Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f6fdd6a
Spark on Kubernetes - basic scheduler backend
foxish Sep 15, 2017
75e31a9
Adding to modules.py and SparkBuild.scala
foxish Oct 17, 2017
cf82b21
Exclude from unidoc, update travis
foxish Oct 17, 2017
488c535
Address a bunch of style and other comments
foxish Oct 17, 2017
82b79a7
Fix some style concerns
foxish Oct 18, 2017
c052212
Clean up YARN constants, unit test updates
foxish Oct 20, 2017
c565c9f
Couple of more style comments
foxish Oct 20, 2017
2fb596d
Address CR comments.
mccheah Oct 25, 2017
992acbe
Extract initial executor count to utils class
mccheah Oct 25, 2017
b0a5839
Fix scalastyle
mccheah Oct 25, 2017
a4f9797
Fix more scalastyle
mccheah Oct 25, 2017
2b5dcac
Pin down app ID in tests. Fix test style.
mccheah Oct 26, 2017
018f4d8
Address comments.
mccheah Nov 1, 2017
4b32134
Various fixes to the scheduler
mccheah Nov 1, 2017
6cf4ed7
Address comments
mccheah Nov 4, 2017
1f271be
Update fabric8 client version to 3.0.0
foxish Nov 13, 2017
71a971f
Addressed more comments
liyinan926 Nov 13, 2017
0ab9ca7
One more round of comments
liyinan926 Nov 14, 2017
7f14b71
Added a comment regarding how failed executor pods are handled
liyinan926 Nov 15, 2017
7afce3f
Addressed more comments
liyinan926 Nov 21, 2017
b75b413
Fixed Scala style error
liyinan926 Nov 21, 2017
3b587b4
Removed unused parameter in parsePrefixedKeyValuePairs
liyinan926 Nov 22, 2017
cb12fec
Another round of comments
liyinan926 Nov 22, 2017
ae396cf
Addressed latest comments
liyinan926 Nov 27, 2017
f8e3249
Addressed comments around licensing on new dependencies
liyinan926 Nov 27, 2017
a44c29e
Fixed unit tests and made maximum executor lost reason checks configu…
liyinan926 Nov 27, 2017
4bed817
Removed default value for executor Docker image
liyinan926 Nov 27, 2017
c386186
Close the executor pod watcher before deleting the executor pods
liyinan926 Nov 27, 2017
b85cfc4
Addressed more comments
liyinan926 Nov 28, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
One more round of comments
  • Loading branch information
liyinan926 committed Nov 14, 2017
commit 0ab9ca7b2056c0de7a820fa7bb9391227bcf5275
10 changes: 1 addition & 9 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,7 @@
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,7 @@ private[spark] object ConfigurationUtils {
sparkConf: SparkConf,
prefix: String,
configType: String): Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we use this parameter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not really using it in the context of this PR. Removed this parameter.

