Skip to content
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fb5b9ed
initial architecture for PySpark w/o dockerfile work
ifilonenko Apr 16, 2018
b7b3db0
included entrypoint logic
ifilonenko Apr 17, 2018
98cef8c
satisfying integration tests
ifilonenko Apr 18, 2018
dc670dc
end-to-end working pyspark
ifilonenko Apr 18, 2018
eabe4b9
Merge pull request #1 from ifilonenko/py-spark
ifilonenko Apr 18, 2018
8d3debb
resolved comments and fixed --pyfiles issue and allowed for python2 o…
ifilonenko May 2, 2018
91e2a2c
Merge pull request #2 from ifilonenko/py-spark
ifilonenko May 2, 2018
5761ee8
Merge branch 'master' of https://github.com/ifilonenko/spark
ifilonenko May 2, 2018
98cc044
restructured step based pattern to resolve comments
ifilonenko May 7, 2018
678d381
Merge pull request #3 from ifilonenko/py-spark
ifilonenko May 7, 2018
bf738dc
resolved comments
ifilonenko May 8, 2018
c59068d
Merge pull request #4 from ifilonenko/py-spark
ifilonenko May 8, 2018
0344f90
resolving style issues
ifilonenko May 9, 2018
306f3ed
Merge pull request #5 from ifilonenko/py-spark
ifilonenko May 9, 2018
f2fc53e
resolved commits
ifilonenko May 13, 2018
6f66d60
merge conflicts
ifilonenko May 13, 2018
914ff75
resolve rebase conflicts
ifilonenko May 11, 2018
d400607
import statements refactoring
ifilonenko May 13, 2018
72953a3
Merge branch 'py-spark'
ifilonenko May 13, 2018
7bedeb6
resolved comments
ifilonenko May 31, 2018
1801e96
merge conflicts
ifilonenko May 31, 2018
24a704e
setIfMissing
ifilonenko Jun 1, 2018
6a6d69d
added e2e tests on --py-files and inclusion of docs on config values
ifilonenko Jun 7, 2018
ab92913
style issues
ifilonenko Jun 7, 2018
a61d897
resolve comments on docs and addition of unit test
ifilonenko Jun 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,20 @@ function build {
if [ ! -d "$IMG_PATH" ]; then
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
fi

local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local BINDING_BUILD_ARGS=(
--build-arg
base_img=$(image_ref spark)
)
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}

docker build "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
-f "$DOCKERFILE" .
-f "$BASEDOCKERFILE" .

docker build "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-py) \
-f "$PYDOCKERFILE" .
}

