Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f6fdd6a
Spark on Kubernetes - basic scheduler backend
foxish Sep 15, 2017
75e31a9
Adding to modules.py and SparkBuild.scala
foxish Oct 17, 2017
cf82b21
Exclude from unidoc, update travis
foxish Oct 17, 2017
488c535
Address a bunch of style and other comments
foxish Oct 17, 2017
82b79a7
Fix some style concerns
foxish Oct 18, 2017
c052212
Clean up YARN constants, unit test updates
foxish Oct 20, 2017
c565c9f
Couple of more style comments
foxish Oct 20, 2017
2fb596d
Address CR comments.
mccheah Oct 25, 2017
992acbe
Extract initial executor count to utils class
mccheah Oct 25, 2017
b0a5839
Fix scalastyle
mccheah Oct 25, 2017
a4f9797
Fix more scalastyle
mccheah Oct 25, 2017
2b5dcac
Pin down app ID in tests. Fix test style.
mccheah Oct 26, 2017
018f4d8
Address comments.
mccheah Nov 1, 2017
4b32134
Various fixes to the scheduler
mccheah Nov 1, 2017
6cf4ed7
Address comments
mccheah Nov 4, 2017
1f271be
Update fabric8 client version to 3.0.0
foxish Nov 13, 2017
71a971f
Addressed more comments
liyinan926 Nov 13, 2017
0ab9ca7
One more round of comments
liyinan926 Nov 14, 2017
7f14b71
Added a comment regarding how failed executor pods are handled
liyinan926 Nov 15, 2017
7afce3f
Addressed more comments
liyinan926 Nov 21, 2017
b75b413
Fixed Scala style error
liyinan926 Nov 21, 2017
3b587b4
Removed unused parameter in parsePrefixedKeyValuePairs
liyinan926 Nov 22, 2017
cb12fec
Another round of comments
liyinan926 Nov 22, 2017
ae396cf
Addressed latest comments
liyinan926 Nov 27, 2017
f8e3249
Addressed comments around licensing on new dependencies
liyinan926 Nov 27, 2017
a44c29e
Fixed unit tests and made maximum executor lost reason checks configu…
liyinan926 Nov 27, 2017
4bed817
Removed default value for executor Docker image
liyinan926 Nov 27, 2017
c386186
Close the executor pod watcher before deleting the executor pods
liyinan926 Nov 27, 2017
b85cfc4
Addressed more comments
liyinan926 Nov 28, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed more comments
  • Loading branch information
liyinan926 committed Nov 13, 2017
commit 71a971f2108fd9e04532fe73ecbd1ec00b36d132
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ package org.apache.spark.deploy.k8s
import org.apache.spark.SparkConf

