-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17454][MESOS] Use Mesos disk resources for executors. #23758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,6 +79,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| private val useFetcherCache = conf.get(ENABLE_FETCHER_CACHE) | ||
|
|
||
| private val maxGpus = conf.get(MAX_GPUS) | ||
| private val diskPerExecutor = conf.get(EXECUTOR_DISK) | ||
|
|
||
| private val taskLabels = conf.get(TASK_LABELS) | ||
|
|
||
|
|
@@ -399,6 +400,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| val offerMem = getResource(offer.getResourcesList, "mem") | ||
| val offerCpus = getResource(offer.getResourcesList, "cpus") | ||
| val offerPorts = getRangeResource(offer.getResourcesList, "ports") | ||
| val offerDisk = getResource(offer.getResourcesList, "disk") | ||
| val offerReservationInfo = offer | ||
| .getResourcesList | ||
| .asScala | ||
|
|
@@ -411,18 +413,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + | ||
| offerReservationInfo.map(resInfo => | ||
| s"reservation info: ${resInfo.getReservation.toString}").getOrElse("") + | ||
| s"mem: $offerMem cpu: $offerCpus ports: $offerPorts " + | ||
| s"mem: $offerMem cpu: $offerCpus ports: $offerPorts disk: $offerDisk " + | ||
| s"resources: ${offer.getResourcesList.asScala.mkString(",")}." + | ||
| s" Launching ${offerTasks.size} Mesos tasks.") | ||
|
|
||
| for (task <- offerTasks) { | ||
| val taskId = task.getTaskId | ||
| val mem = getResource(task.getResourcesList, "mem") | ||
| val cpus = getResource(task.getResourcesList, "cpus") | ||
| val disk = getResource(task.getResourcesList, "disk") | ||
| val ports = getRangeResource(task.getResourcesList, "ports").mkString(",") | ||
|
|
||
| logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" + | ||
| s" ports: $ports" + s" on slave with slave id: ${task.getSlaveId.getValue} ") | ||
| s" disk: $disk ports: $ports on slave with slave id: ${task.getSlaveId.getValue} ") | ||
| } | ||
|
|
||
| driver.launchTasks( | ||
|
|
@@ -497,11 +500,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
|
|
||
| val taskCPUs = executorCores(offerCPUs) | ||
| val taskMemory = executorMemory(sc) | ||
| val taskDisk = diskPerExecutor | ||
|
|
||
| slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId) | ||
|
|
||
| val (resourcesLeft, resourcesToUse) = | ||
| partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs) | ||
| partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs, taskDisk) | ||
|
|
||
| val taskBuilder = MesosTaskInfo.newBuilder() | ||
| .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) | ||
|
|
@@ -534,7 +538,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| resources: JList[Resource], | ||
| taskCPUs: Int, | ||
| taskMemory: Int, | ||
| taskGPUs: Int) | ||
| taskGPUs: Int, | ||
| taskDisk: Option[Int]) | ||
| : (List[Resource], List[Resource]) = { | ||
|
|
||
| // partition cpus & mem | ||
|
|
@@ -550,14 +555,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| val (nonPortResources, portResourcesToUse) = | ||
| partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterGPUResources) | ||
|
|
||
| (nonPortResources, | ||
| var (remainingResources, resourcesToUse) = (nonPortResources, | ||
| cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse) | ||
|
|
||
| if (taskDisk.isDefined) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is nice and safe. I think it's OK to be consistent with how GPUs are handled -- which may mean it's good to copy your approach for GPU config too. You can use a default of 0 for disk too (I suppose that's nice as if someone sets it to 0, that should have a similar meaning as not set). |
||
| val (afterDiskResources, diskResourcesToUse) = | ||
| partitionResources(remainingResources.asJava, "disk", taskDisk.get) | ||
|
|
||
| remainingResources = afterDiskResources | ||
| resourcesToUse ++= diskResourcesToUse | ||
| } | ||
| (remainingResources, resourcesToUse) | ||
| } | ||
|
|
||
| private def canLaunchTask(slaveId: String, offerHostname: String, | ||
| resources: JList[Resource]): Boolean = { | ||
| val offerMem = getResource(resources, "mem") | ||
| val offerCPUs = getResource(resources, "cpus").toInt | ||
| val offerDisk = getResource(resources, "disk").toInt | ||
| val cpus = executorCores(offerCPUs) | ||
| val mem = executorMemory(sc) | ||
| val ports = getRangeResource(resources, "ports") | ||
|
|
@@ -568,6 +583,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| cpus + totalCoresAcquired <= maxCores && | ||
| mem <= offerMem && | ||
| numExecutors < executorLimit && | ||
| diskPerExecutor.fold(true)(_ <= offerDisk) && | ||
| slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && | ||
| meetsPortRequirements && | ||
| satisfiesLocality(offerHostname) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -180,6 +180,10 @@ trait MesosSchedulerUtils extends Logging { | |
| res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum | ||
| } | ||
|
|
||
| def resourceExists(res: JList[Resource], name: String): Boolean = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is only used in test code, I'd move it there. |
||
| res.asScala.exists(_.getName == name) | ||
| } | ||
|
|
||
| /** | ||
| * Transforms a range resource to a list of ranges | ||
| * | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this throws an exception if not set, and it's optional. You can either check
.containshere and make this an Option, or have a default value of '0' or something that would indicate no reservation.