Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,31 @@ package object config {
.createOptional
}

private[spark] val CREDENTIAL_PRINCIPAL =
ConfigBuilder("spark.mesos.principal")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we had better have .doc like the previous conf SECRET_FILENAMES at line 50 ~ 56?

.doc("Name of the Kerberos principal to authenticate Spark to Mesos.")
.stringConf
.createOptional

private[spark] val CREDENTIAL_PRINCIPAL_FILE =
ConfigBuilder("spark.mesos.principal.file")
.doc("The path of file which contains the name of the Kerberos principal " +
"to authenticate Spark to Mesos.")
.stringConf
.createOptional

private[spark] val CREDENTIAL_SECRET =
ConfigBuilder("spark.mesos.secret")
.doc("The secret value to authenticate Spark to Mesos.")
.stringConf
.createOptional

private[spark] val CREDENTIAL_SECRET_FILE =
ConfigBuilder("spark.mesos.secret.file")
.doc("The path of file which contains the secret value to authenticate Spark to Mesos.")
.stringConf
.createOptional

/* Common app configuration. */

private[spark] val SHUFFLE_CLEANER_INTERVAL_S =
Expand Down Expand Up @@ -85,6 +110,11 @@ package object config {
.stringConf
.createOptional

private[spark] val DRIVER_WEBUI_URL =
ConfigBuilder("spark.mesos.driver.webui.url")
.stringConf
.createOptional

private[spark] val driverSecretConfig = new MesosSecretConfig("driver")

private[spark] val executorSecretConfig = new MesosSecretConfig("executor")
Expand Down Expand Up @@ -118,6 +148,143 @@ package object config {
.stringConf
.createWithDefault("")

private[spark] val DRIVER_FRAMEWORK_ID =
ConfigBuilder("spark.mesos.driver.frameworkId")
.stringConf
.createOptional

private[spark] val EXECUTOR_URI =
ConfigBuilder("spark.executor.uri").stringConf.createOptional

private[spark] val PROXY_BASE_URL =
ConfigBuilder("spark.mesos.proxy.baseURL").stringConf.createOptional

private[spark] val COARSE_MODE =
ConfigBuilder("spark.mesos.coarse").booleanConf.createWithDefault(true)

private[spark] val COARSE_SHUTDOWN_TIMEOUT =
ConfigBuilder("spark.mesos.coarse.shutdownTimeout")
.timeConf(TimeUnit.MILLISECONDS)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add .checkValue(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") here.

.checkValue(_ >= 0, s"spark.mesos.coarse.shutdownTimeout must be >= 0")
.createWithDefaultString("10s")

private[spark] val MAX_DRIVERS =
ConfigBuilder("spark.mesos.maxDrivers").intConf.createWithDefault(200)

private[spark] val RETAINED_DRIVERS =
ConfigBuilder("spark.mesos.retainedDrivers").intConf.createWithDefault(200)

private[spark] val CLUSTER_RETRY_WAIT_MAX_SECONDS =
ConfigBuilder("spark.mesos.cluster.retry.wait.max")
.intConf
.createWithDefault(60) // 1 minute

private[spark] val ENABLE_FETCHER_CACHE =
ConfigBuilder("spark.mesos.fetcherCache.enable")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you are here, can we copy a documentation description in running-on-mesos.md to .doc for the new configurations? At least, for the already documented ones, I think we can have.

.booleanConf
.createWithDefault(false)

private[spark] val APP_JAR_LOCAL_RESOLUTION_MODE =
ConfigBuilder("spark.mesos.appJar.local.resolution.mode")
.stringConf
.checkValues(Set("host", "container"))
.createWithDefault("host")

private[spark] val REJECT_OFFER_DURATION =
ConfigBuilder("spark.mesos.rejectOfferDuration")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("120s")

private[spark] val REJECT_OFFER_DURATION_FOR_UNMET_CONSTRAINTS =
ConfigBuilder("spark.mesos.rejectOfferDurationForUnmetConstraints")
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val REJECT_OFFER_DURATION_FOR_REACHED_MAX_CORES =
ConfigBuilder("spark.mesos.rejectOfferDurationForReachedMaxCores")
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val URIS_TO_DOWNLOAD =
ConfigBuilder("spark.mesos.uris")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val EXECUTOR_HOME =
ConfigBuilder("spark.mesos.executor.home")
.stringConf
.createOptional

private[spark] val EXECUTOR_CORES =
ConfigBuilder("spark.mesos.mesosExecutor.cores")
.doubleConf
.createWithDefault(1)

private[spark] val EXTRA_CORES_PER_EXECUTOR =
ConfigBuilder("spark.mesos.extra.cores")
.intConf
.createWithDefault(0)

private[spark] val EXECUTOR_MEMORY_OVERHEAD =
ConfigBuilder("spark.mesos.executor.memoryOverhead")
.intConf
.createOptional

private[spark] val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.mesos.executor.docker.image")
.stringConf
.createOptional

private[spark] val EXECUTOR_DOCKER_FORCE_PULL_IMAGE =
ConfigBuilder("spark.mesos.executor.docker.forcePullImage")
.booleanConf
.createOptional

private[spark] val EXECUTOR_DOCKER_PORT_MAPS =
ConfigBuilder("spark.mesos.executor.docker.portmaps")
.stringConf
.toSequence
.createOptional

private[spark] val EXECUTOR_DOCKER_PARAMETERS =
ConfigBuilder("spark.mesos.executor.docker.parameters")
.stringConf
.toSequence
.createOptional

private[spark] val EXECUTOR_DOCKER_VOLUMES =
ConfigBuilder("spark.mesos.executor.docker.volumes")
.stringConf
.toSequence
.createOptional

private[spark] val MAX_GPUS =
ConfigBuilder("spark.mesos.gpus.max")
.intConf
.createWithDefault(0)

private[spark] val TASK_LABELS =
ConfigBuilder("spark.mesos.task.labels")
.stringConf
.createWithDefault("")

private[spark] val CONSTRAINTS =
ConfigBuilder("spark.mesos.constraints")
.stringConf
.createWithDefault("")

private[spark] val CONTAINERIZER =
ConfigBuilder("spark.mesos.containerizer")
.stringConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add .checkValues(Set("docker", "mesos")).

.checkValues(Set("docker", "mesos"))
.createWithDefault("docker")

private[spark] val ROLE =
ConfigBuilder("spark.mesos.role")
.stringConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add .checkValues(Set("host", "container")).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I have no idea about available values. Could you refer where I could find available values? I can't find it in Mesos doc - http://mesos.apache.org/documentation/latest/roles/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah looks like you're referring APP_JAR_LOCAL_RESOLUTION_MODE in below comment. I'll address it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It seems that I clicked a wrong one.

.createOptional

private[spark] val DRIVER_ENV_PREFIX = "spark.mesos.driverEnv."
private[spark] val DISPATCHER_DRIVER_DEFAULT_PREFIX = "spark.mesos.dispatcher.driverDefault."
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(

private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
val id = state.driverDescription.submissionId
val proxy = parent.conf.getOption("spark.mesos.proxy.baseURL")
val proxy = parent.conf.get(PROXY_BASE_URL)

val sandboxCol = if (proxy.isDefined) {
val clusterSchedulerId = parent.scheduler.getSchedulerState().frameworkId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.SparkContext
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

Expand All @@ -42,7 +43,7 @@ private[spark] class MesosClusterManager extends ExternalClusterManager {
"I/O encryption is currently not supported in Mesos.")

val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
val coarse = sc.conf.get(COARSE_MODE)
if (coarse) {
new MesosCoarseGrainedSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ private[spark] class MesosClusterScheduler(
MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
private val master = conf.get("spark.master")
private val appName = conf.get("spark.app.name")
private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
private val useFetchCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
private val queuedCapacity = conf.get(config.MAX_DRIVERS)
private val retainedDrivers = conf.get(config.RETAINED_DRIVERS)
private val maxRetryWaitTime = conf.get(config.CLUSTER_RETRY_WAIT_MAX_SECONDS)
private val useFetchCache = conf.get(config.ENABLE_FETCHER_CACHE)
private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new Object()
// Keyed by submission id
Expand Down Expand Up @@ -390,10 +390,10 @@ private[spark] class MesosClusterScheduler(
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
// TODO(mgummelt): Don't do this here. This should be passed as a --conf
val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
v => s"$v -D${config.DRIVER_FRAMEWORK_ID.key}=${getDriverFrameworkID(desc)}"
)

val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
val env = desc.conf.getAllWithPrefix(config.DRIVER_ENV_PREFIX) ++ commandEnv

val envBuilder = Environment.newBuilder()

Expand All @@ -419,22 +419,17 @@ private[spark] class MesosClusterScheduler(

private def isContainerLocalAppJar(desc: MesosDriverDescription): Boolean = {
val isLocalJar = desc.jarUrl.startsWith("local://")
val isContainerLocal = desc.conf.getOption("spark.mesos.appJar.local.resolution.mode").exists {
val isContainerLocal = desc.conf.get(config.APP_JAR_LOCAL_RESOLUTION_MODE) match {
case "container" => true
case "host" => false
case other =>
logWarning(s"Unknown spark.mesos.appJar.local.resolution.mode $other, using host.")
false
}
}
isLocalJar && isContainerLocal
}

private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
val confUris = List(conf.getOption("spark.mesos.uris"),
desc.conf.getOption("spark.mesos.uris"),
Some(desc.conf.get(SUBMIT_PYTHON_FILES).mkString(","))).flatMap(
_.map(_.split(",").map(_.trim))
).flatten
val confUris = (conf.get(config.URIS_TO_DOWNLOAD) ++
desc.conf.get(config.URIS_TO_DOWNLOAD) ++
desc.conf.get(SUBMIT_PYTHON_FILES)).toList
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is exactly the same? _.split(",").map(_.trim) - before URIS_TO_DOWNLOAD should be a comma separated list (probably not intentional, but it works) and URIS_TO_DOWNLOAD & SUBMIT_PYTHON_FILES could have whitespaces

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Element values will be trimmed as same as it was - ConfigBuilder handles it:

/** Turns the config entry into a sequence of values of the underlying type. */
def toSequence: TypedConfigBuilder[Seq[T]] = {
new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter))
}

def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
Utils.stringToSeq(str).map(converter)
}

def stringToSeq(str: String): Seq[String] = {
str.split(",").map(_.trim()).filter(_.nonEmpty)
}


if (isContainerLocalAppJar(desc)) {
(confUris ++ getDriverExecutorURI(desc).toList).map(uri =>
Expand Down Expand Up @@ -464,7 +459,7 @@ private[spark] class MesosClusterScheduler(
}

private def getDriverCommandValue(desc: MesosDriverDescription): String = {
val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
val dockerDefined = desc.conf.contains(config.EXECUTOR_DOCKER_IMAGE)
val executorUri = getDriverExecutorURI(desc)
// Gets the path to run spark-submit, and the path to the Mesos sandbox.
val (executable, sandboxPath) = if (dockerDefined) {
Expand All @@ -484,11 +479,11 @@ private[spark] class MesosClusterScheduler(
// Sandbox path points to the parent folder as we chdir into the folderBasename.
(cmdExecutable, "..")
} else {
val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
val executorSparkHome = desc.conf.get(config.EXECUTOR_HOME)
.orElse(conf.getOption("spark.home"))
.orElse(Option(System.getenv("SPARK_HOME")))
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
throw new SparkException(s"Executor Spark home `${config.EXECUTOR_HOME}` is not set!")
}
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getPath
// Sandbox points to the current directory by default with Mesos.
Expand Down Expand Up @@ -547,7 +542,7 @@ private[spark] class MesosClusterScheduler(
"spark.submit.deployMode", // this would be set to `cluster`, but we need client
"spark.master" // this contains the address of the dispatcher, not master
)
val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
val defaultConf = conf.getAllWithPrefix(config.DISPATCHER_DRIVER_DEFAULT_PREFIX).toMap
val driverConf = desc.conf.getAll
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cores - (cores % minCoresPerExecutor)
}

private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
private val useFetcherCache = conf.get(ENABLE_FETCHER_CACHE)

private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
private val maxGpus = conf.get(MAX_GPUS)

private val taskLabels = conf.get("spark.mesos.task.labels", "")
private val taskLabels = conf.get(TASK_LABELS)

private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
private[this] val shutdownTimeoutMS = conf.get(COARSE_SHUTDOWN_TIMEOUT)

// Synchronization protected by stateLock
private[this] var stopCalled: Boolean = false
Expand Down Expand Up @@ -144,11 +142,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock

private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
private val extraCoresPerExecutor = conf.get(EXTRA_CORES_PER_EXECUTOR)

// Offer constraints
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
parseConstraintString(sc.conf.get(CONSTRAINTS))

// Reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
Expand Down Expand Up @@ -208,10 +206,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.sparkUser,
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
sc.conf.get(DRIVER_WEBUI_URL).orElse(sc.ui.map(_.webUrl)),
None,
Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
sc.conf.get(DRIVER_FRAMEWORK_ID).map(_ + suffix)
)

launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
Expand Down Expand Up @@ -264,10 +262,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val uri = conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))

if (uri.isEmpty) {
val executorSparkHome = conf.getOption("spark.mesos.executor.home")
val executorSparkHome = conf.get(EXECUTOR_HOME)
.orElse(sc.getSparkHome())
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
throw new SparkException(s"Executor Spark home `$EXECUTOR_HOME` is not set!")
}
val runScript = new File(executorSparkHome, "./bin/spark-class").getPath
command.setValue(
Expand All @@ -293,7 +291,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
}

conf.getOption("spark.mesos.uris").foreach(setupUris(_, command, useFetcherCache))
setupUris(conf.get(URIS_TO_DOWNLOAD), command, useFetcherCache)

command.build()
}
Expand Down
Loading