Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9f5b195
[SPARK-26329][CORE] Faster polling of executor memory metrics.
wypoon Jan 4, 2019
03e41a8
[SPARK-26329][CORE] Fix test compilation error in sql.
wypoon Feb 12, 2019
7f6bd74
[SPARK-26329][CORE] Fix Mima issues.
wypoon Feb 13, 2019
7397897
[SPARK-26329][CORE] Fix possible NPE.
wypoon Feb 13, 2019
e1aeafc
[SPARK-26329][CORE] Fix JsonProtocolSuite post-rebase to account for …
wypoon Mar 6, 2019
75ba39d
[SPARK-26329][CORE] Extract polling logic into a separate class.
wypoon Mar 8, 2019
ea2ff0d
[SPARK-26329][CORE] On task failure, send executor metrics in the Tas…
wypoon Mar 18, 2019
0cbfc04
[SPARK-26329][CORE] Unit tests for sending executor metrics in TaskRe…
wypoon Mar 21, 2019
8cb30a8
[SPARK-26329][CORE] Add driver updates to test for executor metrics a…
wypoon Mar 22, 2019
077abb0
[SPARK-26329][CORE] Add SparkListenerTaskEnd events to test for execu…
wypoon Mar 23, 2019
7a3c90d
[SPARK-26329][CORE] Address feedback from irashid.
wypoon Mar 27, 2019
3ed583a
[SPARK-26329][CORE] Fix ExecutorSuite failures.
wypoon Mar 28, 2019
0a4828a
[SPARK-26329][CORE] Delete a comment on irashid's suggestion.
wypoon Mar 28, 2019
9530b75
[SPARK-26329][CORE] Change executorUpdates to be a scala.collection.m…
wypoon Mar 28, 2019
38a397c
[SPARK-26329][CORE] Update HistoryServerSuite.
wypoon Apr 9, 2019
e062e60
[SPARK-26329][CORE] Get executor updates and reset the peaks in a sin…
wypoon May 15, 2019
20b4b7e
[SPARK-26329][CORE] Test fixes after rebase on master.
wypoon Jul 3, 2019
b898ad2
[SPARK-26329][CORE] Adopt some suggestions from attilapiros.
wypoon Jul 4, 2019
fbb55bf
[SPARK-26329][CORE] Address feedback from Imran Rashid.
wypoon Jul 19, 2019
99addf1
[SPARK-26329][CORE] Make TCMP case class private.
wypoon Jul 19, 2019
7331b27
[SPARK-26329][CORE] Fix a test post-rebase.
wypoon Jul 29, 2019
7556d6a
[SPARK-26329][CORE] Update a doc comment based on feedback from Imran…
wypoon Jul 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-26329][CORE] Address feedback from Imran Rashid.
Fix some bugs in ExecutorSuite as well.
  • Loading branch information
wypoon committed Jul 29, 2019
commit fbb55bfa295a5c9326b460dbad729fa82d530113
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicLongArray}

