Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fb5b9ed
initial architecture for PySpark w/o dockerfile work
ifilonenko Apr 16, 2018
b7b3db0
included entrypoint logic
ifilonenko Apr 17, 2018
98cef8c
satisfying integration tests
ifilonenko Apr 18, 2018
dc670dc
end-to-end working pyspark
ifilonenko Apr 18, 2018
eabe4b9
Merge pull request #1 from ifilonenko/py-spark
ifilonenko Apr 18, 2018
8d3debb
resolved comments and fixed --pyfiles issue and allowed for python2 o…
ifilonenko May 2, 2018
91e2a2c
Merge pull request #2 from ifilonenko/py-spark
ifilonenko May 2, 2018
5761ee8
Merge branch 'master' of https://github.com/ifilonenko/spark
ifilonenko May 2, 2018
98cc044
restructured step based pattern to resolve comments
ifilonenko May 7, 2018
678d381
Merge pull request #3 from ifilonenko/py-spark
ifilonenko May 7, 2018
bf738dc
resolved comments
ifilonenko May 8, 2018
c59068d
Merge pull request #4 from ifilonenko/py-spark
ifilonenko May 8, 2018
0344f90
resolving style issues
ifilonenko May 9, 2018
306f3ed
Merge pull request #5 from ifilonenko/py-spark
ifilonenko May 9, 2018
f2fc53e
resolved commits
ifilonenko May 13, 2018
6f66d60
merge conflicts
ifilonenko May 13, 2018
914ff75
resolve rebase conflicts
ifilonenko May 11, 2018
d400607
import statements refactoring
ifilonenko May 13, 2018
72953a3
Merge branch 'py-spark'
ifilonenko May 13, 2018
7bedeb6
resolved comments
ifilonenko May 31, 2018
1801e96
merge conflicts
ifilonenko May 31, 2018
24a704e
setIfMissing
ifilonenko Jun 1, 2018
6a6d69d
added e2e tests on --py-files and inclusion of docs on config values
ifilonenko Jun 7, 2018
ab92913
style issues
ifilonenko Jun 7, 2018
a61d897
resolve comments on docs and addition of unit test
ifilonenko Jun 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
resolved comments and fixed --pyfiles issue and allowed for python2 o…
…r python3 to be specified
  • Loading branch information
ifilonenko committed May 2, 2018
commit 8d3debb88d065df152ec84a900f963fa361e2bb1
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,18 @@ private[spark] object Config extends Logging {
.doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this thanks for adding this.

.doubleConf
.createWithDefault(0.10)
.checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1,
"Ensure that memory overhead is a double between 0 --> 1.0")
.createOptional

val PYSPARK_PYTHON_VERSION =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is minor, but I have a few questions about this element of the config.
First of if this is going to be majour version only lets call it something like majourPythonVersion (e.g. many python2 and python3s exist).

ConfigBuilder("spark.kubernetes.pyspark.pythonversion")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for leaving a comment in an ancient PR but I couldn't hold it. Why did we add a configuration to control Python version instead of using the existent PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON?

Doing this in a configuration breaks or disables many things, for example, PEX (https://medium.com/criteo-labs/packaging-code-with-pex-a-pyspark-example-9057f9f144f3) that requires to set PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON manually.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @dongjoon-hyun too FYI. Conda / virtualenv support enabled by #30486 wouldn't work in Kubernates because of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon sounds reasonable to include support for that, we just need to agree on a policy for which takes precedence.

.doc("This sets the python version. Either 2 or 3. (Python2 or Python3)")
.stringConf
.checkValue(pv => List("2", "3").contains(pv),
"Ensure that Python Version is either Python2 or Python3")
.createWithDefault("2")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading this right that the default is Python 2? Is there a reason for that? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason. I just thought that the major version should default to 2.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only ~18 months of support left for Python 2. Python 3 has been around for 10 years and unless there’s a good reason, I think it should be the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am willing to do that: thoughts @holdenk ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with either as the default. While Py2 is officially EOL I think we'll still see PySpark Py2 apps for awhile after.



val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private[spark] object Constants {
val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
val ENV_PYSPARK_PYTHON_VERSION = "PYSPARK_PYTHON_VERSION"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
def pySparkAppArgs(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_APP_ARGS)

def pySparkPythonVersion(): String = sparkConf
.get(PYSPARK_PYTHON_VERSION)

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

def imagePullSecrets(): Seq[LocalObjectReference] = {
Expand Down Expand Up @@ -131,7 +134,7 @@ private[spark] object KubernetesConf {
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_APP_ARGS, appArgs.mkString(" "))
}
sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4)
sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to set this in the JVM case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is set later in BaseDriverStep

}