private[spark] object ConfigurationUtils {

/**
* Extract and parse Spark configuration properties with a given name prefix and
* return the result as a Map. Keys must not have more than one value.
*
* @param sparkConf Spark configuration
* @param prefix the given property name prefix
* @param configType a descriptive note on the type of entities of interest
* @return a Map storing the configuration property keys and values
*/
def parsePrefixedKeyValuePairs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add comment to explain what does the function do, it not only return the configs, but also ensure no duplicate configs are set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

sparkConf: SparkConf,
prefix: String,
Expand All @@ -34,15 +44,6 @@ private[spark] object ConfigurationUtils {
fromPrefix.toMap
}

def requireSecondIfFirstIsDefined(
opt1: Option[_],
opt2: Option[_],
errMessageWhenSecondIsMissing: String): Unit = {
opt1.foreach { _ =>
require(opt2.isDefined, errMessageWhenSecondIsMissing)
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ private[spark] object SparkKubernetesClientFactory {
ConfigurationUtils.requireNandDefined(
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
s" value $oauthTokenConf.")
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a " +
s"value $oauthTokenConf.")

val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ package object config extends Logging {
.stringConf
.createWithDefault("IfNotPresent")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: duplicated empty lines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.


private[spark] val APISERVER_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
Expand Down Expand Up @@ -95,12 +94,14 @@ package object config extends Logging {
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
.intConf
.checkValue(value => value > 0, "Allocation batch size should be a positive integer")
.createWithDefault(5)

private[spark] val KUBERNETES_ALLOCATION_BATCH_DELAY =
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
.doc("Number of seconds to wait between each round of executor allocation.")
.longConf
.checkValue(value => value > 0, s"Allocation batch delay should be a positive integer")
.createWithDefault(1)

private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
Expand Down Expand Up @@ -77,11 +77,8 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required ? IIRC as of 2.0 the config was removed/is not longer used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used to set the environment variable SPARK_EXECUTOR_PORT in the executor container, which then gets used to set spark.executor.port for the executor. If it's no longer used by Spark, it's not required.

private val blockmanagerPort = sparkConf
private val blockManagerPort = sparkConf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we assuming the port's will always be available (for executor and driver) to bind to ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - because we're deploying the driver and executors in a containerized setting, we're a lot more free to make looser assumptions about available ports. The exception will be if Spark applications are running sidecar containers along with the main driver/executor containers, but support for that is further out in the future when/if we expect Pod Presets to interact with our code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I wanted to make sure I understood it right that we are making the assumption about port being unbound and available for spark.

private val kubernetesDriverPodName = sparkConf
.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(throw new SparkException("Must specify the driver pod name"))

private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)

Expand Down Expand Up @@ -163,7 +160,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
val requiredPorts = Seq(
(EXECUTOR_PORT_NAME, executorPort),
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map(port => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map { case (name, port) => ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

new ContainerPortBuilder()
.withName(port._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
new TaskSchedulerImpl(sc)
}

override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler)
: SchedulerBackend = {
override def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
val sparkConf = sc.getConf

val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)

private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
require(podAllocationInterval > 0, "Allocation batch delay " +
s"${KUBERNETES_ALLOCATION_BATCH_DELAY} " +
s"is ${podAllocationInterval}, should be a positive integer")

private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
require(podAllocationSize > 0, "Allocation batch size " +
s"${KUBERNETES_ALLOCATION_BATCH_SIZE} " +
s"is ${podAllocationSize}, should be a positive integer")

private val allocatorRunnable = new Runnable {

Expand Down Expand Up @@ -304,39 +298,40 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1

override def eventReceived(action: Action, pod: Pod): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can also consider to use case match for action,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running"
&& pod.getMetadata.getDeletionTimestamp == null) {
val podIP = pod.getStatus.getPodIP
val clusterNodeName = pod.getSpec.getNodeName
logInfo(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.")
executorPodsByIPs.put(podIP, pod)
} else if (action == Action.DELETED || action == Action.ERROR) {
val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
require(executorId != null, "Unexpected pod metadata; expected all executor pods" +
s" to have label $SPARK_EXECUTOR_ID_LABEL.")
val podName = pod.getMetadata.getName
val podIP = pod.getStatus.getPodIP
logDebug(s"Executor pod $podName at IP $podIP was at $action.")
if (podIP != null) {
executorPodsByIPs.remove(podIP)
}
val executorExitReason = if (action == Action.ERROR) {
logWarning(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
executorExitReasonOnError(pod)
} else if (action == Action.DELETED) {
logWarning(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
executorExitReasonOnDelete(pod)
} else {
throw new IllegalStateException(
s"Unknown action that should only be DELETED or ERROR: $action")
}
podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
log.warn(s"Executor with id $executorId was not marked as disconnected, but the" +
s" watch received an event of type $action for this executor. The executor may" +
s" have failed to start in the first place and never registered with the driver.")
}
disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
action match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this block need a default match? Seems like you could get a MatchError if you don't get the right set of inputs here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a default match.

case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
&& pod.getMetadata.getDeletionTimestamp == null) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent one extra level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val podIP = pod.getStatus.getPodIP
val clusterNodeName = pod.getSpec.getNodeName
logInfo(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.")
executorPodsByIPs.put(podIP, pod)
case Action.DELETED | Action.ERROR =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add blank line before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val executorId = getExecutorId(pod)
val podName = pod.getMetadata.getName
val podIP = pod.getStatus.getPodIP
logDebug(s"Executor pod $podName at IP $podIP was at $action.")
if (podIP != null) {
executorPodsByIPs.remove(podIP)
}

val executorExitReason = if (action == Action.ERROR) {
logWarning(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
executorExitReasonOnError(pod)
} else if (action == Action.DELETED) {
logWarning(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
executorExitReasonOnDelete(pod)
} else {
throw new IllegalStateException(
s"Unknown action that should only be DELETED or ERROR: $action")
}
podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)

if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
log.warn(s"Executor with id $executorId was not marked as disconnected, but the" +
s" watch received an event of type $action for this executor. The executor may" +
s" have failed to start in the first place and never registered with the driver.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove s.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
}
}

Expand Down Expand Up @@ -391,6 +386,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
ExecutorExited(
getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fits in previous line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

def getExecutorId(pod: Pod): String = {
val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
require(executorId != null, "Unexpected pod metadata; expected all executor pods " +
s"to have label $SPARK_EXECUTOR_ID_LABEL.")
executorId
}
}

override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
Expand Down