Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove erroring test case for pyspark shell
  • Loading branch information
AzureQ committed Nov 29, 2018
commit 149bed4c7083d04a7a05e931c8584670fc937cb2
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import scala.collection.JavaConverters._
import org.scalatest.concurrent.Eventually

import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT}

private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>

import PythonTestsSuite._
Expand Down Expand Up @@ -94,90 +89,6 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

test("Run PySpark shell", k8sTestTag) {
val labels = Map("spark-app-selector" -> driverPodName)
val driverPort = 7077
val blockManagerPort = 10000
val driverService = testBackend
.getKubernetesClient
.services()
.inNamespace(kubernetesTestComponents.namespace)
.createNew()
.withNewMetadata()
.withName(s"$driverPodName-svc")
.endMetadata()
.withNewSpec()
.withClusterIP("None")
.withSelector(labels.asJava)
.addNewPort()
.withName("driver-port")
.withPort(driverPort)
.withNewTargetPort(driverPort)
.endPort()
.addNewPort()
.withName("block-manager")
.withPort(blockManagerPort)
.withNewTargetPort(blockManagerPort)
.endPort()
.endSpec()
.done()
try {
val driverPod = testBackend
.getKubernetesClient
.pods()
.inNamespace(kubernetesTestComponents.namespace)
.createNew()
.withNewMetadata()
.withName(driverPodName)
.withLabels(labels.asJava)
.endMetadata()
.withNewSpec()
.withServiceAccountName(kubernetesTestComponents.serviceAccountName)
.addNewContainer()
.withName("pyspark-shell")
.withImage(pyImage)
.withImagePullPolicy("IfNotPresent")
.withCommand("/opt/spark/bin/pyspark")
.addToArgs("--master", s"k8s://https://kubernetes.default.svc")
.addToArgs("--deploy-mode", "client")
.addToArgs("--conf", s"spark.kubernetes.container.image="+pyImage)
.addToArgs(
"--conf",
s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}")
.addToArgs("--conf", "spark.kubernetes.authenticate.oauthTokenFile=" +
"/var/run/secrets/kubernetes.io/serviceaccount/token")
.addToArgs("--conf", "spark.kubernetes.authenticate.caCertFile=" +
"/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
.addToArgs("--conf", s"spark.kubernetes.driver.pod.name=$driverPodName")
.addToArgs("--conf", "spark.executor.memory=500m")
.addToArgs("--conf", "spark.executor.cores=1")
.addToArgs("--conf", "spark.executor.instances=1")
.addToArgs("--conf",
s"spark.driver.host=" +
s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc")
.addToArgs("--conf", s"spark.driver.port=$driverPort")
.addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort")
.endContainer()
.endSpec()
.done()
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPodName)
.getLog
.contains("SparkSession available"), "The application did not complete.")
}
} finally {
// Have to delete the service manually since it doesn't have an owner reference
kubernetesTestComponents
.kubernetesClient
.services()
.inNamespace(kubernetesTestComponents.namespace)
.delete(driverService)
}
}

}

private[spark] object PythonTestsSuite {
Expand All @@ -187,4 +98,4 @@ private[spark] object PythonTestsSuite {
val PYSPARK_FILES: String = TEST_LOCAL_PYSPARK + "pyfiles.py"
val PYSPARK_CONTAINER_TESTS: String = TEST_LOCAL_PYSPARK + "py_container_checks.py"
val PYSPARK_MEMORY_CHECK: String = TEST_LOCAL_PYSPARK + "worker_memory_check.py"
}
}