Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,16 @@ See the [configuration page](configuration.html) for information on Spark config
Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found
since this configuration is just an upper limit and not a guaranteed amount.
</td>
</tr>
</tr>
<tr>
<td><code>spark.mesos.disk</code></td>
<td><code>(none)</code></td>
<td>
Set the amount of disk to acquire for this job. You might need to set this value depending on the type of disk isolation set up in Mesos.
For instance, setting an amount of disk is required when XFS isolator is enabled with hard limit enforced otherwise the isolator will kill
the Mesos executor when downloading the Spark executor archive.
</td>
</tr>
<tr>
<td><code>spark.mesos.network.name</code></td>
<td><code>(none)</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,16 @@ package object config {
.intConf
.createWithDefault(0)

private[spark] val EXECUTOR_DISK =
ConfigBuilder("spark.mesos.disk")
.doc("Set the amount of disk to acquire for this job. You might need to set this value " +
"depending on the type of disk isolation set up in Mesos. For instance, setting an " +
"amount of disk is required when XFS isolator is enabled with hard limit enforced " +
"otherwise the isolator will kill the Mesos executor when downloading the Spark executor " +
"archive.")
.intConf
.createOptional

private[spark] val TASK_LABELS =
ConfigBuilder("spark.mesos.task.labels")
.doc("Set the Mesos labels to add to each task. Labels are free-form key-value pairs. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val useFetcherCache = conf.get(ENABLE_FETCHER_CACHE)

private val maxGpus = conf.get(MAX_GPUS)
private val diskPerExecutor = conf.get(EXECUTOR_DISK)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this throws an exception if not set, and it's optional. You can either check .contains here and make this an Option, or have a default value of '0' or something that would indicate no reservation.


private val taskLabels = conf.get(TASK_LABELS)

Expand Down Expand Up @@ -399,6 +400,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val offerMem = getResource(offer.getResourcesList, "mem")
val offerCpus = getResource(offer.getResourcesList, "cpus")
val offerPorts = getRangeResource(offer.getResourcesList, "ports")
val offerDisk = getResource(offer.getResourcesList, "disk")
val offerReservationInfo = offer
.getResourcesList
.asScala
Expand All @@ -411,18 +413,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
offerReservationInfo.map(resInfo =>
s"reservation info: ${resInfo.getReservation.toString}").getOrElse("") +
s"mem: $offerMem cpu: $offerCpus ports: $offerPorts " +
s"mem: $offerMem cpu: $offerCpus ports: $offerPorts disk: $offerDisk " +
s"resources: ${offer.getResourcesList.asScala.mkString(",")}." +
s" Launching ${offerTasks.size} Mesos tasks.")

for (task <- offerTasks) {
val taskId = task.getTaskId
val mem = getResource(task.getResourcesList, "mem")
val cpus = getResource(task.getResourcesList, "cpus")
val disk = getResource(task.getResourcesList, "disk")
val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")

logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
s" ports: $ports" + s" on slave with slave id: ${task.getSlaveId.getValue} ")
s" disk: $disk ports: $ports on slave with slave id: ${task.getSlaveId.getValue} ")
}

driver.launchTasks(
Expand Down Expand Up @@ -497,11 +500,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

val taskCPUs = executorCores(offerCPUs)
val taskMemory = executorMemory(sc)
val taskDisk = diskPerExecutor

slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)

val (resourcesLeft, resourcesToUse) =
partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)
partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs, taskDisk)

val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
Expand Down Expand Up @@ -534,7 +538,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
resources: JList[Resource],
taskCPUs: Int,
taskMemory: Int,
taskGPUs: Int)
taskGPUs: Int,
taskDisk: Option[Int])
: (List[Resource], List[Resource]) = {

// partition cpus & mem
Expand All @@ -550,14 +555,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val (nonPortResources, portResourcesToUse) =
partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterGPUResources)

