Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use spark-submit
  • Loading branch information
kimoonkim committed Dec 16, 2017
commit 5585a0a4dbc99d9c8c6228d265f1077895e27a2e
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.deploy.k8s.integrationtest
import java.io.{File, FileOutputStream}
import java.nio.file.Paths
import java.util.{Properties, UUID}
import java.util.regex.Pattern

import com.google.common.base.Charsets
import com.google.common.io.Files
import com.google.common.io.{Files, PatternFilenameFilter}
import io.fabric8.kubernetes.client.internal.readiness.Readiness
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
Expand All @@ -30,13 +31,14 @@ import scala.collection.JavaConverters._

import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory
import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH

private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter {
import KubernetesSuite._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an empty line before the import.

private val testBackend = IntegrationTestBackendFactory.getTestBackend()
private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "")
private var kubernetesTestComponents: KubernetesTestComponents = _
private var testAppConf: TestAppConf = _
private var sparkAppConf: SparkAppConf = _

override def beforeAll(): Unit = {
testBackend.initialize()
Expand All @@ -48,7 +50,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
}

before {
testAppConf = kubernetesTestComponents.newTestJobConf()
sparkAppConf = kubernetesTestComponents.newSparkAppConf()
.set("spark.kubernetes.initcontainer.docker.image", "spark-init:latest")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property names for images have been changed in apache/spark#19995. They need to be updated once that PR is merged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good to know. I'll leave a NOTE. Also, let me delete initcontainer lines since we don't use it here yet.

.set("spark.kubernetes.driver.docker.image", "spark-driver:latest")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems newSparkAppConf already sets spark.kubernetes.driver.docker.image.

.set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL)
Expand All @@ -59,40 +61,21 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
kubernetesTestComponents.deleteNamespace()
}

test("Use container-local resources without the resource staging server") {
test("Run SparkPi with no resources") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

testAppConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE)
runSparkPiAndVerifyCompletion()
}

test("Submit small local files without the resource staging server.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
testAppConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
val testExistenceFileTempDir = Files.createTempDir()
testExistenceFileTempDir.deleteOnExit()
val testExistenceFile = new File(testExistenceFileTempDir, "input.txt")
Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8)
testAppConf.set("spark.files", testExistenceFile.getAbsolutePath)
runSparkApplicationAndVerifyCompletion(
CONTAINER_LOCAL_MAIN_APP_RESOURCE,
FILE_EXISTENCE_MAIN_CLASS,
Seq(
s"File found at /opt/spark/work-dir/${testExistenceFile.getName} with correct contents.",
s"File found on the executors at the relative path ${testExistenceFile.getName} with" +
s" the correct contents."),
Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS))
}

test("Use a very long application name.") {
test("Run SparkPi with a very long application name.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

testAppConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
.set("spark.app.name", "long" * 40)
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE)
sparkAppConf.set("spark.app.name", "long" * 40)
runSparkPiAndVerifyCompletion()
}

private def runSparkPiAndVerifyCompletion(appResource: String = ""): Unit = {
private def runSparkPiAndVerifyCompletion(
appResource: String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_PI_MAIN_CLASS,
Expand All @@ -105,12 +88,10 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
mainClass: String,
expectedLogOnCompletion: Seq[String],
appArgs: Array[String]): Unit = {
val appArguments = TestAppArguments(
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
driverArgs = appArgs,
hadoopConfDir = None)
TestApp.run(testAppConf, appArguments)
mainClass = mainClass)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt)
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", APP_LOCATOR_LABEL)
Expand All @@ -130,40 +111,16 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
}

