Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f6fdd6a
Spark on Kubernetes - basic scheduler backend
foxish Sep 15, 2017
75e31a9
Adding to modules.py and SparkBuild.scala
foxish Oct 17, 2017
cf82b21
Exclude from unidoc, update travis
foxish Oct 17, 2017
488c535
Address a bunch of style and other comments
foxish Oct 17, 2017
82b79a7
Fix some style concerns
foxish Oct 18, 2017
c052212
Clean up YARN constants, unit test updates
foxish Oct 20, 2017
c565c9f
Couple of more style comments
foxish Oct 20, 2017
2fb596d
Address CR comments.
mccheah Oct 25, 2017
992acbe
Extract initial executor count to utils class
mccheah Oct 25, 2017
b0a5839
Fix scalastyle
mccheah Oct 25, 2017
a4f9797
Fix more scalastyle
mccheah Oct 25, 2017
2b5dcac
Pin down app ID in tests. Fix test style.
mccheah Oct 26, 2017
018f4d8
Address comments.
mccheah Nov 1, 2017
4b32134
Various fixes to the scheduler
mccheah Nov 1, 2017
6cf4ed7
Address comments
mccheah Nov 4, 2017
1f271be
Update fabric8 client version to 3.0.0
foxish Nov 13, 2017
71a971f
Addressed more comments
liyinan926 Nov 13, 2017
0ab9ca7
One more round of comments
liyinan926 Nov 14, 2017
7f14b71
Added a comment regarding how failed executor pods are handled
liyinan926 Nov 15, 2017
7afce3f
Addressed more comments
liyinan926 Nov 21, 2017
b75b413
Fixed Scala style error
liyinan926 Nov 21, 2017
3b587b4
Removed unused parameter in parsePrefixedKeyValuePairs
liyinan926 Nov 22, 2017
cb12fec
Another round of comments
liyinan926 Nov 22, 2017
ae396cf
Addressed latest comments
liyinan926 Nov 27, 2017
f8e3249
Addressed comments around licensing on new dependencies
liyinan926 Nov 27, 2017
a44c29e
Fixed unit tests and made maximum executor lost reason checks configu…
liyinan926 Nov 27, 2017
4bed817
Removed default value for executor Docker image
liyinan926 Nov 27, 2017
c386186
Close the executor pod watcher before deleting the executor pods
liyinan926 Nov 27, 2017
b85cfc4
Addressed more comments
liyinan926 Nov 28, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address a bunch of style and other comments
  • Loading branch information
foxish committed Oct 17, 2017
commit 488c535a16e9003aa9b73d209ede0a19e5658146
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.deploy.k8s

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.SparkConf

private[spark] object ConfigurationUtils extends Logging {
private[spark] object ConfigurationUtils {
def parsePrefixedKeyValuePairs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add comment to explain what does the function do, it not only return the configs, but also ensure no duplicate configs are set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

sparkConf: SparkConf,
prefix: String,
Expand All @@ -34,4 +33,24 @@ private[spark] object ConfigurationUtils extends Logging {
}
fromPrefix.toMap
}

def requireBothOrNeitherDefined(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shall we need the function for this PR? Seems it's just used by ResourceStagingServerSslOptionsProvider which is out of scope here.

opt1: Option[_],
opt2: Option[_],
errMessageWhenFirstIsMissing: String,
errMessageWhenSecondIsMissing: String): Unit = {
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

def requireSecondIfFirstIsDefined(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we don't need this function here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
opt1.foreach { _ =>
require(opt2.isDefined, errMessageWhenSecondIsMissing)
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] object SparkKubernetesClientFactory {
.map(new File(_))
.orElse(maybeServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
OptionRequirements.requireNandDefined(
ConfigurationUtils.requireNandDefined(
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ package object config extends Logging {
.stringConf
.createWithDefault("default")

private[spark] val DRIVER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.driver.docker.image")
.doc("Docker image to use for the driver. Specify this using the standard Docker tag format.")
.stringConf
.createWithDefault(s"spark-driver:$sparkVersion")

private[spark] val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.executor.docker.image")
.doc("Docker image to use for the executors. Specify this using the standard Docker tag" +
Expand Down Expand Up @@ -81,22 +75,8 @@ package object config extends Logging {
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
.doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the" +
" driver submission server. This is memory that accounts for things like VM overheads," +
" interned strings, other native overheads, etc. This tends to grow with the driver's" +
" memory size (typically 6-10%).")
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
private[spark] val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
private[spark] val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
private[spark] val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
private[spark] val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."

private[spark] val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,10 @@ package object constants {
private[spark] val SPARK_POD_DRIVER_ROLE = "driver"
private[spark] val SPARK_POD_EXECUTOR_ROLE = "executor"

// Credentials secrets
private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
"/mnt/secrets/spark-kubernetes-credentials"
private[spark] val DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME = "ca-cert"
private[spark] val DRIVER_CREDENTIALS_CA_CERT_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME = "client-key"
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME = "client-cert"
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME = "oauth-token"
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"

// Default and fixed ports
private[spark] val DEFAULT_DRIVER_PORT = 7078
private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079
private[spark] val DEFAULT_UI_PORT = 4040
private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager"
private[spark] val DRIVER_PORT_NAME = "driver-rpc-port"
private[spark] val EXECUTOR_PORT_NAME = "executor"

// Environment Variables
Expand All @@ -57,20 +38,11 @@ package object constants {
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
private[spark] val ENV_SUBMIT_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH"
private[spark] val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_EXECUTOR_EXTRA_CLASSPATH"
private[spark] val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"

// Miscellaneous
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
import org.apache.commons.io.FilenameUtils
import io.fabric8.kubernetes.api.model._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.ConfigurationUtils
Expand Down Expand Up @@ -48,7 +47,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)

private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs (
private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only double indent parameter declarations. This happens in a bunch of places.

KUBERNETES_EXECUTOR_LABEL_PREFIX,
"executor label")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import java.io.File
import io.fabric8.kubernetes.client.Config

import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.{ConfigurationUtils, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.internal.Logging
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.ThreadUtils

private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.net.InetAddress
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}

import scala.collection.{concurrent, mutable}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}

import io.fabric8.kubernetes.api.model._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@ package org.apache.spark.scheduler.cluster.k8s

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Pod, VolumeBuilder, VolumeMountBuilder, _}
import io.fabric8.kubernetes.api.model.{Pod, _}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.io.FilenameUtils
import org.mockito.{AdditionalAnswers, MockitoAnnotations}
import org.mockito.Matchers.{any, eq => mockitoEq}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.MockitoAnnotations
import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants
import org.apache.spark.scheduler.cluster.k8s.ExecutorPodFactoryImpl

class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
private val driverPodName: String = "driver-pod"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource}
import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.Matchers.{any, eq => mockitoEq}
import org.mockito.Mockito.{mock => _, _}
import org.mockito.Mockito.{doNothing, never, times, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.mock.MockitoSugar._

Expand Down Expand Up @@ -74,7 +74,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
.build()

private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
private type LABELLED_PODS = FilterWatchListDeletable[
private type LABELED_PODS = FilterWatchListDeletable[
Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private type IN_NAMESPACE_PODS = NonNamespaceOperation[
Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Expand Down Expand Up @@ -104,7 +104,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
private var podOperations: PODS = _

@Mock
private var podsWithLabelOperations: LABELLED_PODS = _
private var podsWithLabelOperations: LABELED_PODS = _

@Mock
private var podsInNamespace: IN_NAMESPACE_PODS = _
Expand Down