Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9f5b195
[SPARK-26329][CORE] Faster polling of executor memory metrics.
wypoon Jan 4, 2019
03e41a8
[SPARK-26329][CORE] Fix test compilation error in sql.
wypoon Feb 12, 2019
7f6bd74
[SPARK-26329][CORE] Fix Mima issues.
wypoon Feb 13, 2019
7397897
[SPARK-26329][CORE] Fix possible NPE.
wypoon Feb 13, 2019
e1aeafc
[SPARK-26329][CORE] Fix JsonProtocolSuite post-rebase to account for …
wypoon Mar 6, 2019
75ba39d
[SPARK-26329][CORE] Extract polling logic into a separate class.
wypoon Mar 8, 2019
ea2ff0d
[SPARK-26329][CORE] On task failure, send executor metrics in the Tas…
wypoon Mar 18, 2019
0cbfc04
[SPARK-26329][CORE] Unit tests for sending executor metrics in TaskRe…
wypoon Mar 21, 2019
8cb30a8
[SPARK-26329][CORE] Add driver updates to test for executor metrics a…
wypoon Mar 22, 2019
077abb0
[SPARK-26329][CORE] Add SparkListenerTaskEnd events to test for execu…
wypoon Mar 23, 2019
7a3c90d
[SPARK-26329][CORE] Address feedback from irashid.
wypoon Mar 27, 2019
3ed583a
[SPARK-26329][CORE] Fix ExecutorSuite failures.
wypoon Mar 28, 2019
0a4828a
[SPARK-26329][CORE] Delete a comment on irashid's suggestion.
wypoon Mar 28, 2019
9530b75
[SPARK-26329][CORE] Change executorUpdates to be a scala.collection.m…
wypoon Mar 28, 2019
38a397c
[SPARK-26329][CORE] Update HistoryServerSuite.
wypoon Apr 9, 2019
e062e60
[SPARK-26329][CORE] Get executor updates and reset the peaks in a sin…
wypoon May 15, 2019
20b4b7e
[SPARK-26329][CORE] Test fixes after rebase on master.
wypoon Jul 3, 2019
b898ad2
[SPARK-26329][CORE] Adopt some suggestions from attilapiros.
wypoon Jul 4, 2019
fbb55bf
[SPARK-26329][CORE] Address feedback from Imran Rashid.
wypoon Jul 19, 2019
99addf1
[SPARK-26329][CORE] Make TCMP case class private.
wypoon Jul 19, 2019
7331b27
[SPARK-26329][CORE] Fix a test post-rebase.
wypoon Jul 29, 2019
7556d6a
[SPARK-26329][CORE] Update a doc comment based on feedback from Imran…
wypoon Jul 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-26329][CORE] Extract polling logic into a separate class.
Address review feedback from Edwina Lu and Imran Rashid.
Yet to address: handle sending task executor metrics on task failure.
  • Loading branch information
wypoon committed Jul 29, 2019
commit 75ba39d54bfe2901330f698f5fd0f338415138f3
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import org.apache.spark.util._
*/
private[spark] case class Heartbeat(
executorId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
// taskId -> accumulator updates
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorUpdates: Map[(Int, Int), ExecutorMetrics]) // executor level updates
// (stageId, stageAttemptId) -> executor metric peaks
executorUpdates: Map[(Int, Int), ExecutorMetrics])

/**
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
Expand Down
140 changes: 20 additions & 120 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package org.apache.spark.executor

import java.io.{File, NotSerializableException}
import java.lang.Long.{MAX_VALUE => LONG_MAX_VALUE}
import java.lang.Thread.UncaughtExceptionHandler
import java.lang.management.ManagementFactory
import java.net.{URI, URL}
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicLong, AtomicLongArray}
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
Expand All @@ -40,7 +38,6 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -173,17 +170,6 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

type StageKey = (Int, Int)

// Map of (stageId, stageAttemptId) to count of running tasks
private val activeStages = new ConcurrentHashMap[StageKey, AtomicLong]

// Map of (stageId, stageAttemptId) to executor metric peaks
private val stageMetricPeaks = new ConcurrentHashMap[StageKey, AtomicLongArray]

// Map of taskId to executor metric peaks
private val taskMetricPeaks = new ConcurrentHashMap[Long, AtomicLongArray]

/**
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
* times, it should kill itself. The default value is 60. For example, if max failures is 60 and
Expand All @@ -207,36 +193,12 @@ private[spark] class Executor(
*/
private val METRICS_POLLING_INTERVAL_MS = conf.get(EXECUTOR_METRICS_POLLING_INTERVAL)

// Executor for the metrics polling task
private val poller =
if (METRICS_POLLING_INTERVAL_MS > 0) {
ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-poller")
} else {
null
}

private def poll(): Unit = {
// get the latest values for the metrics
val latestMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager)

def compareAndUpdate(current: Long, latest: Long): Long =
if (latest > current) latest else current

def updatePeaks(metrics: AtomicLongArray): Unit = {
(0 until metrics.length).foreach { i =>
metrics.getAndAccumulate(i, latestMetrics(i), compareAndUpdate)
}
}

def peaksForStage(k: StageKey, v: AtomicLong): AtomicLongArray =
if (v.get() > 0) stageMetricPeaks.get(k) else null
private val pollOnHeartbeat = if (METRICS_POLLING_INTERVAL_MS > 0) false else true

// for each active stage (number of running tasks > 0), get the peaks and update them
activeStages.forEach[AtomicLongArray](LONG_MAX_VALUE, peaksForStage, updatePeaks)

