Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
999ec13
[SPARK-22570][SQL] Avoid to create a lot of global variables by using…
kiszk Nov 30, 2017
6ac57fd
[SPARK-21417][SQL] Infer join conditions using propagated constraints
Nov 30, 2017
bcceab6
[SPARK-22489][SQL] Shouldn't change broadcast join buildSide if user …
wangyum Nov 30, 2017
f5f8e84
[SPARK-22614] Dataset API: repartitionByRange(...)
adrian-ionescu Nov 30, 2017
7e5f669
[SPARK-22428][DOC] Add spark application garbage collector configurat…
gaborgsomogyi Dec 1, 2017
7da1f57
[SPARK-22373] Bump Janino dependency version to fix thread safety issue…
Victsm Dec 1, 2017
dc36542
[SPARK-22653] executorAddress registered in CoarseGrainedSchedulerBac…
tgravescs Dec 1, 2017
16adaf6
[SPARK-22601][SQL] Data load is getting displayed successful on provi…
sujith71955 Dec 1, 2017
9d06a9e
[SPARK-22393][SPARK-SHELL] spark-shell can't find imported types in c…
mpetruska Dec 1, 2017
ee10ca7
[SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus
zsxwing Dec 1, 2017
aa4cf2b
[SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients fo…
HyukjinKwon Dec 2, 2017
d2cf95a
[SPARK-22634][BUILD] Update Bouncy Castle to 1.58
srowen Dec 2, 2017
f23dddf
[SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileFormat based o…
dongjoon-hyun Dec 3, 2017
2c16267
[SPARK-22669][SQL] Avoid unnecessary function calls in code generation
mgaido91 Dec 3, 2017
dff440f
[SPARK-22626][SQL] deals with wrong Hive's statistics (zero rowCount)
wangyum Dec 3, 2017
4131ad0
[SPARK-22489][DOC][FOLLOWUP] Update broadcast behavior changes in mig…
wangyum Dec 4, 2017
3927bb9
[SPARK-22473][FOLLOWUP][TEST] Remove deprecated Date functions
mgaido91 Dec 4, 2017
f81401e
[SPARK-22162] Executors and the driver should use consistent JobIDs i…
Dec 4, 2017
e1dd03e
[SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.
Dec 4, 2017
dcaac45
Spark on Kubernetes - basic submission client
liyinan926 Nov 10, 2017
27c67ff
Addressed first round of review comments
liyinan926 Nov 27, 2017
6d597d0
Made Client implement the SparkApplication trait
liyinan926 Nov 28, 2017
5b9fa39
Addressed the second round of comments
liyinan926 Nov 28, 2017
5ccadb5
Added missing step for supporting local:// dependencies and addressed…
liyinan926 Nov 30, 2017
12f2797
Fixed Scala style check errors
liyinan926 Nov 30, 2017
c35fe48
Addressed another round of comments
liyinan926 Dec 4, 2017
faa2849
Rebased on master and added a constant val for the Client class
liyinan926 Dec 4, 2017
347ed69
Addressed another major round of comments
liyinan926 Dec 5, 2017
0e8ca01
Addressed one more round of comments
liyinan926 Dec 5, 2017
3a0b8e3
Removed mentioning of kubernetes-namespace
liyinan926 Dec 6, 2017
83d0b9c
Fixed a couple of bugs found during manual tests
liyinan926 Dec 7, 2017
44c40b1
Guard against client mode in SparkContext
liyinan926 Dec 8, 2017
67bc847
Added libc6-compat into the base docker image
liyinan926 Dec 8, 2017
7d2b303
Addressed latest comments
liyinan926 Dec 8, 2017
caf2206
Addressed docs comments
liyinan926 Dec 9, 2017
2e7810b
Fixed a comment
liyinan926 Dec 11, 2017
cbcd30e
Addressed latest comments
liyinan926 Dec 11, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Spark on Kubernetes - basic submission client
  • Loading branch information
liyinan926 committed Dec 4, 2017
commit dcaac45bc7cdf9c1358e46380fbc18ff871c9f2e
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>kubernetes</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
37 changes: 31 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES

// Deploy modes
private val CLIENT = 1
Expand Down Expand Up @@ -257,6 +258,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
Expand Down Expand Up @@ -302,6 +304,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand All @@ -322,6 +330,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER

if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
Expand Down Expand Up @@ -556,20 +565,24 @@ object SparkSubmit extends CommandLineUtils with Logging {
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),

// Kubernetes only
OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.kubernetes.namespace"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
confKey = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
confKey = "spark.driver.supervise"),
Expand Down Expand Up @@ -703,6 +716,18 @@ object SparkSubmit extends CommandLineUtils with Logging {
}
}

if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += "--arg"
childArgs += arg
}
}
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var submissionToRequestStatusFor: String = null
var useRest: Boolean = true // used internally

// Kubernetes only
var kubernetesNamespace: String = null

/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
Expand Down Expand Up @@ -199,6 +202,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull

kubernetesNamespace = Option(kubernetesNamespace)
.orElse(sparkProperties.get("spark.kubernetes.namespace"))
.orNull

// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
val uri = new URI(primaryResource)
Expand Down Expand Up @@ -454,6 +461,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KEYTAB =>
keytab = value

case KUBERNETES_NAMESPACE =>
kubernetesNamespace = value

case HELP =>
printUsageAndExit(0)

