Skip to content
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fb5b9ed
initial architecture for PySpark w/o dockerfile work
ifilonenko Apr 16, 2018
b7b3db0
included entrypoint logic
ifilonenko Apr 17, 2018
98cef8c
satisfying integration tests
ifilonenko Apr 18, 2018
dc670dc
end-to-end working pyspark
ifilonenko Apr 18, 2018
eabe4b9
Merge pull request #1 from ifilonenko/py-spark
ifilonenko Apr 18, 2018
8d3debb
resolved comments and fixed --pyfiles issue and allowed for python2 o…
ifilonenko May 2, 2018
91e2a2c
Merge pull request #2 from ifilonenko/py-spark
ifilonenko May 2, 2018
5761ee8
Merge branch 'master' of https://github.com/ifilonenko/spark
ifilonenko May 2, 2018
98cc044
restructured step based pattern to resolve comments
ifilonenko May 7, 2018
678d381
Merge pull request #3 from ifilonenko/py-spark
ifilonenko May 7, 2018
bf738dc
resolved comments
ifilonenko May 8, 2018
c59068d
Merge pull request #4 from ifilonenko/py-spark
ifilonenko May 8, 2018
0344f90
resolving style issues
ifilonenko May 9, 2018
306f3ed
Merge pull request #5 from ifilonenko/py-spark
ifilonenko May 9, 2018
f2fc53e
resolved commits
ifilonenko May 13, 2018
6f66d60
merge conflicts
ifilonenko May 13, 2018
914ff75
resolve rebase conflicts
ifilonenko May 11, 2018
d400607
import statements refactoring
ifilonenko May 13, 2018
72953a3
Merge branch 'py-spark'
ifilonenko May 13, 2018
7bedeb6
resolved comments
ifilonenko May 31, 2018
1801e96
merge conflicts
ifilonenko May 31, 2018
24a704e
setIfMissing
ifilonenko Jun 1, 2018
6a6d69d
added e2e tests on --py-files and inclusion of docs on config values
ifilonenko Jun 7, 2018
ab92913
style issues
ifilonenko Jun 7, 2018
a61d897
resolve comments on docs and addition of unit test
ifilonenko Jun 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,20 @@ function build {
if [ ! -d "$IMG_PATH" ]; then
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
fi

local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local BINDING_BUILD_ARGS=(
--build-arg
base_img=$(image_ref spark)
)
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}

docker build "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
-f "$DOCKERFILE" .
-f "$BASEDOCKERFILE" .

docker build "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-py) \
-f "$PYDOCKERFILE" .
}