// for each running task, update the peaks
taskMetricPeaks.forEachValue(LONG_MAX_VALUE, updatePeaks)
}
// Poller for the memory metrics.
private val metricsPoller = new ExecutorMetricsPoller(
env.memoryManager,
METRICS_POLLING_INTERVAL_MS)

// Executor for the heartbeat task.
private val heartbeater = new Heartbeater(
Expand All @@ -256,19 +218,14 @@ private[spark] class Executor(

heartbeater.start()

if (poller != null) {
val pollingTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(poll())
}
poller.scheduleAtFixedRate(pollingTask, 0L, METRICS_POLLING_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
metricsPoller.start()

private[executor] def numRunningTasks: Int = runningTasks.size()

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
taskMetricPeaks.put(taskDescription.taskId, new AtomicLongArray(ExecutorMetricType.numMetrics))
metricsPoller.onTaskLaunch(taskDescription.taskId)
threadPool.execute(tr)
}

Expand Down Expand Up @@ -311,14 +268,11 @@ private[spark] class Executor(

def stop(): Unit = {
env.metricsSystem.report()
if (poller != null) {
try {
poller.shutdown()
poller.awaitTermination(METRICS_POLLING_INTERVAL_MS, TimeUnit.MILLISECONDS)
} catch {
case NonFatal(e) =>
logWarning("Unable to stop poller", e)
}
try {
metricsPoller.stop()
} catch {
case NonFatal(e) =>
logWarning("Unable to stop executor metrics poller", e)
}
try {
heartbeater.stop()
Expand Down Expand Up @@ -480,18 +434,7 @@ private[spark] class Executor(

val stageId = task.stageId
val stageAttemptId = task.stageAttemptId
val count = new AtomicLong(0)
val old = activeStages.putIfAbsent((stageId, stageAttemptId), count)
val stageCount =
if (old != null) {
old.incrementAndGet()
} else {
logDebug(s"added ($stageId, $stageAttemptId) to activeStages")
count.incrementAndGet()
}
logDebug(s"activeStages: ($stageId, $stageAttemptId) -> $stageCount")
stageMetricPeaks.putIfAbsent((stageId, stageAttemptId),
new AtomicLongArray(ExecutorMetricType.numMetrics))
metricsPoller.onTaskStart(stageId, stageAttemptId)

// Run the actual task and measure its runtime.
taskStartTimeNs = System.nanoTime()
Expand Down Expand Up @@ -608,19 +551,11 @@ private[spark] class Executor(
executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled)
executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled)

def getMetricPeaks(): Array[Long] = {
val currentPeaks = taskMetricPeaks.get(taskId)
val metricPeaks = new Array[Long](ExecutorMetricType.numMetrics)
ExecutorMetricType.metricToOffset.foreach { case (_, i) =>
metricPeaks(i) = currentPeaks.get(i)
}
metricPeaks
}

// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId)
// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueBytes, accumUpdates, getMetricPeaks)
val directResult = new DirectTaskResult(valueBytes, accumUpdates, metricPeaks)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit()

Expand All @@ -646,28 +581,7 @@ private[spark] class Executor(
}
}

def decrementCount(stage: StageKey, count: AtomicLong): AtomicLong = {
val countValue = count.decrementAndGet()
if (countValue == 0L) {
logDebug(s"removing (${stage._1}, ${stage._2}) from activeStages")
null
} else {
logDebug(s"activeStages: (${stage._1}, ${stage._2}) -> " + countValue)
count
}
}

// If the count is zero, the stage is removed from activeStages
activeStages.computeIfPresent((stageId, stageAttemptId), decrementCount)

// If the stage has been removed from activeStages, remove it from stageMetricPeaks too
def removeInactive(k: StageKey): Unit = {
if (activeStages.get(k) == null) {
stageMetricPeaks.remove(k)
}
}
stageMetricPeaks.forEachKey(LONG_MAX_VALUE, removeInactive)

metricsPoller.onTaskCompletion(stageId, stageAttemptId)
executorSource.SUCCEEDED_TASKS.inc(1L)
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
Expand Down Expand Up @@ -743,7 +657,7 @@ private[spark] class Executor(
}
} finally {
runningTasks.remove(taskId)
taskMetricPeaks.remove(taskId)
metricsPoller.onTaskCleanup(taskId)
}
}

Expand Down Expand Up @@ -960,25 +874,11 @@ private[spark] class Executor(
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
val curGCTime = computeTotalGcTime()

// if not polling in a separater poller, poll here
if (poller == null) {
poll()
}

// build the executor level memory metrics
val executorUpdates = new HashMap[StageKey, ExecutorMetrics]

def peaksForStage(k: StageKey, v: AtomicLong): (StageKey, AtomicLongArray) =
if (v.get() > 0) (k, stageMetricPeaks.get(k)) else null

def addPeaks(nested: (StageKey, AtomicLongArray)): Unit = {
val (k, v) = nested
executorUpdates.put(k, new ExecutorMetrics(v))
// at the same time, reset the peaks in stageMetricPeaks
stageMetricPeaks.put(k, new AtomicLongArray(ExecutorMetricType.numMetrics))
if (pollOnHeartbeat) {
metricsPoller.poll()
}

activeStages.forEach[(StageKey, AtomicLongArray)](LONG_MAX_VALUE, peaksForStage, addPeaks)
val executorUpdates = metricsPoller.getExecutorUpdates()

for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
Expand Down
Loading