Expand Down Expand Up @@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| the node running the Application Master via the Secure
| Distributed Cache, for renewing the login tickets and the
| delegation tokens periodically.
|
| Kubernetes only:
| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the
| application must be launched. The namespace must already
| exist in the cluster. (Default: default).
""".stripMargin
)

Expand Down
26 changes: 26 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,32 @@ class SparkSubmitSuite
conf.get("spark.ui.enabled") should be ("false")
}

test("handles k8s cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--master", "k8s://host:port",
"--executor-memory", "5g",
"--class", "org.SomeClass",
"--kubernetes-namespace", "foo",
"--driver-memory", "4g",
"--conf", "spark.kubernetes.driver.docker.image=bar",
"/home/thejar.jar",
"arg1")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)

val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar"))
childArgsMap.get("--main-class") should be (Some("org.SomeClass"))
childArgsMap.get("--arg") should be (Some("arg1"))
mainClass should be ("org.apache.spark.deploy.k8s.submit.Client")
classpath should have length (0)
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.kubernetes.namespace") should be ("foo")
conf.get("spark.kubernetes.driver.docker.image") should be ("bar")
}

test("handles confs with flag equivalents") {
val clArgs = Seq(
"--deploy-mode", "cluster",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,16 @@ class SparkSubmitOptionParser {
protected final String PRINCIPAL = "--principal";
protected final String QUEUE = "--queue";

// Kubernetes-only options.
protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace";

/**
* This is the canonical list of spark-submit options. Each entry in the array contains the
* different aliases for the same option; the first element of each entry is the "official"
* name of the option, passed to {@link #handle(String, String)}.
* <p>
* Options not listed here nor in the "switch" list below will result in a call to
* {@link $#handleUnknown(String)}.
* {@link #handleUnknown(String)}.
* <p>
* These two arrays are visible for tests.
*/
Expand Down Expand Up @@ -115,6 +118,7 @@ class SparkSubmitOptionParser {
{ REPOSITORIES },
{ STATUS },
{ TOTAL_EXECUTOR_CORES },
{ KUBERNETES_NAMESPACE },
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s

import java.util.concurrent.TimeUnit

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
Expand All @@ -30,6 +32,12 @@ private[spark] object Config extends Logging {
.stringConf
.createWithDefault("default")

val DRIVER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.driver.docker.image")
.doc("Docker image to use for the driver. Specify this using the standard Docker tag format.")
.stringConf
.createOptional

val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.executor.docker.image")
.doc("Docker image to use for the executors. Specify this using the standard Docker tag " +
Expand All @@ -44,9 +52,9 @@ private[spark] object Config extends Logging {
.checkValues(Set("Always", "Never", "IfNotPresent"))
.createWithDefault("IfNotPresent")

val APISERVER_AUTH_DRIVER_CONF_PREFIX =
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
Expand All @@ -55,14 +63,35 @@ private[spark] object Config extends Logging {
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"

val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses " +
"this service account when requesting executor pods from the API server. If specific " +
"credentials are given for the driver pod to use, the driver will favor " +
"using those credentials instead.")
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses" +
" this service account when requesting executor pods from the API server. If specific" +
" credentials are given for the driver pod to use, the driver will favor" +
" using those credentials instead.")
.stringConf
.createOptional

val KUBERNETES_DRIVER_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.driver.limit.cores")
.doc("Specify the hard cpu limit for the driver pod")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
.stringConf
.createOptional

val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
.doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the" +
" driver submission server. This is memory that accounts for things like VM overheads," +
" interned strings, other native overheads, etc. This tends to grow with the driver's" +
" memory size (typically 6-10%).")
.bytesConf(ByteUnit.MiB)
.createOptional

// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
Expand All @@ -74,9 +103,6 @@ private[spark] object Config extends Logging {
.bytesConf(ByteUnit.MiB)
.createOptional

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."

val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
Expand Down Expand Up @@ -104,12 +130,6 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
.createWithDefault(1)

val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
.doc("Maximum number of attempts allowed for checking the reason of an executor loss " +
Expand All @@ -119,5 +139,45 @@ private[spark] object Config extends Logging {
"must be a positive integer")
.createWithDefault(10)

val WAIT_FOR_APP_COMPLETION =
ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the" +
" launcher process.")
.booleanConf
.createWithDefault(true)

val REPORT_INTERVAL =
ConfigBuilder("spark.kubernetes.report.interval")
.doc("Interval between reports of the current app status in cluster mode.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."

val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."

def getK8sMasterUrl(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
}
val masterWithoutK8sPrefix = rawMasterString.replaceFirst("k8s://", "")
if (masterWithoutK8sPrefix.startsWith("http://")
|| masterWithoutK8sPrefix.startsWith("https://")) {
masterWithoutK8sPrefix
} else {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved" +
s" URL is $resolvedURL")
resolvedURL
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,30 @@ private[spark] object Constants {
val SPARK_POD_DRIVER_ROLE = "driver"
val SPARK_POD_EXECUTOR_ROLE = "executor"

// Annotations
val SPARK_APP_NAME_ANNOTATION = "spark-app-name"

// Credentials secrets
val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
"/mnt/secrets/spark-kubernetes-credentials"
val DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME = "ca-cert"
val DRIVER_CREDENTIALS_CA_CERT_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME"
val DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME = "client-key"
val DRIVER_CREDENTIALS_CLIENT_KEY_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME"
val DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME = "client-cert"
val DRIVER_CREDENTIALS_CLIENT_CERT_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME"
val DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME = "oauth-token"
val DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"

// Default and fixed ports
val DEFAULT_DRIVER_PORT = 7078
val DEFAULT_BLOCKMANAGER_PORT = 7079
val DRIVER_PORT_NAME = "driver-rpc-port"
val BLOCK_MANAGER_PORT_NAME = "blockmanager"
val EXECUTOR_PORT_NAME = "executor"

Expand All @@ -42,9 +63,16 @@ private[spark] object Constants {
val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_EXECUTOR_EXTRA_CLASSPATH"
val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
val ENV_SUBMIT_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH"
val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN_MIB = 384L
}
Loading