Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c8e8abe
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
5d6ae1c
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
ad10d28
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
10ed328
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
2d20367
wip on enum based metrics
squito May 23, 2018
f904f1e
wip ... has both enum and non-enum version
squito May 23, 2018
c502ec4
case objects, mostly complete
squito May 23, 2018
7879e66
Merge pull request #1 from squito/metric_enums
edwinalu Jun 3, 2018
2662f6f
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
2871335
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
da83f2e
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
f25a44b
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
ca85c82
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
8b74ba8
wip on enum based metrics
squito May 23, 2018
036148c
wip ... has both enum and non-enum version
squito May 23, 2018
91fb1db
case objects, mostly complete
squito May 23, 2018
2d8894a
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
99044e6
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
edwinalu Jun 14, 2018
263c8c8
code review comments
edwinalu Jun 14, 2018
812fdcf
code review comments:
edwinalu Jun 22, 2018
7ed42a5
Address code review comments. Also make executorUpdates in SparkListe…
edwinalu Jun 28, 2018
8d9acdf
Revert and make executorUpdates in SparkListenerExecutorMetricsUpdate…
edwinalu Jun 29, 2018
20799d2
code review comments: hid array implementation of executor metrics, a…
edwinalu Jul 25, 2018
8905d23
merge with master
edwinalu Jul 25, 2018
04875b8
Integration of ProcessTreeMetrics with PR 21221
Jul 26, 2018
a0eed11
address code review comments
edwinalu Aug 5, 2018
162b9b2
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
Aug 6, 2018
29a44c7
Changing the position of ptree and also make the computation configur…
Aug 7, 2018
3671427
Seperate metrics for jvm, python and others and update the tests
Aug 8, 2018
03cd5bc
code review comments
edwinalu Aug 13, 2018
c79b5ab
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
Aug 14, 2018
10e7f15
Merge branch 'master' into SPARK-23429.2
edwinalu Aug 14, 2018
a14b82a
merge conflicts
edwinalu Aug 14, 2018
2897281
disable stage executor metrics logging by default
edwinalu Aug 16, 2018
8f97b50
Merge branch 'SPARK-23429.2' of https://github.com/rezasafi/spark int…
Aug 17, 2018
b14cebc
Update JsonProtocolSuite with new metrics.
Aug 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
code review comments:
- remove timestamp
- change ExecutorMetrics to Array[Long]
- create new SparkListenerStageExecutorMetrics for recording stage executor metric peaks in
  the history log

Fix issue where metrics for a removed executor were ignored (save dead executors while
there currently active stages that the executor was alive for).
  • Loading branch information
edwinalu committed Jun 25, 2018
commit 812fdcf3961bae2a4fa20b4f60e739b45233fcd0
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public final void onExecutorMetricsUpdate(
onEvent(executorMetricsUpdate);
}

@Override
public final void onStageExecutorMetrics(
SparkListenerStageExecutorMetrics executorMetrics) {
onEvent(executorMetrics);
}

@Override
public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
onEvent(executorAdded);
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
import scala.concurrent.Future

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
Expand All @@ -39,7 +38,7 @@ private[spark] case class Heartbeat(
executorId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
blockManagerId: BlockManagerId,
executorUpdates: ExecutorMetrics) // executor level updates
executorUpdates: Array[Long]) // executor level updates

