Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9f5b195
[SPARK-26329][CORE] Faster polling of executor memory metrics.
wypoon Jan 4, 2019
03e41a8
[SPARK-26329][CORE] Fix test compilation error in sql.
wypoon Feb 12, 2019
7f6bd74
[SPARK-26329][CORE] Fix Mima issues.
wypoon Feb 13, 2019
7397897
[SPARK-26329][CORE] Fix possible NPE.
wypoon Feb 13, 2019
e1aeafc
[SPARK-26329][CORE] Fix JsonProtocolSuite post-rebase to account for …
wypoon Mar 6, 2019
75ba39d
[SPARK-26329][CORE] Extract polling logic into a separate class.
wypoon Mar 8, 2019
ea2ff0d
[SPARK-26329][CORE] On task failure, send executor metrics in the Tas…
wypoon Mar 18, 2019
0cbfc04
[SPARK-26329][CORE] Unit tests for sending executor metrics in TaskRe…
wypoon Mar 21, 2019
8cb30a8
[SPARK-26329][CORE] Add driver updates to test for executor metrics a…
wypoon Mar 22, 2019
077abb0
[SPARK-26329][CORE] Add SparkListenerTaskEnd events to test for execu…
wypoon Mar 23, 2019
7a3c90d
[SPARK-26329][CORE] Address feedback from irashid.
wypoon Mar 27, 2019
3ed583a
[SPARK-26329][CORE] Fix ExecutorSuite failures.
wypoon Mar 28, 2019
0a4828a
[SPARK-26329][CORE] Delete a comment on irashid's suggestion.
wypoon Mar 28, 2019
9530b75
[SPARK-26329][CORE] Change executorUpdates to be a scala.collection.m…
wypoon Mar 28, 2019
38a397c
[SPARK-26329][CORE] Update HistoryServerSuite.
wypoon Apr 9, 2019
e062e60
[SPARK-26329][CORE] Get executor updates and reset the peaks in a sin…
wypoon May 15, 2019
20b4b7e
[SPARK-26329][CORE] Test fixes after rebase on master.
wypoon Jul 3, 2019
b898ad2
[SPARK-26329][CORE] Adopt some suggestions from attilapiros.
wypoon Jul 4, 2019
fbb55bf
[SPARK-26329][CORE] Address feedback from Imran Rashid.
wypoon Jul 19, 2019
99addf1
[SPARK-26329][CORE] Make TCMP case class private.
wypoon Jul 19, 2019
7331b27
[SPARK-26329][CORE] Fix a test post-rebase.
wypoon Jul 29, 2019
7556d6a
[SPARK-26329][CORE] Update a doc comment based on feedback from Imran…
wypoon Jul 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-26329][CORE] Add driver updates to test for executor metrics a…
…ggregation.

Add an internal check within the test showing how the expected aggregated metrics are calculated. This documents how the expected values are arrived at.
  • Loading branch information
wypoon committed Jul 29, 2019
commit 8cb30a876151a7a6c155c2e3ba2fe09f6dfb3984
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import java.io.{File, FileOutputStream, InputStream, IOException}
import java.util.Arrays

import scala.collection.immutable.Map
import scala.collection.mutable
Expand Down Expand Up @@ -288,6 +289,79 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus(conf)

