Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ private[spark] class AppStatusListener(
val locality = event.taskInfo.taskLocality.toString()
val count = stage.localitySummary.getOrElse(locality, 0L) + 1L
stage.localitySummary = stage.localitySummary ++ Map(locality -> count)
stage.activeTasksPerExecutor(event.taskInfo.executorId) += 1
maybeUpdate(stage, now)

stage.jobs.foreach { job =>
Expand Down Expand Up @@ -530,6 +531,7 @@ private[spark] class AppStatusListener(
if (killedDelta > 0) {
stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
}
stage.activeTasksPerExecutor(event.taskInfo.executorId) -= 1
// [SPARK-24415] Wait for all tasks to finish before removing stage from live list
val removeStage =
stage.activeTasks == 0 &&
Expand All @@ -554,7 +556,11 @@ private[spark] class AppStatusListener(
if (killedDelta > 0) {
job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
}
conditionalLiveUpdate(job, now, removeStage)
if (removeStage) {
update(job, now)
} else {
maybeUpdate(job, now)
}
}

val esummary = stage.executorSummary(event.taskInfo.executorId)
Expand All @@ -565,7 +571,16 @@ private[spark] class AppStatusListener(
if (metricsDelta != null) {
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
}
conditionalLiveUpdate(esummary, now, removeStage)

val isLastTask = stage.activeTasksPerExecutor(event.taskInfo.executorId) == 0

// If the last task of the executor finished, then update the esummary
// for both live and history events.
if (isLastTask) {
update(esummary, now)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation is off

} else {
maybeUpdate(esummary, now)
}

if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
stage.cleaning = true
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ private class LiveStage extends LiveEntity {

val executorSummaries = new HashMap[String, LiveExecutorStageSummary]()

val activeTasksPerExecutor = new HashMap[String, Int]().withDefaultValue(0)

var blackListedExecutors = new HashSet[String]()

// Used for cleanup of tasks after they reach the configured limit. Not written to the store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,51 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(allJobs.head.numFailedStages == 1)
}

test("SPARK-25451: total tasks in the executor summary should match total stage tasks") {
val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)

val listener = new AppStatusListener(store, testConf, true)

val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))

val tasks = createTasks(4, Array("1", "2"))
tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
}

time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
Success, tasks(0), null))
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
Success, tasks(1), null))

stage.failureReason = Some("Failed")
listener.onStageCompleted(SparkListenerStageCompleted(stage))
time += 1
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor"))))

time += 1
tasks(2).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
time += 1
tasks(3).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))

val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
esummary.foreach { execSummary =>
assert(execSummary.failedTasks === 1)
assert(execSummary.succeededTasks === 1)
assert(execSummary.killedTasks === 0)
}
}

test("driver logs") {
val listener = new AppStatusListener(store, conf, true)

Expand Down