Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c8e8abe
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
5d6ae1c
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
ad10d28
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
10ed328
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
2d20367
wip on enum based metrics
squito May 23, 2018
f904f1e
wip ... has both enum and non-enum version
squito May 23, 2018
c502ec4
case objects, mostly complete
squito May 23, 2018
7879e66
Merge pull request #1 from squito/metric_enums
edwinalu Jun 3, 2018
2662f6f
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
2871335
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
da83f2e
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
f25a44b
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
ca85c82
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
8b74ba8
wip on enum based metrics
squito May 23, 2018
036148c
wip ... has both enum and non-enum version
squito May 23, 2018
91fb1db
case objects, mostly complete
squito May 23, 2018
2d8894a
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
99044e6
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
edwinalu Jun 14, 2018
263c8c8
code review comments
edwinalu Jun 14, 2018
812fdcf
code review comments:
edwinalu Jun 22, 2018
7ed42a5
Address code review comments. Also make executorUpdates in SparkListe…
edwinalu Jun 28, 2018
8d9acdf
Revert and make executorUpdates in SparkListenerExecutorMetricsUpdate…
edwinalu Jun 29, 2018
20799d2
code review comments: hid array implementation of executor metrics, a…
edwinalu Jul 25, 2018
8905d23
merge with master
edwinalu Jul 25, 2018
04875b8
Integration of ProcessTreeMetrics with PR 21221
Jul 26, 2018
a0eed11
address code review comments
edwinalu Aug 5, 2018
162b9b2
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
Aug 6, 2018
29a44c7
Changing the position of ptree and also make the computation configur…
Aug 7, 2018
3671427
Seperate metrics for jvm, python and others and update the tests
Aug 8, 2018
03cd5bc
code review comments
edwinalu Aug 13, 2018
c79b5ab
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
Aug 14, 2018
10e7f15
Merge branch 'master' into SPARK-23429.2
edwinalu Aug 14, 2018
a14b82a
merge conflicts
edwinalu Aug 14, 2018
2897281
disable stage executor metrics logging by default
edwinalu Aug 16, 2018
8f97b50
Merge branch 'SPARK-23429.2' of https://github.com/rezasafi/spark int…
Aug 17, 2018
b14cebc
Update JsonProtocolSuite with new metrics.
Aug 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
…xecutors REST API

Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage
memory), and expose via the executors REST API. This information will help provide insight into how executor
and driver JVM memory is used, and for the different memory regions. It can be used to help determine good
values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction.

Add an ExecutorMetrics class, with jvmUsedMemory, onHeapExecutionMemory, offHeapExecutionMemory,
onHeapStorageMemory, and offHeapStorageMemory. The new ExecutorMetrics will be sent by executors to the
driver as part of Heartbeat. A heartbeat will be added for the driver as well, to collect these metrics
for the driver.

Modify the EventLoggingListener to log ExecutorMetricsUpdate events if there is a new peak value for any
of the memory metrics for an executor and stage. Only the ExecutorMetrics will be logged, and not the
TaskMetrics, to minimize additional logging.

Modify the AppStatusListener to record the peak values for each memory metric.

Add the new memory metrics to the executors REST API.
  • Loading branch information
edwinalu committed Jun 13, 2018
commit 287133597f819417f96ae5965895c1b640703d86
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
import scala.concurrent.Future

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
Expand All @@ -37,7 +38,8 @@ import org.apache.spark.util._
private[spark] case class Heartbeat(
executorId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
blockManagerId: BlockManagerId)
blockManagerId: BlockManagerId,
executorUpdates: ExecutorMetrics) // executor level updates

/**
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
Expand Down Expand Up @@ -120,14 +122,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)

// Messages received from executors
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorMetrics) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, accumUpdates, blockManagerId)
executorId, accumUpdates, blockManagerId, executorMetrics)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
Expand Down
52 changes: 52 additions & 0 deletions core/src/main/scala/org/apache/spark/Heartbeater.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.util.concurrent.TimeUnit

import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Creates a heartbeat thread which will call the specified reportHeartbeat function at
* intervals of intervalMs.
*
* @param reportHeartbeat the heartbeat reporting function to call.
* @param intervalMs the interval between heartbeats.
*/
private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: Long) {
// Executor for the heartbeat task
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")

/** Schedules a task to report a heartbeat. */
private[spark] def start(): Unit = {
// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartbeat())
}
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}

/** Stops the heartbeat thread. */
private[spark] def stop(): Unit = {
heartbeater.shutdown()
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
}
}

32 changes: 12 additions & 20 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ private[spark] class Executor(
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

// Executor for the heartbeat task.
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
private val heartbeater = new Heartbeater(reportHeartBeat,
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))

// must be initialized before running startDriverHeartbeat()
private val heartbeatReceiverRef =
Expand All @@ -167,7 +168,7 @@ private[spark] class Executor(
*/
private var heartbeatFailures = 0

startDriverHeartbeater()
heartbeater.start()

private[executor] def numRunningTasks: Int = runningTasks.size()

Expand Down Expand Up @@ -216,8 +217,7 @@ private[spark] class Executor(

def stop(): Unit = {
env.metricsSystem.report()
heartbeater.shutdown()
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
heartbeater.stop()
threadPool.shutdown()
if (!isLocal) {
env.stop()
Expand Down Expand Up @@ -787,6 +787,12 @@ private[spark] class Executor(
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
val curGCTime = computeTotalGcTime()

// get executor level memory metrics
val executorUpdates = new ExecutorMetrics(System.currentTimeMillis(),
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
env.memoryManager.onHeapExecutionMemoryUsed, env.memoryManager.offHeapExecutionMemoryUsed,
env.memoryManager.onHeapStorageMemoryUsed, env.memoryManager.offHeapStorageMemoryUsed)

for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.mergeShuffleReadMetrics()
Expand All @@ -795,7 +801,8 @@ private[spark] class Executor(
}
}

val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId,
executorUpdates)
try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
Expand All @@ -815,21 +822,6 @@ private[spark] class Executor(
}
}
}

/**
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
*/
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")

// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}
}

