Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-8881] [SPARK-9260] Fix algorithm for scheduling executors on w…
…orkers

Current scheduling algorithm allocates one core at a time and in doing so ends up ignoring spark.executor.cores. As a result, when spark.cores.max/spark.executor.cores (i.e, num_executors) < num_workers, executors are not launched and the app hangs. This PR fixes and refactors the scheduling algorithm.

andrewor14

Author: Nishkam Ravi <[email protected]>
Author: nishkamravi2 <[email protected]>

Closes apache#7274 from nishkamravi2/master_scheduler and squashes the following commits:

b998097 [nishkamravi2] Update Master.scala
da0f491 [Nishkam Ravi] Update Master.scala
79084e8 [Nishkam Ravi] Update Master.scala
1daf25f [Nishkam Ravi] Update Master.scala
f279cdf [Nishkam Ravi] Update Master.scala
adec84b [Nishkam Ravi] Update Master.scala
a06da76 [nishkamravi2] Update Master.scala
40c8f9f [nishkamravi2] Update Master.scala (to trigger retest)
c11c689 [nishkamravi2] Update EventLoggingListenerSuite.scala
5d6a19c [nishkamravi2] Update Master.scala (for the purpose of issuing a retest)
2d6371c [Nishkam Ravi] Update Master.scala
66362d5 [nishkamravi2] Update Master.scala
ee7cf0e [Nishkam Ravi] Improved scheduling algorithm for executors

(cherry picked from commit 41a7cdf)
Signed-off-by: Andrew Or <[email protected]>
  • Loading branch information
nishkamravi2 authored and Andrew Or committed Jul 26, 2015
commit a37796be7366bfa4b51f5f4675537c10de4469fe
112 changes: 75 additions & 37 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ private[master] class Master(

/**
* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker.
*
* There are two modes of launching executors. The first attempts to spread out an application's
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
Expand All @@ -543,59 +544,96 @@ private[master] class Master(
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor may be launched on each worker.
*
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
// Try to spread out each app among all the workers, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
assigned(pos) += 1
private[master] def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
// If the number of cores per executor is not specified, then we can just schedule
// 1 core at a time since we expect a single executor to be launched on each worker
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var freeWorkers = (0 until numUsable).toIndexedSeq

def canLaunchExecutor(pos: Int): Boolean = {
usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
}

while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
freeWorkers = freeWorkers.filter(canLaunchExecutor)
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
coresToAssign -= coresPerExecutor
assignedCores(pos) += coresPerExecutor
assignedMemory(pos) += memoryPerExecutor

// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false
}
pos = (pos + 1) % numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
for (pos <- 0 until numUsable if assigned(pos) > 0) {
allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
}
}
} else {
// Pack each app into as few workers as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
}
}
assignedCores
}

/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}

/**
* Allocate a worker's resources to one or more executors.
* @param app the info of the application which the executors belong to
* @param coresToAllocate cores on this worker to be allocated to this application
* @param assignedCores number of cores on this worker for this application
* @param coresPerExecutor number of cores per executor
* @param worker the worker info
*/
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
coresToAllocate: Int,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
var coresLeft = coresToAllocate
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
val exec = app.addExecutor(worker, coresPerExecutor)
coresLeft -= coresPerExecutor
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
Expand Down