Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c8e8abe
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
5d6ae1c
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
ad10d28
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
10ed328
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
2d20367
wip on enum based metrics
squito May 23, 2018
f904f1e
wip ... has both enum and non-enum version
squito May 23, 2018
c502ec4
case objects, mostly complete
squito May 23, 2018
7879e66
Merge pull request #1 from squito/metric_enums
edwinalu Jun 3, 2018
2662f6f
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
2871335
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
da83f2e
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
f25a44b
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
ca85c82
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
8b74ba8
wip on enum based metrics
squito May 23, 2018
036148c
wip ... has both enum and non-enum version
squito May 23, 2018
91fb1db
case objects, mostly complete
squito May 23, 2018
2d8894a
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
99044e6
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
edwinalu Jun 14, 2018
263c8c8
code review comments
edwinalu Jun 14, 2018
812fdcf
code review comments:
edwinalu Jun 22, 2018
7ed42a5
Address code review comments. Also make executorUpdates in SparkListe…
edwinalu Jun 28, 2018
8d9acdf
Revert and make executorUpdates in SparkListenerExecutorMetricsUpdate…
edwinalu Jun 29, 2018
20799d2
code review comments: hid array implementation of executor metrics, a…
edwinalu Jul 25, 2018
8905d23
merge with master
edwinalu Jul 25, 2018
04875b8
Integration of ProcessTreeMetrics with PR 21221
Jul 26, 2018
a0eed11
address code review comments
edwinalu Aug 5, 2018
162b9b2
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
Aug 6, 2018
29a44c7
Changing the position of ptree and also make the computation configur…
Aug 7, 2018
3671427
Seperate metrics for jvm, python and others and update the tests
Aug 8, 2018
03cd5bc
code review comments
edwinalu Aug 13, 2018
c79b5ab
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
Aug 14, 2018
10e7f15
Merge branch 'master' into SPARK-23429.2
edwinalu Aug 14, 2018
a14b82a
merge conflicts
edwinalu Aug 14, 2018
2897281
disable stage executor metrics logging by default
edwinalu Aug 16, 2018
8f97b50
Merge branch 'SPARK-23429.2' of https://github.com/rezasafi/spark int…
Aug 17, 2018
b14cebc
Update JsonProtocolSuite with new metrics.
Aug 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address comments (move heartbeater from DAGScheduler to SparkContext,…
… move logic for getting

metrics to Heartbeater), and modifiy tests for the new ExecutorMetrics format.
  • Loading branch information
edwinalu committed Jun 14, 2018
commit 2d8894a91f4a0dacd49114dc74cc97b7c9426879
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/Heartbeater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@ package org.apache.spark

import java.util.concurrent.TimeUnit

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.memory.MemoryManager
import org.apache.spark.metrics.MetricGetter
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Creates a heartbeat thread which will call the specified reportHeartbeat function at
* intervals of intervalMs.
*
* @param memoryManager the memory manager for execution and storage memory.
* @param reportHeartbeat the heartbeat reporting function to call.
* @param name the thread name for the heartbeater.
* @param intervalMs the interval between heartbeats.
*/
private[spark] class Heartbeater(reportHeartbeat: () => Unit, name: String, intervalMs: Long) {
private[spark] class Heartbeater(
memoryManager: MemoryManager,
reportHeartbeat: () => Unit,
name: String,
intervalMs: Long) extends Logging {
// Executor for the heartbeat task
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)

Expand All @@ -49,5 +58,11 @@ private[spark] class Heartbeater(reportHeartbeat: () => Unit, name: String, inte
heartbeater.shutdown()
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
}

/** Get the current metrics. */
def getCurrentMetrics(): ExecutorMetrics = {
new ExecutorMetrics(System.currentTimeMillis(),
MetricGetter.values.map(_.getMetricValue(memoryManager)).toArray)
}
}

25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -304,6 +305,11 @@ class SparkContext(config: SparkConf) extends Logging {
_dagScheduler = ds
}

private[spark] def heartbeater: Heartbeater = _heartbeater
private[spark] def heartbeater_=(hb: Heartbeater): Unit = {
_heartbeater = hb
}

