Skip to content

Commit dee9e22

Browse files
committed
fix locality inversion bug in TaskManager by moving nopref branch
1 parent 8e7d5ba commit dee9e22

File tree

4 files changed

+67
-57
lines changed

4 files changed

+67
-57
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,14 +250,14 @@ private[spark] class TaskSchedulerImpl(
250250
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
251251
// of locality levels so that it gets a chance to launch local tasks on all of them.
252252
var launchedTask = false
253-
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
253+
for (taskSet <- sortedTaskSets; preferredLocality <- taskSet.myLocalityLevels) {
254254
do {
255255
launchedTask = false
256256
for (i <- 0 until shuffledOffers.size) {
257257
val execId = shuffledOffers(i).executorId
258258
val host = shuffledOffers(i).host
259259
if (availableCpus(i) >= CPUS_PER_TASK) {
260-
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
260+
for (task <- taskSet.resourceOffer(execId, host, preferredLocality)) {
261261
tasks(i) += task
262262
val tid = task.taskId
263263
taskIdToTaskSetId(tid) = taskSet.taskSet.id

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,7 @@ private[spark] class TaskSetManager(
196196
}
197197
}
198198

199-
if (!hadAliveLocations) {
200-
// Even though the task might've had preferred locations, all of those hosts or executors
201-
// are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
199+
if (tasks(index).preferredLocations == Nil) {
202200
addTo(pendingTasksWithNoPrefs)
203201
}
204202

@@ -239,7 +237,6 @@ private[spark] class TaskSetManager(
239237
*/
240238
private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
241239
var indexOffset = list.size
242-
243240
while (indexOffset > 0) {
244241
indexOffset -= 1
245242
val index = list(indexOffset)
@@ -352,6 +349,14 @@ private[spark] class TaskSetManager(
352349
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
353350
return Some((index, TaskLocality.NODE_LOCAL, false))
354351
}
352+
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
353+
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
354+
return Some((index, TaskLocality.PROCESS_LOCAL, false))
355+
}
356+
// find a speculative task if all noPref tasks have been scheduled
357+
val specTask = findSpeculativeTask(execId, host, locality).map {
358+
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
359+
if (specTask != None) return specTask
355360
}
356361

357362
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
@@ -363,21 +368,12 @@ private[spark] class TaskSetManager(
363368
}
364369
}
365370

366-
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
367-
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
368-
return Some((index, TaskLocality.PROCESS_LOCAL, false))
369-
}
370-
371371
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
372372
for (index <- findTaskFromList(execId, allPendingTasks)) {
373373
return Some((index, TaskLocality.ANY, false))
374374
}
375375
}
376-
377-
// Finally, if all else has failed, find a speculative task
378-
findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
379-
(taskIndex, allowedLocality, true)
380-
}
376+
None
381377
}
382378

383379
/**
@@ -386,17 +382,16 @@ private[spark] class TaskSetManager(
386382
def resourceOffer(
387383
execId: String,
388384
host: String,
389-
maxLocality: TaskLocality.TaskLocality)
385+
preferredLocality: TaskLocality.TaskLocality)
390386
: Option[TaskDescription] =
391387
{
392388
if (!isZombie) {
393389
val curTime = clock.getTime()
394390

395391
var allowedLocality = getAllowedLocalityLevel(curTime)
396-
if (allowedLocality > maxLocality) {
397-
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
392+
if (allowedLocality > preferredLocality) {
393+
allowedLocality = preferredLocality // We're not allowed to search for farther-away tasks
398394
}
399-
400395
findTask(execId, host, allowedLocality) match {
401396
case Some((index, taskLocality, speculative)) => {
402397
// Found a task; do some bookkeeping and return a task description
@@ -751,7 +746,6 @@ private[spark] class TaskSetManager(
751746
levels.toArray
752747
}
753748

754-
// Re-compute pendingTasksWithNoPrefs since new preferred locations may become available
755749
def executorAdded() {
756750
def newLocAvail(index: Int): Boolean = {
757751
for (loc <- tasks(index).preferredLocations) {
@@ -763,8 +757,6 @@ private[spark] class TaskSetManager(
763757
}
764758
false
765759
}
766-
logInfo("Re-computing pending task lists.")
767-
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
768760
myLocalityLevels = computeValidLocalityLevels()
769761
localityWaits = myLocalityLevels.map(getLocalityWait)
770762
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class FakeTaskSetManager(
8080
override def resourceOffer(
8181
execId: String,
8282
host: String,
83-
maxLocality: TaskLocality.TaskLocality)
83+
preferredLocality: TaskLocality.TaskLocality)
8484
: Option[TaskDescription] =
8585
{
8686
if (tasksSuccessful + numRunningTasks < numTasks) {

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -134,18 +134,17 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
134134
sc = new SparkContext("local", "test")
135135
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
136136
val taskSet = FakeTask.createTaskSet(1)
137-
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
137+
val clock = new FakeClock
138+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
138139

139-
// Offer a host with process-local as the constraint; this should work because the TaskSet
140-
// above won't have any locality preferences
141-
val taskOption = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
142-
assert(taskOption.isDefined)
143-
val task = taskOption.get
144-
assert(task.executorId === "exec1")
145-
assert(sched.startedTasks.contains(0))
140+
// Offer a host with process-local as the constraint;
141+
// we will get no task since the noPref/speculative task will only be considered
142+
// after NODE_LOCAL
143+
val taskOption = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
144+
assert(!taskOption.isDefined)
146145

147-
// Re-offer the host -- now we should get no more tasks
148-
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
146+
// Re-offer the host -- now we can get the task
147+
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) !== None)
149148

150149
// Tell it the task has finished
151150
manager.handleSuccessfulTask(0, createTaskResult(0))
@@ -161,15 +160,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
161160

162161
// First three offers should all find tasks
163162
for (i <- 0 until 3) {
164-
val taskOption = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
163+
val taskOption = manager.resourceOffer("exec1", "host1", NODE_LOCAL)
165164
assert(taskOption.isDefined)
166165
val task = taskOption.get
167166
assert(task.executorId === "exec1")
168167
}
169168
assert(sched.startedTasks.toSet === Set(0, 1, 2))
170169

171170
// Re-offer the host -- now we should get no more tasks
172-
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
171+
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) === None)
173172

174173
// Finish the first two tasks
175174
manager.handleSuccessfulTask(0, createTaskResult(0))
@@ -215,10 +214,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
215214
// First offer host1, exec1: first task should be chosen
216215
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
217216

218-
// Offer host1, exec1 again: the last task, which has no prefs, should be chosen
219-
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 3)
220-
221-
// Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
217+
// Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen due to the delay
222218
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
223219

224220
clock.advance(LOCALITY_WAIT)
@@ -229,19 +225,16 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
229225
// Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
230226
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
231227

232-
// Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
233-
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) === None)
228+
// Offer host1, exec1 again, at NODE_LOCAL level: noPref task should be considered now
229+
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 3)
234230

235231
// Offer host1, exec1 again, at ANY level: nothing should get chosen
236232
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
237233

238-
clock.advance(LOCALITY_WAIT)
234+
clock.advance(LOCALITY_WAIT * 3)
239235

240-
// Offer host1, exec1 again, at ANY level: task 1 should get chosen
236+
// Offer host1, exec1 again, at ANY level: task 1 should be run as non-local task
241237
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
242-
243-
// Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
244-
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
245238
}
246239

247240
test("delay scheduling with fallback") {
@@ -298,20 +291,24 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
298291
// First offer host1: first task should be chosen
299292
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
300293

301-
// Offer host1 again: third task should be chosen immediately because host3 is not up
302-
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
303-
304-
// After this, nothing should get chosen
294+
// After this, nothing should get chosen, because we have separated tasks with unavailable preference
295+
// from the noPrefPendingTasks
305296
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
306297

307298
// Now mark host2 as dead
308299
sched.removeExecutor("exec2")
309300
manager.executorLost("exec2", "host2")
310301

311-
// Task 1 should immediately be launched on host1 because its original host is gone
302+
// nothing should be choosen
303+
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
304+
305+
clock.advance(LOCALITY_WAIT * 2)
306+
307+
// task 1 and 2 would be scheduled as nonLocal task
312308
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
309+
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
313310

314-
// Now that all tasks have launched, nothing new should be launched anywhere else
311+
// all finished
315312
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
316313
assert(manager.resourceOffer("exec2", "host2", ANY) === None)
317314
}
@@ -456,15 +453,11 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
456453
Seq())
457454
val clock = new FakeClock
458455
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
459-
// All tasks added to no-pref list since no preferred location is available
460-
assert(manager.pendingTasksWithNoPrefs.size === 4)
461456
// Only ANY is valid
462457
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
463458
// Add a new executor
464459
sched.addExecutor("execD", "host1")
465460
manager.executorAdded()
466-
// Task 0 and 1 should be removed from no-pref list
467-
assert(manager.pendingTasksWithNoPrefs.size === 2)
468461
// Valid locality should contain NODE_LOCAL and ANY
469462
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY)))
470463
// Add another executor
@@ -536,6 +529,31 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
536529
assert(manager.emittedTaskSizeWarning)
537530
}
538531

532+
test("speculative and noPref task should be scheduled after node-local but before rack-local") {
533+
sc = new SparkContext("local", "test")
534+
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
535+
val taskSet = FakeTask.createTaskSet(4,
536+
Seq(TaskLocation("host1", "execA")),
537+
Seq(TaskLocation("host2"), TaskLocation("host1")),
538+
Seq(),
539+
Seq(TaskLocation("host3", "execC")))
540+
val clock = new FakeClock
541+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
542+
543+
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
544+
clock.advance(LOCALITY_WAIT)
545+
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1)
546+
manager.speculatableTasks += 1
547+
// schedule the nonPref task
548+
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 2)
549+
// schedule the speculative task
550+
clock.advance(LOCALITY_WAIT)
551+
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 1)
552+
clock.advance(LOCALITY_WAIT * 20)
553+
// schedule non-local tasks
554+
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
555+
}
556+
539557
def createTaskResult(id: Int): DirectTaskResult[Int] = {
540558
val valueSer = SparkEnv.get.serializer.newInstance()
541559
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)

0 commit comments

Comments
 (0)