diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 68f6921153d8..de94ce6f55f3 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -74,7 +74,9 @@ private[mesos] class MesosSubmitRequestServlet( * This does not currently consider fields used by python applications since python * is not supported in mesos cluster mode yet. */ - private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = { + // Visible for testing. + private[rest] def buildDriverDescription( + request: CreateSubmissionRequest): MesosDriverDescription = { // Required fields, including the main class because python is not yet supported val appResource = Option(request.appResource).getOrElse { throw new SubmitRestMissingFieldException("Application jar 'appResource' is missing.") @@ -101,11 +103,15 @@ private[mesos] class MesosSubmitRequestServlet( val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) // Construct driver description - val conf = new SparkConf(false).setAll(sparkProperties) + val defaultConf = this.conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap + val driverConf = new SparkConf(false) + .setAll(defaultConf) + .setAll(sparkProperties) + val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) - val sparkJavaOpts = Utils.sparkJavaOpts(conf) + val sparkJavaOpts = Utils.sparkJavaOpts(driverConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command( mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) @@ -117,7 +123,7 @@ private[mesos] class MesosSubmitRequestServlet( new MesosDriverDescription( name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, - command, request.sparkProperties, submissionId, submitDate) + command, driverConf.getAll.toMap, submissionId, submitDate) } protected override def handleSubmit( diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index c7f4e7e52c60..bc78f1b1edb3 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -535,12 +535,13 @@ private[spark] class MesosClusterScheduler( "spark.submit.deployMode", // this would be set to `cluster`, but we need client "spark.master" // this contains the address of the dispatcher, not master ) - val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap - val driverConf = desc.conf.getAll + + desc.conf.getAll .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) } .toMap - (defaultConf ++ driverConf).foreach { case (key, value) => - options ++= Seq("--conf", s"$key=${shellEscape(value)}") } + .foreach { case (key, value) => + options ++= Seq("--conf", s"$key=${shellEscape(value)}") + } options } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosSubmitRequestServletSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosSubmitRequestServletSuite.scala new file mode 100644 index 000000000000..08bb48518820 --- /dev/null +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosSubmitRequestServletSuite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest.mesos + +import org.mockito.Mockito.mock + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.TestPrematureExit +import org.apache.spark.deploy.rest.CreateSubmissionRequest +import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler + +class MesosSubmitRequestServletSuite extends SparkFunSuite + with TestPrematureExit { + + test("test buildDriverDescription applies default settings from dispatcher conf to Driver") { + val conf = new SparkConf(loadDefaults = false) + + conf.set("spark.mesos.dispatcher.driverDefault.spark.mesos.network.name", "test_network") + conf.set("spark.mesos.dispatcher.driverDefault.spark.mesos.network.labels", "k0:v0,k1:v1") + + val submitRequestServlet = new MesosSubmitRequestServlet( + scheduler = mock(classOf[MesosClusterScheduler]), + conf + ) + + val request = new CreateSubmissionRequest + request.appResource = "hdfs://test.jar" + request.mainClass = "foo.Bar" + request.appArgs = Array.empty[String] + request.sparkProperties = Map.empty[String, String] + request.environmentVariables = Map.empty[String, String] + + val driverConf = submitRequestServlet.buildDriverDescription(request).conf + + assert("test_network" == driverConf.get("spark.mesos.network.name")) + assert("k0:v0,k1:v1" == driverConf.get("spark.mesos.network.labels")) + } +}