-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #21221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #21221
Changes from 1 commit
c8e8abe
5d6ae1c
ad10d28
10ed328
2d20367
f904f1e
c502ec4
7879e66
2662f6f
2871335
da83f2e
f25a44b
ca85c82
8b74ba8
036148c
91fb1db
2d8894a
99044e6
263c8c8
812fdcf
7ed42a5
8d9acdf
20799d2
8905d23
a0eed11
03cd5bc
10e7f15
a14b82a
2897281
ee4aa1d
571285b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
…enabled to enable/disable executor metrics update logging. Code review comments.
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,11 @@ package object config { | |
| .bytesConf(ByteUnit.KiB) | ||
| .createWithDefaultString("100k") | ||
|
|
||
| private[spark] val EVENT_LOG_EXECUTOR_METRICS_UPDATES = | ||
| ConfigBuilder("spark.eventLog.logExecutorMetricsUpdates.enabled") | ||
| .booleanConf | ||
| .createWithDefault(true) | ||
|
||
|
|
||
| private[spark] val EVENT_LOG_OVERWRITE = | ||
| ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -211,7 +211,7 @@ class DAGScheduler( | |
| taskScheduler.setDAGScheduler(this) | ||
|
|
||
| /** driver heartbeat for collecting metrics */ | ||
| private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, | ||
| private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, "driver-heartbeater", | ||
|
||
| sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) | ||
|
|
||
| /** BufferPoolMXBean for direct memory */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,7 @@ import org.apache.spark.util.{JsonProtocol, Utils} | |
| * spark.eventLog.overwrite - Whether to overwrite any existing files. | ||
| * spark.eventLog.dir - Path to the directory in which events are logged. | ||
| * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams | ||
| * spark.eventLog.logExecutorMetricsUpdates.enabled - Whether to log executor metrics updates | ||
| */ | ||
| private[spark] class EventLoggingListener( | ||
| appId: String, | ||
|
|
@@ -70,6 +71,7 @@ private[spark] class EventLoggingListener( | |
| private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) | ||
| private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) | ||
| private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES) | ||
| private val shouldLogExecutorMetricsUpdates = sparkConf.get(EVENT_LOG_EXECUTOR_METRICS_UPDATES) | ||
| private val testing = sparkConf.get(EVENT_LOG_TESTING) | ||
| private val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt | ||
| private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) | ||
|
|
@@ -82,7 +84,7 @@ private[spark] class EventLoggingListener( | |
| private val compressionCodecName = compressionCodec.map { c => | ||
| CompressionCodec.getShortName(c.getClass.getName) | ||
| } | ||
|
|
||
| logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecutorMetricsUpdates) | ||
|
||
| // Only defined if the file system scheme is not local | ||
| private var hadoopDataStream: Option[FSDataOutputStream] = None | ||
|
|
||
|
|
@@ -162,9 +164,11 @@ private[spark] class EventLoggingListener( | |
| // Events that do not trigger a flush | ||
| override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { | ||
| logEvent(event) | ||
| // clear the peak metrics when a new stage starts | ||
| liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()), | ||
| new mutable.HashMap[String, PeakExecutorMetrics]()) | ||
| if (shouldLogExecutorMetricsUpdates) { | ||
| // record the peak metrics for the new stage | ||
| liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()), | ||
| new mutable.HashMap[String, PeakExecutorMetrics]()) | ||
| } | ||
| } | ||
|
|
||
| override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event) | ||
|
|
@@ -179,22 +183,30 @@ private[spark] class EventLoggingListener( | |
|
|
||
| // Events that trigger a flush | ||
| override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { | ||
| // log the peak executor metrics for the stage, for each executor | ||
| val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() | ||
| val executorMap = liveStageExecutorMetrics.remove( | ||
| (event.stageInfo.stageId, event.stageInfo.attemptNumber())) | ||
| executorMap.foreach { | ||
| executorEntry => { | ||
| for ((executorId, peakExecutorMetrics) <- executorEntry) { | ||
| val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.jvmUsedHeapMemory, | ||
| peakExecutorMetrics.jvmUsedNonHeapMemory, peakExecutorMetrics.onHeapExecutionMemory, | ||
| peakExecutorMetrics.offHeapExecutionMemory, peakExecutorMetrics.onHeapStorageMemory, | ||
| peakExecutorMetrics.offHeapStorageMemory, peakExecutorMetrics.onHeapUnifiedMemory, | ||
| peakExecutorMetrics.offHeapUnifiedMemory, peakExecutorMetrics.directMemory, | ||
| peakExecutorMetrics.mappedMemory) | ||
| val executorUpdate = new SparkListenerExecutorMetricsUpdate( | ||
| executorId, accumUpdates, Some(executorMetrics)) | ||
| logEvent(executorUpdate) | ||
| if (shouldLogExecutorMetricsUpdates) { | ||
| // clear out any previous attempts, that did not have a stage completed event | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one potential issue here -- even though there is a stage completed event, you can still have tasks running from stage attempt (when there is a fetch failure, all existing tasks keep running). Those leftover tasks will effect the memory usage for other tasks which run on those executors. that said, I dunno if we can do much better here. the alternative would be to track the task start & end events for each stage attempt.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tracking task start and end would be some amount of overhead. If it's a relatively unlikely corner case, and unlikely to have much impact on the numbers, it may be better to leave as is. |
||
| val prevAttemptId = event.stageInfo.attemptNumber() - 1 | ||
| for (attemptId <- 0 to prevAttemptId) { | ||
| liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) | ||
| } | ||
|
|
||
| // log the peak executor metrics for the stage, for each executor | ||
|
||
| val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() | ||
| val executorMap = liveStageExecutorMetrics.remove( | ||
| (event.stageInfo.stageId, event.stageInfo.attemptNumber())) | ||
| executorMap.foreach { | ||
| executorEntry => { | ||
| for ((executorId, peakExecutorMetrics) <- executorEntry) { | ||
|
||
| val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.jvmUsedHeapMemory, | ||
| peakExecutorMetrics.jvmUsedNonHeapMemory, peakExecutorMetrics.onHeapExecutionMemory, | ||
| peakExecutorMetrics.offHeapExecutionMemory, peakExecutorMetrics.onHeapStorageMemory, | ||
| peakExecutorMetrics.offHeapStorageMemory, peakExecutorMetrics.onHeapUnifiedMemory, | ||
| peakExecutorMetrics.offHeapUnifiedMemory, peakExecutorMetrics.directMemory, | ||
| peakExecutorMetrics.mappedMemory) | ||
| val executorUpdate = new SparkListenerExecutorMetricsUpdate( | ||
| executorId, accumUpdates, Some(executorMetrics)) | ||
| logEvent(executorUpdate) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -266,12 +278,14 @@ private[spark] class EventLoggingListener( | |
| } | ||
|
|
||
| override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { | ||
| // For the active stages, record any new peak values for the memory metrics for the executor | ||
| event.executorUpdates.foreach { executorUpdates => | ||
| liveStageExecutorMetrics.values.foreach { peakExecutorMetrics => | ||
| val peakMetrics = peakExecutorMetrics.getOrElseUpdate( | ||
| event.execId, new PeakExecutorMetrics()) | ||
| peakMetrics.compareAndUpdate(executorUpdates) | ||
| if (shouldLogExecutorMetricsUpdates) { | ||
| // For the active stages, record any new peak values for the memory metrics for the executor | ||
| event.executorUpdates.foreach { executorUpdates => | ||
| liveStageExecutorMetrics.values.foreach { peakExecutorMetrics => | ||
| val peakMetrics = peakExecutorMetrics.getOrElseUpdate( | ||
| event.execId, new PeakExecutorMetrics()) | ||
| peakMetrics.compareAndUpdate(executorUpdates) | ||
|
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: whole class is
private[spark]so you don't need to add this to individual methodsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.