Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fb5b9ed
initial architecture for PySpark w/o dockerfile work
ifilonenko Apr 16, 2018
b7b3db0
included entrypoint logic
ifilonenko Apr 17, 2018
98cef8c
satisfying integration tests
ifilonenko Apr 18, 2018
dc670dc
end-to-end working pyspark
ifilonenko Apr 18, 2018
eabe4b9
Merge pull request #1 from ifilonenko/py-spark
ifilonenko Apr 18, 2018
8d3debb
resolved comments and fixed --pyfiles issue and allowed for python2 o…
ifilonenko May 2, 2018
91e2a2c
Merge pull request #2 from ifilonenko/py-spark
ifilonenko May 2, 2018
5761ee8
Merge branch 'master' of https://github.com/ifilonenko/spark
ifilonenko May 2, 2018
98cc044
restructured step based pattern to resolve comments
ifilonenko May 7, 2018
678d381
Merge pull request #3 from ifilonenko/py-spark
ifilonenko May 7, 2018
bf738dc
resolved comments
ifilonenko May 8, 2018
c59068d
Merge pull request #4 from ifilonenko/py-spark
ifilonenko May 8, 2018
0344f90
resolving style issues
ifilonenko May 9, 2018
306f3ed
Merge pull request #5 from ifilonenko/py-spark
ifilonenko May 9, 2018
f2fc53e
resolved commits
ifilonenko May 13, 2018
6f66d60
merge conflicts
ifilonenko May 13, 2018
914ff75
resolve rebase conflicts
ifilonenko May 11, 2018
d400607
import statements refactoring
ifilonenko May 13, 2018
72953a3
Merge branch 'py-spark'
ifilonenko May 13, 2018
7bedeb6
resolved comments
ifilonenko May 31, 2018
1801e96
merge conflicts
ifilonenko May 31, 2018
24a704e
setIfMissing
ifilonenko Jun 1, 2018
6a6d69d
added e2e tests on --py-files and inclusion of docs on config values
ifilonenko Jun 7, 2018
ab92913
style issues
ifilonenko Jun 7, 2018
a61d897
resolve comments on docs and addition of unit test
ifilonenko Jun 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
satisfying integration tests
  • Loading branch information
ifilonenko committed Apr 18, 2018
commit 98cef8ceb0f04cfcefbc482c2a0fe39c75f620c4
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,9 @@ private[spark] class SparkSubmit extends Logging {
if (args.isPython) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic appears to duplicated from YARN, would it make sense to factor this out into a common function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We chatted about this off-line and while its close its not exactly the same so we can deal with minor parts of duplication for now.

childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
childArgs ++= Array("--other-py-files", args.pyFiles)
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
} else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def pyFiles(): Seq[String] = sparkConf
def pyFiles(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_PY_FILES)
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def pySparkMainResource(): Option[String] = sparkConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems redundant with the driver specific spark conf's MainAppResource. Perhaps remove the need to specify this thing twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to parse out the MainAppResource (which I thought we should be doing only once... as such, I thought it would be cleaner to do this...

.get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[spark] class BasicDriverFeatureStep(
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
}

val driverContainer = new ContainerBuilder(pod.container)
val withoutArgsDriverContainer: ContainerBuilder = new ContainerBuilder(pod.container)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous name seemed clearer to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a corresponding driver container with args?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. look below

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we do set arguments on this one right? If not please insert a white space so I can see the different visually.

