diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 2fd13a590324..122f5ebd5219 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -25,6 +25,7 @@ import javax.servlet.http.HttpServletResponse import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} import org.apache.spark.deploy.Command +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.deploy.mesos.MesosDriverDescription import org.apache.spark.deploy.rest._ import org.apache.spark.internal.config @@ -80,7 +81,9 @@ private[mesos] class MesosSubmitRequestServlet( * This does not currently consider fields used by python applications since python * is not supported in mesos cluster mode yet. */ - private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = { + // Visible for testing + private[rest] def buildDriverDescription( + request: CreateSubmissionRequest): MesosDriverDescription = { // Required fields, including the main class because python is not yet supported val appResource = Option(request.appResource).getOrElse { throw new SubmitRestMissingFieldException("Application jar 'appResource' is missing.") @@ -108,6 +111,9 @@ private[mesos] class MesosSubmitRequestServlet( val driverCores = sparkProperties.get(config.DRIVER_CORES.key) val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) + val defaultConf = this.conf.getAllWithPrefix(DISPATCHER_DRIVER_DEFAULT_PREFIX).toMap + val driverConf = new SparkConf(false).setAll(defaultConf).setAll(sparkProperties) + // Construct driver description val conf = new SparkConf(false).setAll(sparkProperties) val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) @@ -115,7 +121,7 @@ private[mesos] class MesosSubmitRequestServlet( val defaultJavaOpts = driverDefaultJavaOptions.map(Utils.splitCommandString) .getOrElse(Seq.empty) val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) - val sparkJavaOpts = Utils.sparkJavaOpts(conf) + val sparkJavaOpts = Utils.sparkJavaOpts(driverConf) val javaOpts = sparkJavaOpts ++ defaultJavaOpts ++ extraJavaOpts val command = new Command( mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) @@ -129,7 +135,7 @@ private[mesos] class MesosSubmitRequestServlet( new MesosDriverDescription( name, appResource, actualDriverMemory + actualDriverMemoryOverhead, actualDriverCores, - actualSuperviseDriver, command, request.sparkProperties, submissionId, submitDate) + actualSuperviseDriver, command, driverConf.getAll.toMap, submissionId, submitDate) } protected override def handleSubmit( diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 289b109a4274..dff7294b4505 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -544,12 +544,13 @@ private[spark] class MesosClusterScheduler( SUBMIT_DEPLOY_MODE.key, // this would be set to `cluster`, but we need client "spark.master" // this contains the address of the dispatcher, not master ) - val defaultConf = conf.getAllWithPrefix(config.DISPATCHER_DRIVER_DEFAULT_PREFIX).toMap - val driverConf = desc.conf.getAll + + desc.conf.getAll .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) } .toMap - (defaultConf ++ driverConf).foreach { case (key, value) => - options ++= Seq("--conf", s"${key}=${value}") } + .foreach { case (key, value) => + options ++= Seq("--conf", s"${key}=${value}") + } options.map(shellEscape) } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosSubmitRequestServletSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosSubmitRequestServletSuite.scala new file mode 100644 index 000000000000..15fcfe952ed8 --- /dev/null +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosSubmitRequestServletSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest.mesos + +import org.mockito.Mockito.mock + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.mesos.config._ +import org.apache.spark.deploy.TestPrematureExit +import org.apache.spark.deploy.rest.{CreateSubmissionRequest, SubmitRestProtocolException} +import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler + +class MesosSubmitRequestServletSuite extends SparkFunSuite + with TestPrematureExit { + + def buildCreateSubmissionRequest(): CreateSubmissionRequest = { + val request = new CreateSubmissionRequest + request.appResource = "hdfs://test.jar" + request.mainClass = "foo.Bar" + request.appArgs = Array.empty[String] + request.sparkProperties = Map.empty[String, String] + request.environmentVariables = Map.empty[String, String] + request + } + + test("test buildDriverDescription applies default settings from dispatcher conf to Driver") { + val conf = new SparkConf(loadDefaults = false) + + conf.set(DISPATCHER_DRIVER_DEFAULT_PREFIX + NETWORK_NAME.key, "test_network") + conf.set(DISPATCHER_DRIVER_DEFAULT_PREFIX + NETWORK_LABELS.key, "k0:v0,k1:v1") + + val submitRequestServlet = new MesosSubmitRequestServlet( + scheduler = mock(classOf[MesosClusterScheduler]), + conf + ) + + val request = buildCreateSubmissionRequest() + val driverConf = submitRequestServlet.buildDriverDescription(request).conf + + assert("test_network" == driverConf.get(NETWORK_NAME)) + assert("k0:v0,k1:v1" == driverConf.get(NETWORK_LABELS)) + } +} \ No newline at end of file