Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed first round of review comments
  • Loading branch information
liyinan926 committed Dec 4, 2017
commit 27c67ff2693a1aaac3d0863410c67691a0784ea9
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -718,12 +718,13 @@ object SparkSubmit extends CommandLineUtils with Logging {

if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
childArgs ++= Array("--primary-java-resource", args.primaryResource)
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
}
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += "--arg"
childArgs += arg
childArgs += ("--arg", arg)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ private[spark] object Config extends Logging {

val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses" +
" this service account when requesting executor pods from the API server. If specific" +
" credentials are given for the driver pod to use, the driver will favor" +
" using those credentials instead.")
.doc("Service account that is used when running the driver pod. The driver pod uses " +
"this service account when requesting executor pods from the API server. If specific " +
"credentials are given for the driver pod to use, the driver will favor " +
"using those credentials instead.")
.stringConf
.createOptional

Expand All @@ -85,10 +85,10 @@ private[spark] object Config extends Logging {

val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
.doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the" +
" driver submission server. This is memory that accounts for things like VM overheads," +
" interned strings, other native overheads, etc. This tends to grow with the driver's" +
" memory size (typically 6-10%).")
.doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the " +
"driver submission server. This is memory that accounts for things like VM overheads, " +
"interned strings, other native overheads, etc. This tends to grow with the driver's " +
"memory size (typically 6-10%).")
.bytesConf(ByteUnit.MiB)
.createOptional

Expand Down Expand Up @@ -141,8 +141,8 @@ private[spark] object Config extends Logging {

val WAIT_FOR_APP_COMPLETION =
ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the" +
" launcher process.")
.doc("In cluster mode, whether to wait for the application to finish before exiting the " +
"launcher process.")
.booleanConf
.createWithDefault(true)

Expand All @@ -166,17 +166,16 @@ private[spark] object Config extends Logging {
val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."

def getK8sMasterUrl(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
}
require(rawMasterString.startsWith("k8s://"),
"Master URL should start with k8s:// in Kubernetes mode.")
val masterWithoutK8sPrefix = rawMasterString.replaceFirst("k8s://", "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rawMasterString.substring("k8s://".length)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (masterWithoutK8sPrefix.startsWith("http://")
|| masterWithoutK8sPrefix.startsWith("https://")) {
masterWithoutK8sPrefix
} else {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved" +
s" URL is $resolvedURL")
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL")
resolvedURL
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

/**
* Encapsulates arguments to the submission client.
*
* @param mainAppResource the main application resource
* @param mainClass the main class of the application to run
* @param driverArgs arguments to the driver
*/
private[spark] case class ClientArguments(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add comments to describe the usage of this class, and the meaning of each params.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

mainAppResource: MainAppResource,
mainClass: String,
Expand All @@ -58,7 +65,7 @@ private[spark] object ClientArguments {
}

require(mainAppResource.isDefined,
"Main app resource must be defined by either --primary-py-file or --primary-java-resource.")
"Main app resource must be defined by --primary-java-resource.")
require(mainClass.isDefined, "Main class must be specified via --main-class")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainAppResource need not be valid here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For case of spark examples, spark thrift server, etc - there is no main app resource - the bits are bundled as part of spark itself.
You can take a look at how yarn or mesos handles this case : I would assume something similar should suffice (look for usage of SparkLauncher.NO_RESOURCE)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, removed the check.


ClientArguments(
Expand All @@ -68,6 +75,19 @@ private[spark] object ClientArguments {
}
}

/**
* Submits a Spark application to run on Kubernetes by creating the driver pod and starting a
* watcher that monitors and logs the application status. Waits for the application to terminate if
* spark.kubernetes.submission.waitAppCompletion is true.
*
* @param submissionSteps steps that collectively configure the driver
* @param submissionSparkConf the submission client Spark configuration
* @param kubernetesClient the client to talk to the Kubernetes API server
* @param waitForAppCompletion a flag indicating whether the client should wait for the application
* to complete
* @param appName the application name
* @param loggingPodStatusWatcher a watcher that monitors and logs the application status
*/
private[spark] class Client(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add comments to describe the usage of this class, and the meaning of each params.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

submissionSteps: Seq[DriverConfigurationStep],
submissionSparkConf: SparkConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ private[spark] class DriverConfigurationStepsOrchestrator(
submissionSparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX)
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SPARK_ROLE_LABEL also ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" +
" operations.")
s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")

val allDriverLabels = driverCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
* number.
*/
private[k8s] class LoggingPodStatusWatcherImpl(
appId: String, maybeLoggingInterval: Option[Long])
extends LoggingPodStatusWatcher with Logging {
appId: String,
maybeLoggingInterval: Option[Long])
extends LoggingPodStatusWatcher with Logging {

private val podCompletedFuture = new CountDownLatch(1)
// start timer for periodic logging
Expand All @@ -66,10 +67,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
override def eventReceived(action: Action, pod: Pod): Unit = {
this.pod = Option(pod)
action match {
case Action.DELETED =>
closeWatch()

case Action.ERROR =>
case Action.DELETED | Action.ERROR =>
closeWatch()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for hasCompleted ? Are all error's non-recoverable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasCompleted is checked below. Refactored the cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is checked only for case of action != DELETED and action != ERROR right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because in both DELETE and ERROR cases, the application is considered being terminated. In other cases, we need to check the phase of the driver pod to determine if the application terminated.


case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ private[spark] class BaseDriverConfigurationStep(
" Spark bookkeeping operations.")

val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build())
.map(env =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map { env =>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build())

val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ private[spark] class DriverKubernetesCredentialsStep(

private class OptionSettableSparkConf(sparkConf: SparkConf) {
def setOption(configEntry: String, option: Option[String]): SparkConf = {
option.map( opt => {
option.map( opt =>
sparkConf.set(configEntry, opt)
}).getOrElse(sparkConf)
).getOrElse(sparkConf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ private[spark] class DriverServiceBootstrapStep(
} else {
val randomServiceId = clock.getTimeMillis()
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is" +
s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" +
" as the driver service's name.")
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}

Expand Down