Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
added pyspark testing
  • Loading branch information
ifilonenko committed Apr 18, 2018
commit b1703d9da0433150a89d55abd780a6866617879e
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkAppConf: SparkAppConf = _
private var image: String = _
private var pyImage: String = _
private var containerLocalSparkDistroExamplesJar: String = _
private var appLocator: String = _
private var driverPodName: String = _
Expand All @@ -62,6 +63,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
val imageTag = getTestImageTag
val imageRepo = getTestImageRepo
image = s"$imageRepo/spark:$imageTag"
pyImage = s"$imageRepo/spark-py:$imageTag"

val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars"))
.toFile
Expand Down Expand Up @@ -184,20 +186,37 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME))
}

test("Run PySpark on simple pi.py example") {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_PI,
mainClass = "",
expectedLogOnCompletion = Seq("Pi is roughly 3"),
appArgs = Array("5"),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
appLocator = appLocator,
isJVM = false)
}


private def runSparkPiAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
appArgs: Array[String] = Array.empty[String],
appLocator: String = appLocator): Unit = {
appLocator: String = appLocator,
isJVM: Boolean = true ): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_PI_MAIN_CLASS,
Seq("Pi is roughly 3"),
appArgs,
driverPodChecker,
executorPodChecker,
appLocator)
appLocator,
isJVM)
}

private def runSparkRemoteCheckAndVerifyCompletion(
Expand All @@ -213,7 +232,8 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
appArgs,
driverPodChecker,
executorPodChecker,
appLocator)
appLocator,
true)
}

private def runSparkJVMCheckAndVerifyCompletion(
Expand All @@ -226,7 +246,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir, true)

val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
Expand All @@ -248,19 +268,22 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
}
}



private def runSparkApplicationAndVerifyCompletion(
appResource: String,
mainClass: String,
expectedLogOnCompletion: Seq[String],
appArgs: Array[String],
driverPodChecker: Pod => Unit,
executorPodChecker: Pod => Unit,
appLocator: String): Unit = {
appLocator: String,
isJVM: Boolean): Unit = {
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
appArgs = appArgs)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir, isJVM)

val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
Expand Down Expand Up @@ -298,11 +321,22 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
}

private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
}

private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === image)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}

private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}

private def checkCustomSettings(pod: Pod): Unit = {
assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
Expand Down Expand Up @@ -384,7 +418,8 @@ private[spark] object KubernetesSuite {
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"

val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
val TEST_SECRET_NAME_PREFIX = "test-secret-"
val TEST_SECRET_KEY = "test-key"
val TEST_SECRET_VALUE = "test-data"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,21 @@ private[spark] object SparkAppLauncher extends Logging {
appArguments: SparkAppArguments,
appConf: SparkAppConf,
timeoutSecs: Int,
sparkHomeDir: Path): Unit = {
sparkHomeDir: Path,
isJVM: Boolean): Unit = {
val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit"))
logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
val commandLine = mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
val preCommandLine = if (isJVM) {
mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
"--deploy-mode", "cluster",
"--class", appArguments.mainClass,
"--master", appConf.get("spark.master")
) ++ appConf.toStringArray :+
appArguments.mainAppResource
"--master", appConf.get("spark.master"))
} else {
mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
"--deploy-mode", "cluster",
"--master", appConf.get("spark.master"))
}
val commandLine = preCommandLine ++ appConf.toStringArray :+ appArguments.mainAppResource
if (appArguments.appArgs.nonEmpty) {
commandLine += appArguments.appArgs.mkString(" ")
}
Expand Down