Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fb5b9ed
initial architecture for PySpark w/o dockerfile work
ifilonenko Apr 16, 2018
b7b3db0
included entrypoint logic
ifilonenko Apr 17, 2018
98cef8c
satisfying integration tests
ifilonenko Apr 18, 2018
dc670dc
end-to-end working pyspark
ifilonenko Apr 18, 2018
eabe4b9
Merge pull request #1 from ifilonenko/py-spark
ifilonenko Apr 18, 2018
8d3debb
resolved comments and fixed --pyfiles issue and allowed for python2 o…
ifilonenko May 2, 2018
91e2a2c
Merge pull request #2 from ifilonenko/py-spark
ifilonenko May 2, 2018
5761ee8
Merge branch 'master' of https://github.com/ifilonenko/spark
ifilonenko May 2, 2018
98cc044
restructured step based pattern to resolve comments
ifilonenko May 7, 2018
678d381
Merge pull request #3 from ifilonenko/py-spark
ifilonenko May 7, 2018
bf738dc
resolved comments
ifilonenko May 8, 2018
c59068d
Merge pull request #4 from ifilonenko/py-spark
ifilonenko May 8, 2018
0344f90
resolving style issues
ifilonenko May 9, 2018
306f3ed
Merge pull request #5 from ifilonenko/py-spark
ifilonenko May 9, 2018
f2fc53e
resolved commits
ifilonenko May 13, 2018
6f66d60
merge conflicts
ifilonenko May 13, 2018
914ff75
resolve rebase conflicts
ifilonenko May 11, 2018
d400607
import statements refactoring
ifilonenko May 13, 2018
72953a3
Merge branch 'py-spark'
ifilonenko May 13, 2018
7bedeb6
resolved comments
ifilonenko May 31, 2018
1801e96
merge conflicts
ifilonenko May 31, 2018
24a704e
setIfMissing
ifilonenko Jun 1, 2018
6a6d69d
added e2e tests on --py-files and inclusion of docs on config values
ifilonenko Jun 7, 2018
ab92913
style issues
ifilonenko Jun 7, 2018
a61d897
resolve comments on docs and addition of unit test
ifilonenko Jun 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ private[spark] class BasicDriverFeatureStep(
private val driverCpuCores = conf.get("spark.driver.cores", "1")
private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)

private val driverDockerContainer = conf.roleSpecificConf.mainAppResource.map {
case JavaMainAppResource(_) => "driver"
case PythonMainAppResource(_) => "driver-py"
}.getOrElse(throw new SparkException("Must specify a JVM or Python Resource"))
// Memory settings
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
private val memoryOverheadMiB = conf
Expand Down Expand Up @@ -93,9 +89,6 @@ private[spark] class BasicDriverFeatureStep(
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryQuantity)
.endResources()
.addToArgs(driverDockerContainer)
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", conf.roleSpecificConf.mainClass)
.build()

