-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22646] [Submission] Spark on Kubernetes - basic submission client #19717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
dcaac45
27c67ff
6d597d0
5b9fa39
5ccadb5
12f2797
c35fe48
faa2849
347ed69
0e8ca01
3a0b8e3
83d0b9c
44c40b1
67bc847
7d2b303
caf2206
2e7810b
cbcd30e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -76,7 +76,8 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
| private val STANDALONE = 2 | ||
| private val MESOS = 4 | ||
| private val LOCAL = 8 | ||
| private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | ||
| private val KUBERNETES = 16 | ||
| private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES | ||
|
|
||
| // Deploy modes | ||
| private val CLIENT = 1 | ||
|
|
@@ -257,6 +258,7 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
| YARN | ||
| case m if m.startsWith("spark") => STANDALONE | ||
| case m if m.startsWith("mesos") => MESOS | ||
| case m if m.startsWith("k8s") => KUBERNETES | ||
| case m if m.startsWith("local") => LOCAL | ||
| case _ => | ||
| printErrorAndExit("Master must either be yarn or start with spark, mesos, local") | ||
|
|
@@ -302,6 +304,12 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
| case (STANDALONE, CLUSTER) if args.isR => | ||
| printErrorAndExit("Cluster deploy mode is currently not supported for R " + | ||
| "applications on standalone clusters.") | ||
| case (KUBERNETES, CLIENT) => | ||
| printErrorAndExit("Client mode is currently not supported for Kubernetes.") | ||
| case (KUBERNETES, _) if args.isPython => | ||
| printErrorAndExit("Python applications are currently not supported for Kubernetes.") | ||
| case (KUBERNETES, _) if args.isR => | ||
| printErrorAndExit("R applications are currently not supported for Kubernetes.") | ||
|
||
| case (LOCAL, CLUSTER) => | ||
| printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") | ||
| case (_, CLUSTER) if isShell(args.primaryResource) => | ||
|
|
@@ -322,6 +330,7 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
| val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER | ||
| val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER | ||
| val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER | ||
| val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER | ||
|
|
||
| if (!isMesosCluster && !isStandAloneCluster) { | ||
| // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files | ||
|
|
@@ -556,20 +565,24 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
| OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"), | ||
| OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"), | ||
|
|
||
| // Kubernetes only | ||
| OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES, | ||
| confKey = "spark.kubernetes.namespace"), | ||
|
|
||
| // Other options | ||
| OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, | ||
| OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, | ||
| confKey = "spark.executor.cores"), | ||
| OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, | ||
| OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES, | ||
| confKey = "spark.executor.memory"), | ||
| OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, | ||
| OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, | ||
| confKey = "spark.cores.max"), | ||
| OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, | ||
| confKey = "spark.files"), | ||
| OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"), | ||
| OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"), | ||
| OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER, | ||
| OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, | ||
| confKey = "spark.driver.memory"), | ||
| OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER, | ||
| OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, | ||
| confKey = "spark.driver.cores"), | ||
| OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, | ||
| confKey = "spark.driver.supervise"), | ||
|
|
@@ -703,6 +716,18 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
| } | ||
| } | ||
|
|
||
| if (isKubernetesCluster) { | ||
| childMainClass = "org.apache.spark.deploy.k8s.submit.Client" | ||
|
||
| childArgs ++= Array("--primary-java-resource", args.primaryResource) | ||
|
||
| childArgs ++= Array("--main-class", args.mainClass) | ||
| if (args.childArgs != null) { | ||
| args.childArgs.foreach { arg => | ||
| childArgs += "--arg" | ||
|
||
| childArgs += arg | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Load any properties specified through --conf and the default properties file | ||
| for ((k, v) <- args.sparkProperties) { | ||
| sparkConf.setIfMissing(k, v) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,6 +81,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
| var submissionToRequestStatusFor: String = null | ||
| var useRest: Boolean = true // used internally | ||
|
|
||
| // Kubernetes only | ||
| var kubernetesNamespace: String = null | ||
|
|
||
| /** Default properties present in the currently defined defaults file. */ | ||
| lazy val defaultSparkProperties: HashMap[String, String] = { | ||
| val defaultProperties = new HashMap[String, String]() | ||
|
|
@@ -199,6 +202,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
| keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull | ||
| principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull | ||
|
|
||
| kubernetesNamespace = Option(kubernetesNamespace) | ||
| .orElse(sparkProperties.get("spark.kubernetes.namespace")) | ||
| .orNull | ||
|
|
||
| // Try to set main class from JAR if no --class argument is given | ||
| if (mainClass == null && !isPython && !isR && primaryResource != null) { | ||
| val uri = new URI(primaryResource) | ||
|
|
@@ -454,6 +461,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
| case KEYTAB => | ||
| keytab = value | ||
|
|
||
| case KUBERNETES_NAMESPACE => | ||
| kubernetesNamespace = value | ||
|
|
||
| case HELP => | ||
| printUsageAndExit(0) | ||
|
|
||
|
|
@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
| | the node running the Application Master via the Secure | ||
| | Distributed Cache, for renewing the login tickets and the | ||
| | delegation tokens periodically. | ||
| | | ||
| | Kubernetes only: | ||
| | --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the | ||
|
||
| | application must be launched. The namespace must already | ||
| | exist in the cluster. (Default: default). | ||
|
||
| """.stripMargin | ||
| ) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -388,6 +388,32 @@ class SparkSubmitSuite | |
| conf.get("spark.ui.enabled") should be ("false") | ||
| } | ||
|
|
||
| test("handles k8s cluster mode") { | ||
| val clArgs = Seq( | ||
| "--deploy-mode", "cluster", | ||
| "--master", "k8s://host:port", | ||
| "--executor-memory", "5g", | ||
| "--class", "org.SomeClass", | ||
| "--kubernetes-namespace", "foo", | ||
| "--driver-memory", "4g", | ||
| "--conf", "spark.kubernetes.driver.docker.image=bar", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also test the arg "--kubernetes-namespace"?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| "/home/thejar.jar", | ||
| "arg1") | ||
| val appArgs = new SparkSubmitArguments(clArgs) | ||
| val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs) | ||
|
|
||
| val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap | ||
| childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar")) | ||
| childArgsMap.get("--main-class") should be (Some("org.SomeClass")) | ||
| childArgsMap.get("--arg") should be (Some("arg1")) | ||
| mainClass should be ("org.apache.spark.deploy.k8s.submit.Client") | ||
| classpath should have length (0) | ||
| conf.get("spark.executor.memory") should be ("5g") | ||
|
||
| conf.get("spark.driver.memory") should be ("4g") | ||
| conf.get("spark.kubernetes.namespace") should be ("foo") | ||
| conf.get("spark.kubernetes.driver.docker.image") should be ("bar") | ||
| } | ||
|
|
||
| test("handles confs with flag equivalents") { | ||
| val clArgs = Seq( | ||
| "--deploy-mode", "cluster", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,8 @@ | |
| */ | ||
| package org.apache.spark.deploy.k8s | ||
|
|
||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config.ConfigBuilder | ||
| import org.apache.spark.network.util.ByteUnit | ||
|
|
@@ -30,6 +32,12 @@ private[spark] object Config extends Logging { | |
| .stringConf | ||
| .createWithDefault("default") | ||
|
|
||
| val DRIVER_DOCKER_IMAGE = | ||
| ConfigBuilder("spark.kubernetes.driver.docker.image") | ||
| .doc("Docker image to use for the driver. Specify this using the standard Docker tag format.") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val EXECUTOR_DOCKER_IMAGE = | ||
| ConfigBuilder("spark.kubernetes.executor.docker.image") | ||
| .doc("Docker image to use for the executors. Specify this using the standard Docker tag " + | ||
|
|
@@ -44,9 +52,9 @@ private[spark] object Config extends Logging { | |
| .checkValues(Set("Always", "Never", "IfNotPresent")) | ||
| .createWithDefault("IfNotPresent") | ||
|
|
||
| val APISERVER_AUTH_DRIVER_CONF_PREFIX = | ||
| val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = | ||
| "spark.kubernetes.authenticate.driver" | ||
| val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX = | ||
| val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = | ||
| "spark.kubernetes.authenticate.driver.mounted" | ||
| val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" | ||
| val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" | ||
|
|
@@ -55,14 +63,35 @@ private[spark] object Config extends Logging { | |
| val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" | ||
|
|
||
| val KUBERNETES_SERVICE_ACCOUNT_NAME = | ||
| ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") | ||
| .doc("Service account that is used when running the driver pod. The driver pod uses " + | ||
| "this service account when requesting executor pods from the API server. If specific " + | ||
| "credentials are given for the driver pod to use, the driver will favor " + | ||
| "using those credentials instead.") | ||
| ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") | ||
| .doc("Service account that is used when running the driver pod. The driver pod uses" + | ||
| " this service account when requesting executor pods from the API server. If specific" + | ||
| " credentials are given for the driver pod to use, the driver will favor" + | ||
| " using those credentials instead.") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_DRIVER_LIMIT_CORES = | ||
| ConfigBuilder("spark.kubernetes.driver.limit.cores") | ||
| .doc("Specify the hard cpu limit for the driver pod") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_EXECUTOR_LIMIT_CORES = | ||
| ConfigBuilder("spark.kubernetes.executor.limit.cores") | ||
| .doc("Specify the hard cpu limit for a single executor pod") | ||
|
||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_DRIVER_MEMORY_OVERHEAD = | ||
| ConfigBuilder("spark.kubernetes.driver.memoryOverhead") | ||
| .doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the" + | ||
| " driver submission server. This is memory that accounts for things like VM overheads," + | ||
| " interned strings, other native overheads, etc. This tends to grow with the driver's" + | ||
| " memory size (typically 6-10%).") | ||
| .bytesConf(ByteUnit.MiB) | ||
| .createOptional | ||
|
|
||
| // Note that while we set a default for this when we start up the | ||
| // scheduler, the specific default value is dynamically determined | ||
| // based on the executor memory. | ||
|
|
@@ -74,9 +103,6 @@ private[spark] object Config extends Logging { | |
| .bytesConf(ByteUnit.MiB) | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." | ||
| val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." | ||
|
|
||
| val KUBERNETES_DRIVER_POD_NAME = | ||
| ConfigBuilder("spark.kubernetes.driver.pod.name") | ||
| .doc("Name of the driver pod.") | ||
|
|
@@ -104,12 +130,6 @@ private[spark] object Config extends Logging { | |
| .checkValue(value => value > 0, "Allocation batch delay should be a positive integer") | ||
| .createWithDefault(1) | ||
|
|
||
| val KUBERNETES_EXECUTOR_LIMIT_CORES = | ||
| ConfigBuilder("spark.kubernetes.executor.limit.cores") | ||
| .doc("Specify the hard cpu limit for a single executor pod") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS = | ||
| ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts") | ||
| .doc("Maximum number of attempts allowed for checking the reason of an executor loss " + | ||
|
|
@@ -119,5 +139,45 @@ private[spark] object Config extends Logging { | |
| "must be a positive integer") | ||
| .createWithDefault(10) | ||
|
|
||
| val WAIT_FOR_APP_COMPLETION = | ||
| ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") | ||
| .doc("In cluster mode, whether to wait for the application to finish before exiting the" + | ||
| " launcher process.") | ||
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
||
| val REPORT_INTERVAL = | ||
| ConfigBuilder("spark.kubernetes.report.interval") | ||
| .doc("Interval between reports of the current app status in cluster mode.") | ||
| .timeConf(TimeUnit.MILLISECONDS) | ||
| .createWithDefaultString("1s") | ||
|
|
||
| val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = | ||
| "spark.kubernetes.authenticate.submission" | ||
|
|
||
| val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." | ||
|
|
||
| val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." | ||
| val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." | ||
|
|
||
| val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." | ||
| val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." | ||
|
|
||
| val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv." | ||
|
|
||
| def getK8sMasterUrl(rawMasterString: String): String = { | ||
| if (!rawMasterString.startsWith("k8s://")) { | ||
|
||
| throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.") | ||
| } | ||
| val masterWithoutK8sPrefix = rawMasterString.replaceFirst("k8s://", "") | ||
|
||
| if (masterWithoutK8sPrefix.startsWith("http://") | ||
| || masterWithoutK8sPrefix.startsWith("https://")) { | ||
| masterWithoutK8sPrefix | ||
| } else { | ||
| val resolvedURL = s"https://$masterWithoutK8sPrefix" | ||
| logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved" + | ||
| s" URL is $resolvedURL") | ||
| resolvedURL | ||
|
||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Master must either be yarn or start with spark, mesos, k8s, localThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.