function push {
Expand All @@ -86,7 +94,8 @@ Commands:
push Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
-f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
-f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
-p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One (future concern) is how we would to handle the overlay with both Python and R at the same time.

-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
Expand Down Expand Up @@ -116,12 +125,14 @@ fi

REPO=
TAG=
DOCKERFILE=
BASEDOCKERFILE=
PYDOCKERFILE=
while getopts f:mr:t: option
do
case "${option}"
in
f) DOCKERFILE=${OPTARG};;
f) BASEDOCKERFILE=${OPTARG};;
p) PYDOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
m)
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,6 @@ private[spark] class SparkSubmit extends Logging {
case (STANDALONE, CLUSTER) if args.isR =>
error("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, _) if args.isPython =>
error("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
error("R applications are currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
Expand Down Expand Up @@ -694,9 +692,17 @@ private[spark] class SparkSubmit extends Logging {
if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
if (args.isPython) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic appears to duplicated from YARN, would it make sense to factor this out into a common function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We chatted about this off-line and while its close its not exactly the same so we can deal with minor parts of duplication for now.

childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
} else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
}
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,28 @@ private[spark] object Config extends Logging {
.stringConf
.createWithDefault("spark")

val KUBERNETES_PYSPARK_PY_FILES =
ConfigBuilder("spark.kubernetes.python.pyFiles")
.doc("The PyFiles that are distributed via client arguments")
.internal()
.stringConf
.createOptional

val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE =
ConfigBuilder("spark.kubernetes.python.mainAppResource")
.doc("The main app resource for pyspark jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_PYSPARK_APP_ARGS =
ConfigBuilder("spark.kubernetes.python.appArgs")
.doc("The app arguments for PySpark Jobs")
.internal()
.stringConf
.createOptional


val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
Expand Down Expand Up @@ -154,6 +176,24 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")

val MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
.doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this thanks for adding this.

.doubleConf
.checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1,
"Ensure that memory overhead is a double between 0 --> 1.0")
.createWithDefault(0.1)

val PYSPARK_MAJOR_PYTHON_VERSION =
ConfigBuilder("spark.kubernetes.pyspark.pythonversion")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for leaving a comment in an ancient PR but I couldn't hold it. Why did we add a configuration to control Python version instead of using the existent PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON?

Doing this in a configuration breaks or disables many things, for example, PEX (https://medium.com/criteo-labs/packaging-code-with-pex-a-pyspark-example-9057f9f144f3) that requires to set PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON manually.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @dongjoon-hyun too FYI. Conda / virtualenv support enabled by #30486 wouldn't work in Kubernates because of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon sounds reasonable to include support for that, we just need to agree on a policy for which takes precedence.

.doc("This sets the python version. Either 2 or 3. (Python2 or Python3)")
.stringConf
.checkValue(pv => List("2", "3").contains(pv),
"Ensure that Python Version is either Python2 or Python3")
.createWithDefault("2")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading this right that the default is Python 2? Is there a reason for that? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason. I just thought that the major version should default to 2.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only ~18 months of support left for Python 2. Python 3 has been around for 10 years and unless there’s a good reason, I think it should be the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am willing to do that: thoughts @holdenk ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with either as the default. While Py2 is officially EOL I think we'll still see PySpark Py2 apps for awhile after.



val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,14 @@ private[spark] object Constants {
val SPARK_CONF_FILE_NAME = "spark.properties"
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"

// BINDINGS
val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN_MIB = 384L
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
*/
package org.apache.spark.deploy.k8s

import scala.collection.mutable

import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config.ConfigEntry


private[spark] sealed trait KubernetesRoleSpecificConf

/*
Expand Down Expand Up @@ -55,7 +58,8 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String]) {
roleEnvs: Map[String, String],
sparkFiles: Seq[String]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

Expand All @@ -64,10 +68,14 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def sparkFiles(): Seq[String] = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])
def pyFiles(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_PY_FILES)

def pySparkMainResource(): Option[String] = sparkConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems redundant with the driver specific spark conf's MainAppResource. Perhaps remove the need to specify this thing twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to parse out the MainAppResource (which I thought we should be doing only once... as such, I thought it would be cleaner to do this...

.get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE)

def pySparkPythonVersion(): String = sparkConf
.get(PYSPARK_MAJOR_PYTHON_VERSION)

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

Expand Down Expand Up @@ -102,17 +110,30 @@ private[spark] object KubernetesConf {
appId: String,
mainAppResource: Option[MainAppResource],
mainClass: String,
appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
appArgs: Array[String],
maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
val additionalFiles = mutable.ArrayBuffer.empty[String]
mainAppResource.foreach {
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
// The function of this outer match is to account for multiple nonJVM
// bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4
case nonJVM: NonJVMResource =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just match PythonMainAppResource immediately here - why the two layers of matching?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the R step should have the same amount of default MemoryOverhead. As should all NonJVMResources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth a comment then? Especially since R support isn't integrated right now it's perhaps not super clear to folks why this is being done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment since R isn't currently integrated could be a bit difficult to infer?

nonJVM match {
case PythonMainAppResource(res) =>
additionalFiles += res
maybePyFiles.foreach{maybePyFiles =>
additionalFiles.appendAll(maybePyFiles.split(","))}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR or JIRA, but for later maybe we should normalize our parsing of input files in a way which allows escape characters and share the logic between Yarn/K8s/Mesos/standalone. What do y'all think? Possible follow up JIRA: https://issues.apache.org/jira/browse/SPARK-24184

sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
}
sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So wait, if the user has specified a different value I don't think we should override it and its not clear to me that this code will not override a user specified value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very true, will need to ensure that it does not override the set value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk this behavior isn't intuitive, that the memory overhead factor default will be calculated differently depending on what language binding the job is running with. Is there a good page in Spark's configuration documentation on https://spark.apache.org/docs/latest/ where this should be documented? Is this logic Kubernetes specific?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you probably only want to setIfMissing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, you can see my statement about not overriding the explicitly user provided value in comment on the 20th ("if the user has specified a different value don't think we should override it").

So this logic, as it stands, is K8s specific and I don't think we we can change how YARN chooses its memory overhead in a minor release, so I'd expect this to remain K8s specific until at least 3.0 when we can evaluate if we want to change this in YARN as well.

The memory overhead configuration notice done in the YARN page right now
(see spark.yarn.am.memoryOverhead on http://spark.apache.org/docs/latest/running-on-yarn.html ). So I would document this in http://spark.apache.org/docs/latest/running-on-kubernetes.html#spark-properties e.g. ./docs/running-on-kubernetes.md).

As for intuitive I'd argue that this actually is more intuitive than what we do in YARN, we know that users who run R & Python need more non-JVM heap space and many users don't know to think about this until their job fails. We can take advantage of our knowledge to handle this setting for the user more often. You can see how often this confuses folks on the list, docs, and stack overflow by looking at "memory overhead exceeded" and "Container killed by YARN for exceeding memory limits" and similar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that power users would want the ability to try to overwrite this if they have a specific amount of memory overhead that they want and know that they need. Configurations should always be configurable, with defaults that are sane. I agree that we can afford to set a better default for Kubernetes, but there should always be a way to override default settings if the user knows the characteristics of their job. For example if the user does memory profiling of their container and sees that it's not using the full amount of memory, they can afford to drop this value and leave more resources for other applications.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users have an ability to overwrite. Hence the change to setIfMissing()

}

val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
Expand All @@ -135,6 +156,11 @@ private[spark] object KubernetesConf {
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)

val sparkFiles = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String]) ++ additionalFiles

KubernetesConf(
sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
Expand All @@ -144,7 +170,8 @@ private[spark] object KubernetesConf {
driverAnnotations,
driverSecretNamesToMountPaths,
driverSecretEnvNamesToKeyRefs,
driverEnvs)
driverEnvs,
sparkFiles)
}

def createExecutorConf(
Expand Down Expand Up @@ -186,6 +213,7 @@ private[spark] object KubernetesConf {
executorAnnotations,
executorMountSecrets,
executorEnvSecrets,
executorEnv)
executorEnv,
Seq.empty[String])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] object KubernetesUtils {
}
}

private def resolveFileUri(uri: String): String = {
def resolveFileUri(uri: String): String = {
val fileUri = Utils.resolveURI(uri)
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher

private[spark] class BasicDriverFeatureStep(
conf: KubernetesConf[KubernetesDriverSpecificConf])
Expand All @@ -48,7 +48,8 @@ private[spark] class BasicDriverFeatureStep(
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
private val memoryOverheadMiB = conf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

override def configurePod(pod: SparkPod): SparkPod = {
Expand Down Expand Up @@ -88,13 +89,6 @@ private[spark] class BasicDriverFeatureStep(
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryQuantity)
.endResources()
.addToArgs("driver")
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", conf.roleSpecificConf.mainClass)
// The user application jar is merged into the spark.jars list and managed through that
// property, so there is no need to reference it explicitly here.
.addToArgs(SparkLauncher.NO_RESOURCE)
.addToArgs(conf.roleSpecificConf.appArgs: _*)
.build()

val driverPod = new PodBuilder(pod.pod)
Expand Down Expand Up @@ -122,7 +116,7 @@ private[spark] class BasicDriverFeatureStep(
val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
conf.sparkJars())
val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
conf.sparkFiles())
conf.sparkFiles)
if (resolvedSparkJars.nonEmpty) {
additionalProps.put("spark.jars", resolvedSparkJars.mkString(","))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ private[spark] class BasicExecutorFeatureStep(

private val memoryOverheadMiB = kubernetesConf
.get(EXECUTOR_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
.getOrElse(math.max(
(kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features.bindings

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata}

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
import org.apache.spark.launcher.SparkLauncher

private[spark] class JavaDriverFeatureStep(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
val withDriverArgs = new ContainerBuilder(pod.container)
.addToArgs("driver")
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", kubernetesConf.roleSpecificConf.mainClass)
// The user application jar is merged into the spark.jars list and managed through that
// property, so there is no need to reference it explicitly here.
.addToArgs(SparkLauncher.NO_RESOURCE)
.addToArgs(kubernetesConf.roleSpecificConf.appArgs: _*)
.build()
SparkPod(pod.pod, withDriverArgs)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Loading