val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] class BasicDriverFeatureStep(
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
private val memoryOverheadMiB = conf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt,
.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * driverMemoryMiB).toInt,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit strange we set the default to 0.1 here while in KubernetesConf#createDriverConf we're setting it to 0.4 if it's missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it easier to set it here as such, since it should default to 0.1 unless the non-JVM check modifies it to0.4?

MEMORY_OVERHEAD_MIN_MIB))
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ private[spark] class BasicExecutorFeatureStep(

private val memoryOverheadMiB = kubernetesConf
.get(EXECUTOR_MEMORY_OVERHEAD)
.getOrElse(math.max((kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
.getOrElse(math.max(
(kubernetesConf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) * executorMemoryMiB).toInt,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic happens twice, and I'd rather see it in a utils or config class where if we update it in one place it will take effect everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic is different since it takes in Driver vs. Executor configs to determine memory size. only similar logic is in kubernetesConf.get(MEMORY_OVERHEAD_FACTOR).getOrElse(0.1) which seems unnecessary to put into a Utils.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so multiple yes, but since have two instances of this magic 0.1 constant I'd rather have it shared somewhere incase we go to update this in the future and don't get everywhere. Could also be a shared constant if we want instead rather than a getter for memory overhead factor, either way keeps us honest.

MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder
import io.fabric8.kubernetes.api.model.HasMetadata

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants.{ENV_PYSPARK_ARGS, ENV_PYSPARK_FILES, ENV_PYSPARK_PRIMARY}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep

Expand All @@ -32,7 +32,7 @@ private[spark] class PythonDriverFeatureStep(
require(mainResource.isDefined, "PySpark Main Resource must be defined")
val otherPyFiles = kubernetesConf.pyFiles().map(pyFile =>
KubernetesUtils.resolveFileUrisAndPath(pyFile.split(","))
.mkString(",")).getOrElse("")
.mkString(":")).getOrElse("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a comment that we are switching from "," to ":" to match the format expected by the PYTHONPATH environment variable. ( http://xkcd.com/1987 )

val withPythonPrimaryFileContainer = new ContainerBuilder(pod.container)
.addNewEnv()
.withName(ENV_PYSPARK_ARGS)
Expand All @@ -46,6 +46,10 @@ private[spark] class PythonDriverFeatureStep(
.withName(ENV_PYSPARK_FILES)
.withValue(if (otherPyFiles == "") {""} else otherPyFiles)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, what is this logic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add empty env vars - see above.

.endEnv()
.addNewEnv()
.withName(ENV_PYSPARK_PYTHON_VERSION)
.withValue(kubernetesConf.pySparkPythonVersion())
.endEnv()
.build()
SparkPod(pod.pod, withPythonPrimaryFileContainer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class KubernetesConfSuite extends SparkFunSuite {
None)
assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",")
=== Array("local:///opt/spark/jar1.jar"))
assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1)
assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR).isEmpty)
}

test("Creating driver conf with a python primary file") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like also see a unit test for with a PyFile and an overriden memory overhead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaults are checked on 96 and 117. (But I need to ensure that it is possible to override as well. Will add)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a follow up we should have a test for with Python and overriding MEMORY_OVERHEAD_FACTOR (e.g. test to make sure that setIfMissing since we had it the other way earlier in the PR).

Expand All @@ -114,14 +114,15 @@ class KubernetesConfSuite extends SparkFunSuite {
Some(inputPyFiles.mkString(",")))
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
=== Array("local:///opt/spark/jar1.jar"))
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4)
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === Some(0.4))
assert(kubernetesConfWithMainResource.sparkFiles
=== Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles)
}


test("Resolve driver labels, annotations, secret mount paths, and envs.") {
test("Resolve driver labels, annotations, secret mount paths, envs, and memory overhead") {
val sparkConf = new SparkConf(false)
.set(MEMORY_OVERHEAD_FACTOR, 0.3)
CUSTOM_LABELS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$key", value)
}
Expand Down Expand Up @@ -151,6 +152,7 @@ class KubernetesConfSuite extends SparkFunSuite {
assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
assert(conf.roleEnvs === CUSTOM_ENVS)
assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === Some(0.3))
}

