-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22757][Kubernetes] Enable use of remote dependencies (http, s3, gcs, etc.) in Kubernetes mode #19954
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22757][Kubernetes] Enable use of remote dependencies (http, s3, gcs, etc.) in Kubernetes mode #19954
Changes from 12 commits
d3cbbdd
5d2cbc8
4ee76af
9c8051a
1f65417
109ad80
c21fdcf
a3cd71d
23c5cd9
2ec15c4
5d1f889
9d9c841
c51bc56
28343fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit | |
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config.ConfigBuilder | ||
| import org.apache.spark.network.util.ByteUnit | ||
|
|
||
| private[spark] object Config extends Logging { | ||
|
|
||
|
|
@@ -132,30 +131,84 @@ private[spark] object Config extends Logging { | |
|
|
||
| val JARS_DOWNLOAD_LOCATION = | ||
| ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir") | ||
| .doc("Location to download jars to in the driver and executors. When using" + | ||
| " spark-submit, this directory must be empty and will be mounted as an empty directory" + | ||
| " volume on the driver and executor pod.") | ||
| .doc("Location to download jars to in the driver and executors. When using " + | ||
| "spark-submit, this directory must be empty and will be mounted as an empty directory " + | ||
| "volume on the driver and executor pod.") | ||
| .stringConf | ||
| .createWithDefault("/var/spark-data/spark-jars") | ||
|
|
||
| val FILES_DOWNLOAD_LOCATION = | ||
| ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir") | ||
| .doc("Location to download files to in the driver and executors. When using" + | ||
| " spark-submit, this directory must be empty and will be mounted as an empty directory" + | ||
| " volume on the driver and executor pods.") | ||
| .doc("Location to download files to in the driver and executors. When using " + | ||
| "spark-submit, this directory must be empty and will be mounted as an empty directory " + | ||
| "volume on the driver and executor pods.") | ||
| .stringConf | ||
| .createWithDefault("/var/spark-data/spark-files") | ||
|
|
||
| val INIT_CONTAINER_IMAGE = | ||
| ConfigBuilder("spark.kubernetes.initContainer.image") | ||
| .doc("Image for the driver and executor's init-container for downloading dependencies.") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val INIT_CONTAINER_MOUNT_TIMEOUT = | ||
| ConfigBuilder("spark.kubernetes.mountDependencies.timeout") | ||
| .doc("Timeout before aborting the attempt to download and unpack dependencies from remote " + | ||
| "locations into the driver and executor pods.") | ||
| .timeConf(TimeUnit.MINUTES) | ||
|
||
| .createWithDefault(5) | ||
|
|
||
| val INIT_CONTAINER_MAX_THREAD_POOL_SIZE = | ||
| ConfigBuilder("spark.kubernetes.mountDependencies.maxSimultaneousDownloads") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the current name is already pretty long. Adding |
||
| .doc("Maximum number of remote dependencies to download simultaneously in a driver or " + | ||
| "executor pod.") | ||
| .intConf | ||
| .createWithDefault(5) | ||
|
|
||
| val INIT_CONTAINER_REMOTE_JARS = | ||
| ConfigBuilder("spark.kubernetes.initContainer.remoteJars") | ||
| .doc("Comma-separated list of jar URIs to download in the init-container. This is " + | ||
| "calculated from spark.jars.") | ||
| .internal() | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val INIT_CONTAINER_REMOTE_FILES = | ||
| ConfigBuilder("spark.kubernetes.initContainer.remoteFiles") | ||
| .doc("Comma-separated list of file URIs to download in the init-container. This is " + | ||
| "calculated from spark.files.") | ||
| .internal() | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val INIT_CONTAINER_CONFIG_MAP_NAME = | ||
| ConfigBuilder("spark.kubernetes.initContainer.configMapName") | ||
| .doc("Name of the config map to use in the init-container that retrieves submitted files " + | ||
| "for the executor.") | ||
| .internal() | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val INIT_CONTAINER_CONFIG_MAP_KEY_CONF = | ||
| ConfigBuilder("spark.kubernetes.initContainer.configMapKey") | ||
| .doc("Key for the entry in the init container config map for submitted files that " + | ||
| "corresponds to the properties for this init-container.") | ||
| .internal() | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = | ||
| "spark.kubernetes.authenticate.submission" | ||
|
|
||
| val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." | ||
|
|
||
| val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." | ||
| val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." | ||
| val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." | ||
|
|
||
| val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." | ||
| val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." | ||
| val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets." | ||
|
|
||
| val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv." | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.k8s | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder} | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkException} | ||
| import org.apache.spark.deploy.k8s.Config._ | ||
| import org.apache.spark.deploy.k8s.Constants._ | ||
|
|
||
| /** | ||
| * Bootstraps an init-container for downloading remote dependencies. This is separated out from | ||
| * the init-container steps API because this component can be used to bootstrap init-containers | ||
| * for both the driver and executors. | ||
| */ | ||
| private[spark] class InitContainerBootstrap( | ||
| initContainerImage: String, | ||
| imagePullPolicy: String, | ||
| jarsDownloadPath: String, | ||
| filesDownloadPath: String, | ||
| configMapName: String, | ||
| configMapKey: String, | ||
| sparkRole: String, | ||
| sparkConf: SparkConf) { | ||
|
|
||
| /** | ||
| * Bootstraps an init-container that downloads dependencies to be used by a main container. | ||
| */ | ||
| def bootstrapInitContainer( | ||
| original: PodWithDetachedInitContainer): PodWithDetachedInitContainer = { | ||
| val sharedVolumeMounts = Seq[VolumeMount]( | ||
| new VolumeMountBuilder() | ||
| .withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME) | ||
| .withMountPath(jarsDownloadPath) | ||
| .build(), | ||
| new VolumeMountBuilder() | ||
| .withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME) | ||
| .withMountPath(filesDownloadPath) | ||
| .build()) | ||
|
|
||
| val customEnvVarKeyPrefix = sparkRole match { | ||
| case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY | ||
| case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv." | ||
| case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role") | ||
| } | ||
| val customEnvVars = sparkConf.getAllWithPrefix(customEnvVarKeyPrefix).toSeq.map { | ||
| case (key, value) => | ||
| new EnvVarBuilder() | ||
| .withName(key) | ||
| .withValue(value) | ||
| .build() | ||
| } | ||
|
|
||
| val initContainer = new ContainerBuilder(original.initContainer) | ||
| .withName("spark-init") | ||
| .withImage(initContainerImage) | ||
| .withImagePullPolicy(imagePullPolicy) | ||
| .addAllToEnv(customEnvVars.asJava) | ||
| .addNewVolumeMount() | ||
| .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME) | ||
| .withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR) | ||
| .endVolumeMount() | ||
| .addToVolumeMounts(sharedVolumeMounts: _*) | ||
| .addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH) | ||
| .build() | ||
|
|
||
| val podWithBasicVolumes = new PodBuilder(original.pod) | ||
| .editSpec() | ||
| .addNewVolume() | ||
| .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME) | ||
| .withNewConfigMap() | ||
| .withName(configMapName) | ||
| .addNewItem() | ||
| .withKey(configMapKey) | ||
| .withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME) | ||
| .endItem() | ||
| .endConfigMap() | ||
| .endVolume() | ||
| .addNewVolume() | ||
| .withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME) | ||
| .withEmptyDir(new EmptyDirVolumeSource()) | ||
| .endVolume() | ||
| .addNewVolume() | ||
| .withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME) | ||
| .withEmptyDir(new EmptyDirVolumeSource()) | ||
| .endVolume() | ||
| .endSpec() | ||
| .build() | ||
|
|
||
| val mainContainer = new ContainerBuilder(original.mainContainer) | ||
| .addToVolumeMounts(sharedVolumeMounts: _*) | ||
| .addNewEnv() | ||
| .withName(ENV_MOUNTED_FILES_DIR) | ||
| .withValue(filesDownloadPath) | ||
| .endEnv() | ||
| .build() | ||
|
|
||
| PodWithDetachedInitContainer( | ||
| podWithBasicVolumes, | ||
| initContainer, | ||
| mainContainer) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.k8s | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder} | ||
|
|
||
| /** | ||
| * Bootstraps a driver or executor container or an init-container with needed secrets mounted. | ||
| */ | ||
| private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) { | ||
|
||
|
|
||
| /** | ||
| * Mounts Kubernetes secrets as secret volumes into the given container in the given pod. | ||
| * | ||
| * @param pod the pod into which the secret volumes are being added. | ||
| * @param container the container into which the secret volumes are being mounted. | ||
| * @return the updated pod and container with the secrets mounted. | ||
| */ | ||
| def mountSecrets(pod: Pod, container: Container): (Pod, Container) = { | ||
| var podBuilder = new PodBuilder(pod) | ||
| secretNamesToMountPaths.keys.foreach { name => | ||
| podBuilder = podBuilder | ||
| .editOrNewSpec() | ||
| .addNewVolume() | ||
| .withName(secretVolumeName(name)) | ||
| .withNewSecret() | ||
| .withSecretName(name) | ||
| .endSecret() | ||
| .endVolume() | ||
| .endSpec() | ||
| } | ||
|
|
||
| var containerBuilder = new ContainerBuilder(container) | ||
| secretNamesToMountPaths.foreach { case (name, path) => | ||
| containerBuilder = containerBuilder | ||
| .addNewVolumeMount() | ||
| .withName(secretVolumeName(name)) | ||
| .withMountPath(path) | ||
| .endVolumeMount() | ||
| } | ||
|
|
||
| (podBuilder.build(), containerBuilder.build()) | ||
| } | ||
|
|
||
| private def secretVolumeName(secretName: String): String = { | ||
| secretName + "-volume" | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
spark.kubernetes.initContainer.mountDependencies.timeout?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the response regarding
spark.kubernetes.mountDependencies.maxSimultaneousDownloads.