diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index aa4132ebcd70..664da54cb53f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -298,7 +298,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } } - if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || + if (hostname == null) { + hostname = Utils.localHostName() + log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself") + } + + if (driverUrl == null || executorId == null || cores <= 0 || appId == null) { printUsageAndExit() } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index f562c1e6aa7c..60880ab4c316 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -279,27 +279,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .getOrElse { throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") } - val runScript = new File(executorSparkHome, "./bin/spark-class").getPath - command.setValue( - "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" - .format(prefixEnv, runScript) + - s" --driver-url $driverURL" + - s" --executor-id $taskId" + - s" --hostname ${executorHostname(offer)}" + - s" --cores $numCores" + - s" --app-id $appId") + val executable = new File(executorSparkHome, "./bin/spark-class").getPath + val runScript = s"$prefixEnv $executable " + + s"org.apache.spark.executor.CoarseGrainedExecutorBackend" + + command.setValue(buildExecutorCommand(runScript, taskId, numCores, offer)) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.get.split('/').last.split('.').head - command.setValue( - s"cd $basename*; $prefixEnv " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + - s" --driver-url $driverURL" + - s" --executor-id $taskId" + - s" --hostname ${executorHostname(offer)}" + - s" --cores $numCores" + - s" --app-id $appId") + val runScript = s"cd $basename*; $prefixEnv " + + "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + + command.setValue(buildExecutorCommand(runScript, taskId, numCores, offer)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache)) } @@ -308,6 +300,28 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( command.build() } + private def buildExecutorCommand( + runScript: String, taskId: String, numCores: Int, offer: Offer): String = { + + val sb = new StringBuilder() + .append(runScript) + .append(" --driver-url ") + .append(driverURL) + .append(" --executor-id ") + .append(taskId) + .append(" --cores ") + .append(numCores) + .append(" --app-id ") + .append(appId) + + if (sc.conf.get(NETWORK_NAME).isEmpty) { + sb.append(" --hostname ") + sb.append(offer.getHostname) + } + + sb.toString() + } + protected def driverURL: String = { if (conf.contains("spark.testing")) { "driverURL" @@ -809,15 +823,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( slaves.values.map(_.taskIDs.size).sum } - private def executorHostname(offer: Offer): String = { - if (sc.conf.get(NETWORK_NAME).isDefined) { - // The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0 - "0.0.0.0" - } else { - offer.getHostname - } - } - override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { if (UserGroupInformation.isSecurityEnabled) { Some(hadoopDelegationTokenManager.getTokens()) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index bfb73611f053..b288f7e7bf3b 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -153,7 +153,9 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { .getOrElse(List.empty) if (containerType == ContainerInfo.Type.DOCKER) { - containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps, params)) + containerInfo.setDocker( + dockerInfo(image, forcePullImage, portMaps, params, conf.get(NETWORK_NAME)) + ) } else { containerInfo.setMesos(mesosInfo(image, forcePullImage)) } @@ -263,13 +265,24 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { image: String, forcePullImage: Boolean, portMaps: List[ContainerInfo.DockerInfo.PortMapping], - params: List[Parameter]): DockerInfo = { + params: List[Parameter], + networkName: Option[String]): DockerInfo = { val dockerBuilder = ContainerInfo.DockerInfo.newBuilder() .setImage(image) .setForcePullImage(forcePullImage) portMaps.foreach(dockerBuilder.addPortMappings(_)) params.foreach(dockerBuilder.addParameters(_)) + networkName.foreach { net => + val network = Parameter.newBuilder() + .setKey("net") + .setValue(net) + .build() + + dockerBuilder.setNetwork(DockerInfo.Network.USER) + dockerBuilder.addParameters(network) + } + dockerBuilder.build } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 2131bb999837..841c7881f34b 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -657,6 +657,30 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2") } + test("scheduler backend skips '--hostname' for executor when virtual network is enabled") { + setBackend() + val (mem, cpu) = (backend.executorMemory(sc), 4) + val offer = createOffer("o1", "s1", mem, cpu) + + assert(backend.createCommand(offer, cpu, "test").getValue.contains("--hostname")) + sc.stop() + + setBackend(Map("spark.executor.uri" -> "hdfs://test/executor.jar")) + assert(backend.createCommand(offer, cpu, "test").getValue.contains("--hostname")) + sc.stop() + + setBackend(Map("spark.mesos.network.name" -> "test")) + assert(!backend.createCommand(offer, cpu, "test").getValue.contains("--hostname")) + sc.stop() + + setBackend(Map( + "spark.mesos.network.name" -> "test", + "spark.executor.uri" -> "hdfs://test/executor.jar" + )) + assert(!backend.createCommand(offer, cpu, "test").getValue.contains("--hostname")) + sc.stop() + } + test("supports spark.scheduler.minRegisteredResourcesRatio") { val expectedCores = 1 setBackend(Map( diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala index 442c43960ec1..d5af3f5045af 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala @@ -17,8 +17,9 @@ package org.apache.spark.scheduler.cluster.mesos +import org.apache.mesos.Protos.ContainerInfo.DockerInfo + import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.mesos.config class MesosSchedulerBackendUtilSuite extends SparkFunSuite { @@ -50,4 +51,18 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite { assert(params.get(2).getKey == "c") assert(params.get(2).getValue == "3") } + + test("ContainerInfo respects Docker network configuration") { + val networkName = "test" + val conf = new SparkConf() + conf.set("spark.mesos.network.name", networkName) + + val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf) + + assert(containerInfo.getDocker.getNetwork == DockerInfo.Network.USER) + val params = containerInfo.getDocker.getParametersList + assert(params.size() == 1) + assert(params.get(0).getKey == "net") + assert(params.get(0).getValue == networkName) + } }