Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c8e8abe
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
5d6ae1c
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
ad10d28
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
10ed328
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
2d20367
wip on enum based metrics
squito May 23, 2018
f904f1e
wip ... has both enum and non-enum version
squito May 23, 2018
c502ec4
case objects, mostly complete
squito May 23, 2018
7879e66
Merge pull request #1 from squito/metric_enums
edwinalu Jun 3, 2018
2662f6f
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
2871335
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
da83f2e
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
f25a44b
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
ca85c82
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
8b74ba8
wip on enum based metrics
squito May 23, 2018
036148c
wip ... has both enum and non-enum version
squito May 23, 2018
91fb1db
case objects, mostly complete
squito May 23, 2018
2d8894a
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
99044e6
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
edwinalu Jun 14, 2018
263c8c8
code review comments
edwinalu Jun 14, 2018
812fdcf
code review comments:
edwinalu Jun 22, 2018
7ed42a5
Address code review comments. Also make executorUpdates in SparkListe…
edwinalu Jun 28, 2018
8d9acdf
Revert and make executorUpdates in SparkListenerExecutorMetricsUpdate…
edwinalu Jun 29, 2018
20799d2
code review comments: hid array implementation of executor metrics, a…
edwinalu Jul 25, 2018
8905d23
merge with master
edwinalu Jul 25, 2018
a0eed11
address code review comments
edwinalu Aug 5, 2018
03cd5bc
code review comments
edwinalu Aug 13, 2018
10e7f15
Merge branch 'master' into SPARK-23429.2
edwinalu Aug 14, 2018
a14b82a
merge conflicts
edwinalu Aug 14, 2018
2897281
disable stage executor metrics logging by default
edwinalu Aug 16, 2018
ee4aa1d
Merge branch 'master' into SPARK-23429.2
edwinalu Sep 6, 2018
571285b
fix indentation
edwinalu Sep 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
code review comments
  • Loading branch information
edwinalu committed Aug 13, 2018
commit 03cd5bceddc7867a90918430b23bf9fa3771edfd
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ class ExecutorMetrics private[spark] extends Serializable {
* @return if there is a new peak value for any metric
*/
private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = {
var updated: Boolean = false
var updated = false

(0 until ExecutorMetricType.values.length).foreach { idx =>
if ( executorMetrics.metrics(idx) > metrics(idx)) {
if (executorMetrics.metrics(idx) > metrics(idx)) {
updated = true
metrics(idx) = executorMetrics.metrics(idx)
}
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,22 +183,30 @@ private[spark] abstract class MemoryManager(
/**
* On heap execution memory currently in use, in bytes.
*/
final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed
final def onHeapExecutionMemoryUsed: Long = synchronized {
onHeapExecutionMemoryPool.memoryUsed
}

/**
* Off heap execution memory currently in use, in bytes.
*/
final def offHeapExecutionMemoryUsed: Long = offHeapExecutionMemoryPool.memoryUsed
final def offHeapExecutionMemoryUsed: Long = synchronized {
offHeapExecutionMemoryPool.memoryUsed
}

/**
* On heap storage memory currently in use, in bytes.
*/
final def onHeapStorageMemoryUsed: Long = onHeapStorageMemoryPool.memoryUsed
final def onHeapStorageMemoryUsed: Long = synchronized {
onHeapStorageMemoryPool.memoryUsed
}

/**
* Off heap storage memory currently in use, in bytes.
*/
final def offHeapStorageMemoryUsed: Long = offHeapStorageMemoryPool.memoryUsed
final def offHeapStorageMemoryUsed: Long = synchronized {
offHeapStorageMemoryPool.memoryUsed
}

/**
* Returns the execution memory consumption, in bytes, for the given task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[spark] abstract class MemoryManagerExecutorMetricType(
}
}

private[spark]abstract class MBeanExecutorMetricType(mBeanName: String)
private[spark] abstract class MBeanExecutorMetricType(mBeanName: String)
extends ExecutorMetricType {
private val bean = ManagementFactory.newPlatformMXBeanProxy(
ManagementFactory.getPlatformMBeanServer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,8 +690,8 @@ private[spark] class AppStatusListener(
// check if there is a new peak value for any of the executor level memory metrics
// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed
// for the live UI.
event.executorUpdates.foreach { updates: ExecutorMetrics =>
liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
event.executorUpdates.foreach { updates =>
liveExecutors.get(event.execId).foreach { exec =>
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
maybeUpdate(exec, now)
}
Expand Down