Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
999ec13
[SPARK-22570][SQL] Avoid to create a lot of global variables by using…
kiszk Nov 30, 2017
6ac57fd
[SPARK-21417][SQL] Infer join conditions using propagated constraints
Nov 30, 2017
bcceab6
[SPARK-22489][SQL] Shouldn't change broadcast join buildSide if user …
wangyum Nov 30, 2017
f5f8e84
[SPARK-22614] Dataset API: repartitionByRange(...)
adrian-ionescu Nov 30, 2017
7e5f669
[SPARK-22428][DOC] Add spark application garbage collector configurat…
gaborgsomogyi Dec 1, 2017
7da1f57
[SPARK-22373] Bump Janino dependency version to fix thread safety issue…
Victsm Dec 1, 2017
dc36542
[SPARK-22653] executorAddress registered in CoarseGrainedSchedulerBac…
tgravescs Dec 1, 2017
16adaf6
[SPARK-22601][SQL] Data load is getting displayed successful on provi…
sujith71955 Dec 1, 2017
9d06a9e
[SPARK-22393][SPARK-SHELL] spark-shell can't find imported types in c…
mpetruska Dec 1, 2017
ee10ca7
[SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus
zsxwing Dec 1, 2017
aa4cf2b
[SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients fo…
HyukjinKwon Dec 2, 2017
d2cf95a
[SPARK-22634][BUILD] Update Bouncy Castle to 1.58
srowen Dec 2, 2017
f23dddf
[SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileFormat based o…
dongjoon-hyun Dec 3, 2017
2c16267
[SPARK-22669][SQL] Avoid unnecessary function calls in code generation
mgaido91 Dec 3, 2017
dff440f
[SPARK-22626][SQL] deals with wrong Hive's statistics (zero rowCount)
wangyum Dec 3, 2017
4131ad0
[SPARK-22489][DOC][FOLLOWUP] Update broadcast behavior changes in mig…
wangyum Dec 4, 2017
3927bb9
[SPARK-22473][FOLLOWUP][TEST] Remove deprecated Date functions
mgaido91 Dec 4, 2017
f81401e
[SPARK-22162] Executors and the driver should use consistent JobIDs i…
Dec 4, 2017
e1dd03e
[SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.
Dec 4, 2017
dcaac45
Spark on Kubernetes - basic submission client
liyinan926 Nov 10, 2017
27c67ff
Addressed first round of review comments
liyinan926 Nov 27, 2017
6d597d0
Made Client implement the SparkApplication trait
liyinan926 Nov 28, 2017
5b9fa39
Addressed the second round of comments
liyinan926 Nov 28, 2017
5ccadb5
Added missing step for supporting local:// dependencies and addressed…
liyinan926 Nov 30, 2017
12f2797
Fixed Scala style check errors
liyinan926 Nov 30, 2017
c35fe48
Addressed another round of comments
liyinan926 Dec 4, 2017
faa2849
Rebased on master and added a constant val for the Client class
liyinan926 Dec 4, 2017
347ed69
Addressed another major round of comments
liyinan926 Dec 5, 2017
0e8ca01
Addressed one more round of comments
liyinan926 Dec 5, 2017
3a0b8e3
Removed mentioning of kubernetes-namespace
liyinan926 Dec 6, 2017
83d0b9c
Fixed a couple of bugs found during manual tests
liyinan926 Dec 7, 2017
44c40b1
Guard against client mode in SparkContext
liyinan926 Dec 8, 2017
67bc847
Added libc6-compat into the base docker image
liyinan926 Dec 8, 2017
7d2b303
Addressed latest comments
liyinan926 Dec 8, 2017
caf2206
Addressed docs comments
liyinan926 Dec 9, 2017
2e7810b
Fixed a comment
liyinan926 Dec 11, 2017
cbcd30e
Addressed latest comments
liyinan926 Dec 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added missing step for supporting local:// dependencies and addressed…
… more comments
  • Loading branch information
liyinan926 committed Dec 4, 2017
commit 5ccadb5b29de157219a388fef28bcbd44e3ec788
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,22 @@ private[spark] object Config extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

private[spark] val JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
.doc("Location to download jars to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-jars")

private[spark] val FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-files")

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesClientFactory
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

/**
* Encapsulates arguments to the submission client.
*
* @param mainAppResource the main application resource
* @param mainAppResource the main application resource if any
* @param mainClass the main class of the application to run
* @param driverArgs arguments to the driver
*/
private[spark] case class ClientArguments(
mainAppResource: MainAppResource,
mainAppResource: Option[MainAppResource],
mainClass: String,
driverArgs: Array[String])

Expand All @@ -68,7 +68,7 @@ private[spark] object ClientArguments {
require(mainClass.isDefined, "Main class must be specified via --main-class")

ClientArguments(
mainAppResource.get,
mainAppResource,
mainClass.get,
driverArgs.toArray)
}
Expand Down Expand Up @@ -214,7 +214,7 @@ private[spark] object Client extends SparkApplication {
clientArguments.driverArgs,
sparkConf)

Utils.tryWithResource(KubernetesClientFactory.createKubernetesClient(
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular line does not read well because the word "KubernetesClient" appears here twice meaning two different things. The reader may fail to distinguish "Spark client" (SparkKubernetesClientFactory), vs "K8s API client" (createKubernetesClient).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed SparkKubernetesClientFactory to KubernetesClientFactory and renamed the method to create.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I myself misread. The two clients meant the same thing :-)

master,
Some(namespace),
KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.steps.{BaseDriverConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep}
import org.apache.spark.deploy.k8s.submit.steps._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.SystemClock

/**
Expand All @@ -30,7 +31,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
namespace: String,
kubernetesAppId: String,
launchTime: Long,
mainAppResource: MainAppResource,
mainAppResource: Option[MainAppResource],
appName: String,
mainClass: String,
appArgs: Array[String],
Expand All @@ -45,6 +46,8 @@ private[spark] class DriverConfigurationStepsOrchestrator(
private val kubernetesResourceNamePrefix =
s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val jarsDownloadPath = submissionSparkConf.get(JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = submissionSparkConf.get(FILES_DOWNLOAD_LOCATION)

def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
Expand Down Expand Up @@ -80,9 +83,39 @@ private[spark] class DriverConfigurationStepsOrchestrator(
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesResourceNamePrefix)

val additionalMainAppJar = if (mainAppResource.nonEmpty) {
val mayBeResource = mainAppResource.get match {
case JavaMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE =>
Some(resource)
case _ => Option.empty
}
mayBeResource
} else {
Option.empty
}

val sparkJars = submissionSparkConf.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty[String]) ++
additionalMainAppJar.toSeq
val sparkFiles = submissionSparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])

val maybeDependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
Some(new DependencyResolutionStep(
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath))
} else {
Option.empty
}

Seq(
initialSubmissionStep,
driverAddressStep,
kubernetesCredentialsStep)
kubernetesCredentialsStep) ++
maybeDependencyResolutionStep.toSeq
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit

import java.io.File

import org.apache.spark.util.Utils

private[spark] object KubernetesFileUtils {

/**
* For the given collection of file URIs, resolves them as follows:
* - File URIs with scheme file:// are resolved to the given download path.
* - File URIs with scheme local:// resolve to just the path of the URI.
* - Otherwise, the URIs are returned as-is.
*/
def resolveSubmittedUris(fileUris: Iterable[String], fileDownloadPath: String)
: Iterable[String] = {
fileUris.map { uri =>
val fileUri = Utils.resolveURI(uri)
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
case "file" =>
val fileName = new File(fileUri.getPath).getName
s"$fileDownloadPath/$fileName"
case "local" =>
fileUri.getPath
case _ => uri
}
}
}

/**
* If any file uri has any scheme other than local:// it is mapped as if the file
* was downloaded to the file download path. Otherwise, it is mapped to the path
* part of the URI.
*/
def resolveFilePaths(fileUris: Iterable[String], fileDownloadPath: String): Iterable[String] = {
fileUris.map { uri =>
resolveFilePath(uri, fileDownloadPath)
}
}

private def resolveFilePath(uri: String, fileDownloadPath: String): String = {
val fileUri = Utils.resolveURI(uri)
if (Option(fileUri.getScheme).getOrElse("file") == "local") {
fileUri.getPath
} else {
val fileName = new File(fileUri.getPath).getName
s"$fileDownloadPath/$fileName"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps

import java.io.File

import io.fabric8.kubernetes.api.model.ContainerBuilder

import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, KubernetesFileUtils}

/**
* Step that configures the classpath, spark.jars, and spark.files for the driver given that the
* user may provide remote files or files with local:// schemes.
*/
private[spark] class DependencyResolutionStep(
sparkJars: Seq[String],
sparkFiles: Seq[String],
jarsDownloadPath: String,
localFilesDownloadPath: String) extends DriverConfigurationStep {

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val resolvedSparkJars = KubernetesFileUtils.resolveSubmittedUris(sparkJars, jarsDownloadPath)
val resolvedSparkFiles = KubernetesFileUtils.resolveSubmittedUris(
sparkFiles, localFilesDownloadPath)
val sparkConfResolvedSparkDependencies = driverSpec.driverSparkConf.clone()
if (resolvedSparkJars.nonEmpty) {
sparkConfResolvedSparkDependencies.set("spark.jars", resolvedSparkJars.mkString(","))
}
if (resolvedSparkFiles.nonEmpty) {
sparkConfResolvedSparkDependencies.set("spark.files", resolvedSparkFiles.mkString(","))
}
val resolvedClasspath = KubernetesFileUtils.resolveFilePaths(sparkJars, jarsDownloadPath)
val driverContainerWithResolvedClasspath = if (resolvedClasspath.nonEmpty) {
new ContainerBuilder(driverSpec.driverContainer)
.addNewEnv()
.withName(ENV_MOUNTED_CLASSPATH)
.withValue(resolvedClasspath.mkString(File.pathSeparator))
.endEnv()
.build()
} else {
driverSpec.driverContainer
}
driverSpec.copy(
driverContainer = driverContainerWithResolvedClasspath,
driverSparkConf = sparkConfResolvedSparkDependencies)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,16 @@ private[spark] class DriverKubernetesCredentialsStep(
mountedUserSpecified: Option[String],
valueMountedFromSubmitter: Option[String],
mountedCanonicalLocation: String): Option[String] = {
mountedUserSpecified.orElse(valueMountedFromSubmitter.map( _ => {
mountedUserSpecified.orElse(valueMountedFromSubmitter.map( _ =>
mountedCanonicalLocation
}))
))
}

private def resolveSecretData(
mountedUserSpecified: Option[String],
valueMountedFromSubmitter: Option[String],
secretName: String): Map[String, String] = {
mountedUserSpecified.map { _ => Map.empty[String, String]}
mountedUserSpecified.map { _ => Map.empty[String, String] }
.getOrElse {
valueMountedFromSubmitter.map { valueBase64 =>
Map(secretName -> valueBase64)
Expand Down
Loading