-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26843][MESOS] Use ConfigEntry for hardcoded configs for "mesos" resource manager #23743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
69751cb
c156fe0
c2d0e04
f103b4e
e9a180e
7986b0d
c5bee33
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,26 @@ package object config { | |
| .createOptional | ||
| } | ||
|
|
||
| private[spark] val CREDENTIAL_PRINCIPAL = | ||
| ConfigBuilder("spark.mesos.principal") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val CREDENTIAL_PRINCIPAL_FILE = | ||
| ConfigBuilder("spark.mesos.principal.file") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val CREDENTIAL_SECRET = | ||
| ConfigBuilder("spark.mesos.secret") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val CREDENTIAL_SECRET_FILE = | ||
| ConfigBuilder("spark.mesos.secret.file") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| /* Common app configuration. */ | ||
|
|
||
| private[spark] val SHUFFLE_CLEANER_INTERVAL_S = | ||
|
|
@@ -85,6 +105,11 @@ package object config { | |
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val DRIVER_WEBUI_URL = | ||
| ConfigBuilder("spark.mesos.driver.webui.url") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val driverSecretConfig = new MesosSecretConfig("driver") | ||
|
|
||
| private[spark] val executorSecretConfig = new MesosSecretConfig("executor") | ||
|
|
@@ -118,6 +143,140 @@ package object config { | |
| .stringConf | ||
| .createWithDefault("") | ||
|
|
||
| private[spark] val DRIVER_FRAMEWORK_ID = | ||
| ConfigBuilder("spark.mesos.driver.frameworkId") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val EXECUTOR_URI = | ||
| ConfigBuilder("spark.executor.uri").stringConf.createOptional | ||
|
|
||
| private[spark] val PROXY_BASE_URL = | ||
| ConfigBuilder("spark.mesos.proxy.baseURL").stringConf.createOptional | ||
|
|
||
| private[spark] val COARSE_MODE = | ||
| ConfigBuilder("spark.mesos.coarse").booleanConf.createWithDefault(true) | ||
|
|
||
| private[spark] val COARSE_SHUTDOWN_TIMEOUT = | ||
| ConfigBuilder("spark.mesos.coarse.shutdownTimeout") | ||
| .timeConf(TimeUnit.MILLISECONDS) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add |
||
| .createWithDefaultString("10s") | ||
|
|
||
| private[spark] val MAX_DRIVERS = | ||
| ConfigBuilder("spark.mesos.maxDrivers").intConf.createWithDefault(200) | ||
|
|
||
| private[spark] val RETAINED_DRIVERS = | ||
| ConfigBuilder("spark.mesos.retainedDrivers").intConf.createWithDefault(200) | ||
|
|
||
| private[spark] val CLUSTER_RETRY_WAIT_MAX_SECONDS = | ||
| ConfigBuilder("spark.mesos.cluster.retry.wait.max") | ||
| .intConf | ||
| .createWithDefault(60) // 1 minute | ||
|
|
||
| private[spark] val ENABLE_FETCHER_CACHE = | ||
| ConfigBuilder("spark.mesos.fetcherCache.enable") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While you are here, can we copy a documentation description in |
||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val APPLICATION_JAR_LOCAL_RESOLUTION_MODE = | ||
|
||
| ConfigBuilder("spark.mesos.appJar.local.resolution.mode") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val REJECT_OFFER_DURATION = | ||
| ConfigBuilder("spark.mesos.rejectOfferDuration") | ||
| .timeConf(TimeUnit.SECONDS) | ||
| .createWithDefaultString("120s") | ||
|
|
||
| private[spark] val REJECT_OFFER_DURATION_FOR_UNMET_CONSTRAINTS = | ||
| ConfigBuilder("spark.mesos.rejectOfferDurationForUnmetConstraints") | ||
| .timeConf(TimeUnit.SECONDS) | ||
| .createOptional | ||
|
|
||
| private[spark] val REJECT_OFFER_DURATION_FOR_REACHED_MAX_CORES = | ||
| ConfigBuilder("spark.mesos.rejectOfferDurationForReachedMaxCores") | ||
| .timeConf(TimeUnit.SECONDS) | ||
| .createOptional | ||
|
|
||
| private[spark] val URIS_TO_DOWNLOAD = | ||
| ConfigBuilder("spark.mesos.uris") | ||
| .stringConf | ||
| .toSequence | ||
| .createWithDefault(Nil) | ||
|
|
||
| private[spark] val EXECUTOR_HOME = | ||
| ConfigBuilder("spark.mesos.executor.home") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val EXECUTOR_CORES = | ||
| ConfigBuilder("spark.mesos.mesosExecutor.cores") | ||
| .doubleConf | ||
| .createWithDefault(1) | ||
|
|
||
| private[spark] val EXTRA_CORES_PER_EXECUTOR = | ||
| ConfigBuilder("spark.mesos.extra.cores") | ||
| .intConf | ||
| .createWithDefault(0) | ||
|
|
||
| private[spark] val EXECUTOR_MEMORY_OVERHEAD = | ||
| ConfigBuilder("spark.mesos.executor.memoryOverhead") | ||
| .intConf | ||
| .createOptional | ||
|
|
||
| private[spark] val EXECUTOR_DOCKER_IMAGE = | ||
| ConfigBuilder("spark.mesos.executor.docker.image") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val EXECUTOR_DOCKER_FORCE_PULL_IMAGE = | ||
| ConfigBuilder("spark.mesos.executor.docker.forcePullImage") | ||
| .booleanConf | ||
| .createOptional | ||
|
|
||
| private[spark] val EXECUTOR_DOCKER_PORT_MAPS = | ||
| ConfigBuilder("spark.mesos.executor.docker.portmaps") | ||
| .stringConf | ||
| .toSequence | ||
| .createOptional | ||
|
|
||
| private[spark] val EXECUTOR_DOCKER_PARAMETERS = | ||
| ConfigBuilder("spark.mesos.executor.docker.parameters") | ||
| .stringConf | ||
| .toSequence | ||
| .createOptional | ||
|
|
||
| private[spark] val EXECUTOR_DOCKER_VOLUMES = | ||
| ConfigBuilder("spark.mesos.executor.docker.volumes") | ||
| .stringConf | ||
| .toSequence | ||
| .createOptional | ||
|
|
||
| private[spark] val MAX_GPUS = | ||
| ConfigBuilder("spark.mesos.gpus.max") | ||
| .intConf | ||
| .createWithDefault(0) | ||
|
|
||
| private[spark] val TASK_LABELS = | ||
| ConfigBuilder("spark.mesos.task.labels") | ||
| .stringConf | ||
| .createWithDefault("") | ||
|
|
||
| private[spark] val SLAVE_OFFER_CONSTRAINTS = | ||
|
||
| ConfigBuilder("spark.mesos.constraints") | ||
| .stringConf | ||
| .createWithDefault("") | ||
|
|
||
| private[spark] val CONTAINERIZER = | ||
| ConfigBuilder("spark.mesos.containerizer") | ||
| .stringConf | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add |
||
| .createWithDefault("docker") | ||
|
|
||
| private[spark] val ROLE = | ||
| ConfigBuilder("spark.mesos.role") | ||
| .stringConf | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Honestly I have no idea about available values. Could you refer where I could find available values? I can't find it in Mesos doc - http://mesos.apache.org/documentation/latest/roles/
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah looks like you're referring
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. It seems that I clicked a wrong one. |
||
| .createOptional | ||
|
|
||
| private[spark] val DRIVER_ENV_PREFIX = "spark.mesos.driverEnv." | ||
| private[spark] val DISPATCHER_DRIVER_DEFAULT_PREFIX = "spark.mesos.dispatcher.driverDefault." | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -127,10 +127,10 @@ private[spark] class MesosClusterScheduler( | |||||||||||||||||||||
| MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf)) | ||||||||||||||||||||||
| private val master = conf.get("spark.master") | ||||||||||||||||||||||
| private val appName = conf.get("spark.app.name") | ||||||||||||||||||||||
| private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200) | ||||||||||||||||||||||
| private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200) | ||||||||||||||||||||||
| private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute | ||||||||||||||||||||||
| private val useFetchCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) | ||||||||||||||||||||||
| private val queuedCapacity = conf.get(config.MAX_DRIVERS) | ||||||||||||||||||||||
| private val retainedDrivers = conf.get(config.RETAINED_DRIVERS) | ||||||||||||||||||||||
| private val maxRetryWaitTime = conf.get(config.CLUSTER_RETRY_WAIT_MAX_SECONDS) | ||||||||||||||||||||||
| private val useFetchCache = conf.get(config.ENABLE_FETCHER_CACHE) | ||||||||||||||||||||||
| private val schedulerState = engineFactory.createEngine("scheduler") | ||||||||||||||||||||||
| private val stateLock = new Object() | ||||||||||||||||||||||
| // Keyed by submission id | ||||||||||||||||||||||
|
|
@@ -390,10 +390,10 @@ private[spark] class MesosClusterScheduler( | |||||||||||||||||||||
| private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { | ||||||||||||||||||||||
| // TODO(mgummelt): Don't do this here. This should be passed as a --conf | ||||||||||||||||||||||
| val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( | ||||||||||||||||||||||
| v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" | ||||||||||||||||||||||
| v => s"$v -D${config.DRIVER_FRAMEWORK_ID.key}=${getDriverFrameworkID(desc)}" | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv | ||||||||||||||||||||||
| val env = desc.conf.getAllWithPrefix(config.DRIVER_ENV_PREFIX) ++ commandEnv | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| val envBuilder = Environment.newBuilder() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -419,22 +419,20 @@ private[spark] class MesosClusterScheduler( | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| private def isContainerLocalAppJar(desc: MesosDriverDescription): Boolean = { | ||||||||||||||||||||||
| val isLocalJar = desc.jarUrl.startsWith("local://") | ||||||||||||||||||||||
| val isContainerLocal = desc.conf.getOption("spark.mesos.appJar.local.resolution.mode").exists { | ||||||||||||||||||||||
| val isContainerLocal = desc.conf.get(config.APPLICATION_JAR_LOCAL_RESOLUTION_MODE).exists { | ||||||||||||||||||||||
| case "container" => true | ||||||||||||||||||||||
| case "host" => false | ||||||||||||||||||||||
| case other => | ||||||||||||||||||||||
| logWarning(s"Unknown spark.mesos.appJar.local.resolution.mode $other, using host.") | ||||||||||||||||||||||
| logWarning(s"Unknown ${config.APPLICATION_JAR_LOCAL_RESOLUTION_MODE} $other, using host.") | ||||||||||||||||||||||
| false | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
||||||||||||||||||||||
| isLocalJar && isContainerLocal | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = { | ||||||||||||||||||||||
| val confUris = List(conf.getOption("spark.mesos.uris"), | ||||||||||||||||||||||
| desc.conf.getOption("spark.mesos.uris"), | ||||||||||||||||||||||
| Some(desc.conf.get(SUBMIT_PYTHON_FILES).mkString(","))).flatMap( | ||||||||||||||||||||||
| _.map(_.split(",").map(_.trim)) | ||||||||||||||||||||||
| ).flatten | ||||||||||||||||||||||
| val confUris = (conf.get(config.URIS_TO_DOWNLOAD) ++ | ||||||||||||||||||||||
| desc.conf.get(config.URIS_TO_DOWNLOAD) ++ | ||||||||||||||||||||||
| desc.conf.get(SUBMIT_PYTHON_FILES)).toList | ||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure this is exactly the same?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Element values will be trimmed as same as it was - ConfigBuilder handles it: spark/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala Lines 123 to 126 in dc46fb7
spark/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala Lines 48 to 50 in dc46fb7
spark/core/src/main/scala/org/apache/spark/util/Utils.scala Lines 2619 to 2621 in dc46fb7
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if (isContainerLocalAppJar(desc)) { | ||||||||||||||||||||||
| (confUris ++ getDriverExecutorURI(desc).toList).map(uri => | ||||||||||||||||||||||
|
|
@@ -464,7 +462,7 @@ private[spark] class MesosClusterScheduler( | |||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private def getDriverCommandValue(desc: MesosDriverDescription): String = { | ||||||||||||||||||||||
| val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image") | ||||||||||||||||||||||
| val dockerDefined = desc.conf.contains(config.EXECUTOR_DOCKER_IMAGE) | ||||||||||||||||||||||
| val executorUri = getDriverExecutorURI(desc) | ||||||||||||||||||||||
| // Gets the path to run spark-submit, and the path to the Mesos sandbox. | ||||||||||||||||||||||
| val (executable, sandboxPath) = if (dockerDefined) { | ||||||||||||||||||||||
|
|
@@ -484,11 +482,11 @@ private[spark] class MesosClusterScheduler( | |||||||||||||||||||||
| // Sandbox path points to the parent folder as we chdir into the folderBasename. | ||||||||||||||||||||||
| (cmdExecutable, "..") | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home") | ||||||||||||||||||||||
| val executorSparkHome = desc.conf.get(config.EXECUTOR_HOME) | ||||||||||||||||||||||
| .orElse(conf.getOption("spark.home")) | ||||||||||||||||||||||
| .orElse(Option(System.getenv("SPARK_HOME"))) | ||||||||||||||||||||||
| .getOrElse { | ||||||||||||||||||||||
| throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") | ||||||||||||||||||||||
| throw new SparkException(s"Executor Spark home `${config.EXECUTOR_HOME}` is not set!") | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getPath | ||||||||||||||||||||||
| // Sandbox points to the current directory by default with Mesos. | ||||||||||||||||||||||
|
|
@@ -547,7 +545,7 @@ private[spark] class MesosClusterScheduler( | |||||||||||||||||||||
| "spark.submit.deployMode", // this would be set to `cluster`, but we need client | ||||||||||||||||||||||
| "spark.master" // this contains the address of the dispatcher, not master | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap | ||||||||||||||||||||||
| val defaultConf = conf.getAllWithPrefix(config.DISPATCHER_DRIVER_DEFAULT_PREFIX).toMap | ||||||||||||||||||||||
| val driverConf = desc.conf.getAll | ||||||||||||||||||||||
| .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) } | ||||||||||||||||||||||
| .toMap | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -76,15 +76,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| cores - (cores % minCoresPerExecutor) | ||
| } | ||
|
|
||
| private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) | ||
| private val useFetcherCache = conf.get(ENABLE_FETCHER_CACHE) | ||
|
|
||
| private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) | ||
| private val maxGpus = conf.get(MAX_GPUS) | ||
|
|
||
| private val taskLabels = conf.get("spark.mesos.task.labels", "") | ||
| private val taskLabels = conf.get(TASK_LABELS) | ||
|
|
||
| private[this] val shutdownTimeoutMS = | ||
| conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") | ||
| .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") | ||
| conf.get(COARSE_SHUTDOWN_TIMEOUT).ensuring(_ >= 0, s"$COARSE_SHUTDOWN_TIMEOUT must be >= 0") | ||
|
||
|
|
||
| // Synchronization protected by stateLock | ||
| private[this] var stopCalled: Boolean = false | ||
|
|
@@ -144,11 +143,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| // may lead to deadlocks since the superclass might also try to lock | ||
| private val stateLock = new ReentrantLock | ||
|
|
||
| private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) | ||
| private val extraCoresPerExecutor = conf.get(EXTRA_CORES_PER_EXECUTOR) | ||
|
|
||
| // Offer constraints | ||
| private val slaveOfferConstraints = | ||
| parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) | ||
| parseConstraintString(sc.conf.get(SLAVE_OFFER_CONSTRAINTS)) | ||
|
|
||
| // Reject offers with mismatched constraints in seconds | ||
| private val rejectOfferDurationForUnmetConstraints = | ||
|
|
@@ -208,10 +207,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| sc.sparkUser, | ||
| sc.appName, | ||
| sc.conf, | ||
| sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), | ||
| sc.conf.get(DRIVER_WEBUI_URL).orElse(sc.ui.map(_.webUrl)), | ||
| None, | ||
| Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)), | ||
| sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) | ||
| sc.conf.get(DRIVER_FRAMEWORK_ID).map(_ + suffix) | ||
| ) | ||
|
|
||
| launcherBackend.setState(SparkAppHandle.State.SUBMITTED) | ||
|
|
@@ -264,10 +263,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| val uri = conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) | ||
|
|
||
| if (uri.isEmpty) { | ||
| val executorSparkHome = conf.getOption("spark.mesos.executor.home") | ||
| val executorSparkHome = conf.get(EXECUTOR_HOME) | ||
| .orElse(sc.getSparkHome()) | ||
| .getOrElse { | ||
| throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") | ||
| throw new SparkException(s"Executor Spark home `$EXECUTOR_HOME` is not set!") | ||
| } | ||
| val runScript = new File(executorSparkHome, "./bin/spark-class").getPath | ||
| command.setValue( | ||
|
|
@@ -293,7 +292,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache)) | ||
| } | ||
|
|
||
| conf.getOption("spark.mesos.uris").foreach(setupUris(_, command, useFetcherCache)) | ||
| setupUris(conf.get(URIS_TO_DOWNLOAD), command, useFetcherCache) | ||
|
|
||
| command.build() | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we had better have
.doclike the previous conf SECRET_FILENAMES at line 50 ~ 56?