Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 44 additions & 26 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,25 @@ class DAGScheduler(
}
}

private def postTaskEnd(event: CompletionEvent): Unit = {
val taskMetrics: TaskMetrics =
if (event.accumUpdates.nonEmpty) {
try {
TaskMetrics.fromAccumulators(event.accumUpdates)
} catch {
case NonFatal(e) =>
val taskId = event.taskInfo.taskId
logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
null
}
} else {
null
}

listenerBus.post(SparkListenerTaskEnd(event.task.stageId, event.task.stageAttemptId,
Utils.getFormattedClassName(event.task), event.reason, event.taskInfo, taskMetrics))
}

/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
Expand All @@ -1133,34 +1152,36 @@ class DAGScheduler(
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)

// Reconstruct task metrics. Note: this may be null if the task has failed.
val taskMetrics: TaskMetrics =
if (event.accumUpdates.nonEmpty) {
try {
TaskMetrics.fromAccumulators(event.accumUpdates)
} catch {
case NonFatal(e) =>
logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
null
}
} else {
null
}

// The stage may have already finished when we get this event -- eg. maybe it was a
// speculative task. It is important that we send the TaskEnd event in any case, so listeners
// are properly notified and can chose to handle it. For instance, some listeners are
// doing their own accounting and if they don't get the task end event they think
// tasks are still running when they really aren't.
listenerBus.post(SparkListenerTaskEnd(
stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))

if (!stageIdToStage.contains(task.stageId)) {
// The stage may have already finished when we get this event -- eg. maybe it was a
// speculative task. It is important that we send the TaskEnd event in any case, so listeners
// are properly notified and can chose to handle it. For instance, some listeners are
// doing their own accounting and if they don't get the task end event they think
// tasks are still running when they really aren't.
postTaskEnd(event)

// Skip all the actions if the stage has been cancelled.
return
}

val stage = stageIdToStage(task.stageId)

// Make sure the task's accumulators are updated before any other processing happens, so that
// we can post a task end event before any jobs or stages are updated. The accumulators are
// only updated in certain cases.
event.reason match {
case Success =>
stage match {
case rs: ResultStage if rs.activeJob.isEmpty =>
// Ignore update if task's job has finished.
case _ =>
updateAccumulators(event)
}
case _: ExceptionFailure => updateAccumulators(event)
case _ =>
}
postTaskEnd(event)

event.reason match {
case Success =>
task match {
Expand All @@ -1171,7 +1192,6 @@ class DAGScheduler(
resultStage.activeJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
Expand All @@ -1198,7 +1218,6 @@ class DAGScheduler(

case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
Expand Down Expand Up @@ -1357,8 +1376,7 @@ class DAGScheduler(
// Do nothing here, left up to the TaskScheduler to decide how to handle denied commits

case exceptionFailure: ExceptionFailure =>
// Tasks failed with exceptions might still have accumulator updates.
updateAccumulators(event)
// Nothing left to do, already handled above for accumulator updates.

case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
Expand Down Expand Up @@ -2277,6 +2277,36 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
(Success, 1)))
}

test("task end event should have updated accumulators (SPARK-20342)") {
val tasks = 10

val accumId = new AtomicLong()
val foundCount = new AtomicLong()
val listener = new SparkListener() {
override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
event.taskInfo.accumulables.find(_.id == accumId.get).foreach { _ =>
foundCount.incrementAndGet()
}
}
}
sc.addSparkListener(listener)

// Try a few times in a loop to make sure. This is not guaranteed to fail when the bug exists,
// but it should at least make the test flaky. If the bug is fixed, this should always pass.
(1 to 10).foreach { i =>
foundCount.set(0L)

val accum = sc.longAccumulator(s"accum$i")
accumId.set(accum.id)

sc.parallelize(1 to tasks, tasks).foreach { _ =>
accum.add(1L)
}
sc.listenerBus.waitUntilEmpty(1000)
assert(foundCount.get() === tasks)
}
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down