Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Modified Master.scala to use numWorkersVisited and numWorkersAlive in…
…stead of stopPos
  • Loading branch information
sarutak committed Sep 17, 2014
commit 7a4deea8552664cc53db1e20876b42631d36fae3
38 changes: 17 additions & 21 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,28 +489,24 @@ private[spark] class Master(
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val aliveWorkerNum = shuffledAliveWorkers.size

if (aliveWorkerNum > 0) {
var curPos = 0
var stopPos = aliveWorkerNum
for (driver <- waitingDrivers.toList) {
// iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
while (curPos != stopPos && !launched) {
val worker = shuffledAliveWorkers(curPos)
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % aliveWorkerNum
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (stopPos + 1) % aliveWorkerNum
stopPos = curPos + aliveWorkerNum
curPos = (curPos + 1) % numWorkersAlive
}
}

Expand Down