-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23984][K8S] Initial Python Bindings for PySpark on K8s #21092
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
fb5b9ed
b7b3db0
98cef8c
dc670dc
eabe4b9
8d3debb
91e2a2c
5761ee8
98cc044
678d381
bf738dc
c59068d
0344f90
306f3ed
f2fc53e
6f66d60
914ff75
d400607
72953a3
7bedeb6
1801e96
24a704e
6a6d69d
ab92913
a61d897
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -284,8 +284,6 @@ private[spark] class SparkSubmit extends Logging { | |
| case (STANDALONE, CLUSTER) if args.isR => | ||
| error("Cluster deploy mode is currently not supported for R " + | ||
| "applications on standalone clusters.") | ||
| case (KUBERNETES, _) if args.isPython => | ||
| error("Python applications are currently not supported for Kubernetes.") | ||
| case (KUBERNETES, _) if args.isR => | ||
| error("R applications are currently not supported for Kubernetes.") | ||
| case (LOCAL, CLUSTER) => | ||
|
|
@@ -695,9 +693,17 @@ private[spark] class SparkSubmit extends Logging { | |
| if (isKubernetesCluster) { | ||
| childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS | ||
| if (args.primaryResource != SparkLauncher.NO_RESOURCE) { | ||
| childArgs ++= Array("--primary-java-resource", args.primaryResource) | ||
| if (args.isPython) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic appears to duplicated from YARN, would it make sense to factor this out into a common function?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We chatted about this off-line and while its close its not exactly the same so we can deal with minor parts of duplication for now. |
||
| childArgs ++= Array("--primary-py-file", args.primaryResource) | ||
| childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner") | ||
| if (args.pyFiles != null) { | ||
| childArgs ++= Array("--other-py-files", args.pyFiles) | ||
| } | ||
| } else { | ||
| childArgs ++= Array("--primary-java-resource", args.primaryResource) | ||
| childArgs ++= Array("--main-class", args.mainClass) | ||
| } | ||
| } | ||
| childArgs ++= Array("--main-class", args.mainClass) | ||
| if (args.childArgs != null) { | ||
| args.childArgs.foreach { arg => | ||
| childArgs += ("--arg", arg) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -117,6 +117,28 @@ private[spark] object Config extends Logging { | |
| .stringConf | ||
| .createWithDefault("spark") | ||
|
|
||
| val KUBERNETES_PYSPARK_PY_FILES = | ||
| ConfigBuilder("spark.kubernetes.python.pyFiles") | ||
| .doc("The PyFiles that are distributed via client arguments") | ||
| .internal() | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE = | ||
| ConfigBuilder("spark.kubernetes.python.mainAppResource") | ||
| .doc("The main app resource for pyspark jobs") | ||
| .internal() | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_PYSPARK_APP_ARGS = | ||
| ConfigBuilder("spark.kubernetes.python.appArgs") | ||
| .doc("The app arguments for PySpark Jobs") | ||
| .internal() | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
|
|
||
| val KUBERNETES_ALLOCATION_BATCH_SIZE = | ||
| ConfigBuilder("spark.kubernetes.allocation.batch.size") | ||
| .doc("Number of pods to launch at once in each round of executor allocation.") | ||
|
|
@@ -154,6 +176,13 @@ private[spark] object Config extends Logging { | |
| .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") | ||
| .createWithDefaultString("1s") | ||
|
|
||
| val MEMORY_OVERHEAD_FACTOR = | ||
| ConfigBuilder("spark.kubernetes.memoryOverheadFactor") | ||
| .doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " + | ||
| "which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to this thanks for adding this. |
||
| .doubleConf | ||
| .createWithDefault(0.10) | ||
|
|
||
| val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = | ||
| "spark.kubernetes.authenticate.submission" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,14 +16,17 @@ | |
| */ | ||
| package org.apache.spark.deploy.k8s | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod} | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.deploy.k8s.Config._ | ||
| import org.apache.spark.deploy.k8s.Constants._ | ||
| import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource} | ||
| import org.apache.spark.deploy.k8s.submit._ | ||
| import org.apache.spark.internal.config.ConfigEntry | ||
|
|
||
|
|
||
| private[spark] sealed trait KubernetesRoleSpecificConf | ||
|
|
||
| /* | ||
|
|
@@ -54,7 +57,8 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( | |
| roleLabels: Map[String, String], | ||
| roleAnnotations: Map[String, String], | ||
| roleSecretNamesToMountPaths: Map[String, String], | ||
| roleEnvs: Map[String, String]) { | ||
| roleEnvs: Map[String, String], | ||
| sparkFiles: Seq[String]) { | ||
|
|
||
| def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) | ||
|
|
||
|
|
@@ -63,10 +67,14 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( | |
| .map(str => str.split(",").toSeq) | ||
| .getOrElse(Seq.empty[String]) | ||
|
|
||
| def sparkFiles(): Seq[String] = sparkConf | ||
| .getOption("spark.files") | ||
| .map(str => str.split(",").toSeq) | ||
| .getOrElse(Seq.empty[String]) | ||
| def pyFiles(): Option[String] = sparkConf | ||
| .get(KUBERNETES_PYSPARK_PY_FILES) | ||
|
|
||
| def pySparkMainResource(): Option[String] = sparkConf | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems redundant with the driver specific spark conf's MainAppResource. Perhaps remove the need to specify this thing twice?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need to parse out the MainAppResource (which I thought we should be doing only once... as such, I thought it would be cleaner to do this... |
||
| .get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE) | ||
|
|
||
| def pySparkAppArgs(): Option[String] = sparkConf | ||
| .get(KUBERNETES_PYSPARK_APP_ARGS) | ||
|
|
||
| def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) | ||
|
|
||
|
|
@@ -101,17 +109,29 @@ private[spark] object KubernetesConf { | |
| appId: String, | ||
| mainAppResource: Option[MainAppResource], | ||
| mainClass: String, | ||
| appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { | ||
| appArgs: Array[String], | ||
| maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { | ||
| val sparkConfWithMainAppJar = sparkConf.clone() | ||
| val additionalFiles = mutable.ArrayBuffer.empty[String] | ||
| mainAppResource.foreach { | ||
| case JavaMainAppResource(res) => | ||
| val previousJars = sparkConf | ||
| .getOption("spark.jars") | ||
| .map(_.split(",")) | ||
| .getOrElse(Array.empty) | ||
| if (!previousJars.contains(res)) { | ||
| sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) | ||
| } | ||
| case JavaMainAppResource(res) => | ||
| val previousJars = sparkConf | ||
| .getOption("spark.jars") | ||
| .map(_.split(",")) | ||
| .getOrElse(Array.empty) | ||
| if (!previousJars.contains(res)) { | ||
| sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) | ||
| } | ||
| case nonJVM: NonJVMResource => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't we just match
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the R step should have the same amount of default MemoryOverhead. As should all NonJVMResources.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe worth a comment then? Especially since R support isn't integrated right now it's perhaps not super clear to folks why this is being done.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a comment since R isn't currently integrated could be a bit difficult to infer? |
||
| nonJVM match { | ||
| case PythonMainAppResource(res) => | ||
| additionalFiles += res | ||
| maybePyFiles.foreach{maybePyFiles => | ||
| additionalFiles.appendAll(maybePyFiles.split(","))} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not for this PR or JIRA, but for later maybe we should normalize our parsing of input files in a way which allows escape characters and share the logic between Yarn/K8s/Mesos/standalone. What do y'all think? Possible follow up JIRA: https://issues.apache.org/jira/browse/SPARK-24184 |
||
| sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) | ||
| sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_APP_ARGS, appArgs.mkString(" ")) | ||
| } | ||
| sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4) | ||
|
||
| } | ||
|
|
||
| val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( | ||
|
|
@@ -132,6 +152,11 @@ private[spark] object KubernetesConf { | |
| val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs( | ||
| sparkConf, KUBERNETES_DRIVER_ENV_PREFIX) | ||
|
|
||
| val sparkFiles = sparkConf | ||
| .getOption("spark.files") | ||
| .map(str => str.split(",").toSeq) | ||
| .getOrElse(Seq.empty[String]) ++ additionalFiles | ||
|
|
||
| KubernetesConf( | ||
| sparkConfWithMainAppJar, | ||
| KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs), | ||
|
|
@@ -140,7 +165,8 @@ private[spark] object KubernetesConf { | |
| driverLabels, | ||
| driverAnnotations, | ||
| driverSecretNamesToMountPaths, | ||
| driverEnvs) | ||
| driverEnvs, | ||
| sparkFiles) | ||
| } | ||
|
|
||
| def createExecutorConf( | ||
|
|
@@ -179,6 +205,7 @@ private[spark] object KubernetesConf { | |
| executorLabels, | ||
| executorAnnotations, | ||
| executorSecrets, | ||
| executorEnv) | ||
| executorEnv, | ||
| Seq.empty[String]) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ import org.apache.spark.SparkException | |
| import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} | ||
| import org.apache.spark.deploy.k8s.Config._ | ||
| import org.apache.spark.deploy.k8s.Constants._ | ||
| import org.apache.spark.deploy.k8s.submit._ | ||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.launcher.SparkLauncher | ||
|
|
||
|
|
@@ -44,11 +45,16 @@ private[spark] class BasicDriverFeatureStep( | |
| private val driverCpuCores = conf.get("spark.driver.cores", "1") | ||
| private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES) | ||
|
|
||
| private val driverDockerContainer = conf.roleSpecificConf.mainAppResource.map { | ||
| case JavaMainAppResource(_) => "driver" | ||
| case PythonMainAppResource(_) => "driver-py" | ||
| }.getOrElse(throw new SparkException("Must specify a JVM or Python Resource")) | ||
|
||
| // Memory settings | ||
| private val driverMemoryMiB = conf.get(DRIVER_MEMORY) | ||
| private val memoryOverheadMiB = conf | ||
| .get(DRIVER_MEMORY_OVERHEAD) | ||
| .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) | ||
| .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, | ||
| MEMORY_OVERHEAD_MIN_MIB)) | ||
| private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB | ||
|
|
||
| override def configurePod(pod: SparkPod): SparkPod = { | ||
|
|
@@ -71,7 +77,7 @@ private[spark] class BasicDriverFeatureStep( | |
| ("cpu", new QuantityBuilder(false).withAmount(limitCores).build()) | ||
| } | ||
|
|
||
| val driverContainer = new ContainerBuilder(pod.container) | ||
| val withoutArgsDriverContainer: ContainerBuilder = new ContainerBuilder(pod.container) | ||
|
||
| .withName(DRIVER_CONTAINER_NAME) | ||
| .withImage(driverContainerImage) | ||
| .withImagePullPolicy(conf.imagePullPolicy()) | ||
|
|
@@ -88,15 +94,22 @@ private[spark] class BasicDriverFeatureStep( | |
| .addToRequests("memory", driverMemoryQuantity) | ||
| .addToLimits("memory", driverMemoryQuantity) | ||
| .endResources() | ||
| .addToArgs("driver") | ||
| .addToArgs(driverDockerContainer) | ||
| .addToArgs("--properties-file", SPARK_CONF_PATH) | ||
| .addToArgs("--class", conf.roleSpecificConf.mainClass) | ||
| // The user application jar is merged into the spark.jars list and managed through that | ||
| // property, so there is no need to reference it explicitly here. | ||
| .addToArgs(SparkLauncher.NO_RESOURCE) | ||
| .addToArgs(conf.roleSpecificConf.appArgs: _*) | ||
| .build() | ||
|
|
||
| val driverContainer = | ||
| if (driverDockerContainer == "driver-py") { | ||
|
||
| withoutArgsDriverContainer | ||
| .build() | ||
| } else { | ||
| // The user application jar is merged into the spark.jars list and managed through that | ||
| // property, so there is no need to reference it explicitly here. | ||
| withoutArgsDriverContainer | ||
| .addToArgs(SparkLauncher.NO_RESOURCE) | ||
| .addToArgs(conf.roleSpecificConf.appArgs: _*) | ||
| .build() | ||
| } | ||
| val driverPod = new PodBuilder(pod.pod) | ||
| .editOrNewMetadata() | ||
| .withName(driverPodName) | ||
|
|
@@ -122,7 +135,7 @@ private[spark] class BasicDriverFeatureStep( | |
| val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath( | ||
| conf.sparkJars()) | ||
| val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath( | ||
| conf.sparkFiles()) | ||
| conf.sparkFiles) | ||
| if (resolvedSparkJars.nonEmpty) { | ||
| additionalProps.put("spark.jars", resolvedSparkJars.mkString(",")) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,7 +54,7 @@ private[spark] class BasicExecutorFeatureStep( | |
|
|
||
| private val memoryOverheadMiB = kubernetesConf | ||
| .get(EXECUTOR_MEMORY_OVERHEAD) | ||
| .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, | ||
| .getOrElse(math.max((kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, | ||
|
||
| MEMORY_OVERHEAD_MIN_MIB)) | ||
| private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One (future concern) is how we would to handle the overlay with both Python and R at the same time.