/**
* A unique identifier for the Spark application.
* Its format depends on the scheduler implementation.
Expand Down Expand Up @@ -496,6 +502,11 @@ class SparkContext(config: SparkConf) extends Logging {
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// create and start the heartbeater for collecting memory metrics
_heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater",
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
_heartbeater.start()

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
Expand Down Expand Up @@ -1922,6 +1933,12 @@ class SparkContext(config: SparkConf) extends Logging {
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
if(_heartbeater != null) {
Utils.tryLogNonFatalError {
_heartbeater.stop()
}
_heartbeater = null
}
if (_dagScheduler != null) {
Utils.tryLogNonFatalError {
_dagScheduler.stop()
Expand Down Expand Up @@ -2398,6 +2415,14 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

/** Reports heartbeat metrics for the driver. */
private def reportHeartBeat(): Unit = {
val driverUpdates = _heartbeater.getCurrentMetrics()
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
Some(driverUpdates)))
}

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
Expand Down
20 changes: 4 additions & 16 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.metrics.MetricGetter
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
Expand Down Expand Up @@ -149,8 +148,8 @@ private[spark] class Executor(
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

// Executor for the heartbeat task.
private val heartbeater = new Heartbeater(reportHeartBeat, "executor-heartbeater",
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
"executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))

// must be initialized before running startDriverHeartbeat()
private val heartbeatReceiverRef =
Expand Down Expand Up @@ -789,7 +788,7 @@ private[spark] class Executor(
val curGCTime = computeTotalGcTime()

// get executor level memory metrics
val executorUpdates = Executor.getCurrentExecutorMetrics(env.memoryManager)
val executorUpdates = heartbeater.getCurrentMetrics()

for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
Expand Down Expand Up @@ -827,15 +826,4 @@ private[spark] object Executor {
// task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be
// used instead.
val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties]

/**
* Get the current executor level memory metrics.
*/
def getCurrentExecutorMetrics(memoryManager: MemoryManager): ExecutorMetrics = {
val metrics = new ExecutorMetrics(System.currentTimeMillis())
MetricGetter.idxAndValues.foreach { case (idx, metric) =>
metrics.metrics(idx) = metric.getMetricValue(memoryManager)
}
metrics
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,15 @@ import org.apache.spark.metrics.MetricGetter
*
* @param timestamp the time the metrics were collected, or -1 for Spark history
* log events which are logged when a stage has completed
* @param metrics the array of executor metrics values, order and elements as
* specified in MetricGetter
*/
@DeveloperApi
class ExecutorMetrics private[spark] (val timestamp: Long) extends Serializable {
val metrics = new Array[Long](MetricGetter.values.length)
class ExecutorMetrics private[spark] (
val timestamp: Long,
val metrics: Array[Long]) extends Serializable {
if (metrics.length != MetricGetter.values.length) {
throw new IllegalArgumentException("invalid metrics length " + metrics.length +
" does not equal expected length " + MetricGetter.values.length)
}
}
26 changes: 17 additions & 9 deletions core/src/main/scala/org/apache/spark/metrics/MetricGetter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.memory.MemoryManager

sealed trait MetricGetter {
def getMetricValue(memoryManager: MemoryManager): Long
val name = getClass().getName().stripSuffix("$")
val name = getClass().getName().stripSuffix("$").split("""\.""").last
}

abstract class MemoryManagerMetricGetter(f: MemoryManager => Long) extends MetricGetter {
Expand Down Expand Up @@ -53,13 +53,19 @@ case object JVMOffHeapMemory extends MetricGetter {
}
}

case object OnHeapExecution extends MemoryManagerMetricGetter(_.onHeapExecutionMemoryUsed)
case object OnHeapExecutionMemory extends MemoryManagerMetricGetter(_.onHeapExecutionMemoryUsed)

case object OffHeapExecution extends MemoryManagerMetricGetter(_.offHeapExecutionMemoryUsed)
case object OffHeapExecutionMemory extends MemoryManagerMetricGetter(_.offHeapExecutionMemoryUsed)

case object OnHeapStorage extends MemoryManagerMetricGetter(_.onHeapStorageMemoryUsed)
case object OnHeapStorageMemory extends MemoryManagerMetricGetter(_.onHeapStorageMemoryUsed)

case object OffHeapStorage extends MemoryManagerMetricGetter(_.offHeapStorageMemoryUsed)
case object OffHeapStorageMemory extends MemoryManagerMetricGetter(_.offHeapStorageMemoryUsed)

case object OnHeapUnifiedMemory extends MemoryManagerMetricGetter(
(m => m.onHeapExecutionMemoryUsed + m.onHeapStorageMemoryUsed))

case object OffHeapUnifiedMemory extends MemoryManagerMetricGetter(
(m => m.offHeapExecutionMemoryUsed + m.offHeapStorageMemoryUsed))

case object DirectPoolMemory extends MBeanMetricGetter("java.nio:type=BufferPool,name=direct")
case object MappedPoolMemory extends MBeanMetricGetter("java.nio:type=BufferPool,name=mapped")
Expand All @@ -68,10 +74,12 @@ object MetricGetter {
val values = IndexedSeq(
JVMHeapMemory,
JVMOffHeapMemory,
OnHeapExecution,
OffHeapExecution,
OnHeapStorage,
OffHeapStorage,
OnHeapExecutionMemory,
OffHeapExecutionMemory,
OnHeapStorageMemory,
OffHeapStorageMemory,
OnHeapUnifiedMemory,
OffHeapUnifiedMemory,
DirectPoolMemory,
MappedPoolMemory
)
Expand Down
20 changes: 2 additions & 18 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.lang.management.ManagementFactory
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -35,7 +34,7 @@ import org.apache.commons.lang3.SerializationUtils

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.{Executor, ExecutorMetrics, TaskMetrics}
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.network.util.JavaUtils
Expand Down Expand Up @@ -210,10 +209,6 @@ class DAGScheduler(
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

/** driver heartbeat for collecting metrics */
private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat, "driver-heartbeater",
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))

/**
* Called by the TaskSetManager to report task's starting.
*/
Expand Down Expand Up @@ -1758,20 +1753,9 @@ class DAGScheduler(
messageScheduler.shutdownNow()
eventProcessLoop.stop()
taskScheduler.stop()
heartbeater.stop()
}

/** Reports heartbeat metrics for the driver. */
private def reportHeartBeat(): Unit = {
// get driver memory metrics
val driverUpdates = Executor.getCurrentExecutorMetrics(sc.env.memoryManager)
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
Some(driverUpdates)))
}
}

eventProcessLoop.start()
heartbeater.start()
}

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.EnumSet
import java.util.Locale

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
Expand Down Expand Up @@ -84,7 +84,7 @@ private[spark] class EventLoggingListener(
private val compressionCodecName = compressionCodec.map { c =>
CompressionCodec.getShortName(c.getClass.getName)
}
logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecutorMetricsUpdates)

// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None

Expand All @@ -97,8 +97,7 @@ logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecut
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)