import scala.collection.mutable.HashMap

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.memory.MemoryManager
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* :: DeveloperApi ::
* A class that polls executor metrics, and tracks their peaks per task and per stage.
* Each executor keeps an instance of this class.
* The poll method polls the executor metrics, and is either run in its own thread or
Expand All @@ -38,19 +36,31 @@ import org.apache.spark.util.{ThreadUtils, Utils}
* executor's task runner threads concurrently with the polling thread. One thread may
* update one of these maps while another reads it, so the reading thread may not get
* the latest metrics, but this is ok.
* One ConcurrentHashMap tracks the number of running tasks and the executor metric
* peaks for each stage. A positive task count means the stage is active. When the task
* count reaches zero for a stage, we remove the entry from the map. That way, the map
* only contains entries for active stages and does not grow without bound. On every
* heartbeat, the executor gets the per-stage metric peaks from this class and sends
* them and the peaks are reset.
* The other ConcurrentHashMap tracks the executor metric peaks for each task (the peaks
* seen while each task is running). At task end, these peaks are sent with the task
* result by the task runner.
* The reason we track executor metric peaks per task in addition to per stage is:
* If between heartbeats, a stage completes, so there are no more running tasks for that
* stage, then in the next heartbeat, there are no metrics sent for that stage; however,
* at the end of a task that belonged to that stage, the metrics would have been sent
* in the task result, so we do not lose those peaks.
*
* @param memoryManager the memory manager used by the executor.
* @param pollingInterval the polling interval in milliseconds.
*/
@DeveloperApi
private[spark] class ExecutorMetricsPoller(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a lot of threading and concurrency are added here. How much is the overhead? Is there a number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I explained, there is only ever one thread that polls executor metrics, either a dedicated polling thread, or the heartbeater thread. So, yes, there is possibly one more thread than before. However, I don't think that by itself adds much overhead. There are two ConcurrentHashMaps that we keep here, that the thread that polls updates in a bulk operation while task runner threads may concurrently read from them; this is what CHMs are designed to do well with little overhead. Task runner threads also put an entry in the CHMs when a task starts and updates an entry or removes it when the task ends. This requires synchronization but the operation is short.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I just asked if there was an overhead analysis and if there is a number like 10-20% overhead. If you are saying that you have done some testing on an actual cluster with some large/normal/low workload and concluded that the overhead isn't that much to even being noticeable, I'm totally fine. It is for guiding users to what expect if they want to use this.

Copy link
Contributor Author

@wypoon wypoon Mar 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done testing on an actual cluster. I ran a few applications with and without faster polling (100ms interval). In both cases, I used 1 second for the heartbeat interval. In both cases, I did not enable proc fs metrics. There was no noticeable difference to the task times in the executors or to the stage times. However, the workload is small and the tasks took ~15s to ~1 minute.

memoryManager: MemoryManager,
pollingInterval: Long)
extends Logging {
pollingInterval: Long) extends Logging {

type StageKey = (Int, Int)
// tuple for Task Count and Metric Peaks
type TCMP = (AtomicLong, AtomicLongArray)
// Task Count and Metric Peaks
case class TCMP(count: AtomicLong, peaks: AtomicLongArray)

// Map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks)
private val stageTCMP = new ConcurrentHashMap[StageKey, TCMP]
Expand Down Expand Up @@ -85,7 +95,7 @@ private[spark] class ExecutorMetricsPoller(
}

// for each active stage, update the peaks
stageTCMP.forEachValue(LONG_MAX_VALUE, v => updatePeaks(v._2))
stageTCMP.forEachValue(LONG_MAX_VALUE, v => updatePeaks(v.peaks))

// for each running task, update the peaks
taskMetricPeaks.forEachValue(LONG_MAX_VALUE, updatePeaks)
Expand All @@ -101,38 +111,29 @@ private[spark] class ExecutorMetricsPoller(

/**
* Called by TaskRunner#run.
*
* @param taskId the id of the task being run.
* @param stageId the id of the stage the task belongs to.
* @param stageAttemptId the attempt number of the stage the task belongs to.
*/
def onTaskStart(taskId: Long, stageId: Int, stageAttemptId: Int): Unit = {
// Put an entry in taskMetricPeaks for the task.
taskMetricPeaks.put(taskId, new AtomicLongArray(ExecutorMetricType.numMetrics))

// Put a new entry in stageTCMP for the stage if there isn't one already.
// Increment the task count.
val (count, _) = stageTCMP.computeIfAbsent((stageId, stageAttemptId),
_ => (new AtomicLong(0), new AtomicLongArray(ExecutorMetricType.numMetrics)))
val stageCount = count.incrementAndGet()
val countAndPeaks = stageTCMP.computeIfAbsent((stageId, stageAttemptId),
_ => TCMP(new AtomicLong(0), new AtomicLongArray(ExecutorMetricType.numMetrics)))
val stageCount = countAndPeaks.count.incrementAndGet()
logDebug(s"stageTCMP: ($stageId, $stageAttemptId) -> $stageCount")
}

/**
* Called by TaskRunner#run. It should only be called if onTaskStart has been called with
* the same arguments.
*
* @param taskId the id of the task that was run.
* @param stageId the id of the stage the task belongs to.
* @param stageAttemptId the attempt number of the stage the task belongs to.
*/
def onTaskCompletion(taskId: Long, stageId: Int, stageAttemptId: Int): Unit = {
// Decrement the task count.
// Remove the entry from stageTCMP if the task count reaches zero.

def decrementCount(stage: StageKey, countAndPeaks: TCMP): TCMP = {
val count = countAndPeaks._1
val countValue = count.decrementAndGet()
val countValue = countAndPeaks.count.decrementAndGet()
if (countValue == 0L) {
logDebug(s"removing (${stage._1}, ${stage._2}) from stageTCMP")
null
Expand All @@ -150,8 +151,6 @@ private[spark] class ExecutorMetricsPoller(

/**
* Called by TaskRunner#run.
*
* @param taskId the id of the task that was run.
*/
def getTaskMetricPeaks(taskId: Long): Array[Long] = {
// If this is called with an invalid taskId or a valid taskId but the task was killed and
Expand All @@ -177,8 +176,8 @@ private[spark] class ExecutorMetricsPoller(
val executorUpdates = new HashMap[StageKey, ExecutorMetrics]

def getUpdateAndResetPeaks(k: StageKey, v: TCMP): TCMP = {
executorUpdates.put(k, new ExecutorMetrics(v._2))
(v._1, new AtomicLongArray(ExecutorMetricType.numMetrics))
executorUpdates.put(k, new ExecutorMetrics(v.peaks))
TCMP(v.count, new AtomicLongArray(ExecutorMetricType.numMetrics))
}

stageTCMP.replaceAll(getUpdateAndResetPeaks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ private[spark] class EventLoggingListener(

override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
if (shouldLogStageExecutorMetrics) {
event.executorUpdates.foreach { case (stageKey1, peaks) =>
event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) =>
// If the update came from the driver, stageKey1 will be the dummy key (-1, -1),
// so record those peaks for all active stages.
// Otherwise, record the peaks for the matching stage.
if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) {
val metrics = metricsPerExecutor.getOrElseUpdate(
event.execId, new ExecutorMetrics())
metrics.compareAndUpdatePeakValues(peaks)
metrics.compareAndUpdatePeakValues(newPeaks)
}
}
}
Expand Down
62 changes: 28 additions & 34 deletions core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.memory.TestMemoryManager
Expand All @@ -55,6 +56,12 @@ import org.apache.spark.util.{LongAccumulator, UninterruptibleThread}
class ExecutorSuite extends SparkFunSuite
with LocalSparkContext with MockitoSugar with Eventually with PrivateMethodTester {

override def afterEach() {
// Unset any latches after each test; each test that needs them initializes new ones.
ExecutorSuiteHelper.latches = null
super.afterEach()
}

test("SPARK-15963: Catch `TaskKilledException` correctly in Executor.TaskRunner") {
// mock some objects to make Executor.launchTask() happy
val conf = new SparkConf
Expand Down Expand Up @@ -136,10 +143,6 @@ class ExecutorSuite extends SparkFunSuite
}
}

// This test does not use ExecutorSuiteHelper.latches.
// It instantiates a FetchFailureHidingRDD with throwOOM = false and interrupt = false.
// It calls runTaskGetFailReasonAndExceptionHandler (via runTaskAndGetFailReason) with
// killTask = false and poll = false.
test("SPARK-19276: Handle FetchFailedExceptions that are hidden by user exceptions") {
val conf = new SparkConf().setMaster("local").setAppName("executor suite test")
sc = new SparkContext(conf)
Expand All @@ -151,7 +154,8 @@ class ExecutorSuite extends SparkFunSuite
// fetch failure, not a generic exception from user code.
val inputRDD = new FetchFailureThrowingRDD(sc)
val secondRDD = new FetchFailureHidingRDD(sc, inputRDD, throwOOM = false, interrupt = false)
val taskDescription = createResultTaskDescription(serializer, resultFunc, secondRDD, 1)
val taskBinary = sc.broadcast(serializer.serialize((secondRDD, resultFunc)).array())
val taskDescription = createResultTaskDescription(serializer, taskBinary, secondRDD, 1)

val failReason = runTaskAndGetFailReason(taskDescription)
assert(failReason.isInstanceOf[FetchFailed])
Expand Down Expand Up @@ -211,14 +215,12 @@ class ExecutorSuite extends SparkFunSuite
// (and to poll executor metrics if necessary)
ExecutorSuiteHelper.latches = new ExecutorSuiteHelper
val secondRDD = new FetchFailureHidingRDD(sc, inputRDD, throwOOM = oom, interrupt = !oom)
val taskDescription = createResultTaskDescription(serializer, resultFunc, secondRDD, 1)
val taskBinary = sc.broadcast(serializer.serialize((secondRDD, resultFunc)).array())
val taskDescription = createResultTaskDescription(serializer, taskBinary, secondRDD, 1)

runTaskGetFailReasonAndExceptionHandler(taskDescription, killTask = !oom, poll)
}

// This test does not use ExecutorSuiteHelper.latches.
// It calls runTaskGetFailReasonAndExceptionHandler (via runTaskAndGetFailReason) with
// killTask = false and poll = false.
test("Gracefully handle error in task deserialization") {
val conf = new SparkConf
val serializer = new JavaSerializer(conf)
Expand Down Expand Up @@ -330,40 +332,35 @@ class ExecutorSuite extends SparkFunSuite
ExecutorSuiteHelper.latches = new ExecutorSuiteHelper
val resultFunc =
(context: TaskContext, itr: Iterator[Int]) => {
ExecutorSuiteHelper.latches.latch1.await(300, TimeUnit.MILLISECONDS)
ExecutorSuiteHelper.latches.latch2.countDown()
ExecutorSuiteHelper.latches.latch3.await(500, TimeUnit.MILLISECONDS)
// latch1 tells the test that the task is running, so it can ask the metricsPoller
// to poll; latch2 waits for the polling to be done
ExecutorSuiteHelper.latches.latch1.countDown()
ExecutorSuiteHelper.latches.latch2.await(5, TimeUnit.SECONDS)
itr.size
}
val rdd = new RDD[Int](sc, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[Int] = {
val l = List(1)
l.iterator
Iterator(1)
}
override protected def getPartitions: Array[Partition] = {
Array(new SimplePartition)
}
}
val taskDescription = createResultTaskDescription(serializer, resultFunc, rdd, 0)
val taskBinary = sc.broadcast(serializer.serialize((rdd, resultFunc)).array())
val taskDescription = createResultTaskDescription(serializer, taskBinary, rdd, 0)

val mockBackend = mock[ExecutorBackend]
when(mockBackend.statusUpdate(any(), meq(TaskState.RUNNING), any()))
.thenAnswer(new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
ExecutorSuiteHelper.latches.latch1.countDown()
}
})
var executor: Executor = null
try {
executor = new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true)
executor.launchTask(mockBackend, taskDescription)

// Ensure that the executor's metricsPoller is polled so that values are recorded for
// the task metrics
ExecutorSuiteHelper.latches.latch2.await(500, TimeUnit.MILLISECONDS)
ExecutorSuiteHelper.latches.latch1.await(5, TimeUnit.SECONDS)
executor.metricsPoller.poll()
ExecutorSuiteHelper.latches.latch3.countDown()
eventually(timeout(1.seconds), interval(10.milliseconds)) {
ExecutorSuiteHelper.latches.latch2.countDown()
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert(executor.numRunningTasks === 0)
}
} finally {
Expand Down Expand Up @@ -421,10 +418,9 @@ class ExecutorSuite extends SparkFunSuite

private def createResultTaskDescription(
serializer: SerializerInstance,
resultFunc: (TaskContext, Iterator[Int]) => Int,
taskBinary: Broadcast[Array[Byte]],
rdd: RDD[Int],
stageId: Int): TaskDescription = {
val taskBinary = sc.broadcast(serializer.serialize((rdd, resultFunc)).array())
val serializedTaskMetrics = serializer.serialize(TaskMetrics.registered).array()
val task = new ResultTask(
stageId = stageId,
Expand Down Expand Up @@ -459,8 +455,6 @@ class ExecutorSuite extends SparkFunSuite
runTaskGetFailReasonAndExceptionHandler(taskDescription, false)._1
}

// NOTE: This method is ever only called with killTask = false and poll = false when we are
// not using ExecutorSuiteHelper.latches.
private def runTaskGetFailReasonAndExceptionHandler(
taskDescription: TaskDescription,
killTask: Boolean,
Expand Down Expand Up @@ -492,11 +486,11 @@ class ExecutorSuite extends SparkFunSuite
}
killingThread.start()
} else {
// As noted above, when killTask = false and poll = false, we are not using latches;
// thus we only need to wait for latch1 and countdown latch2 when poll = true.
if (poll) {
ExecutorSuiteHelper.latches.latch1.await(1, TimeUnit.SECONDS)
executor.metricsPoller.poll()
if (ExecutorSuiteHelper.latches != null) {
ExecutorSuiteHelper.latches.latch1.await(5, TimeUnit.SECONDS)
if (poll) {
executor.metricsPoller.poll()
}
ExecutorSuiteHelper.latches.latch2.countDown()
}
}
Expand Down Expand Up @@ -566,7 +560,7 @@ class FetchFailureHidingRDD(
if (throwOOM) {
// Allow executor metrics to be polled (if necessary) before throwing the OOMError
ExecutorSuiteHelper.latches.latch1.countDown()
ExecutorSuiteHelper.latches.latch2.await(500, TimeUnit.MILLISECONDS)
ExecutorSuiteHelper.latches.latch2.await(5, TimeUnit.SECONDS)
// scalastyle:off throwerror
throw new OutOfMemoryError("OOM while handling another exception")
// scalastyle:on throwerror
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,6 @@ private[spark] object JsonProtocolSuite extends Assertions {
* --------------------------------- */

private[spark] def assertEquals(event1: SparkListenerEvent, event2: SparkListenerEvent) {
def lexOrder(x: (Int, Int), y: (Int, Int)) =
x._1 < y._1 || x._2 < y._2

(event1, event2) match {
case (e1: SparkListenerStageSubmitted, e2: SparkListenerStageSubmitted) =>
assert(e1.properties === e2.properties)
Expand Down Expand Up @@ -612,8 +609,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
assertSeqEquals[AccumulableInfo](updates1, updates2, (a, b) => a.equals(b))
})
assertSeqEquals[((Int, Int), ExecutorMetrics)](
e1.executorUpdates.toSeq.sortWith((x, y) => lexOrder(x._1, y._1)),
e2.executorUpdates.toSeq.sortWith((x, y) => lexOrder(x._1, y._1)),
e1.executorUpdates.toSeq.sortBy(_._1),
e2.executorUpdates.toSeq.sortBy(_._1),
(a, b) => {
val (k1, v1) = a
val (k2, v2) = b
Expand Down