Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c8e8abe
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
5d6ae1c
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
ad10d28
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
10ed328
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
2d20367
wip on enum based metrics
squito May 23, 2018
f904f1e
wip ... has both enum and non-enum version
squito May 23, 2018
c502ec4
case objects, mostly complete
squito May 23, 2018
7879e66
Merge pull request #1 from squito/metric_enums
edwinalu Jun 3, 2018
2662f6f
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
2871335
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
da83f2e
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
f25a44b
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
ca85c82
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
8b74ba8
wip on enum based metrics
squito May 23, 2018
036148c
wip ... has both enum and non-enum version
squito May 23, 2018
91fb1db
case objects, mostly complete
squito May 23, 2018
2d8894a
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
99044e6
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
edwinalu Jun 14, 2018
263c8c8
code review comments
edwinalu Jun 14, 2018
812fdcf
code review comments:
edwinalu Jun 22, 2018
7ed42a5
Address code review comments. Also make executorUpdates in SparkListe…
edwinalu Jun 28, 2018
8d9acdf
Revert and make executorUpdates in SparkListenerExecutorMetricsUpdate…
edwinalu Jun 29, 2018
20799d2
code review comments: hid array implementation of executor metrics, a…
edwinalu Jul 25, 2018
8905d23
merge with master
edwinalu Jul 25, 2018
04875b8
Integration of ProcessTreeMetrics with PR 21221
Jul 26, 2018
a0eed11
address code review comments
edwinalu Aug 5, 2018
162b9b2
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
Aug 6, 2018
29a44c7
Changing the position of ptree and also make the computation configur…
Aug 7, 2018
3671427
Seperate metrics for jvm, python and others and update the tests
Aug 8, 2018
03cd5bc
code review comments
edwinalu Aug 13, 2018
c79b5ab
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
Aug 14, 2018
10e7f15
Merge branch 'master' into SPARK-23429.2
edwinalu Aug 14, 2018
a14b82a
merge conflicts
edwinalu Aug 14, 2018
2897281
disable stage executor metrics logging by default
edwinalu Aug 16, 2018
8f97b50
Merge branch 'SPARK-23429.2' of https://github.com/rezasafi/spark int…
Aug 17, 2018
b14cebc
Update JsonProtocolSuite with new metrics.
Aug 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address code review comments. Also make executorUpdates in SparkListe…
…nerExecutorMetricsUpdate

not optional. These are no longer logged, and backward compatibility should not be an issue.
These events should only be used to send task and executor updates for heartbeats, and
executors and driver should be the same Spark version.
  • Loading branch information
edwinalu committed Jun 29, 2018
commit 7ed42a5d0eb0b93bb9ddecf14d9461c80dfe1ea0
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Heartbeater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class Heartbeater(
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)

