Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fb5b9ed
initial architecture for PySpark w/o dockerfile work
ifilonenko Apr 16, 2018
b7b3db0
included entrypoint logic
ifilonenko Apr 17, 2018
98cef8c
satisfying integration tests
ifilonenko Apr 18, 2018
dc670dc
end-to-end working pyspark
ifilonenko Apr 18, 2018
eabe4b9
Merge pull request #1 from ifilonenko/py-spark
ifilonenko Apr 18, 2018
8d3debb
resolved comments and fixed --pyfiles issue and allowed for python2 o…
ifilonenko May 2, 2018
91e2a2c
Merge pull request #2 from ifilonenko/py-spark
ifilonenko May 2, 2018
5761ee8
Merge branch 'master' of https://github.com/ifilonenko/spark
ifilonenko May 2, 2018
98cc044
restructured step based pattern to resolve comments
ifilonenko May 7, 2018
678d381
Merge pull request #3 from ifilonenko/py-spark
ifilonenko May 7, 2018
bf738dc
resolved comments
ifilonenko May 8, 2018
c59068d
Merge pull request #4 from ifilonenko/py-spark
ifilonenko May 8, 2018
0344f90
resolving style issues
ifilonenko May 9, 2018
306f3ed
Merge pull request #5 from ifilonenko/py-spark
ifilonenko May 9, 2018
f2fc53e
resolved commits
ifilonenko May 13, 2018
6f66d60
merge conflicts
ifilonenko May 13, 2018
914ff75
resolve rebase conflicts
ifilonenko May 11, 2018
d400607
import statements refactoring
ifilonenko May 13, 2018
72953a3
Merge branch 'py-spark'
ifilonenko May 13, 2018
7bedeb6
resolved comments
ifilonenko May 31, 2018
1801e96
merge conflicts
ifilonenko May 31, 2018
24a704e
setIfMissing
ifilonenko Jun 1, 2018
6a6d69d
added e2e tests on --py-files and inclusion of docs on config values
ifilonenko Jun 7, 2018
ab92913
style issues
ifilonenko Jun 7, 2018
a61d897
resolve comments on docs and addition of unit test
ifilonenko Jun 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merge conflicts
  • Loading branch information
ifilonenko committed May 31, 2018
commit 1801e9665dedda4ce1fc4286f49cbcf5ef1b1b0c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String],
sparkFiles: Seq[String]) {

Expand Down Expand Up @@ -168,6 +169,7 @@ private[spark] object KubernetesConf {
driverLabels,
driverAnnotations,
driverSecretNamesToMountPaths,
driverSecretEnvNamesToKeyRefs,
driverEnvs,
sparkFiles)
}
Expand Down Expand Up @@ -209,7 +211,8 @@ private[spark] object KubernetesConf {
appId,
executorLabels,
executorAnnotations,
executorSecrets,
executorMountSecrets,
executorEnvSecrets,
executorEnv,
Seq.empty[String])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package org.apache.spark.deploy.k8s.submit

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeatureConfigStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep}

private[spark] class KubernetesDriverBuilder(
Expand All @@ -32,6 +31,9 @@ private[spark] class KubernetesDriverBuilder(
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> LocalDirsFeatureStep) =
new LocalDirsFeatureStep(_),
Expand All @@ -51,16 +53,24 @@ private[spark] class KubernetesDriverBuilder(
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))

val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
Some(provideSecretsStep(kubernetesConf)) } else None

val maybeProvideSecretsStep = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
Some(provideEnvSecretsStep(kubernetesConf)) } else None

val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map {
case JavaMainAppResource(_) =>
provideJavaStep(kubernetesConf)
case PythonMainAppResource(_) =>
providePythonStep(kubernetesConf)}.getOrElse(provideJavaStep(kubernetesConf))

val allFeatures: Seq[KubernetesFeatureConfigStep] =
(baseFeatures :+ bindingsStep) ++
maybeRoleSecretNamesStep.toSeq
maybeRoleSecretNamesStep.toSeq ++
maybeProvideSecretsStep.toSeq

var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
val configuredPod = feature.configurePod(spec.pod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.scheduler.cluster.k8s

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, KubernetesFeatureConfigStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, KubernetesFeatureConfigStep, LocalDirsFeatureStep, MountSecretsFeatureStep}

private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
Expand All @@ -37,11 +37,18 @@ private[spark] class KubernetesExecutorBuilder(
val baseFeatures = Seq(
provideBasicStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))

val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
Some(provideSecretsStep(kubernetesConf)) } else None

val maybeProvideSecretsStep = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
Some(provideEnvSecretsStep(kubernetesConf)) } else None

val allFeatures: Seq[KubernetesFeatureConfigStep] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not need any changes/arg passing during executor pod construction?

Copy link
Contributor Author

@ifilonenko ifilonenko Apr 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but there will be more features and I thought that doing options in the setting of allFeatures was cleaner

baseFeatures ++
maybeRoleSecretNamesStep.toSeq
maybeRoleSecretNamesStep.toSeq ++
maybeProvideSecretsStep.toSeq

