Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c8e8abe
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
5d6ae1c
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
ad10d28
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
10ed328
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
2d20367
wip on enum based metrics
squito May 23, 2018
f904f1e
wip ... has both enum and non-enum version
squito May 23, 2018
c502ec4
case objects, mostly complete
squito May 23, 2018
7879e66
Merge pull request #1 from squito/metric_enums
edwinalu Jun 3, 2018
2662f6f
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
2871335
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
da83f2e
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
f25a44b
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
ca85c82
Add configuration parameter spark.eventLog.logExecutorMetricsUpdates.…
edwinalu May 15, 2018
8b74ba8
wip on enum based metrics
squito May 23, 2018
036148c
wip ... has both enum and non-enum version
squito May 23, 2018
91fb1db
case objects, mostly complete
squito May 23, 2018
2d8894a
Address comments (move heartbeater from DAGScheduler to SparkContext,…
edwinalu Jun 10, 2018
99044e6
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
edwinalu Jun 14, 2018
263c8c8
code review comments
edwinalu Jun 14, 2018
812fdcf
code review comments:
edwinalu Jun 22, 2018
7ed42a5
Address code review comments. Also make executorUpdates in SparkListe…
edwinalu Jun 28, 2018
8d9acdf
Revert and make executorUpdates in SparkListenerExecutorMetricsUpdate…
edwinalu Jun 29, 2018
20799d2
code review comments: hid array implementation of executor metrics, a…
edwinalu Jul 25, 2018
8905d23
merge with master
edwinalu Jul 25, 2018
04875b8
Integration of ProcessTreeMetrics with PR 21221
Jul 26, 2018
a0eed11
address code review comments
edwinalu Aug 5, 2018
162b9b2
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
Aug 6, 2018
29a44c7
Changing the position of ptree and also make the computation configur…
Aug 7, 2018
3671427
Seperate metrics for jvm, python and others and update the tests
Aug 8, 2018
03cd5bc
code review comments
edwinalu Aug 13, 2018
c79b5ab
Merge branch 'SPARK-23429.2' of https://github.com/edwinalu/spark int…
Aug 14, 2018
10e7f15
Merge branch 'master' into SPARK-23429.2
edwinalu Aug 14, 2018
a14b82a
merge conflicts
edwinalu Aug 14, 2018
2897281
disable stage executor metrics logging by default
edwinalu Aug 16, 2018
8f97b50
Merge branch 'SPARK-23429.2' of https://github.com/rezasafi/spark int…
Aug 17, 2018
b14cebc
Update JsonProtocolSuite with new metrics.
Aug 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import java.util.EnumSet
import java.util.Locale

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable.{ArrayBuffer, Map}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
Expand Down Expand Up @@ -96,7 +96,7 @@ private[spark] class EventLoggingListener(
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)

// map of (stageId, stageAttempt), to peak executor metrics for the stage
private val liveStageExecutorMetrics = HashMap[(Int, Int), HashMap[String, ExecutorMetrics]]()
private val liveStageExecutorMetrics = Map.empty[(Int, Int), Map[String, ExecutorMetrics]]

/**
* Creates the log file in the configured log directory.
Expand Down Expand Up @@ -165,7 +165,7 @@ private[spark] class EventLoggingListener(
if (shouldLogStageExecutorMetrics) {
// record the peak metrics for the new stage
liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()),
new HashMap[String, ExecutorMetrics]())
Map.empty[String, ExecutorMetrics])
}
}

Expand Down Expand Up @@ -338,7 +338,7 @@ private[spark] object EventLoggingListener extends Logging {
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)

// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new HashMap[String, CompressionCodec]
private val codecMap = Map.empty[String, CompressionCodec]

/**
* Write metadata about an event log to the given stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,13 +706,10 @@ private[spark] class AppStatusListener(
// while reading from the log. SparkListenerStageExecutorMetrics are only processed
// when reading logs.
liveExecutors.get(executorMetrics.execId)
.orElse(deadExecutors.get(executorMetrics.execId)) match {
case Some(exec) =>
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) {
maybeUpdate(exec, now)
}
case None =>
logWarning("unable to find executor " + executorMetrics.execId)
.orElse(deadExecutors.get(executorMetrics.execId)).map { exec =>
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) {
update(exec, now)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
executorLogs,
memoryMetrics,
blacklistedInStages,
if (peakExecutorMetrics.isSet()) Some(peakExecutorMetrics) else None)
Some(peakExecutorMetrics).filter(_.isSet))
new ExecutorSummaryWrapper(info)
}
}
Expand Down
20 changes: 7 additions & 13 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,33 +116,27 @@ class MemoryMetrics private[spark](

/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
private[spark] class ExecutorMetricsJsonDeserializer
extends JsonDeserializer[Option[ExecutorMetrics]] {
extends JsonDeserializer[Option[ExecutorMetrics]] {
override def deserialize(
jsonParser: JsonParser,
deserializationContext: DeserializationContext): Option[ExecutorMetrics] = {
val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
new TypeReference[Option[Map[String, java.lang.Long]]] {})
metricsMap match {
case Some(metrics) =>
Some(new ExecutorMetrics(metrics))
case None => None
}
metricsMap.map(metrics => new ExecutorMetrics(metrics))
}
}
/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */
private[spark] class ExecutorMetricsJsonSerializer
extends JsonSerializer[Option[ExecutorMetrics]] {
extends JsonSerializer[Option[ExecutorMetrics]] {
override def serialize(
metrics: Option[ExecutorMetrics],
jsonGenerator: JsonGenerator,
serializerProvider: SerializerProvider): Unit = {
metrics match {
case Some(m) =>
val metricsMap = ExecutorMetricType.values.map { metricType =>
metrics.foreach { m: ExecutorMetrics =>
val metricsMap = ExecutorMetricType.values.map { metricType =>
metricType.name -> m.getMetricValue(metricType)
}.toMap
jsonGenerator.writeObject(metricsMap)
case None =>
}.toMap
jsonGenerator.writeObject(metricsMap)
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -723,9 +723,8 @@ private[spark] object JsonProtocol {
(json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
(taskId, stageId, stageAttemptId, updates)
}
val executorUpdates = jsonOption(json \ "Executor Metrics Updated") match {
case None => None
case Some(executorUpdate) => Some(executorMetricsFromJson(executorUpdate))
val executorUpdates = jsonOption(json \ "Executor Metrics Updated").map {
executorUpdate => executorMetricsFromJson(executorUpdate)
}
SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, executorUpdates)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,26 +272,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus(conf)

// expected StageExecutorMetrics, for the given stage id and executor id
val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] =
Map(
((0, "1"),
new SparkListenerStageExecutorMetrics("1", 0, 0,
new ExecutorMetrics(Array(5000L, 50L, 4000L, 8000L, 50L, 20L, 50L, 10L, 100L, 30L,
70L, 20L)))),
((0, "2"),
new SparkListenerStageExecutorMetrics("2", 0, 0,
new ExecutorMetrics(Array(7000L, 70L, 4000L, 9000L, 50L, 20L, 10L, 10L, 50L, 30L,
80L, 40L)))),
((1, "1"),
new SparkListenerStageExecutorMetrics("1", 1, 0,
new ExecutorMetrics(Array(7000L, 70L, 3000L, 5000L, 50L, 30L, 60L, 30L, 80L, 55L,
50L, 0L)))),
((1, "2"),
new SparkListenerStageExecutorMetrics("2", 1, 0,
new ExecutorMetrics(Array(7000L, 70L, 5000L, 8000L, 50L, 40L, 10L, 30L, 50L, 60L,
40L, 40L)))))

// Events to post.
val events = Array(
SparkListenerApplicationStart("executionMetrics", None,
Expand Down Expand Up @@ -365,6 +345,26 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
listenerBus.stop()
eventLogger.stop()

// expected StageExecutorMetrics, for the given stage id and executor id
val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] =
Map(
((0, "1"),
new SparkListenerStageExecutorMetrics("1", 0, 0,
new ExecutorMetrics(Array(5000L, 50L, 4000L, 8000L, 50L, 20L, 50L, 10L, 100L, 30L,
70L, 20L)))),
((0, "2"),
new SparkListenerStageExecutorMetrics("2", 0, 0,
new ExecutorMetrics(Array(7000L, 70L, 4000L, 9000L, 50L, 20L, 10L, 10L, 50L, 30L,
80L, 40L)))),
((1, "1"),
new SparkListenerStageExecutorMetrics("1", 1, 0,
new ExecutorMetrics(Array(7000L, 70L, 3000L, 5000L, 50L, 30L, 60L, 30L, 80L, 55L,
50L, 0L)))),
((1, "2"),
new SparkListenerStageExecutorMetrics("2", 1, 0,
new ExecutorMetrics(Array(7000L, 70L, 5000L, 8000L, 50L, 40L, 10L, 30L, 50L, 60L,
40L, 40L)))))

// Verify the log file contains the expected events.
// Posted events should be logged, except for ExecutorMetricsUpdate events -- these
// are consolidated, and the peak values for each stage are logged at stage end.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
// Verify the same events are replayed in the same order
assert(sc.eventLogger.isDefined)
val originalEvents = sc.eventLogger.get.loggedEvents.filter { e =>
JsonProtocol.sparkEventFromJson(e) match {
case event: SparkListenerStageExecutorMetrics => false
case _ => true
}
!JsonProtocol.sparkEventFromJson(e).isInstanceOf[SparkListenerStageExecutorMetrics]
}
val replayedEvents = eventMonster.loggedEvents
originalEvents.zip(replayedEvents).foreach { case (e1, e2) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {

test("stage executor metrics") {
// simulate reading in StageExecutorMetrics events from the history log
val listener = new AppStatusListener(store, conf, true)
val listener = new AppStatusListener(store, conf, false)
val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42)

listener.onExecutorAdded(createExecutorAddedEvent(1))
Expand Down