/** Schedules a task to report a heartbeat. */
private[spark] def start(): Unit = {
def start(): Unit = {
// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

Expand All @@ -53,7 +53,7 @@ private[spark] class Heartbeater(
}

/** Stops the heartbeat thread. */
private[spark] def stop(): Unit = {
def stop(): Unit = {
heartbeater.shutdown()
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
if(_heartbeater != null) {
if (_heartbeater != null) {
Utils.tryLogNonFatalError {
_heartbeater.stop()
}
Expand Down Expand Up @@ -2414,8 +2414,7 @@ class SparkContext(config: SparkConf) extends Logging {
private def reportHeartBeat(): Unit = {
val driverUpdates = _heartbeater.getCurrentMetrics()
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
Some(driverUpdates)))
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates, driverUpdates))
}

// In order to prevent multiple SparkContexts from being active at the same time, mark this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ package object config {
.bytesConf(ByteUnit.KiB)
.createWithDefaultString("100k")

private[spark] val EVENT_LOG_EXECUTOR_METRICS_UPDATES =
ConfigBuilder("spark.eventLog.logExecutorMetricsUpdates.enabled")
private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS =
ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled")
.booleanConf
.createWithDefault(true)

Expand Down
36 changes: 22 additions & 14 deletions core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import javax.management.ObjectName

import org.apache.spark.memory.MemoryManager

sealed trait MetricGetter {
private[spark] sealed trait MetricGetter {
def getMetricValue(memoryManager: MemoryManager): Long
val name = getClass().getName().stripSuffix("$").split("""\.""").last
}

abstract class MemoryManagerMetricGetter(f: MemoryManager => Long) extends MetricGetter {
private[spark] abstract class MemoryManagerMetricGetter(
f: MemoryManager => Long) extends MetricGetter {
override def getMetricValue(memoryManager: MemoryManager): Long = {
f(memoryManager)
}
}

abstract class MBeanMetricGetter(mBeanName: String) extends MetricGetter {
private[spark]abstract class MBeanMetricGetter(mBeanName: String)
extends MetricGetter {
val bean = ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer,
new ObjectName(mBeanName).toString, classOf[BufferPoolMXBean])

Expand All @@ -41,36 +43,42 @@ abstract class MBeanMetricGetter(mBeanName: String) extends MetricGetter {
}
}

case object JVMHeapMemory extends MetricGetter {
private[spark] case object JVMHeapMemory extends MetricGetter {
override def getMetricValue(memoryManager: MemoryManager): Long = {
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed()
}
}

case object JVMOffHeapMemory extends MetricGetter {
private[spark] case object JVMOffHeapMemory extends MetricGetter {
override def getMetricValue(memoryManager: MemoryManager): Long = {
ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed()
}
}

case object OnHeapExecutionMemory extends MemoryManagerMetricGetter(_.onHeapExecutionMemoryUsed)
private[spark] case object OnHeapExecutionMemory extends MemoryManagerMetricGetter(
_.onHeapExecutionMemoryUsed)

case object OffHeapExecutionMemory extends MemoryManagerMetricGetter(_.offHeapExecutionMemoryUsed)
private[spark] case object OffHeapExecutionMemory extends MemoryManagerMetricGetter(
_.offHeapExecutionMemoryUsed)

case object OnHeapStorageMemory extends MemoryManagerMetricGetter(_.onHeapStorageMemoryUsed)
private[spark] case object OnHeapStorageMemory extends MemoryManagerMetricGetter(
_.onHeapStorageMemoryUsed)

case object OffHeapStorageMemory extends MemoryManagerMetricGetter(_.offHeapStorageMemoryUsed)
private[spark] case object OffHeapStorageMemory extends MemoryManagerMetricGetter(
_.offHeapStorageMemoryUsed)

case object OnHeapUnifiedMemory extends MemoryManagerMetricGetter(
private[spark] case object OnHeapUnifiedMemory extends MemoryManagerMetricGetter(
(m => m.onHeapExecutionMemoryUsed + m.onHeapStorageMemoryUsed))

case object OffHeapUnifiedMemory extends MemoryManagerMetricGetter(
private[spark] case object OffHeapUnifiedMemory extends MemoryManagerMetricGetter(
(m => m.offHeapExecutionMemoryUsed + m.offHeapStorageMemoryUsed))

case object DirectPoolMemory extends MBeanMetricGetter("java.nio:type=BufferPool,name=direct")
case object MappedPoolMemory extends MBeanMetricGetter("java.nio:type=BufferPool,name=mapped")
private[spark] case object DirectPoolMemory extends MBeanMetricGetter(
"java.nio:type=BufferPool,name=direct")
private[spark] case object MappedPoolMemory extends MBeanMetricGetter(
"java.nio:type=BufferPool,name=mapped")

object MetricGetter {
private[spark] object MetricGetter {
val values = IndexedSeq(
JVMHeapMemory,
JVMOffHeapMemory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ class DAGScheduler(
// (taskId, stageId, stageAttemptId, accumUpdates)
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId,
// executor metrics indexed by MetricGetter.values
executorUpdates: Array[Long]): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
Some(executorUpdates)))
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates, executorUpdates))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
* spark.eventLog.overwrite - Whether to overwrite any existing files.
* spark.eventLog.dir - Path to the directory in which events are logged.
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
* spark.eventLog.logExecutorMetricsUpdates.enabled - Whether to log executor metrics updates
* spark.eventLog.logStageExecutorMetrics.enabled - Whether to log stage executor metrics
*/
private[spark] class EventLoggingListener(
appId: String,
Expand All @@ -70,7 +70,7 @@ private[spark] class EventLoggingListener(
private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES)
private val shouldLogExecutorMetricsUpdates = sparkConf.get(EVENT_LOG_EXECUTOR_METRICS_UPDATES)
private val shouldLogStageExecutorMetrics = sparkConf.get(EVENT_LOG_STAGE_EXECUTOR_METRICS)
private val testing = sparkConf.get(EVENT_LOG_TESTING)
private val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
Expand Down Expand Up @@ -162,7 +162,7 @@ private[spark] class EventLoggingListener(
// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
logEvent(event)
if (shouldLogExecutorMetricsUpdates) {
if (shouldLogStageExecutorMetrics) {
// record the peak metrics for the new stage
liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()),
new HashMap[String, PeakExecutorMetrics]())
Expand All @@ -181,7 +181,7 @@ private[spark] class EventLoggingListener(

// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
if (shouldLogExecutorMetricsUpdates) {
if (shouldLogStageExecutorMetrics) {
// clear out any previous attempts, that did not have a stage completed event
val prevAttemptId = event.stageInfo.attemptNumber() - 1
for (attemptId <- 0 to prevAttemptId) {
Expand All @@ -190,14 +190,12 @@ private[spark] class EventLoggingListener(

// log the peak executor metrics for the stage, for each live executor,
// whether or not the executor is running tasks for the stage
val executorMap = liveStageExecutorMetrics.remove(
val executorOpt = liveStageExecutorMetrics.remove(
(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
executorMap.foreach {
executorEntry => {
for ((executorId, peakExecutorMetrics) <- executorEntry) {
executorOpt.foreach { execMap =>
execMap.foreach { case (executorId, peakExecutorMetrics) =>
logEvent(new SparkListenerStageExecutorMetrics(executorId, event.stageInfo.stageId,
event.stageInfo.attemptNumber(), peakExecutorMetrics.metrics))
}
}
}
}
Expand Down Expand Up @@ -269,14 +267,12 @@ private[spark] class EventLoggingListener(
}

override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
if (shouldLogExecutorMetricsUpdates) {
if (shouldLogStageExecutorMetrics) {
// For the active stages, record any new peak values for the memory metrics for the executor
event.executorUpdates.foreach { executorUpdates =>
liveStageExecutorMetrics.values.foreach { peakExecutorMetrics =>
val peakMetrics = peakExecutorMetrics.getOrElseUpdate(
event.execId, new PeakExecutorMetrics())
peakMetrics.compareAndUpdate(executorUpdates)
}
liveStageExecutorMetrics.values.foreach { peakExecutorMetrics =>
val peakMetrics = peakExecutorMetrics.getOrElseUpdate(
event.execId, new PeakExecutorMetrics())
peakMetrics.compareAndUpdate(event.executorUpdates)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.metrics.MetricGetter
* values have been recorded yet.
*/
private[spark] class PeakExecutorMetrics {
// Metrics are indexed by MetricGetter.values
val metrics = new Array[Long](MetricGetter.values.length)
metrics(0) = -1

Expand All @@ -46,10 +47,4 @@ private[spark] class PeakExecutorMetrics {
}
updated
}

/** Clears/resets the saved peak values. */
def reset(): Unit = {
(0 until metrics.length).foreach { idx => metrics(idx) = 0}
metrics(0) = -1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
case class SparkListenerExecutorMetricsUpdate(
execId: String,
accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
executorUpdates: Option[Array[Long]] = None)
executorUpdates: Array[Long])
extends SparkListenerEvent

/**
Expand All @@ -175,7 +175,7 @@ case class SparkListenerExecutorMetricsUpdate(
* @param execId executor id
* @param stageId stage id
* @param stageAttemptId stage attempt
* @param executorMetrics executor level metrics
* @param executorMetrics executor level metrics, indexed by MetricGetter.values
*/
@DeveloperApi
case class SparkListenerStageExecutorMetrics(
Expand Down Expand Up @@ -283,7 +283,9 @@ private[spark] trait SparkListenerInterface {
def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit

/**
* Called when the driver reads stage executor metrics from the history log.
* Called with the peak memory metrics for a given (executor, stage) combination. Note that this
* is only present when reading from the event log (as in the history server), and is never
* called in a live application.
*/
def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,20 +686,23 @@ private[spark] class AppStatusListener(
}
}
}
event.executorUpdates.foreach { updates: Array[Long] =>
// check if there is a new peak value for any of the executor level memory metrics
liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
if (exec.peakExecutorMetrics.compareAndUpdate(updates)) {
maybeUpdate(exec, now)
}

// check if there is a new peak value for any of the executor level memory metrics
// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed
// for the live UI.
liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
if (exec.peakExecutorMetrics.compareAndUpdate(event.executorUpdates)) {
maybeUpdate(exec, now)
}
}
}

override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = {
val now = System.nanoTime()

// check if there is a new peak value for any of the executor level memory metrics
// check if there is a new peak value for any of the executor level memory metrics,
// while reading from the log. SparkListenerStageExecutorMetrics are only processed
// when reading logs.
liveExecutors.get(executorMetrics.execId)
.orElse(deadExecutors.get(executorMetrics.execId)) match {
case Some(exec) =>
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ class MemoryMetrics private[spark](
val totalOnHeapStorageMemory: Long,
val totalOffHeapStorageMemory: Long)

/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */
class PeakMemoryMetricsDeserializer private[spark] extends JsonDeserializer[Option[Array[Long]]] {
/** deserializer for peakMemoryMetrics: convert to array ordered by metric name */
private class PeakMemoryMetricsDeserializer extends JsonDeserializer[Option[Array[Long]]] {
override def deserialize(
jsonParser: JsonParser,
deserializationContext: DeserializationContext): Option[Array[Long]] = {
Expand All @@ -128,7 +128,7 @@ class PeakMemoryMetricsDeserializer private[spark] extends JsonDeserializer[Opti
}
}
/** serializer for peakMemoryMetrics: convert array to map with metric name as key */
class PeakMemoryMetricsSerializer private[spark] extends JsonSerializer[Option[Array[Long]]] {
private class PeakMemoryMetricsSerializer extends JsonSerializer[Option[Array[Long]]] {
override def serialize(
metrics: Option[Array[Long]],
jsonGenerator: JsonGenerator,
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private[spark] object JsonProtocol {
def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
val execId = metricsUpdate.execId
val accumUpdates = metricsUpdate.accumUpdates
val executorMetrics = metricsUpdate.executorUpdates.map(executorMetricsToJson(_))
val executorMetrics = executorMetricsToJson(metricsUpdate.executorUpdates)
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
("Executor ID" -> execId) ~
("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
Expand Down Expand Up @@ -723,10 +723,7 @@ private[spark] object JsonProtocol {
(json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
(taskId, stageId, stageAttemptId, updates)
}
val executorUpdates = jsonOption(json \ "Executor Metrics Updated") match {
case None => None
case Some(executorUpdate) => Some(executorMetricsFromJson(executorUpdate))
}
val executorUpdates = executorMetricsFromJson(json \ "Executor Metrics Updated")
SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, executorUpdates)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
}

test("Executor metrics update") {
testExecutorMetricsUpdateEventLogging()
testStageExecutorMetricsEventLogging()
}

/* ----------------- *
Expand Down Expand Up @@ -262,19 +262,17 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
}

/**
* Test executor metrics update logging functionality. This checks that a
* SparkListenerExecutorMetricsUpdate event is added to the Spark history
* log if one of the executor metrics is larger than any previously
* recorded value for the metric, per executor per stage. The task metrics
* should not be added.
* Test stage executor metrics logging functionality. This checks that peak
* values from SparkListenerExecutorMetricsUpdate events during a stage are
* logged in a StageExecutorMetrics event for each executor at stage completion.
*/
private def testExecutorMetricsUpdateEventLogging() {
private def testStageExecutorMetricsEventLogging() {
val conf = getLoggingConf(testDirPath, None)
val logName = "executorMetricsUpdated-test"
val logName = "stageExecutorMetrics-test"
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus(conf)

// expected ExecutorMetricsUpdate, for the given stage id and executor id
// expected StageExecutorMetrics, for the given stage id and executor id
val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] =
Map(
((0, "1"),
Expand Down Expand Up @@ -375,7 +373,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
case stageCompleted: SparkListenerStageCompleted =>
val execIds = Set[String]()
(1 to 2).foreach { _ =>
val execId = checkExecutorMetricsUpdate(lines(logIdx),
val execId = checkStageExecutorMetrics(lines(logIdx),
stageCompleted.stageInfo.stageId, expectedMetricsEvents)
execIds += execId
logIdx += 1
Expand Down Expand Up @@ -418,7 +416,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
taskMetrics.incDiskBytesSpilled(111)
taskMetrics.incMemoryBytesSpilled(222)
val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo)))
SparkListenerExecutorMetricsUpdate(executorId.toString, accum, Some(executorMetrics))
SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorMetrics)
}

/** Check that the Spark history log line matches the expected event. */
Expand Down Expand Up @@ -446,7 +444,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
* @param stageId the stage ID the ExecutorMetricsUpdate is associated with
* @param expectedEvents map of expected ExecutorMetricsUpdate events, for (stageId, executorId)
*/
private def checkExecutorMetricsUpdate(
private def checkStageExecutorMetrics(
line: String,
stageId: Int,
expectedEvents: Map[(Int, String), SparkListenerStageExecutorMetrics]): String = {
Expand Down
Loading