// Executor metrics
// driver
val md_1 = Array(4000L, 50L, 0L, 0L, 40L, 0L, 40L, 0L, 70L, 0L, 7500L, 3500L,
0L, 0L, 0L, 0L, 10L, 90L, 2L, 20L)
val md_2 = Array(4500L, 50L, 0L, 0L, 40L, 0L, 40L, 0L, 70L, 0L, 8000L, 3500L,
0L, 0L, 0L, 0L, 10L, 90L, 3L, 20L)
val md_3 = Array(4200L, 50L, 0L, 0L, 40L, 0L, 40L, 0L, 70L, 0L, 7800L, 3500L,
0L, 0L, 0L, 0L, 15L, 100L, 5L, 20L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is becoming pretty impossible to reason about . its not really your fault, its taking something that was probably a bit too complex in the first place, and adding to it. anyway I will think if there is a better way we can organize this test, don't worry about it for now (unless you have a better idea ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it is virtually impossible to reason about the arrays of numbers.
However, that is why I introduced:

    // calculated metric peaks per stage per executor
    // metrics sent during stage 0 for each executor
    val cp0_1 = Seq(m1_1, m1_2, m1_3, t1, m1_4, t3).reduceLeft(max)
    val cp0_2 = Seq(m2_1, m2_2, m2_3, t2, m2_4, t4).reduceLeft(max)
    val cp0_d = Seq(md_1, md_2).reduceLeft(max)
    // metrics sent during stage 1 for each executor
    val cp1_1 = Seq(m1_4, m1_5, m1_6, m1_7, t5).reduceLeft(max)
    val cp1_2 = Seq(m2_4, m2_5, m2_6, m2_7, t6).reduceLeft(max)
    val cp1_d = Seq(md_2, md_3).reduceLeft(max)

and the internal check that the calculated peaks match the peak values that I check against at the end.
When you look at what events are sent, you can match them with the above definitions, and you can reason about the correctness of the metric aggregation.


// executors 1 and 2
val m1_1 = Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L, 7500L, 3500L,
6500L, 2500L, 5500L, 1500L, 10L, 90L, 2L, 20L)
val m2_1 = Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L, 8500L, 3500L,
7500L, 2500L, 6500L, 1500L, 10L, 90L, 2L, 20L)
val m1_2 = Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L, 8000L, 4000L,
7000L, 3000L, 6000L, 2000L, 10L, 90L, 2L, 20L)
val m2_2 = Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L, 9000L, 4000L,
8000L, 3000L, 7000L, 2000L, 10L, 90L, 2L, 20L)
val m1_3 = Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L, 8000L, 3500L,
7000L, 2500L, 6000L, 1500L, 10L, 90L, 2L, 20L)
val m2_3 = Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L, 8500L, 3500L,
7500L, 2500L, 6500L, 1500L, 10L, 90L, 2L, 20L)
val m1_4 = Array(5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L,
0L, 5000L, 3000L, 4000L, 2000L, 3000L, 1000L, 10L, 90L, 2L, 20L)
val m2_4 = Array(7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L,
40L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L, 10L, 90L, 2L, 20L)
val m1_5 = Array(6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L, 5000L, 3000L,
4000L, 2000L, 3000L, 1000L, 10L, 90L, 2L, 20L)
val m2_5 = Array(5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L,
20L, 8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 10L, 90L, 2L, 20L)
val m1_6 = Array(7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L, 3000L, 2500L,
2000L, 1500L, 1000L, 500L, 10L, 90L, 2L, 20L)
val m2_6 = Array(5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L,
20L, 7000L, 3000L, 6000L, 2000L, 5000L, 1000L, 10L, 90L, 2L, 20L)
val m1_7 = Array(5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L,
0L, 4000L, 2500L, 3000L, 1500L, 2000L, 500L, 10L, 90L, 2L, 20L)
val m2_7 = Array(4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L, 7000L,
4000L, 6000L, 3000L, 5000L, 2000L, 10L, 90L, 2L, 20L)

def max(a: Array[Long], b: Array[Long]): Array[Long] =
(a, b).zipped.map(Math.max)

// calculated metric peaks per stage per executor
// metrics sent during stage 0 for each executor
val cp0_1 = Seq(m1_1, m1_2, m1_3, m1_4).reduceLeft(max)
val cp0_2 = Seq(m2_1, m2_2, m2_3, m2_4).reduceLeft(max)
val cp0_d = Seq(md_1, md_2).reduceLeft(max)
// metrics sent during stage 1 for each executor
val cp1_1 = Seq(m1_4, m1_5, m1_6, m1_7).reduceLeft(max)
val cp1_2 = Seq(m2_4, m2_5, m2_6, m2_7).reduceLeft(max)
val cp1_d = Seq(md_2, md_3).reduceLeft(max)

// expected metric peaks per stage per executor
val p0_1 = Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L,
70L, 20L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L, 10L, 90L, 2L, 20L)
val p0_2 = Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L,
80L, 40L, 9000L, 4000L, 8000L, 3000L, 7000L, 2000L, 10L, 90L, 2L, 20L)
val p0_d = Array(4500L, 50L, 0L, 0L, 40L, 0L, 40L, 0L,
70L, 0L, 8000L, 3500L, 0L, 0L, 0L, 0L, 10L, 90L, 3L, 20L)
val p1_1 = Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L,
50L, 0L, 5000L, 3000L, 4000L, 2000L, 3000L, 1000L, 10L, 90L, 2L, 20L)
val p1_2 = Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L,
40L, 40L, 8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 10L, 90L, 2L, 20L)
val p1_d = Array(4500L, 50L, 0L, 0L, 40L, 0L, 40L, 0L,
70L, 0L, 8000L, 3500L, 0L, 0L, 0L, 0L, 15L, 100L, 5L, 20L)

assert(java.util.Arrays.equals(p0_1, cp0_1))
assert(java.util.Arrays.equals(p0_2, cp0_2))
assert(java.util.Arrays.equals(p0_d, cp0_d))
assert(java.util.Arrays.equals(p1_1, cp1_1))
assert(java.util.Arrays.equals(p1_2, cp1_2))
assert(java.util.Arrays.equals(p1_d, cp1_d))

