Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
addition of --py-files test
  • Loading branch information
ifilonenko committed Jun 7, 2018
commit e06454f46f1bfa398293d884d500c7b12f2cde7f
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,41 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
isJVM = false)
}

test("Run PySpark with Python2 to test a pyfiles example") {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.pyspark.pythonversion", "2")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
mainClass = "",
expectedLogOnCompletion = Seq(
"Python runtime version check is: True",
"Python environment version check is: True"),
appArgs = Array("python"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

test("Run PySpark with Python3 to test a pyfiles example") {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.pyspark.pythonversion", "3")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: looks like pythonVersion is the right one based on the style of existing Spark config properties.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hehe... well... I will make sure to change that style in a follow-up PR to core.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
mainClass = "",
expectedLogOnCompletion = Seq(
"Python runtime version check is: True",
"Python environment version check is: True"),
appArgs = Array("python3"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

private def runSparkPiAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
Expand Down Expand Up @@ -278,12 +313,19 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
driverPodChecker: Pod => Unit,
executorPodChecker: Pod => Unit,
appLocator: String,
isJVM: Boolean): Unit = {
isJVM: Boolean,
pyFiles: Option[String] = None): Unit = {
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir, isJVM)
SparkAppLauncher.launch(
appArguments,
sparkAppConf,
TIMEOUT.value.toSeconds.toInt,
sparkHomeDir,
isJVM,
pyFiles)

val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
Expand Down Expand Up @@ -420,6 +462,8 @@ private[spark] object KubernetesSuite {
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py"
val TEST_SECRET_NAME_PREFIX = "test-secret-"
val TEST_SECRET_KEY = "test-key"
val TEST_SECRET_VALUE = "test-data"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ private[spark] object SparkAppLauncher extends Logging {
appConf: SparkAppConf,
timeoutSecs: Int,
sparkHomeDir: Path,
isJVM: Boolean): Unit = {
isJVM: Boolean,
pyFiles: Option[String] = None): Unit = {
val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit"))
logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
val preCommandLine = if (isJVM) {
Expand All @@ -113,7 +114,10 @@ private[spark] object SparkAppLauncher extends Logging {
"--deploy-mode", "cluster",
"--master", appConf.get("spark.master"))
}
val commandLine = preCommandLine ++ appConf.toStringArray :+ appArguments.mainAppResource
val commandLine =
pyFiles.map(s => preCommandLine ++ Array("--py-files", s)).getOrElse(preCommandLine) ++
appConf.toStringArray :+ appArguments.mainAppResource

if (appArguments.appArgs.nonEmpty) {
commandLine += appArguments.appArgs.mkString(" ")
}
Expand Down