private[spark] object KubernetesSuite {
val EXAMPLES_JAR_FILE = Paths.get("target", "integration-tests-spark-jobs")
.toFile
.listFiles()(0)

val HELPER_JAR_FILE = Paths.get("target", "integration-tests-spark-jobs-helpers")
.toFile
.listFiles()(0)
val SUBMITTER_LOCAL_MAIN_APP_RESOURCE = s"file://${EXAMPLES_JAR_FILE.getAbsolutePath}"
val CONTAINER_LOCAL_MAIN_APP_RESOURCE = s"local:///opt/spark/examples/" +
s"integration-tests-jars/${EXAMPLES_JAR_FILE.getName}"
val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" +
s"integration-tests-jars/${HELPER_JAR_FILE.getName}"
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.k8s" +
".integrationtest.jobs.SparkPiWithInfiniteWait"
val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner"
val SPARK_R_MAIN_CLASS = "org.apache.spark.deploy.RRunner"
val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION =
"local:///opt/spark/examples/src/main/python/pi.py"
val PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION =
"local:///opt/spark/examples/src/main/python/sort.py"
val SPARK_R_DATAFRAME_SUBMITTER_FILE_LOCATION =
"local:///opt/spark/examples/src/main/r/dataframe.R"
val SPARK_R_DATAFRAME_CONTAINER_LOCAL_FILE_LOCATION =
"src/test/R/dataframe.R"
val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py"
val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.k8s" +
".integrationtest.jobs.FileExistenceTest"
val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.k8s" +
".integrationtest.jobs.GroupByTest"
val JAVA_OPTIONS_MAIN_CLASS = "org.apache.spark.deploy.k8s" +
".integrationtest.jobs.JavaOptionsTest"
val TEST_EXISTENCE_FILE_CONTENTS = "contents"
val SPARK_DISTRO_EXAMPLES_JAR_FILE: File = Paths.get(SPARK_DISTRO_PATH.toFile.getAbsolutePath,
"examples", "jars")
.toFile
.listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0)
val CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR: String = s"local:///opt/spark/examples/" +
s"${SPARK_DISTRO_EXAMPLES_JAR_FILE.getName}"
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"

case object ShuffleNotReadyException extends Exception
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import java.nio.file.Paths
import java.util.UUID

import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import scala.collection.mutable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala packages should be in a group after java packages.

import scala.collection.JavaConverters._

import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH

private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {

val namespace = UUID.randomUUID().toString.replaceAll("-", "")
Expand All @@ -48,15 +52,14 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
}
}

def newTestJobConf(): TestAppConf = {
new TestAppConf()
def newSparkAppConf(): SparkAppConf = {
new SparkAppConf()
.set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}")
.set("spark.kubernetes.namespace", namespace)
.set("spark.kubernetes.driver.docker.image",
System.getProperty("spark.docker.test.driverImage", "spark-driver:latest"))
.set("spark.kubernetes.executor.docker.image",
System.getProperty("spark.docker.test.executorImage", "spark-executor:latest"))
.setJars(Seq(KubernetesSuite.HELPER_JAR_FILE.getAbsolutePath))
.set("spark.executor.memory", "500m")
.set("spark.executor.cores", "1")
.set("spark.executors.instances", "1")
Expand All @@ -66,3 +69,42 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
.set("spark.kubernetes.submission.waitAppCompletion", "false")
}
}

private[spark] class SparkAppConf {

private val map = mutable.Map[String, String]()

def set(key:String, value: String): SparkAppConf = {
map.put(key, value)
this
}

def get(key: String): String = map.getOrElse(key, "")

def setJars(jars: Seq[String]) = set("spark.jars", jars.mkString(","))

override def toString: String = map.toString

def toStringArray: Iterable[String] = map.toList.flatMap(t => List("--conf", s"${t._1}=${t._2}"))
}

private[spark] case class SparkAppArguments(
mainAppResource: String,
mainClass: String)

private[spark] object SparkAppLauncher extends Logging {

private val SPARK_SUBMIT_EXECUTABLE_DEST = Paths.get(SPARK_DISTRO_PATH.toFile.getAbsolutePath,
"bin", "spark-submit").toFile

def launch(appArguments: SparkAppArguments, appConf: SparkAppConf, timeoutSecs: Int): Unit = {
logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
val commandLine = Array(SPARK_SUBMIT_EXECUTABLE_DEST.getAbsolutePath,
"--deploy-mode", "cluster",
"--class", appArguments.mainClass,
"--master", appConf.get("spark.master")
) ++ appConf.toStringArray :+ appArguments.mainAppResource
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
ProcessUtils.executeProcess(commandLine, timeoutSecs)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ private[spark] class MinikubeTestBackend extends IntegrationTestBackend {

override def initialize(): Unit = {
Minikube.startMinikube()
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
if (!System.getProperty("spark.docker.test.skipBuildImages", "false").toBoolean) {
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
}
defaultClient = Minikube.getKubernetesClient
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import java.nio.file.Paths

package object constants {
val MINIKUBE_TEST_BACKEND = "minikube"
val GCE_TEST_BACKEND = "gce"
val SPARK_DISTRO_PATH = Paths.get("target", "spark-distro")
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import org.apache.http.client.utils.URIBuilder
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Seconds, Span}

import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH
import org.apache.spark.deploy.k8s.integrationtest.Logging

private[spark] class SparkDockerImageBuilder
(private val dockerEnv: Map[String, String]) extends Logging{

private val DOCKER_BUILD_PATH = Paths.get("target", "spark-distro")
private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH
// Dockerfile paths must be relative to the build path.
private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile"
private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile"
Expand Down