Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added test for client mode pyspark shell into PythonTestsSuite
  • Loading branch information
AzureQ committed Nov 19, 2018
commit 4bf6bc6b2b9e12c7a29efe3929c4101db4957d4c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import scala.collection.JavaConverters._
import org.scalatest.concurrent.Eventually

import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT}

private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>

import PythonTestsSuite._
Expand Down Expand Up @@ -89,6 +94,90 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

test("Run bin/pyspark in client mode", k8sTestTag) {
val labels = Map("spark-app-selector" -> driverPodName)
val driverPort = 7077
val blockManagerPort = 10000
val driverService = testBackend
.getKubernetesClient
.services()
.inNamespace(kubernetesTestComponents.namespace)
.createNew()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withNewMetadata()
.withName(s"$driverPodName-svc")
.endMetadata()
.withNewSpec()
.withClusterIP("None")
.withSelector(labels.asJava)
.addNewPort()
.withName("driver-port")
.withPort(driverPort)
.withNewTargetPort(driverPort)
.endPort()
.addNewPort()
.withName("block-manager")
.withPort(blockManagerPort)
.withNewTargetPort(blockManagerPort)
.endPort()
.endSpec()
.done()
try {
val driverPod = testBackend
.getKubernetesClient
.pods()
.inNamespace(kubernetesTestComponents.namespace)
.createNew()
.withNewMetadata()
.withName(driverPodName)
.withLabels(labels.asJava)
.endMetadata()
.withNewSpec()
.withServiceAccountName(kubernetesTestComponents.serviceAccountName)
.addNewContainer()
.withName("pyspark-example")
.withImage(image)
.withImagePullPolicy("IfNotPresent")
.withCommand("/opt/spark/bin/pyspark")
.addToArgs("--master", s"k8s://https://kubernetes.default.svc")
.addToArgs("--deploy-mode", "client")
.addToArgs("--conf", s"spark.kubernetes.container.image="+pyImage)
.addToArgs(
"--conf",
s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}")
.addToArgs("--conf", "spark.kubernetes.authenticate.oauthTokenFile=" +
"/var/run/secrets/kubernetes.io/serviceaccount/token")
.addToArgs("--conf", "spark.kubernetes.authenticate.caCertFile=" +
"/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
.addToArgs("--conf", s"spark.kubernetes.driver.pod.name=$driverPodName")
.addToArgs("--conf", "spark.executor.memory=500m")
.addToArgs("--conf", "spark.executor.cores=1")
.addToArgs("--conf", "spark.executor.instances=1")
.addToArgs("--conf",
s"spark.driver.host=" +
s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc")
.addToArgs("--conf", s"spark.driver.port=$driverPort")
.addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort")
.endContainer()
.endSpec()
.done()
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPodName)
.getLog
.contains("SparkSession available"), "The application did not complete.")
}
} finally {
// Have to delete the service manually since it doesn't have an owner reference
kubernetesTestComponents
.kubernetesClient
.services()
.inNamespace(kubernetesTestComponents.namespace)
.delete(driverService)
}
}

}

private[spark] object PythonTestsSuite {
Expand Down