-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22372][core, yarn] Make cluster submission use SparkApplication. #19631
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
c80554b
cee6be2
cb6c8ff
593eb6b
121bcf8
08f47ca
2129ccb
86f0bf8
e8dc7ec
515f8f8
c752453
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,11 +25,10 @@ import javax.net.ssl._ | |
|
|
||
| import com.google.common.hash.HashCodes | ||
| import com.google.common.io.Files | ||
| import org.apache.hadoop.io.Text | ||
|
|
||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.launcher.SparkLauncher | ||
| import org.apache.spark.network.sasl.SecretKeyHolder | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -225,7 +224,6 @@ private[spark] class SecurityManager( | |
| setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", "")); | ||
| setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", "")); | ||
|
|
||
| private val secretKey = generateSecretKey() | ||
| logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") + | ||
| "; ui acls " + (if (aclsOn) "enabled" else "disabled") + | ||
| "; users with view permissions: " + viewAcls.toString() + | ||
|
|
@@ -416,50 +414,6 @@ private[spark] class SecurityManager( | |
|
|
||
| def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey | ||
|
|
||
| /** | ||
| * Generates or looks up the secret key. | ||
| * | ||
| * The way the key is stored depends on the Spark deployment mode. Yarn | ||
| * uses the Hadoop UGI. | ||
| * | ||
| * For non-Yarn deployments, If the config variable is not set | ||
| * we throw an exception. | ||
| */ | ||
| private def generateSecretKey(): String = { | ||
| if (!isAuthenticationEnabled) { | ||
| null | ||
| } else if (SparkHadoopUtil.get.isYarnMode) { | ||
| // In YARN mode, the secure cookie will be created by the driver and stashed in the | ||
| // user's credentials, where executors can get it. The check for an array of size 0 | ||
| // is because of the test code in YarnSparkHadoopUtilSuite. | ||
| val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY) | ||
| if (secretKey == null || secretKey.length == 0) { | ||
| logDebug("generateSecretKey: yarn mode, secret key from credentials is null") | ||
| val rnd = new SecureRandom() | ||
| val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE | ||
| val secret = new Array[Byte](length) | ||
| rnd.nextBytes(secret) | ||
|
|
||
| val cookie = HashCodes.fromBytes(secret).toString() | ||
| SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie) | ||
| cookie | ||
| } else { | ||
| new Text(secretKey).toString | ||
| } | ||
| } else { | ||
| // user must have set spark.authenticate.secret config | ||
| // For Master/Worker, auth secret is in conf; for Executors, it is in env variable | ||
| Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET)) | ||
| .orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match { | ||
| case Some(value) => value | ||
| case None => | ||
| throw new IllegalArgumentException( | ||
| "Error: a secret key must be specified via the " + | ||
| SecurityManager.SPARK_AUTH_SECRET_CONF + " config") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Check to see if Acls for the UI are enabled | ||
| * @return true if UI authentication is enabled, otherwise false | ||
|
|
@@ -542,7 +496,55 @@ private[spark] class SecurityManager( | |
| * Gets the secret key. | ||
| * @return the secret key as a String if authentication is enabled, otherwise returns null | ||
| */ | ||
| def getSecretKey(): String = secretKey | ||
| def getSecretKey(): String = { | ||
| if (isAuthenticationEnabled) { | ||
| Option(sparkConf.getenv(ENV_AUTH_SECRET)) | ||
| .orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF)) | ||
| .getOrElse { | ||
| throw new IllegalArgumentException( | ||
| s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config") | ||
| } | ||
| } else { | ||
| null | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Initialize the configuration object held by this class for authentication. | ||
| * | ||
| * If authentication is disabled, do nothing. | ||
| * | ||
| * In YARN mode, generate a secret key and store it in the configuration object, setting it up to | ||
| * also be propagated to executors using an env variable. | ||
| * | ||
| * In other modes, assert that the auth secret is set in the configuration. | ||
| */ | ||
| def initializeAuth(): Unit = { | ||
| if (!sparkConf.get(NETWORK_AUTH_ENABLED)) { | ||
| return | ||
| } | ||
|
|
||
| if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") { | ||
| require(sparkConf.contains(SPARK_AUTH_SECRET_CONF), | ||
| s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.") | ||
| return | ||
| } | ||
|
|
||
| // In YARN, force creation of a new secret if this is client mode. This ensures each | ||
|
||
| // YARN app uses a different secret. For cluster mode, this relies on YARN's client to | ||
| // not propagate the secret to the driver, which will then generate a new one. | ||
| val deployMode = sparkConf.get(SparkLauncher.DEPLOY_MODE, "client") | ||
| if (!sparkConf.contains(SPARK_AUTH_SECRET_CONF) || deployMode == "client") { | ||
| val rnd = new SecureRandom() | ||
| val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE | ||
| val secretBytes = new Array[Byte](length) | ||
| rnd.nextBytes(secretBytes) | ||
|
|
||
| val secret = HashCodes.fromBytes(secretBytes).toString() | ||
| sparkConf.set(SPARK_AUTH_SECRET_CONF, secret) | ||
| sparkConf.setExecutorEnv(ENV_AUTH_SECRET, secret) | ||
| } | ||
| } | ||
|
|
||
| // Default SecurityManager only has a single secret key, so ignore appId. | ||
| override def getSaslUser(appId: String): String = getSaslUser() | ||
|
|
@@ -551,13 +553,10 @@ private[spark] class SecurityManager( | |
|
|
||
| private[spark] object SecurityManager { | ||
|
|
||
| val SPARK_AUTH_CONF: String = "spark.authenticate" | ||
| val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret" | ||
| val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key | ||
| val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can also make this as a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a separate change maybe. There are too many references to this right now, it would be really noisy here. |
||
| // This is used to set auth secret to an executor's env variable. It should have the same | ||
| // value as SPARK_AUTH_SECRET_CONF set in SparkConf | ||
| val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET" | ||
|
|
||
| // key used to store the spark secret in the Hadoop UGI | ||
| val SECRET_LOOKUP_KEY = "sparkCookie" | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -412,8 +412,6 @@ class SparkContext(config: SparkConf) extends Logging { | |
| } | ||
| } | ||
|
|
||
| if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure why this is not required anymore?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change is removing all references to |
||
|
|
||
| _listenerBus = new LiveListenerBus(_conf) | ||
|
|
||
| // "_jobProgressListener" should be set up before creating SparkEnv because when creating | ||
|
|
@@ -1943,7 +1941,6 @@ class SparkContext(config: SparkConf) extends Logging { | |
| // `SparkContext` is stopped. | ||
| localProperties.remove() | ||
| // Unset YARN mode system env variable, to allow switching between cluster types. | ||
| System.clearProperty("SPARK_YARN_MODE") | ||
| SparkContext.clearActiveContext() | ||
| logInfo("Successfully stopped SparkContext") | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -92,6 +92,11 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
|
|
||
| private val CLASS_NOT_FOUND_EXIT_STATUS = 101 | ||
|
|
||
| // Following constants are visible for testing. | ||
| private[deploy] val YARN_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication" | ||
| private[deploy] val REST_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() | ||
| private[deploy] val STANDALONE_SUBMIT_CLASS = classOf[ClientApp].getName() | ||
|
||
|
|
||
| // scalastyle:off println | ||
| private[spark] def printVersionAndExit(): Unit = { | ||
| printStream.println("""Welcome to | ||
|
|
@@ -281,7 +286,7 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
| } | ||
|
|
||
| // Make sure YARN is included in our build if we're trying to use it | ||
| if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { | ||
| if (!Utils.classIsLoadable(YARN_SUBMIT_CLASS) && !Utils.isTesting) { | ||
| printErrorAndExit( | ||
| "Could not load YARN classes. " + | ||
| "This copy of Spark may not have been compiled with YARN support.") | ||
|
|
@@ -365,22 +370,21 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
|
|
||
| // This security manager will not need an auth secret, but set a dummy value in case | ||
| // spark.authenticate is enabled, otherwise an exception is thrown. | ||
|
||
| lazy val downloadConf = sparkConf.clone().set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") | ||
| lazy val secMgr = new SecurityManager(downloadConf) | ||
| lazy val secMgr = new SecurityManager(sparkConf) | ||
|
|
||
| // In client mode, download remote files. | ||
| var localPrimaryResource: String = null | ||
| var localJars: String = null | ||
| var localPyFiles: String = null | ||
| if (deployMode == CLIENT) { | ||
| localPrimaryResource = Option(args.primaryResource).map { | ||
| downloadFile(_, targetDir, downloadConf, hadoopConf, secMgr) | ||
| downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr) | ||
| }.orNull | ||
| localJars = Option(args.jars).map { | ||
| downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr) | ||
| downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) | ||
| }.orNull | ||
| localPyFiles = Option(args.pyFiles).map { | ||
| downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr) | ||
| downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) | ||
| }.orNull | ||
| } | ||
|
|
||
|
|
@@ -391,8 +395,6 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
| // For yarn client mode, since we already download them with above code, so we only need to | ||
| // figure out the local path and replace the remote one. | ||
| if (clusterManager == YARN) { | ||
| sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") | ||
| val secMgr = new SecurityManager(sparkConf) | ||
| val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) | ||
|
|
||
| def shouldDownload(scheme: String): Boolean = { | ||
|
|
@@ -409,7 +411,7 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
| if (file.exists()) { | ||
| file.toURI.toString | ||
| } else { | ||
| downloadFile(resource, targetDir, downloadConf, hadoopConf, secMgr) | ||
| downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) | ||
| } | ||
| case _ => uri.toString | ||
| } | ||
|
|
@@ -634,11 +636,11 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
| // All Spark parameters are expected to be passed to the client through system properties. | ||
| if (args.isStandaloneCluster) { | ||
| if (args.useRest) { | ||
| childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient" | ||
| childMainClass = REST_SUBMIT_CLASS | ||
| childArgs += (args.primaryResource, args.mainClass) | ||
| } else { | ||
| // In legacy standalone cluster mode, use Client as a wrapper around the user class | ||
| childMainClass = "org.apache.spark.deploy.Client" | ||
| childMainClass = STANDALONE_SUBMIT_CLASS | ||
| if (args.supervise) { childArgs += "--supervise" } | ||
| Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) } | ||
| Option(args.driverCores).foreach { c => childArgs += ("--cores", c) } | ||
|
|
@@ -663,7 +665,7 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
|
|
||
| // In yarn-cluster mode, use yarn.Client as a wrapper around the user class | ||
| if (isYarnCluster) { | ||
| childMainClass = "org.apache.spark.deploy.yarn.Client" | ||
| childMainClass = YARN_SUBMIT_CLASS | ||
| if (args.isPython) { | ||
| childArgs += ("--primary-py-file", args.primaryResource) | ||
| childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") | ||
|
|
@@ -684,7 +686,7 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
|
|
||
| if (isMesosCluster) { | ||
| assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API") | ||
| childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient" | ||
| childMainClass = REST_SUBMIT_CLASS | ||
| if (args.isPython) { | ||
| // Second argument is main class | ||
| childArgs += (args.primaryResource, "") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that now for the driver we are using the conf (SPARK_AUTH_SECRET_CONF) just as a holding point for yarn. To me this introduces a bit of risk that it more easily gets out to the user. we are filtering it out from the spark conf written for executors but that seems more brittle then if its just not in there.
I realize this makes the code a bit more common for the other modes, but the other modes aren't really secure. I would almost rather keep the in memory secretKey variable as storage on yarn.
I think this also makes the secret key available for the user to get on the driver side (sc.getConf.get..) which I think it would be better to hide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, in Mesos, we are planning on using the Secrets primitives to distribute
ENV_AUTH_SECRET. This way Mesos and YARN can both use the same secret-generation code and only differ in the distribution of the secret.SPARK_AUTH_SECRET_CONFis already somewhat awkward because it has to be in the config.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add a different, internal config for this is re-using
SPARK_AUTH_SECRET_CONFis confusing. But I'm not too concerned about exposing this to the user code running the application; they can just as easily get that info from the UGI currently. Spark already redacts this kind of information when writing it to things like the event log, which would be one place where it might leak out.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed things and now they work pretty much as before. It would be good to separate secret generation from distribution, but I'd rather do that separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree SPARK_AUTH_SECRET_CONF is awkward and not really secure, when I initially did this , this is what was requested by other committers since standalone and mesos needed more security work around it anyway.
I don't follow how the MesosSecretConfig is going to be used fully. Are these just regular spark configs passed around or distributed through mesos somehow?