-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #20940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API #20940
Changes from 2 commits
48eab1d
b348901
df05fb6
b02e76c
5b5a36e
bc8d093
ae91720
15298b9
529f847
5e24fc6
44a9f8e
6151f29
fe2b7a4
a7c19d9
28ea4e3
a135182
441d0d0
8020f66
7cf9fab
66a3a5a
1035aaa
359375e
5cfd5fa
16ef6ba
5197562
a355236
cccaaa1
d8379e5
d3bd043
c5c8b54
1822ecd
b2329fb
d9ca1c9
4807d38
f2ac087
d65e531
249007e
6ade5cb
9452401
d766ea2
c926acf
d23a805
b6935ff
e998250
6ab134c
2c1fe64
6a73457
710a68c
8d40a79
32471ba
d81f29e
10f45bb
7c1654e
252468a
61b7247
f94f362
6498884
95034af
3323b15
e179658
adb222b
4f1e8b9
7c7570d
c7622be
87611bb
c604d65
271c891
653fe02
3cb8204
75a1830
9d960de
e904dfa
6a2289e
0b19122
0f93b91
682002b
14291b0
ab7b961
1018be4
4b07036
0323e61
a83ae0d
4dfd746
25892f3
558f31b
cbb41a0
73f2853
c096493
6931022
083cf22
5003736
0461482
fd990a9
14844a6
1cc66a0
05ae747
30ffb53
0a9172a
ed4101d
3990daa
f39e82c
1ca3c50
5fccdae
1e3b876
310a8cd
cce4694
f81fa47
f09a9e9
a906647
0c94e48
8bb0df2
d5bec48
46bb2b5
1b08c43
e134165
9e10f69
d96c3e3
0deaa52
6e19f76
a471880
9ea8d3d
e55953b
b3fde5a
e6b4660
074a7f9
0dd97f6
1d758dc
32b4bcd
7bc853d
ae8a388
c48085a
c3a86fa
f70f46d
d87d30e
afbdf42
293a0f2
448d248
770add8
e82cb68
c8f3ac6
efcfc64
b24f041
9d9c248
bbe1a82
8ae0126
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark | ||
|
|
||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import org.apache.spark.util.{ThreadUtils, Utils} | ||
|
|
||
| /** | ||
| * Creates a heartbeat thread which will call the specified reportHeartbeat function at | ||
| * intervals of intervalMs. | ||
| * | ||
| * @param reportHeartbeat the heartbeat reporting function to call. | ||
| * @param intervalMs the interval between heartbeats. | ||
| */ | ||
| private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: Long) { | ||
| // Executor for the heartbeat task | ||
| private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") | ||
|
|
||
| /** Schedules a task to report a heartbeat. */ | ||
| private[spark] def start(): Unit = { | ||
| // Wait a random interval so the heartbeats don't end up in sync | ||
| val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] | ||
|
|
||
| val heartbeatTask = new Runnable() { | ||
| override def run(): Unit = Utils.logUncaughtExceptions(reportHeartbeat()) | ||
| } | ||
| heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS) | ||
| } | ||
|
|
||
| /** Stops the heartbeat thread. */ | ||
| private[spark] def stop(): Unit = { | ||
| heartbeater.shutdown() | ||
| heartbeater.awaitTermination(10, TimeUnit.SECONDS) | ||
| } | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.executor | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| * Executor level metrics. | ||
| * | ||
| * This is sent to the driver periodically (on executor heartbeat), to provide | ||
| * information about each executor's metrics. | ||
| * | ||
| * @param timestamp the time the metrics were collected | ||
| * @param jvmUsedMemory the amount of JVM used memory for the executor | ||
| * @param onHeapExecutionMemory the amount of on heap execution memory used | ||
| * @param offHeapExecutionMemory the amount of off heap execution memory used | ||
| * @param onHeapStorageMemory the amount of on heap storage memory used | ||
| * @param offHeapStorageMemory the amount of off heap storage memory used | ||
| */ | ||
| @DeveloperApi | ||
| class ExecutorMetrics private[spark] ( | ||
| val timestamp: Long, | ||
| val jvmUsedMemory: Long, | ||
| val onHeapExecutionMemory: Long, | ||
| val offHeapExecutionMemory: Long, | ||
| val onHeapStorageMemory: Long, | ||
| val offHeapStorageMemory: Long) extends Serializable |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ import org.json4s.jackson.JsonMethods._ | |
|
|
||
| import org.apache.spark.{SPARK_VERSION, SparkConf} | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.executor.ExecutorMetrics | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.io.CompressionCodec | ||
|
|
@@ -93,6 +94,9 @@ private[spark] class EventLoggingListener( | |
| // Visible for tests only. | ||
| private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) | ||
|
|
||
| // Peak metric values for each executor | ||
| private var peakExecutorMetrics = new mutable.HashMap[String, PeakExecutorMetrics]() | ||
|
||
|
|
||
| /** | ||
| * Creates the log file in the configured log directory. | ||
| */ | ||
|
|
@@ -155,7 +159,11 @@ private[spark] class EventLoggingListener( | |
| } | ||
|
|
||
| // Events that do not trigger a flush | ||
| override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event) | ||
| override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { | ||
| logEvent(event) | ||
| // clear the peak metrics when a new stage starts | ||
| peakExecutorMetrics.values.foreach(_.reset()) | ||
| } | ||
|
|
||
| override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event) | ||
|
|
||
|
|
@@ -197,10 +205,12 @@ private[spark] class EventLoggingListener( | |
| } | ||
| override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { | ||
| logEvent(event, flushLogger = true) | ||
| peakExecutorMetrics.put(event.executorId, new PeakExecutorMetrics()) | ||
| } | ||
|
|
||
| override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { | ||
| logEvent(event, flushLogger = true) | ||
| peakExecutorMetrics.remove(event.executorId) | ||
| } | ||
|
|
||
| override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { | ||
|
|
@@ -234,8 +244,22 @@ private[spark] class EventLoggingListener( | |
| } | ||
| } | ||
|
|
||
| // No-op because logging every update would be overkill | ||
| override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } | ||
| /** | ||
| * Log if there is a new peak value for one of the memory metrics for the given executor. | ||
| * Metrics are cleared out when a new stage is started in onStageSubmitted, so this will | ||
| * log new peak memory metric values per executor per stage. | ||
| */ | ||
| override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't think you'd want to log for every new peak, as I'd expect it would be natural for the peak to keep growing, so you'd just end up with a lot of logs. I'd expect you'd just log the peak when the stage ended, or when the executor died. the downside of that approach is that you never log a peak if the driver dies ... but then you've got to figure out the driver issue anyway.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a longer running stage, once it ramps up, hopefully there wouldn't be a lot of new peak values. Looking at a subset of our applications, the extra logging overhead has mostly been between 0.25% to 1%, but it can be 8%. By logging each peak value at the time they occur (and reinitializing when a stage starts), it's possible to tell which stages are active at the time, and it would potentially be possible to graph these changes on a timeline -- this information wouldn't be available if the metrics are only logged at stage end, and the times are lost. Logging at stage end would limit the amount of extra logging. If we add more metrics (such as for offheap), then there could be more new peaks and more extra logging with the current approach. Excess logging is a concern, and I can move to stage end if the overhead is too much.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could tell which stages are active from the other events. Do you think that timeline would be useful? Since its only keeping the peak, I'm not sure how interesting that graph is. With overlapping stages, you might lose some information by only logging at stage end -- eg. if first stage 1 is running for a while, with peak X, and then stage 2 starts up with peak Y > X, you would only ever log peak Y for stage 1. You could address this by also logging at stage start, but then you're back to more logging (and its really tricky to figure out a minimal set of things of log when stage 2 starts, as you don't know which executor its going to run on). But I'm still not sure what we'd do with those extra values, and if there is any value in capturing them. (some this design discussion probably belongs on jira -- I'll also go reread the design doc now)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also on log size -- was there anything special about the 8% case? Eg. was a tiny application running on a ton of executors, so the logs were small to begin with? If so, then its probably fine.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're not constructing a timeline currently, and yes, with only peak values, it could be rather sparse. We would get new values with new stages, but would not see decreases in memory during a stage. The situation where there is a stage 1 with peak X, and then stage 2 starts with peak Y > X is interesting though, and it would be useful to have this information, since we would then know to focus on stage 2 for memory consumption, even though both 1 and 2 could have the same peak values. Logging at stage start would double the amount of logging, and would be trickier, so I'd prefer either keeping the current approach or only logging at stage end. The higher logging was for smaller applications (and smaller logs).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah logging an event per executor at stage end seems good to me. It would be great if we could see how much that version affects log size as well, if you can get those metrics. also these tradeoffs should go into the design doc, its harder to find comments from a PR after this feature has been merged. For now, it would also be nice if you could post a version that everyone can comment on, eg. a google doc.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will make the change to log at stage end, and will update the design doc. I haven't pushed the recent changes yet -- I'll make the rest of the changes from this round of review, and will push when done. Thanks for your comments.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ExecutorMetrics right now has: jvmUsedHeapMemory, jvmUsedNonHeapMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory. For logging at stage end, we can log the peak for each of these, but unified memory is more problematic. We could add new fields for on heap/off heap unified memory, but I'm inclined to remove unified memory (from all the places it is currently used), rather than add more fields. Users can still sum peak execution and peak storage values, which may be larger than the actual peak unified memory if they are not at peak values at the same time, but should still be a reasonable estimate for sizing.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is your concern the size of the msg from the executors to the driver? that certainly is valid, but I wonder if we should think a bit harder about this if that is going to be a common concern, as I think we'll want to add more metrics. One possibility is for the executor to do the peak calculation itself, and then only send an update for the metrics with a new peak. Also that would let us just send the peak on task end events. I'm just brainstorming at the moment, not saying it should be changed one way or the other ... need to think about it more
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your thoughts on this. Size of message, and also logging, but it is only an extra few longs per heartbeat, and and similarly for logging. Task end would help with minimizing communication for longer running tasks. The heartbeats are only every 10 seconds, so perhaps not so bad. |
||
| var log: Boolean = false | ||
| event.executorUpdates.foreach { executorUpdates => | ||
| val peakMetrics = peakExecutorMetrics.getOrElseUpdate(event.execId, new PeakExecutorMetrics()) | ||
| if (peakMetrics.compareAndUpdate(executorUpdates)) { | ||
| val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() | ||
| logEvent(new SparkListenerExecutorMetricsUpdate(event.execId, accumUpdates, | ||
| event.executorUpdates), flushLogger = true) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override def onOtherEvent(event: SparkListenerEvent): Unit = { | ||
| if (event.logEvent) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about including the jvms direct & memory mapped usage as well? see https://issues.apache.org/jira/browse/SPARK-22483
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be useful to have more information about offheap memory usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think its totally fine to not have every metric possible now, but if this one is easy to add here, it would be nice. In particular I'm thinking we'd also like to capture the memory associated with python if its a pyspark app, though that is significantly more complicated so we don't need to do that now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed(), for total non-heap memory used by the JVM.
For direct and memory mapped usage, would collecting these be similar to https://gist.github.com/t3rmin4t0r/1a753ccdcfa8d111f07c ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be honest I haven't tried this out myself, I only knew this was possible. The version I was familiar with is whats used by the dropwizard metrics
https://github.com/dropwizard/metrics/blob/4.1-development/metrics-jvm/src/main/java/com/codahale/metrics/jvm/BufferPoolMetricSet.java
sorry you'll need to experiment with it a bit. (again, not a blocker for this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I will play around with it a bit. If it seems more complicated or expensive, I'll file a separate subtask.