-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22757][Kubernetes] Enable use of remote dependencies (http, s3, gcs, etc.) in Kubernetes mode #19954
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22757][Kubernetes] Enable use of remote dependencies (http, s3, gcs, etc.) in Kubernetes mode #19954
Changes from 1 commit
d3cbbdd
5d2cbc8
4ee76af
9c8051a
1f65417
109ad80
c21fdcf
a3cd71d
23c5cd9
2ec15c4
5d1f889
9d9c841
c51bc56
28343fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,13 +14,49 @@ | |
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.k8s.submit | ||
| package org.apache.spark.deploy.k8s | ||
|
|
||
| import java.io.File | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder} | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| private[spark] object KubernetesFileUtils { | ||
| private[spark] object KubernetesUtils { | ||
|
|
||
| /** | ||
| * Extract and parse Spark configuration properties with a given name prefix and | ||
| * return the result as a Map. Keys must not have more than one value. | ||
| * | ||
| * @param sparkConf Spark configuration | ||
| * @param prefix the given property name prefix | ||
| * @return a Map storing the configuration property keys and values | ||
| */ | ||
| def parsePrefixedKeyValuePairs( | ||
| sparkConf: SparkConf, | ||
| prefix: String): Map[String, String] = { | ||
| sparkConf.getAllWithPrefix(prefix).toMap | ||
| } | ||
|
|
||
| def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { | ||
| opt1.foreach { _ => require(opt2.isEmpty, errMessage) } | ||
| } | ||
|
|
||
| /** | ||
| * Append the given init-container to a pod's list of init-containers.. | ||
|
||
| * | ||
| * @param originalPodSpec original specification of the pod | ||
| * @param initContainer the init-container to add to the pod | ||
| * @return the pod with the init-container added to the list of InitContainers | ||
| */ | ||
| def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = { | ||
| new PodBuilder(originalPodSpec) | ||
| .editOrNewSpec() | ||
| .addToInitContainers(initContainer) | ||
| .endSpec() | ||
| .build() | ||
| } | ||
|
|
||
| /** | ||
| * For the given collection of file URIs, resolves them as follows: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,7 @@ import java.util.UUID | |
| import com.google.common.primitives.Longs | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.deploy.k8s.{ConfigurationUtils, MountSecretsBootstrap} | ||
| import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap} | ||
| import org.apache.spark.deploy.k8s.Config._ | ||
| import org.apache.spark.deploy.k8s.Constants._ | ||
| import org.apache.spark.deploy.k8s.submit.steps._ | ||
|
|
@@ -61,7 +61,7 @@ private[spark] class DriverConfigOrchestrator( | |
| private val filesDownloadPath = sparkConf.get(FILES_DOWNLOAD_LOCATION) | ||
|
|
||
| def getAllConfigurationSteps: Seq[DriverConfigurationStep] = { | ||
| val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( | ||
| val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( | ||
| sparkConf, | ||
| KUBERNETES_DRIVER_LABEL_PREFIX) | ||
| require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + | ||
|
|
@@ -71,15 +71,15 @@ private[spark] class DriverConfigOrchestrator( | |
| s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " + | ||
| "operations.") | ||
|
|
||
| val secretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs( | ||
| val secretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs( | ||
| sparkConf, | ||
| KUBERNETES_DRIVER_SECRETS_PREFIX) | ||
|
|
||
| val allDriverLabels = driverCustomLabels ++ Map( | ||
| SPARK_APP_ID_LABEL -> kubernetesAppId, | ||
| SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) | ||
|
|
||
| val initialSubmissionStep = new BaseDriverConfigurationStep( | ||
| val initialSubmissionStep = new BasicDriverConfigurationStep( | ||
| kubernetesAppId, | ||
| kubernetesResourceNamePrefix, | ||
| allDriverLabels, | ||
|
|
@@ -117,50 +117,49 @@ private[spark] class DriverConfigOrchestrator( | |
| .map(_.split(",")) | ||
| .getOrElse(Array.empty[String]) | ||
|
|
||
| val maybeDependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) { | ||
| Some(new DependencyResolutionStep( | ||
| val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) { | ||
| Seq(new DependencyResolutionStep( | ||
| sparkJars, | ||
| sparkFiles, | ||
| jarsDownloadPath, | ||
| filesDownloadPath)) | ||
| } else { | ||
| None | ||
| Nil | ||
| } | ||
|
|
||
| val mayBeInitContainerBootstrapStep = | ||
| if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFiles)) { | ||
| val orchestrator = new InitContainerConfigOrchestrator( | ||
| sparkJars, | ||
| sparkFiles, | ||
| jarsDownloadPath, | ||
| filesDownloadPath, | ||
| imagePullPolicy, | ||
| initContainerConfigMapName, | ||
| INIT_CONTAINER_PROPERTIES_FILE_NAME, | ||
| sparkConf) | ||
| val bootstrapStep = new DriverInitContainerBootstrapStep( | ||
| orchestrator.getAllConfigurationSteps, | ||
| initContainerConfigMapName, | ||
| INIT_CONTAINER_PROPERTIES_FILE_NAME) | ||
|
|
||
| Some(bootstrapStep) | ||
| } else { | ||
| None | ||
| } | ||
| val initContainerBootstrapStep = if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFiles)) { | ||
| val orchestrator = new InitContainerConfigOrchestrator( | ||
| sparkJars, | ||
| sparkFiles, | ||
| jarsDownloadPath, | ||
| filesDownloadPath, | ||
| imagePullPolicy, | ||
| initContainerConfigMapName, | ||
| INIT_CONTAINER_PROPERTIES_FILE_NAME, | ||
| sparkConf) | ||
| val bootstrapStep = new DriverInitContainerBootstrapStep( | ||
| orchestrator.getAllConfigurationSteps, | ||
| initContainerConfigMapName, | ||
| INIT_CONTAINER_PROPERTIES_FILE_NAME) | ||
|
|
||
| Seq(bootstrapStep) | ||
| } else { | ||
| Nil | ||
| } | ||
|
|
||
| val mayBeMountSecretsStep = if (secretNamesToMountPaths.nonEmpty) { | ||
| Some(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths))) | ||
| val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) { | ||
| Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths))) | ||
| } else { | ||
| None | ||
| Nil | ||
| } | ||
|
|
||
| Seq( | ||
| initialSubmissionStep, | ||
| serviceBootstrapStep, | ||
| kubernetesCredentialsStep) ++ | ||
| maybeDependencyResolutionStep.toSeq ++ | ||
| mayBeInitContainerBootstrapStep.toSeq ++ | ||
| mayBeMountSecretsStep.toSeq | ||
| dependencyResolutionStep ++ | ||
| initContainerBootstrapStep ++ | ||
| mountSecretsStep | ||
| } | ||
|
|
||
| private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = { | ||
|
||
|
|
||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not
TimeUnit.SECONDS?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.