Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-22757][Kubernetes] Enable use of remote dependencies in Kubern…
…etes mode
  • Loading branch information
liyinan926 committed Dec 23, 2017
commit d3cbbdd8ce45592caa706d0b4d29c873a176e40c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

private[spark] object Config extends Logging {

Expand Down Expand Up @@ -132,30 +131,78 @@ private[spark] object Config extends Logging {

val JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
.doc("Location to download jars to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.doc("Location to download jars to in the driver and executors. When using " +
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
"volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-jars")

val FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.doc("Location to download files to in the driver and executors. When using " +
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
"volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-files")

val INIT_CONTAINER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.initContainer.docker.image")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirroring the discussion in the other PR, are these really restricted to docker? Is it a required config?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of it is docker specific, can be container everywhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a required config?

No, as one may forgo the init container if they're building the deps into the docker image itself and supplying it via local:/// paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to spark.kubernetes.initContainer.image.

.doc("Image for the driver and executor's init-container that downloads dependencies.")
.stringConf
.createOptional

val INIT_CONTAINER_MOUNT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountDependencies.mountTimeout")
.doc("Timeout before aborting the attempt to download and unpack local dependencies from " +
"remote locations and the resource staging server when initializing the driver and " +
"executor pods.")
.timeConf(TimeUnit.MINUTES)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not TimeUnit.SECONDS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.createWithDefault(5)

val INIT_CONTAINER_REMOTE_JARS =
ConfigBuilder("spark.kubernetes.initContainer.remoteJars")
.doc("Comma-separated list of jar URIs to download in the init-container. This is " +
"calculated from spark.jars.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_REMOTE_FILES =
ConfigBuilder("spark.kubernetes.initContainer.remoteFiles")
.doc("Comma-separated list of file URIs to download in the init-container. This is " +
"calculated from spark.files.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_CONFIG_MAP_NAME =
ConfigBuilder("spark.kubernetes.initContainer.configMapName")
.doc("Name of the config map to use in the init-container that retrieves submitted files " +
"for the executor.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_CONFIG_MAP_KEY_CONF =
ConfigBuilder("spark.kubernetes.initContainer.configMapKey")
.doc("Key for the entry in the init container config map for submitted files that " +
"corresponds to the properties for this init-container.")
.internal()
.stringConf
.createOptional

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."

val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ private[spark] object Constants {
val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"

// Bootstrapping dependencies with the init-container
val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files-volume"
val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
val INIT_CONTAINER_PROPERTIES_FILE_PATH =
s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._

/**
* This is separated out from the init-container steps API because this component can be reused to
* set up the init-container for executors as well.
*/
private[spark] trait InitContainerBootstrap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of all these traits that have a single implementation? That seems unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more idiomatic to mock a trait than a class and our unit tests always create mocks for every component that isn't the class under test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more idiomatic to mock a trait than a class

Why? You can mock classes just fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably fine to just use the class here, but some classes can't be mocked, such as final classes or classes with final methods. Having traits everywhere ensures that even if we change the classes down the road to have such characteristics, our tests won't break.

This also is not entirely without precedent. TaskScheduler is only implemented by TaskSchedulerImpl in the main scheduler code, as is TaskContext being extended only by TaskContextImpl. Putting a trait in front of an implementation communicates that it's expected for tests that dependency-inject instances of this to create stub implementations.

But we might be splitting hairs at this point, so using only the class could suffice until we run into problems from having done so.

/**
* Bootstraps an init-container that downloads dependencies to be used by a main container.
*/
def bootstrapInitContainer(
originalPodWithInitContainer: PodWithDetachedInitContainer)
: PodWithDetachedInitContainer
}

private[spark] class InitContainerBootstrapImpl(
initContainerImage: String,
dockerImagePullPolicy: String,
jarsDownloadPath: String,
filesDownloadPath: String,
downloadTimeoutMinutes: Long,
initContainerConfigMapName: String,
initContainerConfigMapKey: String,
sparkRole: String,
sparkConf: SparkConf)
extends InitContainerBootstrap {

override def bootstrapInitContainer(
podWithDetachedInitContainer: PodWithDetachedInitContainer): PodWithDetachedInitContainer = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented about this in my previous review, but could you try to use shorter variable names throughout the PR?

For example, here, just repeating the name of the already long type to name the variable doesn't really help with readability. Imagine if you have two of those, are you going to start adding counters to the already long name?

val sharedVolumeMounts = Seq[VolumeMount](
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
.withMountPath(jarsDownloadPath)
.build(),
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withMountPath(filesDownloadPath)
.build())

val initContainerCustomEnvVarKeyPrefix = sparkRole match {
case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY
case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv."
case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role")
}
val initContainerCustomEnvVars = sparkConf.getAllWithPrefix(initContainerCustomEnvVarKeyPrefix)
.toSeq
.map(env =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map { env =>

new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build())

val initContainer = new ContainerBuilder(podWithDetachedInitContainer.initContainer)
.withName(s"spark-init")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.withImage(initContainerImage)
.withImagePullPolicy(dockerImagePullPolicy)
.addAllToEnv(initContainerCustomEnvVars.asJava)
.addNewVolumeMount()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
.endVolumeMount()
.addToVolumeMounts(sharedVolumeMounts: _*)
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
.build()

val podWithBasicVolumes = new PodBuilder(podWithDetachedInitContainer.pod)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withNewConfigMap()
.withName(initContainerConfigMapName)
.addNewItem()
.withKey(initContainerConfigMapKey)
.withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.addNewVolume()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.addNewVolume()
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.endSpec()
.build()

val mainContainerWithMountedFiles = new ContainerBuilder(
podWithDetachedInitContainer.mainContainer)
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.build()

PodWithDetachedInitContainer(
podWithBasicVolumes,
initContainer,
mainContainerWithMountedFiles)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove an extra line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

/**
* Bootstraps a driver or executor container or an init-container with needed secrets mounted.
*/
private[spark] trait MountSecretsBootstrap {

/**
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
*
* @param pod the pod into which the secret volumes are being added.
* @param container the container into which the secret volumes are being mounted.
* @return the updated pod and container with the secrets mounted.
*/
def mountSecrets(pod: Pod, container: Container): (Pod, Container)
}

private[spark] class MountSecretsBootstrapImpl(
secretNamesToMountPaths: Map[String, String]) extends MountSecretsBootstrap {

override def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
var podBuilder = new PodBuilder(pod)
secretNamesToMountPaths.keys.foreach(name =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.foreach { name =>

podBuilder = podBuilder
.editOrNewSpec()
.addNewVolume()
.withName(secretVolumeName(name))
.withNewSecret()
.withSecretName(name)
.endSecret()
.endVolume()
.endSpec())

var containerBuilder = new ContainerBuilder(container)
secretNamesToMountPaths.foreach(namePath =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.foreach { namePath =>. You get the idea. Please fix all of these.

containerBuilder = containerBuilder
.addNewVolumeMount()
.withName(secretVolumeName(namePath._1))
.withMountPath(namePath._2)
.endVolumeMount()
)

(podBuilder.build(), containerBuilder.build())
}

private def secretVolumeName(secretName: String): String = {
secretName + "-volume"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{Container, Pod}

/**
* Represents a pod with a detached init-container (not yet added to the pod).
*
* @param pod the pod
* @param initContainer the init-container in the pod
* @param mainContainer the main container in the pod
*/
private[spark] case class PodWithDetachedInitContainer(
pod: Pod,
initContainer: Container,
mainContainer: Container)
Loading