.withName(DRIVER_CONTAINER_NAME)
.withImage(driverContainerImage)
.withImagePullPolicy(conf.imagePullPolicy())
Expand All @@ -97,12 +97,20 @@ private[spark] class BasicDriverFeatureStep(
.addToArgs(driverDockerContainer)
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", conf.roleSpecificConf.mainClass)
// The user application jar is merged into the spark.jars list and managed through that
// property, so there is no need to reference it explicitly here.
.addToArgs(SparkLauncher.NO_RESOURCE)
.addToArgs(conf.roleSpecificConf.appArgs: _*)
.build()

val driverContainer =
if (driverDockerContainer == "driver-py") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we can discover if it's a Python application in a better way here. Probably using the built up spark conf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can check the appResource but that was already done. I thought it would be overkill to check twice since it was already handled in setting driverDockerContainer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general I'd prefer having two separate step types here. They can share some logic in either a utils class or a shared superclass. But you only apply one step type for Java apps vs one step type for Python apps.

Another way is to have the basic driver step only do work that would be strictly agnostic of python vs java, and then have a separate step for either Java or Python; the orchestrator picks which one to invoke based on the app resource type. To do this I think the step's constructor needs to take more than just the KubernetesConf as an argument - it needs to take the appropriate specifically-typed MainAppResource as an argument in the constructor as well. This breaks the convention that we've set so far but for now that's probably ok, as long as we don't get parameter length blowup as we go forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second way is the approach that I envisioned and tried to implement. It seems that the approach (without putting too much work on the KubernetesConf) breaks the contract we defined tho.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what about applications which need Python support (e.g. have Python UDFS) but don't use a Python driver process?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what about applications which need Python support (e.g. have Python UDFS) but don't use a Python driver process?

Think that's up to the user to make it work - I don't see this being specifically handled by the other cluster managers.

The goal of this PR should be to bring Kubernetes up to par with the other cluster managers with respect to what they provide.Do the other cluster managers provide any specific support for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently are only running the Python and future R step when we are leveraging a Python (or R) driver process. Else the user would just specify the spark-py docker-image no? and then just continue to run a non-Python driver process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I was forgot that folks could specify the driver container separately from the worker container nvm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ifilonenko I think this still needs some work to clean up.

What I expect to happen is to have three step types:

  1. BasicDriverFeatureStep, which is what's here except we don't provide the args to the container in this step anymore.
  2. PythonDriverFeatureStep which does both what the PythonDriverFeatureStep does currently plus adds the driver-py argument
  3. JavaDriverFeatureStep which only adds the argument SparkLauncher.NO_RESOURCE, conf.roleSpecificConf.appArgs, etc.

Then in the KubernetesDriverBuilder, always apply the first step, and select which of 2 or 3 to apply based on the app resource type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I didn't know if we wanted to include a JavaDriverFeatureStep. I will do so then.

withoutArgsDriverContainer
.addToArgs(conf.roleSpecificConf.appArgs: _*)
.build()
} else {
// The user application jar is merged into the spark.jars list and managed through that
// property, so there is no need to reference it explicitly here.
withoutArgsDriverContainer
.addToArgs(SparkLauncher.NO_RESOURCE)
.addToArgs(conf.roleSpecificConf.appArgs: _*)
.build()
}
val driverPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(driverPodName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ private[spark] class PythonDriverFeatureStep(
override def configurePod(pod: SparkPod): SparkPod = {
val mainResource = kubernetesConf.pySparkMainResource()
require(mainResource.isDefined, "PySpark Main Resource must be defined")
val otherPyFiles = kubernetesConf.pyFiles()
val otherPyFiles = kubernetesConf.pyFiles().map(pyFile =>
KubernetesUtils.resolveFileUrisAndPath(pyFile.split(","))
.mkString(",")).getOrElse("null")
val withPythonPrimaryFileContainer = new ContainerBuilder(pod.container)
.addNewEnv()
.withName(ENV_PYSPARK_PRIMARY)
.withValue(KubernetesUtils.resolveFileUri(mainResource.get))
.endEnv()
.addNewEnv()
.withName(ENV_PYSPARK_FILES)
.withValue(KubernetesUtils.resolveFileUrisAndPath(otherPyFiles).mkString(","))
.withValue(if (otherPyFiles == "") {"null"} else otherPyFiles)
.endEnv()
.build()
SparkPod(pod.pod, withPythonPrimaryFileContainer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ private[spark] class KubernetesDriverBuilder(
provideServiceStep(kubernetesConf))
val maybeRoleSecretNamesStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
Some(provideSecretsStep(kubernetesConf)) } else None
val maybeNonJVMBindings = kubernetesConf.roleSpecificConf.mainAppResource.map {
case PythonMainAppResource(_) =>
providePythonStep(kubernetesConf)
val maybeNonJVMBindings = kubernetesConf.roleSpecificConf.mainAppResource.getOrElse(None)
match {
case PythonMainAppResource(_) =>
Some(providePythonStep(kubernetesConf))
case _ => None
}
val allFeatures: Seq[KubernetesFeatureConfigStep] =
baseFeatures ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,35 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
assert(envs(ENV_PYSPARK_PRIMARY) === expectedMainResource)
assert(envs(ENV_PYSPARK_FILES) === expectedPySparkFiles)
}
test("Python Step testing empty pyfiles") {
val mainResource = "local:///main.py"
val baseDriverPod = SparkPod.initialPod()
val sparkConf = new SparkConf(false)
.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource)
.set(KUBERNETES_PYSPARK_PY_FILES, "")
val kubernetesConf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
Some(PythonMainAppResource("local:///main.py")),
"test-app",
"python-runner",
Seq.empty[String]),
"",
"",
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Seq.empty[String])
val step = new PythonDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
assert(driverContainerwithPySpark.getEnv.size === 2)
val envs = driverContainerwithPySpark
.getEnv
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs(ENV_PYSPARK_FILES) === "null")
}
}