// Events to post.
val events = Array(
SparkListenerApplicationStart("executionMetrics", None,
Expand All @@ -297,68 +371,47 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
createStageSubmittedEvent(0),
// receive 3 metric updates from each executor with just stage 0 running,
// with different peak updates for each executor
createExecutorMetricsUpdateEvent(List(0), 1,
new ExecutorMetrics(Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L, 7500L, 3500L,
6500L, 2500L, 5500L, 1500L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(0), 2,
new ExecutorMetrics(Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L, 8500L, 3500L,
7500L, 2500L, 6500L, 1500L, 10L, 90L, 2L, 20L))),
// exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6
createExecutorMetricsUpdateEvent(List(0), 1,
new ExecutorMetrics(Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L, 8000L, 4000L,
7000L, 3000L, 6000L, 2000L, 10L, 90L, 2L, 20L))),
// also, receive 1 metric update from the driver
createExecutorMetricsUpdateEvent(List(0), "1", new ExecutorMetrics(m1_1)),
createExecutorMetricsUpdateEvent(List(0), "2", new ExecutorMetrics(m2_1)),
// exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6
createExecutorMetricsUpdateEvent(List(0), "1", new ExecutorMetrics(m1_2)),
// exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6
createExecutorMetricsUpdateEvent(List(0), 2,
new ExecutorMetrics(Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L, 9000L, 4000L,
8000L, 3000L, 7000L, 2000L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(0), "2", new ExecutorMetrics(m2_2)),
// driver
createExecutorMetricsUpdateEvent(List(-1), "driver", new ExecutorMetrics(md_1)),
// exec 1: new stage 0 peaks for metrics at indexes: 5, 7
createExecutorMetricsUpdateEvent(List(0), 1,
new ExecutorMetrics(Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L, 8000L, 3500L,
7000L, 2500L, 6000L, 1500L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(0), "1", new ExecutorMetrics(m1_3)),
// exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8
createExecutorMetricsUpdateEvent(List(0), 2,
new ExecutorMetrics(Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L, 8500L, 3500L,
7500L, 2500L, 6500L, 1500L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(0), "2", new ExecutorMetrics(m2_3)),
// now start stage 1, one more metric update for each executor, and new
// peaks for some stage 1 metrics (as listed), initialize stage 1 peaks
createStageSubmittedEvent(1),
// exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7; initialize stage 1 peaks
createExecutorMetricsUpdateEvent(List(0, 1), 1,
new ExecutorMetrics(Array(5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L,
0L, 5000L, 3000L, 4000L, 2000L, 3000L, 1000L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(0, 1), "1", new ExecutorMetrics(m1_4)),
// exec 2: new stage 0 peaks for metrics at indexes: 0, 1, 3, 6, 7, 9;
// initialize stage 1 peaks
createExecutorMetricsUpdateEvent(List(0, 1), 2,
new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L,
40L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(0, 1), "2", new ExecutorMetrics(m2_4)),
// driver
createExecutorMetricsUpdateEvent(List(-1), "driver", new ExecutorMetrics(md_2)),
// complete stage 0, and 3 more updates for each executor with just
// stage 1 running
createStageCompletedEvent(0),
// exec 1: new stage 1 peaks for metrics at indexes: 0, 1, 3
createExecutorMetricsUpdateEvent(List(1), 1,
new ExecutorMetrics(Array(6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L, 5000L, 3000L,
4000L, 2000L, 3000L, 1000L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(1), "1", new ExecutorMetrics(m1_5)),
// exec 2: new stage 1 peaks for metrics at indexes: 3, 4, 7, 8
createExecutorMetricsUpdateEvent(List(1), 2,
new ExecutorMetrics(Array(5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L,
20L, 8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(1), "2", new ExecutorMetrics(m2_5)),
// exec 1: new stage 1 peaks for metrics at indexes: 0, 4, 5, 7
createExecutorMetricsUpdateEvent(List(1), 1,
new ExecutorMetrics(Array(7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L, 3000L, 2500L,
2000L, 1500L, 1000L, 500L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(1), "1", new ExecutorMetrics(m1_6)),
// exec 2: new stage 1 peak for metrics at index: 7
createExecutorMetricsUpdateEvent(List(1), 2,
new ExecutorMetrics(Array(5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L,
20L, 7000L, 3000L, 6000L, 2000L, 5000L, 1000L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(1), "2", new ExecutorMetrics(m2_6)),
// driver
createExecutorMetricsUpdateEvent(List(-1), "driver", new ExecutorMetrics(md_3)),
// exec 1: no new stage 1 peaks
createExecutorMetricsUpdateEvent(List(1), 1,
new ExecutorMetrics(Array(5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L,
0L, 4000L, 2500L, 3000L, 1500L, 2000L, 500L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(1), "1", new ExecutorMetrics(m1_7)),
createExecutorRemovedEvent(1),
// exec 2: new stage 1 peak for metrics at index: 6
createExecutorMetricsUpdateEvent(List(1), 2,
new ExecutorMetrics(Array(4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L, 7000L,
4000L, 6000L, 3000L, 5000L, 2000L, 10L, 90L, 2L, 20L))),
createExecutorMetricsUpdateEvent(List(1), "2", new ExecutorMetrics(m2_7)),
createStageCompletedEvent(1),
SparkListenerApplicationEnd(1000L))

Expand All @@ -374,29 +427,25 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] =
Map(
((0, "1"),
new SparkListenerStageExecutorMetrics("1", 0, 0,
new ExecutorMetrics(Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L,
70L, 20L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L, 10L, 90L, 2L, 20L)))),
new SparkListenerStageExecutorMetrics("1", 0, 0, new ExecutorMetrics(p0_1))),
((0, "2"),
new SparkListenerStageExecutorMetrics("2", 0, 0,
new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L,
80L, 40L, 9000L, 4000L, 8000L, 3000L, 7000L, 2000L, 10L, 90L, 2L, 20L)))),
new SparkListenerStageExecutorMetrics("2", 0, 0, new ExecutorMetrics(p0_2))),
((0, "driver"),
new SparkListenerStageExecutorMetrics("driver", 0, 0, new ExecutorMetrics(p0_d))),
((1, "1"),
new SparkListenerStageExecutorMetrics("1", 1, 0,
new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L,
50L, 0L, 5000L, 3000L, 4000L, 2000L, 3000L, 1000L, 10L, 90L, 2L, 20L)))),
new SparkListenerStageExecutorMetrics("1", 1, 0, new ExecutorMetrics(p1_1))),
((1, "2"),
new SparkListenerStageExecutorMetrics("2", 1, 0,
new ExecutorMetrics(Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L,
40L, 40L, 8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 10L, 90L, 2L, 20L)))))
new SparkListenerStageExecutorMetrics("2", 1, 0, new ExecutorMetrics(p1_2))),
((1, "driver"),
new SparkListenerStageExecutorMetrics("driver", 1, 0, new ExecutorMetrics(p1_d))))
// Verify the log file contains the expected events.
// Posted events should be logged, except for ExecutorMetricsUpdate events -- these
// are consolidated, and the peak values for each stage are logged at stage end.
val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
try {
val lines = readLines(logData)
val logStart = SparkListenerLogStart(SPARK_VERSION)
assert(lines.size === 14)
assert(lines.size === 16)
assert(lines(0).contains("SparkListenerLogStart"))
assert(lines(1).contains("SparkListenerApplicationStart"))
assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
Expand All @@ -406,13 +455,13 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
case stageCompleted: SparkListenerStageCompleted =>
val execIds = Set[String]()
(1 to 2).foreach { _ =>
(1 to 3).foreach { _ =>
val execId = checkStageExecutorMetrics(lines(logIdx),
stageCompleted.stageInfo.stageId, expectedMetricsEvents)
execIds += execId
logIdx += 1
}
assert(execIds.size == 2) // check that each executor was logged
assert(execIds.size == 3) // check that each executor/driver was logged
checkEvent(lines(logIdx), event)
logIdx += 1
case _ =>
Expand Down Expand Up @@ -444,16 +493,30 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
SparkListenerExecutorRemoved(0L, executorId.toString, "test")
}

/**
* Helper to create a SparkListenerExecutorMetricsUpdate event.
* For the driver (executorId == "driver"), the executorUpdates contain a single entry with
* the key (-1, -1). There should be a single stageId passed in stageIds, namely -1.
* For the executors, for each stage, we assume there is a single stage attempt (attempt 0);
* the executorUpdates contain an entry for each stageId passed in stageIds, with the key
* (stageId, 0).
* The same executorMetrics are associated to each key in the executorUpdates.
*/
private def createExecutorMetricsUpdateEvent(
stageIds: Seq[Int],
executorId: Int,
executorId: String,
executorMetrics: ExecutorMetrics): SparkListenerExecutorMetricsUpdate = {
val taskMetrics = TaskMetrics.empty
taskMetrics.incDiskBytesSpilled(111)
taskMetrics.incMemoryBytesSpilled(222)
val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo)))
val executorUpdates = stageIds.map(id => (id, 0) -> executorMetrics).toMap
SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates)
val executorUpdates =
if (executorId == "driver") {
stageIds.map(id => (id, -1) -> executorMetrics).toMap
} else {
stageIds.map(id => (id, 0) -> executorMetrics).toMap
}
SparkListenerExecutorMetricsUpdate(executorId, accum, executorUpdates)
}

/** Check that the Spark history log line matches the expected event. */
Expand Down