// map of live stages, to peak executor metrics for the stage
private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int),
mutable.HashMap[String, PeakExecutorMetrics]]()
private val liveStageExecutorMetrics = HashMap[(Int, Int), HashMap[String, PeakExecutorMetrics]]()

/**
* Creates the log file in the configured log directory.
Expand Down Expand Up @@ -167,7 +166,7 @@ logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecut
if (shouldLogExecutorMetricsUpdates) {
// record the peak metrics for the new stage
liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()),
new mutable.HashMap[String, PeakExecutorMetrics]())
new HashMap[String, PeakExecutorMetrics]())
}
}

Expand All @@ -190,16 +189,15 @@ logInfo("spark.eventLog.logExecutorMetricsUpdates.enabled is " + shouldLogExecut
liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId))
}

// log the peak executor metrics for the stage, for each executor
// log the peak executor metrics for the stage, for each live executor,
// whether or not the executor is running tasks for the stage
val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]()
val executorMap = liveStageExecutorMetrics.remove(
(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
executorMap.foreach {
executorEntry => {
for ((executorId, peakExecutorMetrics) <- executorEntry) {
val executorMetrics = new ExecutorMetrics(-1)
System.arraycopy(peakExecutorMetrics.metrics, 0, executorMetrics.metrics, 0,
peakExecutorMetrics.metrics.size)
val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics)
val executorUpdate = new SparkListenerExecutorMetricsUpdate(
executorId, accumUpdates, Some(executorMetrics))
logEvent(executorUpdate)
Expand Down Expand Up @@ -346,7 +344,7 @@ private[spark] object EventLoggingListener extends Logging {
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)

// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]
private val codecMap = new HashMap[String, CompressionCodec]

/**
* Write metadata about an event log to the given stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.scheduler

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.MetricGetter
import org.apache.spark.status.api.v1.PeakMemoryMetrics

/**
* Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no
Expand Down Expand Up @@ -49,20 +48,6 @@ private[spark] class PeakExecutorMetrics {
updated
}

/**
* @return None if no peak metrics have been recorded, else PeakMemoryMetrics with the peak
* values set.
*/
def getPeakMemoryMetrics: Option[PeakMemoryMetrics] = {
if (metrics(0) < 0) {
None
} else {
val copy = new PeakMemoryMetrics
System.arraycopy(this.metrics, 0, copy.metrics, 0, this.metrics.length)
Some(copy)
}
}

/** Clears/resets the saved peak values. */
def reset(): Unit = {
(0 until metrics.length).foreach { idx => metrics(idx) = 0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
executorLogs,
memoryMetrics,
blacklistedInStages,
peakExecutorMetrics.getPeakMemoryMetrics)
if (peakExecutorMetrics.metrics(0) == -1) None else Some(peakExecutorMetrics.metrics))
new ExecutorSummaryWrapper(info)
}
}
Expand Down
Loading