Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to …
…the execution when merging all accumulator updates

## What changes were proposed in this pull request?
In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException.

## How was this patch tested?
Updated unit test.

Author: Carson Wang <[email protected]>

Closes #17009 from carsonwang/FixSQLMetrics.
  • Loading branch information
carsonwang authored and Michael Allman committed Mar 24, 2017
commit 87af9872eb64ab489edc554759fb34e100a5b065
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
(accumulatorUpdate._1, accumulatorUpdate._2)
}
}.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
}

val driverUpdates = executionUIData.driverAccumUpdates.toSeq
mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId =>
val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter {
case (id, _) => executionUIData.accumulatorMetrics.contains(id)
}
mergeAccumulatorUpdates(totalUpdates, accumulatorId =>
executionUIData.accumulatorMetrics(accumulatorId).metricType)
case None =>
// This execution has been dropped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest

checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))

// Driver accumulator updates don't belong to this execution should be filtered and no
// exception will be thrown.
listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))

listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
Expand Down