val driverPod = new PodBuilder(pod.pod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder
import io.fabric8.kubernetes.api.model.HasMetadata

import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH
import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
import org.apache.spark.launcher.SparkLauncher
Expand All @@ -29,6 +30,9 @@ private[spark] class JavaDriverFeatureStep(
extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
val withDriverArgs = new ContainerBuilder(pod.container)
.addToArgs("driver")
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", kubernetesConf.roleSpecificConf.mainClass)
// The user application jar is merged into the spark.jars list and managed through that
// property, so there is no need to reference it explicitly here.
.addToArgs(SparkLauncher.NO_RESOURCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.deploy.k8s.features.bindings
import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.ContainerBuilder
import io.fabric8.kubernetes.api.model.EnvVar
import io.fabric8.kubernetes.api.model.EnvVarBuilder
import io.fabric8.kubernetes.api.model.HasMetadata

Expand All @@ -35,20 +34,20 @@ private[spark] class PythonDriverFeatureStep(
override def configurePod(pod: SparkPod): SparkPod = {
val roleConf = kubernetesConf.roleSpecificConf
require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined")
val maybePythonArgs: Option[EnvVar] = Option(roleConf.appArgs).filter(_.nonEmpty).map(
val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map(
s =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use something more descriptive than s.

new EnvVarBuilder()
.withName(ENV_PYSPARK_ARGS)
.withValue(s.mkString(","))
.build())
val maybePythonFiles: Option[EnvVar] = kubernetesConf.pyFiles().map(
val maybePythonFiles = kubernetesConf.pyFiles().map(
pyFiles =>
new EnvVarBuilder()
.withName(ENV_PYSPARK_FILES)
.withValue(KubernetesUtils.resolveFileUrisAndPath(pyFiles.split(","))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment on why we need switch from ","s to ":"s.

.mkString(":"))
.build())
val envSeq : Seq[EnvVar] =
val envSeq =
Seq(new EnvVarBuilder()
.withName(ENV_PYSPARK_PRIMARY)
.withValue(KubernetesUtils.resolveFileUri(kubernetesConf.pySparkMainResource().get))
Expand All @@ -62,7 +61,11 @@ private[spark] class PythonDriverFeatureStep(
maybePythonFiles.toSeq

val withPythonPrimaryContainer = new ContainerBuilder(pod.container)
.addAllToEnv(pythonEnvs.asJava).build()
.addAllToEnv(pythonEnvs.asJava)
.addToArgs("driver-py")
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", roleConf.mainClass)
.build()

SparkPod(pod.pod, withPythonPrimaryContainer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[spark] class KubernetesDriverBuilder(
provideJavaStep: (
KubernetesConf[KubernetesDriverSpecificConf]
=> JavaDriverFeatureStep) =
new JavaDriverFeatureStep(_),
new JavaDriverFeatureStep(_),
providePythonStep: (
KubernetesConf[KubernetesDriverSpecificConf]
=> PythonDriverFeatureStep) =
Expand All @@ -49,18 +49,14 @@ private[spark] class KubernetesDriverBuilder(
provideServiceStep(kubernetesConf))
val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
Some(provideSecretsStep(kubernetesConf)) } else None
val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.getOrElse(None)
match {
val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map {
case JavaMainAppResource(_) =>
Some(provideJavaStep(kubernetesConf))
provideJavaStep(kubernetesConf)
case PythonMainAppResource(_) =>
Some(providePythonStep(kubernetesConf))
case _ => None
}
providePythonStep(kubernetesConf)}.getOrElse(provideJavaStep(kubernetesConf))
val allFeatures: Seq[KubernetesFeatureConfigStep] =
baseFeatures ++
maybeRoleSecretNamesStep.toSeq ++
bindingsStep.toSeq
(baseFeatures :+ bindingsStep) ++
maybeRoleSecretNamesStep.toSeq
var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
val configuredPod = feature.configurePod(spec.pod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ class KubernetesConfSuite extends SparkFunSuite {
APP_NAME,
RESOURCE_NAME_PREFIX,
APP_ID,
None,
mainAppResource = None,
MAIN_CLASS,
APP_ARGS,
None)
maybePyFiles = None)
assert(conf.appId === APP_ID)
assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap)
assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX)
Expand All @@ -78,7 +78,7 @@ class KubernetesConfSuite extends SparkFunSuite {
mainAppJar,
MAIN_CLASS,
APP_ARGS,
None)
maybePyFiles = None)
assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars")
.split(",")
=== Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar"))
Expand All @@ -87,10 +87,10 @@ class KubernetesConfSuite extends SparkFunSuite {
APP_NAME,
RESOURCE_NAME_PREFIX,
APP_ID,
None,
mainAppResource = None,
MAIN_CLASS,
APP_ARGS,
None)
maybePyFiles = None)
assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",")
=== Array("local:///opt/spark/jar1.jar"))
assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR).isEmpty)
Expand Down Expand Up @@ -141,10 +141,10 @@ class KubernetesConfSuite extends SparkFunSuite {
APP_NAME,
RESOURCE_NAME_PREFIX,
APP_ID,
None,
mainAppResource = None,
MAIN_CLASS,
APP_ARGS,
None)
maybePyFiles = None)
assert(conf.roleLabels === Map(
SPARK_APP_ID_LABEL -> APP_ID,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
val basePod = SparkPod.initialPod()
val configuredJavaPod = javaFeatureStep.configurePod(basePod)
val configuredPythonPod = pythonFeatureStep.configurePod(basePod)
assert(configuredJavaPod.container.getArgs.get(0) === "driver")
assert(configuredPythonPod.container.getArgs.get(0) === "driver-py")
}

test("Additional system properties resolve jars and set cluster-mode confs.") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,40 @@ import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.PythonMainAppResource

class JavaDriverFeatureStepSuite extends SparkFunSuite {


test("Python Step modifies container correctly") {
test("Java Step modifies container correctly") {
val baseDriverPod = SparkPod.initialPod()
val sparkConf = new SparkConf(false)
val kubernetesConf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
Some(PythonMainAppResource("local:///main.jar")),
"test-app",
"test-class",
"java-runner",
Seq("5 7")),
"",
"",
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
appResourceNamePrefix = "",
appId = "",
roleLabels = Map.empty,
roleAnnotations = Map.empty,
roleSecretNamesToMountPaths = Map.empty,
roleEnvs = Map.empty,
sparkFiles = Seq.empty[String])

val step = new JavaDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
val driverContainerwithJavaStep = step.configurePod(baseDriverPod).container
assert(driverContainerwithJavaStep.getArgs.size === 2)
assert(driverContainerwithJavaStep.getArgs.size === 7)
val args = driverContainerwithJavaStep
.getArgs.asScala
assert(args === List("spark-internal", "5 7"))
assert(args === List(
"driver",
"--properties-file", SPARK_CONF_PATH,
"--class", "test-class",
"spark-internal", "5 7"))

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
"test-app",
"python-runner",
Seq("5 7")),
"",
"",
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
appResourceNamePrefix = "",
appId = "",
roleLabels = Map.empty,
roleAnnotations = Map.empty,
roleSecretNamesToMountPaths = Map.empty,
roleEnvs = Map.empty,
sparkFiles = Seq.empty[String])

val step = new PythonDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
// assert(driverContainerwithPySpark.getEnv.size === 4)
assert(driverContainerwithPySpark.getEnv.size === 4)
val envs = driverContainerwithPySpark
.getEnv
.asScala
Expand All @@ -78,20 +78,25 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
sparkConf,
KubernetesDriverSpecificConf(
Some(PythonMainAppResource("local:///main.py")),
"test-app",
"test-class-py",
"python-runner",
Seq.empty[String]),
"",
"",
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
appResourceNamePrefix = "",
appId = "",
roleLabels = Map.empty,
roleAnnotations = Map.empty,
roleSecretNamesToMountPaths = Map.empty,
roleEnvs = Map.empty,
sparkFiles = Seq.empty[String])
val step = new PythonDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
assert(driverContainerwithPySpark.getEnv.size === 2)
val args = driverContainerwithPySpark
.getArgs.asScala
assert(driverContainerwithPySpark.getArgs.size === 5)
assert(args === List(
"driver-py",
"--properties-file", SPARK_CONF_PATH,
"--class", "test-class-py"))
val envs = driverContainerwithPySpark
.getEnv
.asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
JAVA_STEP_TYPE)
JAVA_STEP_TYPE,
SERVICE_STEP_TYPE)
}

test("Apply secrets step if secrets are present.") {
Expand All @@ -100,8 +100,31 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
SECRETS_STEP_TYPE,
JAVA_STEP_TYPE)
JAVA_STEP_TYPE,
SECRETS_STEP_TYPE)
}

test("Apply Java step if main resource is none.") {
val conf = KubernetesConf(
new SparkConf(false),
KubernetesDriverSpecificConf(
None,
"test-app",
"main",
Seq.empty),
"prefix",
"appId",
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
JAVA_STEP_TYPE,
SERVICE_STEP_TYPE)
}

test("Apply Python step if main resource is python.") {
Expand All @@ -123,8 +146,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
PYSPARK_STEP_TYPE)
PYSPARK_STEP_TYPE,
SERVICE_STEP_TYPE)
}

private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*)
Expand Down