Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.scheduler.SchedulingMode._
import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils}
import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils}
import org.apache.spark.util.collection.MedianHeap

/**
Expand Down Expand Up @@ -723,6 +723,23 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
// Check if any other attempt succeeded before this and this attempt has not been handled
if (successful(index) && killedByOtherAttempt.contains(tid)) {
// Undo the effect on calculatedTasks and totalResultSize made earlier when
// checking if can fetch more results
calculatedTasks -= 1
val resultSizeAcc = result.accumUpdates.find(a =>
a.name == Some(InternalAccumulator.RESULT_SIZE))
if (resultSizeAcc.isDefined) {
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value
}

// Handle this task as a killed task
handleFailedTask(tid, TaskState.KILLED,
TaskKilled("Finish but did not commit due to another attempt succeeded"))
return
}

info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
if (speculationEnabled) {
successfulTaskDurations.insert(info.duration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1532,4 +1532,74 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
}

test("SPARK-13343 speculative tasks that didn't commit shouldn't be marked as success") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation", "true")
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
// Offer resources for 4 tasks to start
for ((k, v) <- List(
"exec1" -> "host1",
"exec1" -> "host1",
"exec2" -> "host2",
"exec2" -> "host2")) {
val taskOption = manager.resourceOffer(k, v, NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === k)
}
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
clock.advance(1)
// Complete the 3 tasks and leave 1 task in running
for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}
// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
// speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(3))

// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption5.isDefined)
val task5 = taskOption5.get
assert(task5.index === 3)
assert(task5.taskId === 4)
assert(task5.executorId === "exec1")
assert(task5.attemptNumber === 1)
sched.backend = mock(classOf[SchedulerBackend])
sched.dagScheduler.stop()
sched.dagScheduler = mock(classOf[DAGScheduler])
// Complete one attempt for the running task
val result = createTaskResult(3, accumUpdatesByTask(3))
manager.handleSuccessfulTask(3, result)
// There is a race between the scheduler asking to kill the other task, and that task
// actually finishing. We simulate what happens if the other task finishes before we kill it.
verify(sched.backend).killTask(4, "exec1", true, "another attempt succeeded")
manager.handleSuccessfulTask(4, result)

val info3 = manager.taskInfos(3)
val info4 = manager.taskInfos(4)
assert(info3.successful)
assert(info4.killed)
verify(sched.dagScheduler).taskEnded(
manager.tasks(3),
TaskKilled("Finish but did not commit due to another attempt succeeded"),
null,
Seq.empty,
info4)
verify(sched.dagScheduler).taskEnded(manager.tasks(3), Success, result.value(),
result.accumUpdates, info3)
}
}