test("Basic executor translated fields.") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
val mainResource = "local:///main.py"
val pyFiles = Seq("local:///example2.py", "local:///example3.py")
val expectedPySparkFiles =
"/example2.py,/example3.py"
"/example2.py:/example3.py"
val baseDriverPod = SparkPod.initialPod()
val sparkConf = new SparkConf(false)
.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource)
Expand All @@ -57,7 +57,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
val step = new PythonDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
assert(driverContainerwithPySpark.getEnv.size === 3)
assert(driverContainerwithPySpark.getEnv.size === 4)
val envs = driverContainerwithPySpark
.getEnv
.asScala
Expand All @@ -66,12 +66,14 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
assert(envs(ENV_PYSPARK_PRIMARY) === expectedMainResource)
assert(envs(ENV_PYSPARK_FILES) === expectedPySparkFiles)
assert(envs(ENV_PYSPARK_ARGS) === "5 7")
assert(envs(ENV_PYSPARK_PYTHON_VERSION) === "2")
}
test("Python Step testing empty pyfiles") {
val mainResource = "local:///main.py"
val baseDriverPod = SparkPod.initialPod()
val sparkConf = new SparkConf(false)
.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource)
.set(PYSPARK_PYTHON_VERSION, "3")
val kubernetesConf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
Expand All @@ -89,13 +91,14 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite {
val step = new PythonDriverFeatureStep(kubernetesConf)
val driverPod = step.configurePod(baseDriverPod).pod
val driverContainerwithPySpark = step.configurePod(baseDriverPod).container
assert(driverContainerwithPySpark.getEnv.size === 3)
assert(driverContainerwithPySpark.getEnv.size === 4)
val envs = driverContainerwithPySpark
.getEnv
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs(ENV_PYSPARK_FILES) === "")
assert(envs(ENV_PYSPARK_ARGS) === "")
assert(envs(ENV_PYSPARK_PYTHON_VERSION) === "3")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
ARG base_img
FROM $base_img
WORKDIR /
COPY python /opt/spark/python
RUN mkdir ${SPARK_HOME}/python
COPY python/lib ${SPARK_HOME}/python/lib
RUN apk add --no-cache python && \
apk add --no-cache python3 && \
python -m ensurepip && \
python3 -m ensurepip && \
rm -r /usr/lib/python*/ensurepip && \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment about why this part?

pip install --upgrade pip setuptools && \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this goes to python2 only, I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Would love recommendations on dependency management in regards to ‘pip’ as it’s tricky to allow for both pip installation and pip3 installation. Unless I use two separate virtual environments for dependency management

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you can run both pip and pip3 with the same packages and they'll get installed in different directories and shouldn't stomp on top of eachother. That being said long term venvs are probably the way we want to go, but as we've discussed those are probably non-trivial and should go in a second PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will include pip3.6 installation for now until we figure out a long-term venv solution in the next PR

rm -r /root/.cache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just being done for space reasons?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

ENV PYTHON_VERSION 2.7.13
ENV PYSPARK_PYTHON python
ENV PYSPARK_DRIVER_PYTHON python
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.6-src.zip:${PYTHONPATH}

ENV PYTHONPATH ${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip

WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,21 @@ if [ -n "$SPARK_MOUNTED_FILES_DIR" ]; then
cp -R "$SPARK_MOUNTED_FILES_DIR/." .
fi

PYSPARK_SECONDARY="$PYSPARK_APP_ARGS"
if [ ! -z "$PYSPARK_FILES" ]; then
PYSPARK_SECONDARY="$PYSPARK_FILES $PYSPARK_APP_ARGS"
if [ -n "$PYSPARK_FILES" ]; then
PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES"
fi


if [ "$PYSPARK_PYTHON_VERSION" == "2" ]; then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fine if we don't set this at all, yeah?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set the PySpark version to default to 2 in the configs

pyv="$(python -V 2>&1)"
export PYTHON_VERSION="${pyv:7}"
export PYSPARK_PYTHON="python"
export PYSPARK_DRIVER_PYTHON="python"
elif [ "$PYSPARK_PYTHON_VERSION" == "3" ]; then
pyv3="$(python3 -V 2>&1)"
export PYTHON_VERSION="${pyv3:7}"
export PYSPARK_PYTHON="python3"
export PYSPARK_DRIVER_PYTHON="python3"
fi

case "$SPARK_K8S_CMD" in
driver)
Expand All @@ -74,7 +83,7 @@ case "$SPARK_K8S_CMD" in
"$SPARK_HOME/bin/spark-submit"
--conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
--deploy-mode client
"$@" $PYSPARK_PRIMARY $PYSPARK_SECONDARY
"$@" $PYSPARK_PRIMARY $PYSPARK_APP_ARGS
)
;;

Expand Down