Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix metastore
  • Loading branch information
skonto committed Apr 1, 2018
commit 1060405b11cc4da88e1a65b5d694fb7f9e7e18b5
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ class SparkHadoopUtil extends Logging {
*/
def addCredentials(conf: JobConf): Unit = {
val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
val userCreds = UserGroupInformation.getCurrentUser().getCredentials()
logInfo(s"Adding user credentials: ${SparkHadoopUtil.get.dumpTokens(userCreds)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use dumpTokens in an info message. Ok to add as debug if you want. But not really sure why you're adding this in this PR.

Copy link
Contributor Author

@skonto skonto Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I saw it being used here, so I thought it would be helpful at the info level. The reason I added it there is I would like to see what credentials the HadoopRDD uses. There are different parts in code base where credentials are added and understanding what is happening can be confusing when looking at the logs of a job. Not clear to people that HadoopRDD fetches tokens on its own.

jobCreds.mergeAll(userCreds)
}

def addCurrentUserCredentials(creds: Credentials): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
// An internal option used only for spark-shell to add user jars to repl's classloader,
// previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
// remote jars, so adding a new option to only specify local jars for spark-shell internally.
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars"),
OptionAssigner(args.proxyUser, MESOS, CLUSTER, confKey = "spark.mesos.proxyUser")
)

// In client mode, launch the application main class directly
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ class HadoopRDD[K, V](
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
logInfo(s"HadoopRDD credentials: ${SparkHadoopUtil.get.dumpTokens(jobConf.getCredentials)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor Author

@skonto skonto Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented above.

val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ private[spark] class MesosClusterScheduler(
options ++= Seq("--class", desc.command.mainClass)
}

desc.conf.getOption("spark.mesos.proxyUser").foreach { v =>
options ++= Seq("--proxy-user", v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a little odd. How's a cluster mode app run in Mesos?

Basically what I want to know:

  • which process starts the driver
  • what user that process is running as, and which user will the driver process run as
  • what kerberos credentials does it have and how are they managed

The gist is that running the Spark driver in client mode (which I think is how the driver in cluster mode is started eventually?) with a proxy user is a weird combination. It means the user code running in that driver has access to the credentials of the more privileged user - and could in its turn use those to run anything as any other user...

In comparison, YARN + cluster mode + proxy user starts the YARN application as the proxy user. So the user code, which only runs in a YARN container, has no access to the privileged credentials, which only exist in the launcher.

Copy link
Contributor Author

@skonto skonto Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin

On DC/OS the spark dcos cli, which supports kerberos & keytab paths, submits jobs directly to the mesos rest api at the mesos dispatcher side. The keytabs are stored on the DC/OS secret store before the job is launched and they are mounted on the container before container is launched.
Thus, the idea here is to store the keytab for the superuser on the secret store, so the spark driver which is eventually launched in client mode within the cluster, to login to kerberos and impersonate another user. The driver will start the SparkJob's main as a proxy user (as usual) and will use the superuser credentials to impersonate the passed proxy user.
The driver is started by the mesos dispatcher and the mesos dispatcher does not have any access to keytabs, it just passes the spark config options. The driver can access a secret only if it is allowed to (this is controlled by DC/OS labels).
The OS user used by the container depends on the setup but that user should be the appropriate one.
Right now DC/OS switched back to root for Spark containers, previously it used nobody but users can customize the image to add their own users anyway.
You can change the user by passing --conf spark.mesos.driverEnv.SPARK_USER=.
Spark on Mesos uses that value if defined when setting up mesos tasks for the executors for example.
In containerized envs this adds extra headaches.
As a whole this is not that different from running this in client mode because in client mode as well I need to access the superuser's credentials somehow. The whole concept is migrated within a container and then the env (DC/OS) should make sure that the same user is consistent from the submit side all the way within the container and enforce restrictions. That is the intention here.

The other option to mimic yarn would be the spark submit to upload a locally created DT (to secret store) in the cluster and the driver to use that for impersonation. But this is not how things work on DC/OS deployments as Michael mentioned in the past here: https://issues.apache.org/jira/browse/SPARK-16742, you may not even have access to the keytab at the launcher side. Yarn has a different approach for that as you mentioned.
At the end of the day, if impersonation includes also launching the driver container as the proxy user then this can be supported with this PR by setting the appropriate user but it will have access to superuser's credentials, this is not ok. On the other hand, If impersonation for mesos starts within Spark at the integration level with the hadoop ecosystem (actually it starts with launching user's main with that user) then I dont see how this PR differs from mesos client mode with impersonation enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@susanxhuynh feel free to add more on how DC/OS (mesos) handles the multi-tenancy story in general and user identity management.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The driver will start the SparkJob's main as a proxy user (as usual) and will use the superuser credentials to impersonate the passed proxy user.

That's a big problem, because, as I said, that makes the super user credentials available to untrusted user code. How do you prevent the user's Spark app from using those credentials?

On YARN cluster mode the super user's credentials are never available to the user application. (On client mode they are, but really, if you're using --proxy-user in client mode you're missing the point, or I hope you know what you're doing.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, you have a problem here you need to solve.

You either have to require kerberos creds on the launcher side, so you can upload DTs in cluster mode, or you need some level of separation between the code that launches the driver and the driver itself. The current system you have here is not secure at all - any user can just impersonate any other user, since they have access to the super user's credentials.

Copy link
Contributor Author

@skonto skonto Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My problem here is that you're making spark-submit + proxy user + client mode the official way to > run Spark on Mesos in cluster mode, and now you're basically exposing everyone to that security > issue.

Yes because the assumption was client mode was safe. There is no warning about this especially for end users and I just started looking into the hadoop security part.
Anyway good to know will get back with an update thanx for the comments, discussing via comments is hard sometimes...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes because the assumption was client mode was safe. There is no warning about this

Could probably use something in the documentation - warnings printed to logs are easily ignored. Still, there are legitimate uses for client mode + proxy user, but I don't think this is one of them.

Copy link
Contributor Author

@skonto skonto Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are legitimate uses if it is not safe? Like knowing what your code does so its ok, spark shell?

require the launcher to have a kerberos login, and send DTs to the application. a.k.a. what >Spark-on-YARN does.
in the code that launches the driver on the Mesos side, create the DTs in a safe context (e.g. > not as part of the spark-submit invocation) and provide them to the Spark driver using the > HADOOP_TOKEN_FILE_LOCATION env var.

For the first option when I run the hive examples with yarn (EMR) in cluster mode (without a TGT) it did fail but it didnt require any credentials (no Spark code does that, its hadoop code). I got:

(Mechanism level: Failed to find any Kerberos tgt)

Coming from this line:

at > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getYarnClusterMetrics(YarnClientImpl.jav> a:550)
at > org.apache.spark.deploy.yarn.Client$$anonfun$submitApplication$1.apply(Client.scala:156)

So not sure what you mean here, unless you mean that and then if that check passes create the DTs at the launcher anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are legitimate uses if it is not safe? Like knowing what your code does so its ok

Yes. For example you can run some trusted code as a less-privileged user, so that you don't accidentally do something stupid as a super user.

(Mechanism level: Failed to find any Kerberos tgt)

That means you don't have any credentials (neither a tgt nor a dt). I don't know EMR so I don't know how to use it (with or without kerberos).

Copy link
Contributor Author

@skonto skonto Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means you don't have any credentials (neither a tgt nor a dt). I don't know EMR so I don't > know how to use it (with or without kerberos).

Yes that was the intention to check where Spark on Yarn code fails when there is no TGT (removed it with kdestroy). I am using it with kerberos.

}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cluster mode we need to pass the proxy user to the dispatcher.

desc.conf.getOption("spark.executor.memory").foreach { v =>
options ++= Seq("--executor-memory", v)
}
Expand All @@ -521,6 +525,7 @@ private[spark] class MesosClusterScheduler(

// --conf
val replicatedOptionsBlacklist = Set(
"spark.mesos.proxyUser",
"spark.jars", // Avoids duplicate classes in classpath
"spark.submit.deployMode", // this would be set to `cluster`, but we need client
"spark.master" // this contains the address of the dispatcher, not master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver

import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
Expand Down Expand Up @@ -62,6 +63,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)

private val isProxyUser = SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser())

// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2

Expand Down Expand Up @@ -194,8 +197,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
super.start()

if (sc.deployMode == "client") {
if (isProxyUser) {
fetchHadoopDelegationTokens()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only happen if security is enabled and proxy user exists.

}
launcherBackend.connect()
}

val startedBefore = IdHelper.startedBefore.getAndSet(true)

val suffix = if (startedBefore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ private[spark] class MesosHadoopDelegationTokenManager(

private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
try {
val creds = UserGroupInformation.getCurrentUser.getCredentials
val currentUser = UserGroupInformation.getCurrentUser()
val creds = currentUser.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
if (SparkHadoopUtil.get.isProxyUser(currentUser)) {
currentUser.addCredentials(creds)
}
logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
(SparkHadoopUtil.get.serialize(creds), SparkHadoopUtil.nextCredentialRenewalTime(rt, conf))
} catch {
Expand Down