Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,13 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
// Check if any other attempt succeeded before this and this attempt has not been handled
if (successful(index) && killedByOtherAttempt(index)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness, we will also need to 'undo' the changes in enqueueSuccessfulTask : to reverse the counters in canFetchMoreResults.

(Orthogonal to this PR): Looking at use of killedByOtherAttempt, I see that there is a bug in executorLost w.r.t how it is updated - hopefully a fix for SPARK-24755 wont cause issues here.

handleFailedTask(tid, TaskState.KILLED,
TaskKilled("Finish but did not commit due to another attempt succeeded"))
return
}

info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
successfulTaskDurations.insert(info.duration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1371,4 +1371,74 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
}

test("SPARK-13343 speculative tasks that didn't commit shouldn't be marked as success") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation", "true")
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
// Offer resources for 4 tasks to start
for ((k, v) <- List(
"exec1" -> "host1",
"exec1" -> "host1",
"exec2" -> "host2",
"exec2" -> "host2")) {
val taskOption = manager.resourceOffer(k, v, NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === k)
}
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
clock.advance(1)
// Complete the 3 tasks and leave 1 task in running
for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}
// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
// speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(3))

// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption5.isDefined)
val task5 = taskOption5.get
assert(task5.index === 3)
assert(task5.taskId === 4)
assert(task5.executorId === "exec1")
assert(task5.attemptNumber === 1)
sched.backend = mock(classOf[SchedulerBackend])
sched.dagScheduler.stop()
sched.dagScheduler = mock(classOf[DAGScheduler])
// Complete one attempt for the running task
val result = createTaskResult(3, accumUpdatesByTask(3))
manager.handleSuccessfulTask(3, result)
// There is a race between the scheduler asking to kill the other task, and that task
// actually finishing. We simulate what happens if the other task finishes before we kill it.
verify(sched.backend).killTask(4, "exec1", true, "another attempt succeeded")
manager.handleSuccessfulTask(4, result)

val info3 = manager.taskInfos(3)
val info4 = manager.taskInfos(4)
assert(info3.successful)
assert(info4.killed)
verify(sched.dagScheduler).taskEnded(
manager.tasks(3),
TaskKilled("Finish but did not commit due to another attempt succeeded"),
null,
Seq.empty,
info4)
verify(sched.dagScheduler).taskEnded(manager.tasks(3), Success, result.value(),
result.accumUpdates, info3)
}
}