Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,31 @@ package object config {
.createOptional
}

private[spark] val CREDENTIAL_PRINCIPAL =
ConfigBuilder("spark.mesos.principal")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we had better have .doc like the previous conf SECRET_FILENAMES at line 50 ~ 56?

.doc("Name of the Kerberos principal to authenticate Spark to Mesos.")
.stringConf
.createOptional

private[spark] val CREDENTIAL_PRINCIPAL_FILE =
ConfigBuilder("spark.mesos.principal.file")
.doc("The path of file which contains the name of the Kerberos principal " +
"to authenticate Spark to Mesos.")
.stringConf
.createOptional

private[spark] val CREDENTIAL_SECRET =
ConfigBuilder("spark.mesos.secret")
.doc("The secret value to authenticate Spark to Mesos.")
.stringConf
.createOptional

private[spark] val CREDENTIAL_SECRET_FILE =
ConfigBuilder("spark.mesos.secret.file")
.doc("The path of file which contains the secret value to authenticate Spark to Mesos.")
.stringConf
.createOptional

/* Common app configuration. */

private[spark] val SHUFFLE_CLEANER_INTERVAL_S =
Expand Down Expand Up @@ -85,6 +110,13 @@ package object config {
.stringConf
.createOptional

private[spark] val DRIVER_WEBUI_URL =
ConfigBuilder("spark.mesos.driver.webui.url")
.doc("Set the Spark Mesos driver webui_url for interacting with the framework. " +
"If unset it will point to Spark's internal web UI.")
.stringConf
.createOptional

private[spark] val driverSecretConfig = new MesosSecretConfig("driver")

private[spark] val executorSecretConfig = new MesosSecretConfig("executor")
Expand Down Expand Up @@ -118,6 +150,221 @@ package object config {
.stringConf
.createWithDefault("")

private[spark] val DRIVER_FRAMEWORK_ID =
ConfigBuilder("spark.mesos.driver.frameworkId")
.stringConf
.createOptional

private[spark] val EXECUTOR_URI =
ConfigBuilder("spark.executor.uri").stringConf.createOptional

private[spark] val PROXY_BASE_URL =
ConfigBuilder("spark.mesos.proxy.baseURL").stringConf.createOptional

private[spark] val COARSE_MODE =
ConfigBuilder("spark.mesos.coarse")
.doc("If set to true, runs over Mesos clusters in \"coarse-grained\" sharing mode, where " +
"Spark acquires one long-lived Mesos task on each machine. If set to false, runs over " +
"Mesos cluster in \"fine-grained\" sharing mode, where one Mesos task is created per " +
"Spark task.")
.booleanConf.createWithDefault(true)

private[spark] val COARSE_SHUTDOWN_TIMEOUT =
ConfigBuilder("spark.mesos.coarse.shutdownTimeout")
.timeConf(TimeUnit.MILLISECONDS)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add .checkValue(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") here.

.checkValue(_ >= 0, s"spark.mesos.coarse.shutdownTimeout must be >= 0")
.createWithDefaultString("10s")

private[spark] val MAX_DRIVERS =
ConfigBuilder("spark.mesos.maxDrivers").intConf.createWithDefault(200)

private[spark] val RETAINED_DRIVERS =
ConfigBuilder("spark.mesos.retainedDrivers").intConf.createWithDefault(200)

private[spark] val CLUSTER_RETRY_WAIT_MAX_SECONDS =
ConfigBuilder("spark.mesos.cluster.retry.wait.max")
.intConf
.createWithDefault(60) // 1 minute

private[spark] val ENABLE_FETCHER_CACHE =
ConfigBuilder("spark.mesos.fetcherCache.enable")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you are here, can we copy a documentation description in running-on-mesos.md to .doc for the new configurations? At least, for the already documented ones, I think we can have.

.doc("If set to true, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be " +
"cached by the Mesos Fetcher Cache.")
.booleanConf
.createWithDefault(false)

private[spark] val APP_JAR_LOCAL_RESOLUTION_MODE =
ConfigBuilder("spark.mesos.appJar.local.resolution.mode")
.doc("Provides support for the `local:///` scheme to reference the app jar resource in " +
"cluster mode. If user uses a local resource (`local:///path/to/jar`) and the config " +
"option is not used it defaults to `host` eg. the mesos fetcher tries to get the " +
"resource from the host's file system. If the value is unknown it prints a warning msg " +
"in the dispatcher logs and defaults to `host`. If the value is `container` then spark " +
"submit in the container will use the jar in the container's path: `/path/to/jar`.")
.stringConf
.checkValues(Set("host", "container"))
.createWithDefault("host")

private[spark] val REJECT_OFFER_DURATION =
ConfigBuilder("spark.mesos.rejectOfferDuration")
.doc("Time to consider unused resources refused, serves as a fallback of " +
"`spark.mesos.rejectOfferDurationForUnmetConstraints`, " +
"`spark.mesos.rejectOfferDurationForReachedMaxCores`.")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("120s")

private[spark] val REJECT_OFFER_DURATION_FOR_UNMET_CONSTRAINTS =
ConfigBuilder("spark.mesos.rejectOfferDurationForUnmetConstraints")
.doc("Time to consider unused resources refused with unmet constraints.")
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val REJECT_OFFER_DURATION_FOR_REACHED_MAX_CORES =
ConfigBuilder("spark.mesos.rejectOfferDurationForReachedMaxCores")
.doc("Time to consider unused resources refused when maximum number of cores " +
"`spark.cores.max` is reached.")
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val URIS_TO_DOWNLOAD =
ConfigBuilder("spark.mesos.uris")
.doc("A comma-separated list of URIs to be downloaded to the sandbox when driver or " +
"executor is launched by Mesos. This applies to both coarse-grained and fine-grained " +
"mode.")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val EXECUTOR_HOME =
ConfigBuilder("spark.mesos.executor.home")
.doc("Set the directory in which Spark is installed on the executors in Mesos. " +
"By default, the executors will simply use the driver's Spark home directory, which may " +
"not be visible to them. Note that this is only relevant if a Spark binary package is " +
"not specified through `spark.executor.uri`.")
.stringConf
.createOptional

private[spark] val EXECUTOR_CORES =
ConfigBuilder("spark.mesos.mesosExecutor.cores")
.doc("(Fine-grained mode only) Number of cores to give each Mesos executor. This does not " +
"include the cores used to run the Spark tasks. In other words, even if no Spark task " +
"is being run, each Mesos executor will occupy the number of cores configured here. " +
"The value can be a floating point number.")
.doubleConf
.createWithDefault(1.0)

private[spark] val EXTRA_CORES_PER_EXECUTOR =
ConfigBuilder("spark.mesos.extra.cores")
.doc("Set the extra number of cores for an executor to advertise. This does not result in " +
"more cores allocated. It instead means that an executor will \"pretend\" it has more " +
"cores, so that the driver will send it more tasks. Use this to increase parallelism. " +
"This setting is only used for Mesos coarse-grained mode.")
.intConf
.createWithDefault(0)

private[spark] val EXECUTOR_MEMORY_OVERHEAD =
ConfigBuilder("spark.mesos.executor.memoryOverhead")
.doc("The amount of additional memory, specified in MiB, to be allocated per executor. " +
"By default, the overhead will be larger of either 384 or 10% of " +
"`spark.executor.memory`. If set, the final overhead will be this value.")
.intConf
.createOptional

private[spark] val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.mesos.executor.docker.image")
.doc("Set the name of the docker image that the Spark executors will run in. The selected " +
"image must have Spark installed, as well as a compatible version of the Mesos library. " +
"The installed path of Spark in the image can be specified with " +
"`spark.mesos.executor.home`; the installed path of the Mesos library can be specified " +
"with `spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY`.")
.stringConf
.createOptional

private[spark] val EXECUTOR_DOCKER_FORCE_PULL_IMAGE =
ConfigBuilder("spark.mesos.executor.docker.forcePullImage")
.doc("Force Mesos agents to pull the image specified in " +
"`spark.mesos.executor.docker.image`. By default Mesos agents will not pull images they " +
"already have cached.")
.booleanConf
.createOptional

private[spark] val EXECUTOR_DOCKER_PORT_MAPS =
ConfigBuilder("spark.mesos.executor.docker.portmaps")
.stringConf
.toSequence
.createOptional

private[spark] val EXECUTOR_DOCKER_PARAMETERS =
ConfigBuilder("spark.mesos.executor.docker.parameters")
.doc("Set the list of custom parameters which will be passed into the `docker run` " +
"command when launching the Spark executor on Mesos using the docker containerizer. " +
"The format of this property is a list of key/value pairs which pair looks key1=value1.")
.stringConf
.toSequence
.createOptional

private[spark] val EXECUTOR_DOCKER_VOLUMES =
ConfigBuilder("spark.mesos.executor.docker.volumes")
.doc("Set the list of volumes which will be mounted into the Docker image, which was set " +
"using `spark.mesos.executor.docker.image`. The format of this property is a list of " +
"mappings following the form passed to `docker run -v`. That is they take the form: " +
"`[host_path:]container_path[:ro|:rw]`")
.stringConf
.toSequence
.createOptional

private[spark] val MAX_GPUS =
ConfigBuilder("spark.mesos.gpus.max")
.doc("Set the maximum number GPU resources to acquire for this job. Note that executors " +
"will still launch when no GPU resources are found since this configuration is just an " +
"upper limit and not a guaranteed amount.")
.intConf
.createWithDefault(0)

private[spark] val TASK_LABELS =
ConfigBuilder("spark.mesos.task.labels")
.doc("Set the Mesos labels to add to each task. Labels are free-form key-value pairs. " +
"Key-value pairs should be separated by a colon, and commas used to list more than one. " +
"If your label includes a colon or comma, you can escape it with a backslash. " +
"Ex. key:value,key2:a\\:b.")
.stringConf
.createWithDefault("")

private[spark] val CONSTRAINTS =
ConfigBuilder("spark.mesos.constraints")
.doc("Attribute-based constraints on mesos resource offers. By default, all resource " +
"offers will be accepted. This setting applies only to executors. Refer to Mesos " +
"Attributes & Resources doc for more information on attributes. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @HeartSaVioR .
Could you remove the followings line 338 ~ 347? Since it says Refer to Mesos ... already, it looks okay to remove the rest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agreed. Just addressed it.

"* Scalar constraints are matched with \"less than equal\" semantics i.e. value in the " +
"constraint must be less than or equal to the value in the resource offer. " +
"* Range constraints are matched with \"contains\" semantics i.e. value in the " +
"constraint must be within the resource offer's value. " +
"* Set constraints are matched with \"subset of\" semantics i.e. value in the constraint " +
"must be a subset of the resource offer's value. " +
"* Text constraints are matched with \"equality\" semantics i.e. value in the constraint " +
"must be exactly equal to the resource offer's value. " +
"* In case there is no value present as a part of the constraint any offer with the " +
"corresponding attribute will be accepted (without value check).")
.stringConf
.createWithDefault("")

private[spark] val CONTAINERIZER =
ConfigBuilder("spark.mesos.containerizer")
.doc("This only affects docker containers, and must be one of \"docker\" or \"mesos\". " +
"Mesos supports two types of containerizers for docker: the \"docker\" containerizer, " +
"and the preferred \"mesos\" containerizer. " +
"Read more here: http://mesos.apache.org/documentation/latest/container-image/")
.stringConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add .checkValues(Set("docker", "mesos")).

.checkValues(Set("docker", "mesos"))
.createWithDefault("docker")

private[spark] val ROLE =
ConfigBuilder("spark.mesos.role")
.doc("Set the role of this Spark framework for Mesos. Roles are used in Mesos for " +
"reservations and resource weight sharing.")
.stringConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add .checkValues(Set("host", "container")).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I have no idea about available values. Could you refer where I could find available values? I can't find it in Mesos doc - http://mesos.apache.org/documentation/latest/roles/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah looks like you're referring APP_JAR_LOCAL_RESOLUTION_MODE in below comment. I'll address it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It seems that I clicked a wrong one.

.createOptional

private[spark] val DRIVER_ENV_PREFIX = "spark.mesos.driverEnv."
private[spark] val DISPATCHER_DRIVER_DEFAULT_PREFIX = "spark.mesos.dispatcher.driverDefault."
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(

private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
val id = state.driverDescription.submissionId
val proxy = parent.conf.getOption("spark.mesos.proxy.baseURL")
val proxy = parent.conf.get(PROXY_BASE_URL)

val sandboxCol = if (proxy.isDefined) {
val clusterSchedulerId = parent.scheduler.getSchedulerState().frameworkId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.SparkContext
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

Expand All @@ -42,7 +43,7 @@ private[spark] class MesosClusterManager extends ExternalClusterManager {
"I/O encryption is currently not supported in Mesos.")

val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1)
val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
val coarse = sc.conf.get(COARSE_MODE)
if (coarse) {
new MesosCoarseGrainedSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ private[spark] class MesosClusterScheduler(
MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf))
private val master = conf.get("spark.master")
private val appName = conf.get("spark.app.name")
private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
private val useFetchCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
private val queuedCapacity = conf.get(config.MAX_DRIVERS)
private val retainedDrivers = conf.get(config.RETAINED_DRIVERS)
private val maxRetryWaitTime = conf.get(config.CLUSTER_RETRY_WAIT_MAX_SECONDS)
private val useFetchCache = conf.get(config.ENABLE_FETCHER_CACHE)
private val schedulerState = engineFactory.createEngine("scheduler")
private val stateLock = new Object()
// Keyed by submission id
Expand Down Expand Up @@ -390,10 +390,10 @@ private[spark] class MesosClusterScheduler(
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
// TODO(mgummelt): Don't do this here. This should be passed as a --conf
val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
v => s"$v -D${config.DRIVER_FRAMEWORK_ID.key}=${getDriverFrameworkID(desc)}"
)

val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
val env = desc.conf.getAllWithPrefix(config.DRIVER_ENV_PREFIX) ++ commandEnv

val envBuilder = Environment.newBuilder()

Expand All @@ -419,22 +419,17 @@ private[spark] class MesosClusterScheduler(

private def isContainerLocalAppJar(desc: MesosDriverDescription): Boolean = {
val isLocalJar = desc.jarUrl.startsWith("local://")
val isContainerLocal = desc.conf.getOption("spark.mesos.appJar.local.resolution.mode").exists {
val isContainerLocal = desc.conf.get(config.APP_JAR_LOCAL_RESOLUTION_MODE) match {
case "container" => true
case "host" => false
case other =>
logWarning(s"Unknown spark.mesos.appJar.local.resolution.mode $other, using host.")
false
}
}
isLocalJar && isContainerLocal
}

private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
val confUris = List(conf.getOption("spark.mesos.uris"),
desc.conf.getOption("spark.mesos.uris"),
Some(desc.conf.get(SUBMIT_PYTHON_FILES).mkString(","))).flatMap(
_.map(_.split(",").map(_.trim))
).flatten
val confUris = (conf.get(config.URIS_TO_DOWNLOAD) ++
desc.conf.get(config.URIS_TO_DOWNLOAD) ++
desc.conf.get(SUBMIT_PYTHON_FILES)).toList
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is exactly the same? _.split(",").map(_.trim) - before URIS_TO_DOWNLOAD should be a comma separated list (probably not intentional, but it works) and URIS_TO_DOWNLOAD & SUBMIT_PYTHON_FILES could have whitespaces

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Element values will be trimmed as same as it was - ConfigBuilder handles it:

/** Turns the config entry into a sequence of values of the underlying type. */
def toSequence: TypedConfigBuilder[Seq[T]] = {
new TypedConfigBuilder(parent, stringToSeq(_, converter), seqToString(_, stringConverter))
}

def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
Utils.stringToSeq(str).map(converter)
}

def stringToSeq(str: String): Seq[String] = {
str.split(",").map(_.trim()).filter(_.nonEmpty)
}


if (isContainerLocalAppJar(desc)) {
(confUris ++ getDriverExecutorURI(desc).toList).map(uri =>
Expand Down Expand Up @@ -464,7 +459,7 @@ private[spark] class MesosClusterScheduler(
}

private def getDriverCommandValue(desc: MesosDriverDescription): String = {
val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
val dockerDefined = desc.conf.contains(config.EXECUTOR_DOCKER_IMAGE)
val executorUri = getDriverExecutorURI(desc)
// Gets the path to run spark-submit, and the path to the Mesos sandbox.
val (executable, sandboxPath) = if (dockerDefined) {
Expand All @@ -484,11 +479,11 @@ private[spark] class MesosClusterScheduler(
// Sandbox path points to the parent folder as we chdir into the folderBasename.
(cmdExecutable, "..")
} else {
val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
val executorSparkHome = desc.conf.get(config.EXECUTOR_HOME)
.orElse(conf.getOption("spark.home"))
.orElse(Option(System.getenv("SPARK_HOME")))
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
throw new SparkException(s"Executor Spark home `${config.EXECUTOR_HOME}` is not set!")
}
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getPath
// Sandbox points to the current directory by default with Mesos.
Expand Down Expand Up @@ -547,7 +542,7 @@ private[spark] class MesosClusterScheduler(
"spark.submit.deployMode", // this would be set to `cluster`, but we need client
"spark.master" // this contains the address of the dispatcher, not master
)
val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
val defaultConf = conf.getAllWithPrefix(config.DISPATCHER_DRIVER_DEFAULT_PREFIX).toMap
val driverConf = desc.conf.getAll
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
.toMap
Expand Down
Loading