-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #21221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #21221
Changes from 1 commit
c8e8abe
5d6ae1c
ad10d28
10ed328
2d20367
f904f1e
c502ec4
7879e66
2662f6f
2871335
da83f2e
f25a44b
ca85c82
8b74ba8
036148c
91fb1db
2d8894a
99044e6
263c8c8
812fdcf
7ed42a5
8d9acdf
20799d2
8905d23
a0eed11
03cd5bc
10e7f15
a14b82a
2897281
ee4aa1d
571285b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,7 +35,7 @@ import org.apache.commons.lang3.SerializationUtils | |
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.broadcast.Broadcast | ||
| import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} | ||
| import org.apache.spark.executor.{Executor, ExecutorMetrics, TaskMetrics} | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config | ||
| import org.apache.spark.network.util.JavaUtils | ||
|
|
@@ -214,6 +214,12 @@ class DAGScheduler( | |
| private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, | ||
| sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) | ||
|
|
||
| /** BufferPoolMXBean for direct memory */ | ||
| private val directBufferPool = Executor.getBufferPool(Executor.DIRECT_BUFFER_POOL_NAME) | ||
|
|
||
| /** BufferPoolMXBean for mapped memory */ | ||
| private val mappedBufferPool = Executor.getBufferPool(Executor.MAPPED_BUFFER_POOL_NAME) | ||
|
|
||
| /** | ||
| * Called by the TaskSetManager to report task's starting. | ||
| */ | ||
|
|
@@ -1764,12 +1770,8 @@ class DAGScheduler( | |
| /** Reports heartbeat metrics for the driver. */ | ||
| private def reportHeartBeat(): Unit = { | ||
|
||
| // get driver memory metrics | ||
| val driverUpdates = new ExecutorMetrics(System.currentTimeMillis(), | ||
| ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(), | ||
| sc.env.memoryManager.onHeapExecutionMemoryUsed, | ||
| sc.env.memoryManager.offHeapExecutionMemoryUsed, | ||
| sc.env.memoryManager.onHeapStorageMemoryUsed, | ||
| sc.env.memoryManager.offHeapStorageMemoryUsed) | ||
| val driverUpdates = Executor.getCurrentExecutorMetrics( | ||
| sc.env.memoryManager, directBufferPool, mappedBufferPool) | ||
| val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0) | ||
| listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates, | ||
| Some(driverUpdates))) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -94,8 +94,9 @@ private[spark] class EventLoggingListener( | |
| // Visible for tests only. | ||
| private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) | ||
|
|
||
| // Peak metric values for each executor | ||
| private var peakExecutorMetrics = new mutable.HashMap[String, PeakExecutorMetrics]() | ||
| // map of live stages, to peak executor metrics for the stage | ||
| private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int), | ||
|
||
| mutable.HashMap[String, PeakExecutorMetrics]]() | ||
|
||
|
|
||
| /** | ||
| * Creates the log file in the configured log directory. | ||
|
|
@@ -162,7 +163,8 @@ private[spark] class EventLoggingListener( | |
| override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { | ||
| logEvent(event) | ||
| // clear the peak metrics when a new stage starts | ||
| peakExecutorMetrics.values.foreach(_.reset()) | ||
| liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()), | ||
| new mutable.HashMap[String, PeakExecutorMetrics]()) | ||
| } | ||
|
|
||
| override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event) | ||
|
|
@@ -177,6 +179,27 @@ private[spark] class EventLoggingListener( | |
|
|
||
| // Events that trigger a flush | ||
| override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { | ||
| // log the peak executor metrics for the stage, for each executor | ||
| val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() | ||
| val executorMap = liveStageExecutorMetrics.remove( | ||
|
||
| (event.stageInfo.stageId, event.stageInfo.attemptNumber())) | ||
| executorMap.foreach { | ||
| executorEntry => { | ||
| for ((executorId, peakExecutorMetrics) <- executorEntry) { | ||
|
||
| val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.jvmUsedHeapMemory, | ||
| peakExecutorMetrics.jvmUsedNonHeapMemory, peakExecutorMetrics.onHeapExecutionMemory, | ||
| peakExecutorMetrics.offHeapExecutionMemory, peakExecutorMetrics.onHeapStorageMemory, | ||
| peakExecutorMetrics.offHeapStorageMemory, peakExecutorMetrics.onHeapUnifiedMemory, | ||
| peakExecutorMetrics.offHeapUnifiedMemory, peakExecutorMetrics.directMemory, | ||
| peakExecutorMetrics.mappedMemory) | ||
| val executorUpdate = new SparkListenerExecutorMetricsUpdate( | ||
| executorId, accumUpdates, Some(executorMetrics)) | ||
| logEvent(executorUpdate) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // log stage completed event | ||
| logEvent(event, flushLogger = true) | ||
| } | ||
|
|
||
|
|
@@ -205,12 +228,10 @@ private[spark] class EventLoggingListener( | |
| } | ||
| override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { | ||
| logEvent(event, flushLogger = true) | ||
| peakExecutorMetrics.put(event.executorId, new PeakExecutorMetrics()) | ||
| } | ||
|
|
||
| override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { | ||
| logEvent(event, flushLogger = true) | ||
| peakExecutorMetrics.remove(event.executorId) | ||
| } | ||
|
|
||
| override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { | ||
|
|
@@ -244,19 +265,13 @@ private[spark] class EventLoggingListener( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Log if there is a new peak value for one of the memory metrics for the given executor. | ||
| * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will | ||
| * log new peak memory metric values per executor per stage. | ||
| */ | ||
| override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { | ||
| var log: Boolean = false | ||
| // For the active stages, record any new peak values for the memory metrics for the executor | ||
| event.executorUpdates.foreach { executorUpdates => | ||
| val peakMetrics = peakExecutorMetrics.getOrElseUpdate(event.execId, new PeakExecutorMetrics()) | ||
| if (peakMetrics.compareAndUpdate(executorUpdates)) { | ||
| val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() | ||
| logEvent(new SparkListenerExecutorMetricsUpdate(event.execId, accumUpdates, | ||
| event.executorUpdates), flushLogger = true) | ||
| liveStageExecutorMetrics.values.foreach { peakExecutorMetrics => | ||
| val peakMetrics = peakExecutorMetrics.getOrElseUpdate( | ||
| event.execId, new PeakExecutorMetrics()) | ||
| peakMetrics.compareAndUpdate(executorUpdates) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,17 +21,40 @@ import org.apache.spark.executor.ExecutorMetrics | |
| import org.apache.spark.status.api.v1.PeakMemoryMetrics | ||
|
|
||
| /** | ||
| * Records the peak values for executor level metrics. If jvmUsedMemory is -1, then no values have | ||
| * been recorded yet. | ||
| * Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no | ||
| * values have been recorded yet. | ||
| */ | ||
| private[spark] class PeakExecutorMetrics { | ||
|
||
| private var jvmUsedMemory = -1L; | ||
| private var onHeapExecutionMemory = 0L | ||
| private var offHeapExecutionMemory = 0L | ||
| private var onHeapStorageMemory = 0L | ||
| private var offHeapStorageMemory = 0L | ||
| private var onHeapUnifiedMemory = 0L | ||
| private var offHeapUnifiedMemory = 0L | ||
| private var _jvmUsedHeapMemory = -1L; | ||
| private var _jvmUsedNonHeapMemory = 0L; | ||
| private var _onHeapExecutionMemory = 0L | ||
| private var _offHeapExecutionMemory = 0L | ||
| private var _onHeapStorageMemory = 0L | ||
| private var _offHeapStorageMemory = 0L | ||
| private var _onHeapUnifiedMemory = 0L | ||
| private var _offHeapUnifiedMemory = 0L | ||
| private var _directMemory = 0L | ||
| private var _mappedMemory = 0L | ||
|
|
||
| def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory | ||
|
|
||
| def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory | ||
|
|
||
| def onHeapExecutionMemory: Long = _onHeapExecutionMemory | ||
|
|
||
| def offHeapExecutionMemory: Long = _offHeapExecutionMemory | ||
|
|
||
| def onHeapStorageMemory: Long = _onHeapStorageMemory | ||
|
|
||
| def offHeapStorageMemory: Long = _offHeapStorageMemory | ||
|
|
||
| def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory | ||
|
|
||
| def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory | ||
|
|
||
| def directMemory: Long = _directMemory | ||
|
|
||
| def mappedMemory: Long = _mappedMemory | ||
|
|
||
| /** | ||
| * Compare the specified memory values with the saved peak executor memory | ||
|
|
@@ -43,36 +66,44 @@ private[spark] class PeakExecutorMetrics { | |
| def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = { | ||
| var updated: Boolean = false | ||
|
|
||
| if (executorMetrics.jvmUsedMemory > jvmUsedMemory) { | ||
| jvmUsedMemory = executorMetrics.jvmUsedMemory | ||
| if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) { | ||
| _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory | ||
| updated = true | ||
| } | ||
| if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) { | ||
| _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory | ||
| updated = true | ||
| } | ||
| if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) { | ||
| _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory | ||
| updated = true | ||
| } | ||
| if (executorMetrics.onHeapExecutionMemory > onHeapExecutionMemory) { | ||
| onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory | ||
| if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) { | ||
| _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory | ||
| updated = true | ||
| } | ||
| if (executorMetrics.offHeapExecutionMemory > offHeapExecutionMemory) { | ||
| offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory | ||
| if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) { | ||
| _onHeapStorageMemory = executorMetrics.onHeapStorageMemory | ||
| updated = true | ||
| } | ||
| if (executorMetrics.onHeapStorageMemory > onHeapStorageMemory) { | ||
| onHeapStorageMemory = executorMetrics.onHeapStorageMemory | ||
| if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) { | ||
| _offHeapStorageMemory = executorMetrics.offHeapStorageMemory | ||
|
||
| updated = true | ||
| } | ||
| if (executorMetrics.offHeapStorageMemory > offHeapStorageMemory) { | ||
| offHeapStorageMemory = executorMetrics.offHeapStorageMemory | ||
| if (executorMetrics.onHeapUnifiedMemory > _onHeapUnifiedMemory) { | ||
| _onHeapUnifiedMemory = executorMetrics.onHeapUnifiedMemory | ||
| updated = true | ||
| } | ||
| val newOnHeapUnifiedMemory = (executorMetrics.onHeapExecutionMemory + | ||
| executorMetrics.onHeapStorageMemory) | ||
| if (newOnHeapUnifiedMemory > onHeapUnifiedMemory) { | ||
| onHeapUnifiedMemory = newOnHeapUnifiedMemory | ||
| if (executorMetrics.offHeapUnifiedMemory > _offHeapUnifiedMemory) { | ||
| _offHeapUnifiedMemory = executorMetrics.offHeapUnifiedMemory | ||
| updated = true | ||
| } | ||
| val newOffHeapUnifiedMemory = (executorMetrics.offHeapExecutionMemory + | ||
| executorMetrics.offHeapStorageMemory) | ||
| if ( newOffHeapUnifiedMemory > offHeapUnifiedMemory) { | ||
| offHeapUnifiedMemory = newOffHeapUnifiedMemory | ||
| if (executorMetrics.directMemory > _directMemory) { | ||
| _directMemory = executorMetrics.directMemory | ||
| updated = true | ||
| } | ||
| if (executorMetrics.mappedMemory > _mappedMemory) { | ||
| _mappedMemory = executorMetrics.mappedMemory | ||
| updated = true | ||
| } | ||
|
|
||
|
|
@@ -84,23 +115,13 @@ private[spark] class PeakExecutorMetrics { | |
| * values set. | ||
| */ | ||
| def getPeakMemoryMetrics: Option[PeakMemoryMetrics] = { | ||
| if (jvmUsedMemory < 0) { | ||
| if (_jvmUsedHeapMemory < 0) { | ||
| None | ||
| } else { | ||
| Some(new PeakMemoryMetrics(jvmUsedMemory, onHeapExecutionMemory, | ||
| offHeapExecutionMemory, onHeapStorageMemory, offHeapStorageMemory, | ||
| onHeapUnifiedMemory, offHeapUnifiedMemory)) | ||
| Some(new PeakMemoryMetrics(_jvmUsedHeapMemory, _jvmUsedNonHeapMemory, | ||
| _onHeapExecutionMemory, _offHeapExecutionMemory, _onHeapStorageMemory, | ||
| _offHeapStorageMemory, _onHeapUnifiedMemory, _offHeapUnifiedMemory, | ||
| _directMemory, _mappedMemory)) | ||
| } | ||
| } | ||
|
|
||
| /** Clears/resets the saved peak values. */ | ||
| def reset(): Unit = { | ||
| jvmUsedMemory = -1L; | ||
| onHeapExecutionMemory = 0L | ||
| offHeapExecutionMemory = 0L | ||
| onHeapStorageMemory = 0L | ||
| offHeapStorageMemory = 0L | ||
| onHeapUnifiedMemory = 0L | ||
| offHeapUnifiedMemory = 0L | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it make more sense to move this inside
Heartbeater? Then you don't need to pass in any BufferPoolMXBeans. also rename to "getCurrentMemoryMetrics"There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and easier to share the code between driver and executor.