val fromPrefix = sparkConf.getAllWithPrefix(prefix)
fromPrefix.groupBy(_._1).foreach {
case (key, values) =>
require(values.size == 1,
s"Cannot have multiple values for a given $configType key, got key $key with" +
s" values $values")
}
fromPrefix.toMap
sparkConf.getAllWithPrefix(prefix).toMap
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ private[spark] object SparkKubernetesClientFactory {
namespace: Option[String],
kubernetesAuthConfPrefix: String,
sparkConf: SparkConf,
maybeServiceAccountToken: Option[File],
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
defaultServiceAccountToken: Option[File],
defaultServiceAccountCaCert: Option[File]): KubernetesClient = {
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not create constants like for other config options?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lacks context from the spark-submit implementation that is not in this PR.

We intend to have two different sets of authentication options for the Kubernetes API. The first is the credentials for creating a driver pod and all the Kubernetes resources that the application requires outside of executor pods. The second is a set of credentials that the driver can use to create executor pods. These options will have shared suffixes in the configuration keys but different prefixes.

The reasoning for two sets of credentials is twofold:

  • The driver needs strictly fewer privileges than spark-submit, because the driver only creates + deletes pods but spark-submit needs to make pods and other Kubernetes resources. Two sets of credentials allows the driver to have an appropriately limited scope of API access.
  • Part of the credentials includes TLS certificates for accessing the Kubernetes API over HTTPs. A common environment is to have the Kubernetes API server be accessible from a proxy into the cluster from an outside location, but then the driver will access the API server from inside the cluster. A front door for the API server typically asks for a different certificate than the certificate one would present when accessing the API server internally.

.map(new File(_))
.orElse(maybeServiceAccountToken)
.orElse(defaultServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
ConfigurationUtils.requireNandDefined(
oauthTokenFile,
Expand All @@ -56,7 +56,7 @@ private[spark] object SparkKubernetesClientFactory {

val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
.orElse(defaultServiceAccountCaCert.map(_.getAbsolutePath))
val clientKeyFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand from the response to @vanzin, this is used in both spark-submit and in driver ?
I am assuming clientKeyFile is private key for the submiting user ?

Assuming both are true - how is the private key transmitted and secured ? Are there any security concerns ?
Is it available only at the driver or available/required at executors too ? If yes, any concerns there ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The submission client will have a bootstrap to send these over via a Kubernetes secret volume. This secret material isn't necessarily used by spark-submit itself, but spark-submit provides this secret material through said volume. This should only be read by the driver; we don't mount secrets for Kubernetes credentials into executor pods in ExecutorPodFactory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying.
I am assuming the key is used for all requests submitted transparently - that is without any explicit use of it ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client is configured to always use this value in HTTP requests once the client object is constructed by the builder.

val clientCertFile = sparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,45 @@
*/
package org.apache.spark.deploy.k8s

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

package object config extends Logging {
private[spark] object config extends Logging {

private[spark] val KUBERNETES_NAMESPACE =
val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
.doc("The namespace that will be used for running the driver and executor pods. When using" +
" spark-submit in cluster mode, this can also be passed to spark-submit via the" +
" --kubernetes-namespace command line argument.")
.stringConf
.createWithDefault("default")

private[spark] val EXECUTOR_DOCKER_IMAGE =
val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.executor.docker.image")
.doc("Docker image to use for the executors. Specify this using the standard Docker tag" +
" format.")
.stringConf
.createWithDefault(s"spark-executor:$sparkVersion")
.createWithDefault(s"spark-executor:$SPARK_VERSION")

private[spark] val DOCKER_IMAGE_PULL_POLICY =
val DOCKER_IMAGE_PULL_POLICY =
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
.doc("Docker image pull policy when pulling any docker image in Kubernetes integration")
.stringConf
.createWithDefault("IfNotPresent")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: duplicated empty lines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

private[spark] val APISERVER_AUTH_DRIVER_CONF_PREFIX =
val APISERVER_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
private[spark] val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
private[spark] val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
private[spark] val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
private[spark] val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
private[spark] val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"

private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME =
val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses" +
" this service account when requesting executor pods from the API server. If specific" +
Expand All @@ -66,49 +66,49 @@ package object config extends Logging {
// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
.doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This" +
" is memory that accounts for things like VM overheads, interned strings, other native" +
" overheads, etc. This tends to grow with the executor size. (typically 6-10%).")
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
private[spark] val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."

private[spark] val KUBERNETES_DRIVER_POD_NAME =
val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
.doc("Prefix to use in front of the executor pod names.")
.internal()
.stringConf
.createWithDefault("spark")

private[spark] val KUBERNETES_ALLOCATION_BATCH_SIZE =
val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
.intConf
.checkValue(value => value > 0, "Allocation batch size should be a positive integer")
.createWithDefault(5)

private[spark] val KUBERNETES_ALLOCATION_BATCH_DELAY =
val KUBERNETES_ALLOCATION_BATCH_DELAY =
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
.doc("Number of seconds to wait between each round of executor allocation.")
.longConf
.checkValue(value => value > 0, s"Allocation batch delay should be a positive integer")
.createWithDefault(1)

private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
.stringConf
.createOptional

private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.util.Utils

/**
* Configures executor pods. Construct one of these with a SparkConf to set up properties that are
* common across all executors. Then, pass in dynamic parameters into createExecutorPod.
* A factory class for configuring and creating executor pods.
*/
private[spark] trait ExecutorPodFactory {

/**
* Configure and construct an executor pod with the given parameters.
*/
def createExecutorPod(
executorId: String,
applicationId: String,
Expand Down Expand Up @@ -161,12 +164,12 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
val requiredPorts = Seq(
(EXECUTOR_PORT_NAME, executorPort),
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map(port => {
.map { case (name, port) =>
new ContainerPortBuilder()
.withName(port._1)
.withContainerPort(port._2)
.withName(name)
.withContainerPort(port)
.build()
})
}

val executorContainer = new ContainerBuilder()
.withName(s"executor")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove s.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Expand Down Expand Up @@ -202,16 +205,15 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
.endSpec()
.build()

val containerWithExecutorLimitCores = executorLimitCores.map {
limitCores =>
val executorCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
new ContainerBuilder(executorContainer)
.editResources()
.addToLimits("cpu", executorCpuLimitQuantity)
.endResources()
.build()
val containerWithExecutorLimitCores = executorLimitCores.map { limitCores =>
val executorCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
new ContainerBuilder(executorContainer)
.editResources()
.addToLimits("cpu", executorCpuLimitQuantity)
.endResources()
.build()
}.getOrElse(executorContainer)

new PodBuilder(executorPod)
Expand Down
Loading