private[spark] object Executor {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Executor level metrics.
*
* This is sent to the driver periodically (on executor heartbeat), to provide
* information about each executor's metrics.
*
* @param timestamp the time the metrics were collected
* @param jvmUsedMemory the amount of JVM used memory for the executor
* @param onHeapExecutionMemory the amount of on heap execution memory used
* @param offHeapExecutionMemory the amount of off heap execution memory used
* @param onHeapStorageMemory the amount of on heap storage memory used
* @param offHeapStorageMemory the amount of off heap storage memory used
*/
@DeveloperApi
class ExecutorMetrics private[spark] (
val timestamp: Long,
val jvmUsedMemory: Long,
val onHeapExecutionMemory: Long,
val offHeapExecutionMemory: Long,
val onHeapStorageMemory: Long,
val offHeapStorageMemory: Long) extends Serializable
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager(
onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
}

/**
* On heap execution memory currently in use, in bytes.
*/
final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed

/**
* Off heap execution memory currently in use, in bytes.
*/
final def offHeapExecutionMemoryUsed: Long = offHeapExecutionMemoryPool.memoryUsed

/**
* On heap storage memory currently in use, in bytes.
*/
final def onHeapStorageMemoryUsed: Long = onHeapStorageMemoryPool.memoryUsed

/**
* Off heap storage memory currently in use, in bytes.
*/
final def offHeapStorageMemoryUsed: Long = offHeapStorageMemoryPool.memoryUsed

/**
* Returns the execution memory consumption, in bytes, for the given task.
*/
Expand Down
29 changes: 26 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.lang.management.ManagementFactory
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -34,7 +35,7 @@ import org.apache.commons.lang3.SerializationUtils

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.network.util.JavaUtils
Expand Down Expand Up @@ -209,6 +210,10 @@ class DAGScheduler(
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

/** driver heartbeat for collecting metrics */
private val heartbeater: Heartbeater = new Heartbeater(reportHeartBeat,
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))

/**
* Called by the TaskSetManager to report task's starting.
*/
Expand Down Expand Up @@ -246,8 +251,10 @@ class DAGScheduler(
execId: String,
// (taskId, stageId, stageAttemptId, accumUpdates)
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
blockManagerId: BlockManagerId,
executorUpdates: ExecutorMetrics): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
Some(executorUpdates)))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
Expand Down Expand Up @@ -1751,9 +1758,25 @@ class DAGScheduler(
messageScheduler.shutdownNow()
eventProcessLoop.stop()
taskScheduler.stop()
heartbeater.stop()
}

/** Reports heartbeat metrics for the driver. */
private def reportHeartBeat(): Unit = {
// get driver memory metrics
val driverUpdates = new ExecutorMetrics(System.currentTimeMillis(),
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed(),
sc.env.memoryManager.onHeapExecutionMemoryUsed,
sc.env.memoryManager.offHeapExecutionMemoryUsed,
sc.env.memoryManager.onHeapStorageMemoryUsed,
sc.env.memoryManager.offHeapStorageMemoryUsed)
val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
Some(driverUpdates)))
}

eventProcessLoop.start()
heartbeater.start()
}

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.json4s.jackson.JsonMethods._

import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
Expand Down Expand Up @@ -93,6 +94,9 @@ private[spark] class EventLoggingListener(
// Visible for tests only.
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)

// Peak metric values for each executor
private var peakExecutorMetrics = new mutable.HashMap[String, PeakExecutorMetrics]()

/**
* Creates the log file in the configured log directory.
*/
Expand Down Expand Up @@ -155,7 +159,11 @@ private[spark] class EventLoggingListener(
}

// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
logEvent(event)
// clear the peak metrics when a new stage starts
peakExecutorMetrics.values.foreach(_.reset())
}

override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event)

Expand Down Expand Up @@ -197,10 +205,12 @@ private[spark] class EventLoggingListener(
}
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
logEvent(event, flushLogger = true)
peakExecutorMetrics.put(event.executorId, new PeakExecutorMetrics())
}

override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
logEvent(event, flushLogger = true)
peakExecutorMetrics.remove(event.executorId)
}

override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
Expand Down Expand Up @@ -234,8 +244,22 @@ private[spark] class EventLoggingListener(
}
}

// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
/**
* Log if there is a new peak value for one of the memory metrics for the given executor.
* Metrics are cleared out when a new stage is started in onStageSubmitted, so this will
* log new peak memory metric values per executor per stage.
*/
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
var log: Boolean = false
event.executorUpdates.foreach { executorUpdates =>
val peakMetrics = peakExecutorMetrics.getOrElseUpdate(event.execId, new PeakExecutorMetrics())
if (peakMetrics.compareAndUpdate(executorUpdates)) {
val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]()
logEvent(new SparkListenerExecutorMetricsUpdate(event.execId, accumUpdates,
event.executorUpdates), flushLogger = true)
}
}
}

override def onOtherEvent(event: SparkListenerEvent): Unit = {
if (event.logEvent) {
Expand Down
Loading