Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
}

if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
if (hostname == null) {
hostname = Utils.localHostName()
log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself")
}

if (driverUrl == null || executorId == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,27 +279,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val runScript = new File(executorSparkHome, "./bin/spark-class").getPath
command.setValue(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
.format(prefixEnv, runScript) +
s" --driver-url $driverURL" +
s" --executor-id $taskId" +
s" --hostname ${executorHostname(offer)}" +
s" --cores $numCores" +
s" --app-id $appId")
val executable = new File(executorSparkHome, "./bin/spark-class").getPath
val runScript = s"$prefixEnv $executable " +
s"org.apache.spark.executor.CoarseGrainedExecutorBackend"

command.setValue(buildExecutorCommand(runScript, taskId, numCores, offer))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.get.split('/').last.split('.').head
command.setValue(
s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
s" --driver-url $driverURL" +
s" --executor-id $taskId" +
s" --hostname ${executorHostname(offer)}" +
s" --cores $numCores" +
s" --app-id $appId")
val runScript = s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend"

command.setValue(buildExecutorCommand(runScript, taskId, numCores, offer))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
}

Expand All @@ -308,6 +300,28 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
command.build()
}

private def buildExecutorCommand(
runScript: String, taskId: String, numCores: Int, offer: Offer): String = {

val sb = new StringBuilder()
.append(runScript)
.append(" --driver-url ")
.append(driverURL)
.append(" --executor-id ")
.append(taskId)
.append(" --cores ")
.append(numCores)
.append(" --app-id ")
.append(appId)

if (sc.conf.get(NETWORK_NAME).isEmpty) {
sb.append(" --hostname ")
sb.append(offer.getHostname)
}

sb.toString()
}

protected def driverURL: String = {
if (conf.contains("spark.testing")) {
"driverURL"
Expand Down Expand Up @@ -809,15 +823,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
slaves.values.map(_.taskIDs.size).sum
}

private def executorHostname(offer: Offer): String = {
if (sc.conf.get(NETWORK_NAME).isDefined) {
// The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0
"0.0.0.0"
} else {
offer.getHostname
}
}

override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = {
if (UserGroupInformation.isSecurityEnabled) {
Some(hadoopDelegationTokenManager.getTokens())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.getOrElse(List.empty)

if (containerType == ContainerInfo.Type.DOCKER) {
containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps, params))
containerInfo.setDocker(
dockerInfo(image, forcePullImage, portMaps, params, conf.get(NETWORK_NAME))
)
} else {
containerInfo.setMesos(mesosInfo(image, forcePullImage))
}
Expand Down Expand Up @@ -263,13 +265,24 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
image: String,
forcePullImage: Boolean,
portMaps: List[ContainerInfo.DockerInfo.PortMapping],
params: List[Parameter]): DockerInfo = {
params: List[Parameter],
networkName: Option[String]): DockerInfo = {
val dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
.setImage(image)
.setForcePullImage(forcePullImage)
portMaps.foreach(dockerBuilder.addPortMappings(_))
params.foreach(dockerBuilder.addParameters(_))

networkName.foreach { net =>
val network = Parameter.newBuilder()
.setKey("net")
.setValue(net)
.build()

dockerBuilder.setNetwork(DockerInfo.Network.USER)
dockerBuilder.addParameters(network)
}

dockerBuilder.build
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,30 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}

test("scheduler backend skips '--hostname' for executor when virtual network is enabled") {
setBackend()
val (mem, cpu) = (backend.executorMemory(sc), 4)
val offer = createOffer("o1", "s1", mem, cpu)

assert(backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
sc.stop()

setBackend(Map("spark.executor.uri" -> "hdfs://test/executor.jar"))
assert(backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
sc.stop()

setBackend(Map("spark.mesos.network.name" -> "test"))
assert(!backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
sc.stop()

setBackend(Map(
"spark.mesos.network.name" -> "test",
"spark.executor.uri" -> "hdfs://test/executor.jar"
))
assert(!backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
sc.stop()
}

test("supports spark.scheduler.minRegisteredResourcesRatio") {
val expectedCores = 1
setBackend(Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.mesos.Protos.ContainerInfo.DockerInfo

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.mesos.config

class MesosSchedulerBackendUtilSuite extends SparkFunSuite {

Expand Down Expand Up @@ -50,4 +51,18 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
assert(params.get(2).getKey == "c")
assert(params.get(2).getValue == "3")
}

test("ContainerInfo respects Docker network configuration") {
val networkName = "test"
val conf = new SparkConf()
conf.set("spark.mesos.network.name", networkName)

val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf)

assert(containerInfo.getDocker.getNetwork == DockerInfo.Network.USER)
val params = containerInfo.getDocker.getParametersList
assert(params.size() == 1)
assert(params.get(0).getKey == "net")
assert(params.get(0).getValue == networkName)
}
}