(nonPortResources,
var (remainingResources, resourcesToUse) = (nonPortResources,
cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse)

if (taskDisk.isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice and safe. I think it's OK to be consistent with how GPUs are handled -- which may mean it's good to copy your approach for GPU config too. You can use a default of 0 for disk too (I suppose that's nice as if someone sets it to 0, that should have a similar meaning as not set).

val (afterDiskResources, diskResourcesToUse) =
partitionResources(remainingResources.asJava, "disk", taskDisk.get)

remainingResources = afterDiskResources
resourcesToUse ++= diskResourcesToUse
}
(remainingResources, resourcesToUse)
}

private def canLaunchTask(slaveId: String, offerHostname: String,
resources: JList[Resource]): Boolean = {
val offerMem = getResource(resources, "mem")
val offerCPUs = getResource(resources, "cpus").toInt
val offerDisk = getResource(resources, "disk").toInt
val cpus = executorCores(offerCPUs)
val mem = executorMemory(sc)
val ports = getRangeResource(resources, "ports")
Expand All @@ -568,6 +583,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
numExecutors < executorLimit &&
diskPerExecutor.fold(true)(_ <= offerDisk) &&
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
meetsPortRequirements &&
satisfiesLocality(offerHostname)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ trait MesosSchedulerUtils extends Logging {
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
}

def resourceExists(res: JList[Resource], name: String): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only used in test code, I'd move it there.

res.asScala.exists(_.getName == name)
}

/**
* Transforms a range resource to a list of ranges
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
Utils.createTextAttribute("c2", "b"))
val offers = List(
Utils.createOffer("o1", "s1", mem, cpu, None, 0),
Utils.createOffer("o2", "s2", mem, cpu, None, 0, s2Attributes),
Utils.createOffer("o3", "s3", mem, cpu, None, 0, s3Attributes))
Utils.createOffer("o2", "s2", mem, cpu, None, 0, None, s2Attributes),
Utils.createOffer("o3", "s3", mem, cpu, None, 0, None, s3Attributes))

def submitDriver(driverConstraints: String): Unit = {
val response = scheduler.submitDriver(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,46 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == offerCores)
}

test("mesos supports spark.mesos.disk") {
val claimedDisk = 40
setBackend(Map("spark.mesos.disk" -> claimedDisk.toString))

val executorMemory = backend.executorMemory(sc)
val offers = List(Resources(executorMemory, 1, 0, Some(100)))
offerResources(offers)

val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)

val taskDisk = backend.getResource(taskInfos.head.getResourcesList, "disk")
assert(taskDisk == claimedDisk)
}

test("mesos supports unset spark.mesos.disk") {
setBackend()

val executorMemory = backend.executorMemory(sc)
val offers = List(Resources(executorMemory, 1, 0, Some(100)))
offerResources(offers)

val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)

val taskDiskExist = backend.resourceExists(taskInfos.head.getResourcesList, "disk")
assert(!taskDiskExist)
}

test("mesos declines offer if not enough disk available") {
val claimedDisk = 400
setBackend(Map("spark.mesos.disk" -> claimedDisk.toString))

val executorMemory = backend.executorMemory(sc)
val offers = List(Resources(executorMemory, 1, 0, Some(100)))
offerResources(offers)

verifyDeclinedOffer(driver, createOfferId("o1"))
}

test("mesos does not acquire more than spark.cores.max") {
val maxCores = 10
setBackend(Map(CORES_MAX.key -> maxCores.toString))
Expand Down Expand Up @@ -686,7 +726,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
verifyTaskLaunched(driver, "o1")
}

private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
private case class Resources(mem: Int, cpus: Int, gpus: Int = 0, disk: Option[Int] = None)

private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = {
val mockEndpointRef = mock[RpcEndpointRef]
Expand All @@ -709,7 +749,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

private def offerResources(offers: List[Resources], startId: Int = 1): Unit = {
val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus)}
createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus,
offer.disk)}

backend.resourceOffers(driver, mesosOffers.asJava)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object Utils {
cpus: Int,
ports: Option[(Long, Long)] = None,
gpus: Int = 0,
disk: Option[Int] = None,
attributes: List[Attribute] = List.empty): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
Expand All @@ -73,6 +74,12 @@ object Utils {
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(gpus))
}
if (disk.isDefined) {
builder.addResourcesBuilder()
.setName("disk")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(disk.get))
}
builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))
Expand Down