function push {
Expand All @@ -86,7 +94,8 @@ Commands:
push Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
-f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
-f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
-p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One (future concern) is how we would to handle the overlay with both Python and R at the same time.

-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
Expand Down Expand Up @@ -116,12 +125,14 @@ fi

REPO=
TAG=
DOCKERFILE=
BASEDOCKERFILE=
PYDOCKERFILE=
while getopts f:mr:t: option
do
case "${option}"
in
f) DOCKERFILE=${OPTARG};;
f) BASEDOCKERFILE=${OPTARG};;
p) PYDOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
m)
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,6 @@ private[spark] class SparkSubmit extends Logging {
case (STANDALONE, CLUSTER) if args.isR =>
error("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, _) if args.isPython =>
error("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
error("R applications are currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
Expand Down Expand Up @@ -694,9 +692,17 @@ private[spark] class SparkSubmit extends Logging {
if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
if (args.isPython) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic appears to duplicated from YARN, would it make sense to factor this out into a common function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We chatted about this off-line and while its close its not exactly the same so we can deal with minor parts of duplication for now.

childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
} else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
}
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)
Expand Down
16 changes: 16 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -624,4 +624,20 @@ specific to Spark on Kubernetes.
<code>spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.memoryOverheadFactor</code></td>
<td><code>0.1</code></td>
<td>
This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can maybe improve this documentation a little bit. It's not so much how much memory is set aside for non-JVM jobs, it's how much memory is set aside for non-JVM memory, including off-heap allocations, non-JVM jobs (like Python or R), and system processes.

This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This prempts this error with
a higher default.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.pyspark.pythonversion</code></td>
<td><code>"2"</code></td>
<td>
This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3.
</td>
</tr>
</table>
32 changes: 32 additions & 0 deletions examples/src/main/python/py_container_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import sys


def version_check(python_env, major_python_version):
"""
These are various tests to test the Python container image.
This file will be distributed via --py-files in the e2e tests.
"""
env_version = os.environ.get('PYSPARK_PYTHON')
print("Python runtime version check is: " +
str(sys.version_info[0] == major_python_version))

print("Python environment version check is: " +
str(env_version == python_env))
38 changes: 38 additions & 0 deletions examples/src/main/python/pyfiles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function

import sys

from pyspark.sql import SparkSession


if __name__ == "__main__":
"""
Usage: pyfiles [major_python_version]
"""
spark = SparkSession \
.builder \
.appName("PyFilesTest") \
.getOrCreate()

from py_container_checks import version_check
# Begin of Python container checks
version_check(sys.argv[1], 2 if sys.argv[1] == "python" else 3)

spark.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,28 @@ private[spark] object Config extends Logging {
.stringConf
.createWithDefault("spark")

val KUBERNETES_PYSPARK_PY_FILES =
ConfigBuilder("spark.kubernetes.python.pyFiles")
.doc("The PyFiles that are distributed via client arguments")
.internal()
.stringConf
.createOptional

val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE =
ConfigBuilder("spark.kubernetes.python.mainAppResource")
.doc("The main app resource for pyspark jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_PYSPARK_APP_ARGS =
ConfigBuilder("spark.kubernetes.python.appArgs")
.doc("The app arguments for PySpark Jobs")
.internal()
.stringConf
.createOptional


val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
Expand Down Expand Up @@ -154,6 +176,24 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")

val MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
.doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this thanks for adding this.

.doubleConf
.checkValue(mem_overhead => mem_overhead >= 0 && mem_overhead < 1,
"Ensure that memory overhead is a double between 0 --> 1.0")
.createWithDefault(0.1)

val PYSPARK_MAJOR_PYTHON_VERSION =
ConfigBuilder("spark.kubernetes.pyspark.pythonversion")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for leaving a comment in an ancient PR but I couldn't hold it. Why did we add a configuration to control Python version instead of using the existent PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON?

Doing this in a configuration breaks or disables many things, for example, PEX (https://medium.com/criteo-labs/packaging-code-with-pex-a-pyspark-example-9057f9f144f3) that requires to set PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON manually.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @dongjoon-hyun too FYI. Conda / virtualenv support enabled by #30486 wouldn't work in Kubernates because of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon sounds reasonable to include support for that, we just need to agree on a policy for which takes precedence.

.doc("This sets the major Python version. Either 2 or 3. (Python2 or Python3)")
.stringConf
.checkValue(pv => List("2", "3").contains(pv),
"Ensure that major Python version is either Python2 or Python3")
.createWithDefault("2")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading this right that the default is Python 2? Is there a reason for that? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason. I just thought that the major version should default to 2.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only ~18 months of support left for Python 2. Python 3 has been around for 10 years and unless there’s a good reason, I think it should be the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am willing to do that: thoughts @holdenk ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with either as the default. While Py2 is officially EOL I think we'll still see PySpark Py2 apps for awhile after.



val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,14 @@ private[spark] object Constants {
val SPARK_CONF_FILE_NAME = "spark.properties"
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"

// BINDINGS
val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN_MIB = 384L
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
*/
package org.apache.spark.deploy.k8s

import scala.collection.mutable

import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config.ConfigEntry


private[spark] sealed trait KubernetesRoleSpecificConf

/*
Expand Down Expand Up @@ -55,7 +58,8 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String]) {
roleEnvs: Map[String, String],
sparkFiles: Seq[String]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

Expand All @@ -64,10 +68,14 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def sparkFiles(): Seq[String] = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])
def pyFiles(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_PY_FILES)

def pySparkMainResource(): Option[String] = sparkConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems redundant with the driver specific spark conf's MainAppResource. Perhaps remove the need to specify this thing twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to parse out the MainAppResource (which I thought we should be doing only once... as such, I thought it would be cleaner to do this...

.get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE)

def pySparkPythonVersion(): String = sparkConf
.get(PYSPARK_MAJOR_PYTHON_VERSION)

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

Expand Down Expand Up @@ -102,17 +110,30 @@ private[spark] object KubernetesConf {
appId: String,
mainAppResource: Option[MainAppResource],
mainClass: String,
appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
appArgs: Array[String],
maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
val additionalFiles = mutable.ArrayBuffer.empty[String]
mainAppResource.foreach {
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
// The function of this outer match is to account for multiple nonJVM
// bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4
case nonJVM: NonJVMResource =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just match PythonMainAppResource immediately here - why the two layers of matching?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the R step should have the same amount of default MemoryOverhead. As should all NonJVMResources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth a comment then? Especially since R support isn't integrated right now it's perhaps not super clear to folks why this is being done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment since R isn't currently integrated could be a bit difficult to infer?

nonJVM match {
case PythonMainAppResource(res) =>
additionalFiles += res
maybePyFiles.foreach{maybePyFiles =>
additionalFiles.appendAll(maybePyFiles.split(","))}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR or JIRA, but for later maybe we should normalize our parsing of input files in a way which allows escape characters and share the logic between Yarn/K8s/Mesos/standalone. What do y'all think? Possible follow up JIRA: https://issues.apache.org/jira/browse/SPARK-24184

sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
}
sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to set this in the JVM case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is set later in BaseDriverStep

}

val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
Expand All @@ -135,6 +156,11 @@ private[spark] object KubernetesConf {
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)

val sparkFiles = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String]) ++ additionalFiles

KubernetesConf(
sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
Expand All @@ -144,7 +170,8 @@ private[spark] object KubernetesConf {
driverAnnotations,
driverSecretNamesToMountPaths,
driverSecretEnvNamesToKeyRefs,
driverEnvs)
driverEnvs,
sparkFiles)
}

def createExecutorConf(
Expand Down Expand Up @@ -186,6 +213,7 @@ private[spark] object KubernetesConf {
executorAnnotations,
executorMountSecrets,
executorEnvSecrets,
executorEnv)
executorEnv,
Seq.empty[String])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] object KubernetesUtils {
}
}

private def resolveFileUri(uri: String): String = {
def resolveFileUri(uri: String): String = {
val fileUri = Utils.resolveURI(uri)
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
Expand Down
Loading