Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Create one task per jar/file to download in the init-container
  • Loading branch information
liyinan926 committed Dec 23, 2017
commit 9c8051a0156d58d13b91eb740aab83447ceddf66
Original file line number Diff line number Diff line change
Expand Up @@ -125,38 +125,14 @@ private[spark] class DriverConfigOrchestrator(

val mayBeInitContainerBootstrapStep =
if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFiles)) {
<<<<<<< HEAD:resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
val initContainerConfigurationStepsOrchestrator =
new InitContainerConfigurationStepsOrchestrator(
namespace,
kubernetesResourceNamePrefix,
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath,
imagePullPolicy,
allDriverLabels,
initContainerConfigMapName,
INIT_CONTAINER_PROPERTIES_FILE_NAME,
submissionSparkConf)
val initContainerConfigurationSteps =
initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps()
val initContainerBootstrapStep =
new DriverInitContainerBootstrapStep(
initContainerConfigurationSteps,
initContainerConfigMapName,
INIT_CONTAINER_PROPERTIES_FILE_NAME)

Some(initContainerBootstrapStep)
=======
val orchestrator = new InitContainerConfigOrchestrator(
namespace,
kubernetesResourceNamePrefix,
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath,
dockerImagePullPolicy,
imagePullPolicy,
allDriverLabels,
initContainerConfigMapName,
INIT_CONTAINER_PROPERTIES_FILE_NAME,
Expand All @@ -167,7 +143,6 @@ private[spark] class DriverConfigOrchestrator(
INIT_CONTAINER_PROPERTIES_FILE_NAME)

Some(bootstrapStep)
>>>>>>> Addressed the second round of comments:resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.io.File
import java.util.concurrent.TimeUnit

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -54,26 +53,22 @@ private[spark] class SparkPodInitContainer(
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)

def run(): Unit = {
val remoteJarsDownload = Future[Unit] {
logInfo(s"Downloading remote jars: $remoteJars")
downloadFiles(
remoteJars,
jarsDownloadDir,
s"Remote jars download directory specified at $jarsDownloadDir does not exist " +
"or is not a directory.")
}
val remoteFilesDownload = Future[Unit] {
logInfo(s"Downloading remote files: $remoteFiles")
downloadFiles(
remoteFiles,
filesDownloadDir,
s"Remote files download directory specified at $filesDownloadDir does not exist " +
"or is not a directory.")
}

Seq(remoteJarsDownload, remoteFilesDownload).foreach {
ThreadUtils.awaitResult(_, Duration.create(downloadTimeoutMinutes, TimeUnit.MINUTES))
}
logInfo(s"Downloading remote jars: $remoteJars")
downloadFiles(
remoteJars,
jarsDownloadDir,
s"Remote jars download directory specified at $jarsDownloadDir does not exist " +
"or is not a directory.")

logInfo(s"Downloading remote files: $remoteFiles")
downloadFiles(
remoteFiles,
filesDownloadDir,
s"Remote files download directory specified at $filesDownloadDir does not exist " +
"or is not a directory.")

downloadExecutor.shutdown()
downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES)
}

private def downloadFiles(
Expand All @@ -84,7 +79,9 @@ private[spark] class SparkPodInitContainer(
require(downloadDir.isDirectory, errMessageOnDestinationNotADirectory)
}
filesCommaSeparated.map(_.split(",")).toSeq.flatten.foreach { file =>
fileFetcher.fetchFile(file, downloadDir)
Future[Unit] {
fileFetcher.fetchFile(file, downloadDir)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {

private val NAMESPACE = "default"
private val DRIVER_IMAGE = "driver-image"
private val INIT_CONTAINER_IMAGE = "init-container-image"
private val IC_IMAGE = "init-container-image"
private val APP_ID = "spark-app-id"
private val LAUNCH_TIME = 975256L
private val APP_NAME = "spark"
Expand Down Expand Up @@ -58,13 +58,8 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {

test("Base submission steps without a main app resource.") {
val sparkConf = new SparkConf(false)
<<<<<<< HEAD:resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
.set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
val orchestrator = new DriverConfigurationStepsOrchestrator(
=======
.set(DRIVER_DOCKER_IMAGE, DRIVER_IMAGE)
val orchestrator = new DriverConfigOrchestrator(
>>>>>>> Addressed the second round of comments:resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
NAMESPACE,
APP_ID,
LAUNCH_TIME,
Expand All @@ -83,13 +78,8 @@ class DriverConfigOrchestratorSuite extends SparkFunSuite {

test("Submission steps with an init-container.") {
val sparkConf = new SparkConf(false)
<<<<<<< HEAD:resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
.set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE)
.set(INIT_CONTAINER_DOCKER_IMAGE, INIT_CONTAINER_IMAGE)
=======
.set(DRIVER_DOCKER_IMAGE, DRIVER_IMAGE)
.set(INIT_CONTAINER_IMAGE, INIT_CONTAINER_IMAGE)
>>>>>>> Addressed the second round of comments:resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala
.set(INIT_CONTAINER_IMAGE, IC_IMAGE)
.set("spark.jars", "hdfs://localhost:9000/var/apps/jars/jar1.jar")
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
val orchestrator = new DriverConfigOrchestrator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class SparkPodInitContainerSuite
Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir)
Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir)
Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir)

}

private def getSparkConfForRemoteFileDownloads: SparkConf = {
Expand Down