/**
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/Heartbeater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark

import java.util.concurrent.TimeUnit

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.memory.MemoryManager
import org.apache.spark.metrics.MetricGetter
Expand Down Expand Up @@ -59,10 +58,12 @@ private[spark] class Heartbeater(
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
}

/** Get the current metrics. */
def getCurrentMetrics(): ExecutorMetrics = {
new ExecutorMetrics(System.currentTimeMillis(),
MetricGetter.values.map(_.getMetricValue(memoryManager)).toArray)
/**
* Get the current executor level metrics. These are returned as an array, with the index
* determined by MetricGetter.values
*/
def getCurrentMetrics(): Array[Long] = {
MetricGetter.values.map(_.getMetricValue(memoryManager)).toArray
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.commons.lang3.SerializationUtils

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.network.util.JavaUtils
Expand Down Expand Up @@ -247,7 +247,7 @@ class DAGScheduler(
// (taskId, stageId, stageAttemptId, accumUpdates)
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId,
executorUpdates: ExecutorMetrics): Boolean = {
executorUpdates: Array[Long]): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
Some(executorUpdates)))
blockManagerMaster.driverEndpoint.askSync[Boolean](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.json4s.jackson.JsonMethods._

import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
Expand Down Expand Up @@ -191,19 +190,13 @@ private[spark] class EventLoggingListener(

// log the peak executor metrics for the stage, for each live executor,
// whether or not the executor is running tasks for the stage
val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]()
val executorMap = liveStageExecutorMetrics.remove(
(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
executorMap.foreach {
executorEntry => {
for ((executorId, peakExecutorMetrics) <- executorEntry) {
// -1 timestamp indicates that the ExecutorMetricsUpdate event is being read from the
// history log, and contains the peak metrics for the stage whose StageCompleted event
// immediately follows
val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics)
val executorUpdate = new SparkListenerExecutorMetricsUpdate(
executorId, accumUpdates, Some(executorMetrics))
logEvent(executorUpdate)
logEvent(new SparkListenerStageExecutorMetrics(executorId, event.stageInfo.stageId,
event.stageInfo.attemptNumber(), peakExecutorMetrics.metrics))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.scheduler

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.MetricGetter

/**
Expand All @@ -35,11 +34,11 @@ private[spark] class PeakExecutorMetrics {
* @param executorMetrics the executor metrics to compare
* @return if there is a new peak value for any metric
*/
def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = {
def compareAndUpdate(executorMetrics: Array[Long]): Boolean = {
var updated: Boolean = false

(0 until MetricGetter.values.length).foreach { metricIdx =>
val newVal = executorMetrics.metrics(metricIdx)
val newVal = executorMetrics(metricIdx)
if ( newVal > metrics(metricIdx)) {
updated = true
metrics(metricIdx) = newVal
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo

import org.apache.spark.{SparkConf, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.ui.SparkUI
Expand Down Expand Up @@ -166,7 +166,23 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
case class SparkListenerExecutorMetricsUpdate(
execId: String,
accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
executorUpdates: Option[ExecutorMetrics] = None)
executorUpdates: Option[Array[Long]] = None)
extends SparkListenerEvent

/**
* Peak metric values for the executor for the stage, written to the history log at stage
* completion.
* @param execId executor id
* @param stageId stage id
* @param stageAttemptId stage attempt
* @param executorMetrics executor level metrics
*/
@DeveloperApi
case class SparkListenerStageExecutorMetrics(
execId: String,
stageId: Int,
stageAttemptId: Int,
executorMetrics: Array[Long])
extends SparkListenerEvent

@DeveloperApi
Expand Down Expand Up @@ -266,6 +282,11 @@ private[spark] trait SparkListenerInterface {
*/
def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit

/**
* Called when the driver reads stage executor metrics from the history log.
*/
def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit

/**
* Called when the driver registers a new executor.
*/
Expand Down Expand Up @@ -363,6 +384,9 @@ abstract class SparkListener extends SparkListenerInterface {
override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }

override def onStageExecutorMetrics(
executorMetrics: SparkListenerStageExecutorMetrics): Unit = { }

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ private[spark] trait SparkListenerBus
listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
listener.onExecutorMetricsUpdate(metricsUpdate)
case stageExecutorMetrics: SparkListenerStageExecutorMetrics =>
listener.onStageExecutorMetrics(stageExecutorMetrics)
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.scheduler

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AccumulatorV2
Expand Down Expand Up @@ -77,7 +76,7 @@ private[spark] trait TaskScheduler {
execId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorUpdates: ExecutorMetrics): Boolean
executorUpdates: Array[Long]): Boolean

/**
* Get an application ID associated with the job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import scala.util.Random

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
Expand Down Expand Up @@ -444,7 +443,7 @@ private[spark] class TaskSchedulerImpl(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorMetrics: ExecutorMetrics): Boolean = {
executorMetrics: Array[Long]): Boolean = {
// (taskId, stageId, stageAttemptId, accumUpdates)
val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
accumUpdates.flatMap { case (id, updates) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
import org.apache.spark.status.api.v1
Expand Down Expand Up @@ -66,6 +66,7 @@ private[spark] class AppStatusListener(
private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
private val liveJobs = new HashMap[Int, LiveJob]()
private val liveExecutors = new HashMap[String, LiveExecutor]()
private val deadExecutors = new HashMap[String, LiveExecutor]()
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
Expand Down Expand Up @@ -204,6 +205,19 @@ private[spark] class AppStatusListener(
update(rdd, now)
}
}
if (isExecutorActiveForLiveStages(exec)) {
// the executor was running for a currently active stage, so save it for now in
// deadExecutors, and remove when there are no active stages overlapping with the
// executor.
deadExecutors.put(event.executorId, exec)
}
}
}

/** Was the specified executor active for any currently live stages? */
private def isExecutorActiveForLiveStages(exec: LiveExecutor): Boolean = {
liveStages.values.asScala.exists { stage =>
stage.info.submissionTime.getOrElse(0L) < exec.removeTime.getTime
}
}

Expand Down Expand Up @@ -618,6 +632,9 @@ private[spark] class AppStatusListener(
}
}

// remove any dead executors that were not running for any currently active stages
deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec))

appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
kvstore.write(appSummary)
}
Expand Down Expand Up @@ -669,7 +686,7 @@ private[spark] class AppStatusListener(
}
}
}
event.executorUpdates.foreach { updates: ExecutorMetrics =>
event.executorUpdates.foreach { updates: Array[Long] =>
// check if there is a new peak value for any of the executor level memory metrics
liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
if (exec.peakExecutorMetrics.compareAndUpdate(updates)) {
Expand All @@ -679,6 +696,21 @@ private[spark] class AppStatusListener(
}
}

override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = {
val now = System.nanoTime()

// check if there is a new peak value for any of the executor level memory metrics
liveExecutors.get(executorMetrics.execId)
.orElse(deadExecutors.get(executorMetrics.execId)) match {
case Some(exec) =>
if (exec.peakExecutorMetrics.compareAndUpdate(executorMetrics.executorMetrics)) {
maybeUpdate(exec, now)
}
case None =>
logWarning("unable to find executor " + executorMetrics.execId)
}
}

override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
event.blockUpdatedInfo.blockId match {
case block: RDDBlockId => updateRDDBlock(event, block)
Expand Down
Loading