var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
executorPod = feature.configurePod(executorPod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
DRIVER_ENVS,
Seq.empty[String])

Expand Down Expand Up @@ -140,6 +141,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
DRIVER_ENVS,
Seq.empty[String])
val pythonKubernetesConf = KubernetesConf(
Expand All @@ -154,6 +156,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
DRIVER_ENVS,
Seq.empty[String])
val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf)
Expand Down Expand Up @@ -184,6 +187,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
DRIVER_ENVS,
allFiles)
val step = new BasicDriverFeatureStep(kubernetesConf)
val additionalProperties = step.getAdditionalPodSystemProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class BasicExecutorFeatureStepSuite
ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String]))
val executor = step.configurePod(SparkPod.initialPod())

Expand Down Expand Up @@ -126,6 +127,7 @@ class BasicExecutorFeatureStepSuite
ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String]))
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
}
Expand All @@ -144,6 +146,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map("qux" -> "quux"),
Seq.empty[String]))
val executor = step.configurePod(SparkPod.initialPod())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
Expand Down Expand Up @@ -90,6 +91,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])

val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
Expand Down Expand Up @@ -127,6 +129,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String]))
assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
assert(configurationStep.getAdditionalKubernetesResources().size === 1)
Expand Down Expand Up @@ -96,6 +97,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String]))
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
Expand All @@ -116,6 +118,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String]))
val resolvedService = configurationStep
.getAdditionalKubernetesResources()
Expand Down Expand Up @@ -145,6 +148,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String]),
clock)
val driverService = configurationStep
Expand All @@ -171,6 +175,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String]),
clock)
fail("The driver bind address should not be allowed.")
Expand All @@ -195,6 +200,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String]),
clock)
fail("The driver host address should not be allowed.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{
Map.empty,
Map.empty,
envVarsToKeys,
Map.empty)
Map.empty,
Seq.empty[String])

val step = new EnvSecretsFeatureStep(kubernetesConf)
val driverContainerWithEnvSecrets = step.configurePod(baseDriverPod).container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite {
Map.empty,
secretNamesToMountPaths,
Map.empty,
Map.empty,
Seq.empty[String])

val step = new MountSecretsFeatureStep(kubernetesConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite {
roleLabels = Map.empty,
roleAnnotations = Map.empty,
roleSecretNamesToMountPaths = Map.empty,
roleSecretEnvNamesToKeyRefs = Map.empty,
roleEnvs = Map.empty,
sparkFiles = Seq.empty[String])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
roleLabels = Map.empty,
roleAnnotations = Map.empty,
roleSecretNamesToMountPaths = Map.empty,
roleSecretEnvNamesToKeyRefs = Map.empty,
roleEnvs = Map.empty,
sparkFiles = Seq.empty[String])

Expand Down Expand Up @@ -85,6 +86,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
roleLabels = Map.empty,
roleAnnotations = Map.empty,
roleSecretNamesToMountPaths = Map.empty,
roleSecretEnvNamesToKeyRefs = Map.empty,
roleEnvs = Map.empty,
sparkFiles = Seq.empty[String])
val step = new PythonDriverFeatureStep(kubernetesConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC)
when(kubernetesClient.pods()).thenReturn(podOperations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf}
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep}

class KubernetesDriverBuilderSuite extends SparkFunSuite {
Expand All @@ -30,6 +30,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val SECRETS_STEP_TYPE = "mount-secrets"
private val JAVA_STEP_TYPE = "java-bindings"
private val PYSPARK_STEP_TYPE = "pyspark-bindings"
private val ENV_SECRETS_STEP_TYPE = "env-secrets"

private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep])
Expand All @@ -52,12 +53,16 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val pythonStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep])

private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])

private val builderUnderTest: KubernetesDriverBuilder =
new KubernetesDriverBuilder(
_ => basicFeatureStep,
_ => credentialsStep,
_ => serviceStep,
_ => secretsStep,
_ => envSecretsStep,
_ => localDirsStep,
_ => javaStep,
_ => pythonStep)
Expand All @@ -76,6 +81,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
Expand All @@ -90,7 +96,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
val conf = KubernetesConf(
new SparkConf(false),
KubernetesDriverSpecificConf(
Some(JavaMainAppResource("example.jar")),
None,
"test-app",
"main",
Seq.empty),
Expand All @@ -99,6 +105,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map("secret" -> "secretMountPath"),
Map("EnvName" -> "SecretName:secretKey"),
Map.empty,
Seq.empty[String])
validateStepTypesApplied(
Expand All @@ -108,6 +115,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
SERVICE_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
SECRETS_STEP_TYPE,
ENV_SECRETS_STEP_TYPE,
JAVA_STEP_TYPE)
}

Expand All @@ -125,6 +133,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
Expand All @@ -149,6 +158,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
Expand All @@ -69,6 +70,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
Map.empty,
Map.empty,
Map("secret" -> "secretMountPath"),
Map("secret-name" -> "secret-key"),
Map.empty,
Seq.empty[String])
validateStepTypesApplied(
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.