From 6eb516a8da1c82b1d1a837f79c35a128af836cc6 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Sat, 25 Apr 2015 23:38:57 -0700 Subject: [PATCH 1/2] SPARK-6954. ExecutorAllocationManager can end up requesting a negative number of executors --- .../spark/ExecutorAllocationManager.scala | 84 ++++++-------- .../ExecutorAllocationManagerSuite.scala | 105 +++++++++--------- 2 files changed, 87 insertions(+), 102 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b986fa87dc2f4..00156178999af 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -27,13 +27,20 @@ import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils} /** * An agent that dynamically allocates and removes executors based on the workload. * - * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If - * the scheduler queue is not drained in N seconds, then new executors are added. If the queue - * persists for another M seconds, then more executors are added and so on. The number added - * in each round increases exponentially from the previous round until an upper bound on the - * number of executors has been reached. The upper bound is based both on a configured property - * and on the number of tasks pending: the policy will never increase the number of executor - * requests past the number needed to handle all pending tasks. + * The ExecutorAllocationManager maintains a moving target number of executors which is periodically + * synced to the cluster manager. The target starts at a configured initial value and changes with + * the number of pending and running tasks. + * + * Decreasing the target number of executors happens when the current target is more than needed to + * handle the current load. The target number of executors is always truncated to the number of + * executors that could run all current running and pending tasks at once. + * + * Increasing the target number of executors happens in response to backlogged tasks waiting to be + * scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If + * the queue persists for another M seconds, then more executors are added and so on. The number + * added in each round increases exponentially from the previous round until an upper bound has been + * reached. The upper bound is based both on a configured property and on the current number of + * running and pending tasks, as described above. * * The rationale for the exponential increase is twofold: (1) Executors should be added slowly * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, @@ -105,8 +112,9 @@ private[spark] class ExecutorAllocationManager( // Number of executors to add in the next round private var numExecutorsToAdd = 1 - // Number of executors that have been requested but have not registered yet - private var numExecutorsPending = 0 + // The desired number of executors at this moment in time. If all our executors were to die, this + // is the number of executors we would immediately want from the cluster manager. + private var numExecutorsTarget = 0 // Executors that have been requested to be removed but have not been killed yet private val executorsPendingToRemove = new mutable.HashSet[String] @@ -199,13 +207,6 @@ private[spark] class ExecutorAllocationManager( executor.awaitTermination(10, TimeUnit.SECONDS) } - /** - * The number of executors we would have if the cluster manager were to fulfill all our existing - * requests. - */ - private def targetNumExecutors(): Int = - numExecutorsPending + executorIds.size - executorsPendingToRemove.size - /** * The maximum number of executors we would need under the current load to satisfy all running * and pending tasks, rounded up. @@ -240,8 +241,8 @@ private[spark] class ExecutorAllocationManager( /** * Check to see whether our existing allocation and the requests we've made previously exceed our - * current needs. If so, let the cluster manager know so that it can cancel pending requests that - * are unneeded. + * current needs. If so, truncate our target and let the cluster manager know so that it can + * cancel pending requests that are unneeded. * * If not, and the add time has expired, see if we can request new executors and refresh the add * time. @@ -249,16 +250,16 @@ private[spark] class ExecutorAllocationManager( * @return the delta in the target number of executors. */ private def addOrCancelExecutorRequests(now: Long): Int = synchronized { - val currentTarget = targetNumExecutors val maxNeeded = maxNumExecutorsNeeded - if (maxNeeded < currentTarget) { + if (maxNeeded < numExecutorsTarget) { // The target number exceeds the number we actually need, so stop adding new - // executors and inform the cluster manager to cancel the extra pending requests. - val newTotalExecutors = math.max(maxNeeded, minNumExecutors) - client.requestTotalExecutors(newTotalExecutors) + // executors and inform the cluster manager to cancel the extra pending requests + val oldNumExecutorsTarget = numExecutorsTarget + numExecutorsTarget = math.max(maxNeeded, minNumExecutors) + client.requestTotalExecutors(numExecutorsTarget) numExecutorsToAdd = 1 - updateNumExecutorsPending(newTotalExecutors) + numExecutorsTarget - oldNumExecutorsTarget } else if (addTime != NOT_SET && now >= addTime) { val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + @@ -281,21 +282,22 @@ private[spark] class ExecutorAllocationManager( */ private def addExecutors(maxNumExecutorsNeeded: Int): Int = { // Do not request more executors if it would put our target over the upper bound - val currentTarget = targetNumExecutors - if (currentTarget >= maxNumExecutors) { - logDebug(s"Not adding executors because there are already ${executorIds.size} " + - s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") + if (numExecutorsTarget >= maxNumExecutors) { + val numExecutorsPending = numExecutorsTarget - executorIds.size + logDebug(s"Not adding executors because there are already ${executorIds.size} registered" + + s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded) - val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors) - val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors) + val oldNumExecutorsTarget = numExecutorsTarget + numExecutorsTarget = math.min(numExecutorsTarget + numExecutorsToAdd, actualMaxNumExecutors) + val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget) if (addRequestAcknowledged) { - val delta = updateNumExecutorsPending(newTotalExecutors) + val delta = numExecutorsTarget - oldNumExecutorsTarget logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" + - s" (new desired total will be $newTotalExecutors)") + s" (new desired total will be $numExecutorsTarget)") numExecutorsToAdd = if (delta == numExecutorsToAdd) { numExecutorsToAdd * 2 } else { @@ -304,23 +306,11 @@ private[spark] class ExecutorAllocationManager( delta } else { logWarning( - s"Unable to reach the cluster manager to request $newTotalExecutors total executors!") + s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!") 0 } } - /** - * Given the new target number of executors, update the number of pending executor requests, - * and return the delta from the old number of pending requests. - */ - private def updateNumExecutorsPending(newTotalExecutors: Int): Int = { - val newNumExecutorsPending = - newTotalExecutors - executorIds.size + executorsPendingToRemove.size - val delta = newNumExecutorsPending - numExecutorsPending - numExecutorsPending = newNumExecutorsPending - delta - } - /** * Request the cluster manager to remove the given executor. * Return whether the request is received. @@ -372,10 +362,6 @@ private[spark] class ExecutorAllocationManager( // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951) executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle) logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})") - if (numExecutorsPending > 0) { - numExecutorsPending -= 1 - logDebug(s"Decremented number of pending executors ($numExecutorsPending left)") - } } else { logWarning(s"Duplicate executor $executorId has registered") } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 22acc270b983e..eece2676f5a17 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -78,7 +78,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit test("starting state") { sc = createSparkContext() val manager = sc.executorAllocationManager.get - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) assert(executorIds(manager).isEmpty) assert(addTime(manager) === ExecutorAllocationManager.NOT_SET) @@ -91,42 +91,42 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Keep adding until the limit is reached - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 1) + assert(numExecutorsTarget(manager) === 1) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) - assert(numExecutorsPending(manager) === 3) + assert(numExecutorsTarget(manager) === 3) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 4) - assert(numExecutorsPending(manager) === 7) + assert(numExecutorsTarget(manager) === 7) assert(numExecutorsToAdd(manager) === 8) assert(addExecutors(manager) === 3) // reached the limit of 10 - assert(numExecutorsPending(manager) === 10) + assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 0) - assert(numExecutorsPending(manager) === 10) + assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) // Register previously requested executors onExecutorAdded(manager, "first") - assert(numExecutorsPending(manager) === 9) + assert(numExecutorsTarget(manager) === 10) onExecutorAdded(manager, "second") onExecutorAdded(manager, "third") onExecutorAdded(manager, "fourth") - assert(numExecutorsPending(manager) === 6) + assert(numExecutorsTarget(manager) === 10) onExecutorAdded(manager, "first") // duplicates should not count onExecutorAdded(manager, "second") - assert(numExecutorsPending(manager) === 6) + assert(numExecutorsTarget(manager) === 10) // Try adding again // This should still fail because the number pending + running is still at the limit assert(addExecutors(manager) === 0) - assert(numExecutorsPending(manager) === 6) + assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 0) - assert(numExecutorsPending(manager) === 6) + assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) } @@ -136,49 +136,49 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) // Verify that we're capped at number of tasks in the stage - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 1) + assert(numExecutorsTarget(manager) === 1) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) - assert(numExecutorsPending(manager) === 3) + assert(numExecutorsTarget(manager) === 3) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 2) - assert(numExecutorsPending(manager) === 5) + assert(numExecutorsTarget(manager) === 5) assert(numExecutorsToAdd(manager) === 1) - // Verify that running a task reduces the cap + // Verify that running a task doesn't affect the target sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) sc.listenerBus.postToAll(SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) - assert(numExecutorsPending(manager) === 4) + assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 5) + assert(numExecutorsTarget(manager) === 6) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) - assert(numExecutorsPending(manager) === 7) + assert(numExecutorsTarget(manager) === 8) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 0) - assert(numExecutorsPending(manager) === 7) + assert(numExecutorsTarget(manager) === 8) assert(numExecutorsToAdd(manager) === 1) - // Verify that re-running a task doesn't reduce the cap further + // Verify that re-running a task doesn't blow things up sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3))) sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 8) + assert(numExecutorsTarget(manager) === 9) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 9) + assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) // Verify that running a task once we're at our limit doesn't blow things up sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) assert(addExecutors(manager) === 0) - assert(numExecutorsPending(manager) === 9) + assert(numExecutorsTarget(manager) === 10) } test("cancel pending executors when no longer needed") { @@ -186,13 +186,13 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit val manager = sc.executorAllocationManager.get sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5))) - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 1) + assert(numExecutorsTarget(manager) === 1) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) - assert(numExecutorsPending(manager) === 3) + assert(numExecutorsTarget(manager) === 3) val task1Info = createTaskInfo(0, 0, "executor-1") sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info)) @@ -286,7 +286,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit assert(executorIds(manager).size === 5) // Add until limit - assert(addExecutors(manager) === 5) // upper limit reached + assert(addExecutors(manager) === 3) // upper limit reached assert(addExecutors(manager) === 0) assert(!removeExecutor(manager, "3")) // still at lower limit assert(!removeExecutor(manager, "4")) @@ -303,15 +303,13 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit assert(removeExecutor(manager, "5")) assert(removeExecutor(manager, "6")) assert(executorIds(manager).size === 10) - assert(addExecutors(manager) === 1) + assert(addExecutors(manager) === 0) onExecutorRemoved(manager, "3") onExecutorRemoved(manager, "4") assert(executorIds(manager).size === 8) - // Add succeeds again, now that we are no longer at the upper limit - // Number of executors added restarts at 1 - assert(addExecutors(manager) === 2) - assert(addExecutors(manager) === 1) // upper limit reached + // Number of executors pending restarts at 1 + assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 0) assert(executorIds(manager).size === 8) onExecutorRemoved(manager, "5") @@ -323,6 +321,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit onExecutorAdded(manager, "15") onExecutorAdded(manager, "16") assert(executorIds(manager).size === 10) + assert(numExecutorsTarget(manager) === 10) } test("starting/canceling add timer") { @@ -411,22 +410,22 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit manager.setClock(clock) // No events - we should not be adding or removing - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) schedule(manager) - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) clock.advance(100L) schedule(manager) - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) clock.advance(1000L) schedule(manager) - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) clock.advance(10000L) schedule(manager) - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) } @@ -441,43 +440,43 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit onSchedulerBacklogged(manager) clock.advance(schedulerBacklogTimeout * 1000 / 2) schedule(manager) - assert(numExecutorsPending(manager) === 0) // timer not exceeded yet + assert(numExecutorsTarget(manager) === 0) // timer not exceeded yet clock.advance(schedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 1) // first timer exceeded + assert(numExecutorsTarget(manager) === 1) // first timer exceeded clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2) schedule(manager) - assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet + assert(numExecutorsTarget(manager) === 1) // second timer not exceeded yet clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded + assert(numExecutorsTarget(manager) === 1 + 2) // second timer exceeded clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded + assert(numExecutorsTarget(manager) === 1 + 2 + 4) // third timer exceeded // Scheduler queue drained onSchedulerQueueEmpty(manager) clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 7) // timer is canceled + assert(numExecutorsTarget(manager) === 7) // timer is canceled clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 7) + assert(numExecutorsTarget(manager) === 7) // Scheduler queue backlogged again onSchedulerBacklogged(manager) clock.advance(schedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 7 + 1) // timer restarted + assert(numExecutorsTarget(manager) === 7 + 1) // timer restarted clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 7 + 1 + 2) + assert(numExecutorsTarget(manager) === 7 + 1 + 2) clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4) + assert(numExecutorsTarget(manager) === 7 + 1 + 2 + 4) clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 20) // limit reached + assert(numExecutorsTarget(manager) === 20) // limit reached } test("mock polling loop remove behavior") { @@ -713,7 +712,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { * ------------------------------------------------------- */ private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd) - private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending) + private val _numExecutorsTarget = PrivateMethod[Int]('numExecutorsTarget) private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded) private val _executorsPendingToRemove = PrivateMethod[collection.Set[String]]('executorsPendingToRemove) @@ -735,8 +734,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { manager invokePrivate _numExecutorsToAdd() } - private def numExecutorsPending(manager: ExecutorAllocationManager): Int = { - manager invokePrivate _numExecutorsPending() + private def numExecutorsTarget(manager: ExecutorAllocationManager): Int = { + manager invokePrivate _numExecutorsTarget() } private def executorsPendingToRemove( From b7890fb51cea21d30956f870ab8828359191824d Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 29 Apr 2015 13:34:47 -0700 Subject: [PATCH 2/2] Avoid ramping up to an existing number of executors --- .../spark/ExecutorAllocationManager.scala | 21 +++-- .../ExecutorAllocationManagerSuite.scala | 78 +++++++++++++------ 2 files changed, 69 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 00156178999af..9d8fb100d59b5 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -114,7 +114,8 @@ private[spark] class ExecutorAllocationManager( // The desired number of executors at this moment in time. If all our executors were to die, this // is the number of executors we would immediately want from the cluster manager. - private var numExecutorsTarget = 0 + private var numExecutorsTarget = + conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) // Executors that have been requested to be removed but have not been killed yet private val executorsPendingToRemove = new mutable.HashSet[String] @@ -228,7 +229,7 @@ private[spark] class ExecutorAllocationManager( private def schedule(): Unit = synchronized { val now = clock.getTimeMillis - addOrCancelExecutorRequests(now) + updateAndSyncNumExecutorsTarget(now) removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime @@ -240,6 +241,8 @@ private[spark] class ExecutorAllocationManager( } /** + * Updates our target number of executors and syncs the result with the cluster manager. + * * Check to see whether our existing allocation and the requests we've made previously exceed our * current needs. If so, truncate our target and let the cluster manager know so that it can * cancel pending requests that are unneeded. @@ -249,7 +252,7 @@ private[spark] class ExecutorAllocationManager( * * @return the delta in the target number of executors. */ - private def addOrCancelExecutorRequests(now: Long): Int = synchronized { + private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized { val maxNeeded = maxNumExecutorsNeeded if (maxNeeded < numExecutorsTarget) { @@ -290,9 +293,17 @@ private[spark] class ExecutorAllocationManager( return 0 } - val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded) val oldNumExecutorsTarget = numExecutorsTarget - numExecutorsTarget = math.min(numExecutorsTarget + numExecutorsToAdd, actualMaxNumExecutors) + // There's no point in wasting time ramping up to the number of executors we already have, so + // make sure our target is at least as much as our current allocation: + numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size) + // Boost our target with the amount to add for this round: + numExecutorsTarget += numExecutorsToAdd + // Ensure that our target doesn't exceed what we need at the present moment: + numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded) + // Ensure that our target fits within configured bounds: + numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors) + val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget) if (addRequestAcknowledged) { val delta = numExecutorsTarget - oldNumExecutorsTarget diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index eece2676f5a17..49e6de4e0bafd 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -78,7 +78,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit test("starting state") { sc = createSparkContext() val manager = sc.executorAllocationManager.get - assert(numExecutorsTarget(manager) === 0) + assert(numExecutorsTarget(manager) === 1) assert(executorsPendingToRemove(manager).isEmpty) assert(executorIds(manager).isEmpty) assert(addTime(manager) === ExecutorAllocationManager.NOT_SET) @@ -91,18 +91,18 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Keep adding until the limit is reached - assert(numExecutorsTarget(manager) === 0) + assert(numExecutorsTarget(manager) === 1) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 1) - assert(numExecutorsTarget(manager) === 1) + assert(numExecutorsTarget(manager) === 2) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) - assert(numExecutorsTarget(manager) === 3) + assert(numExecutorsTarget(manager) === 4) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 4) - assert(numExecutorsTarget(manager) === 7) + assert(numExecutorsTarget(manager) === 8) assert(numExecutorsToAdd(manager) === 8) - assert(addExecutors(manager) === 3) // reached the limit of 10 + assert(addExecutors(manager) === 2) // reached the limit of 10 assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 0) @@ -131,7 +131,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit } test("add executors capped by num pending tasks") { - sc = createSparkContext(1, 10) + sc = createSparkContext(0, 10) val manager = sc.executorAllocationManager.get sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) @@ -182,7 +182,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit } test("cancel pending executors when no longer needed") { - sc = createSparkContext(1, 10) + sc = createSparkContext(0, 10) val manager = sc.executorAllocationManager.get sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5))) @@ -266,7 +266,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit // Add a few executors assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) - assert(addExecutors(manager) === 4) onExecutorAdded(manager, "1") onExecutorAdded(manager, "2") onExecutorAdded(manager, "3") @@ -274,52 +273,55 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit onExecutorAdded(manager, "5") onExecutorAdded(manager, "6") onExecutorAdded(manager, "7") - assert(executorIds(manager).size === 7) + onExecutorAdded(manager, "8") + assert(executorIds(manager).size === 8) // Remove until limit assert(removeExecutor(manager, "1")) assert(removeExecutor(manager, "2")) - assert(!removeExecutor(manager, "3")) // lower limit reached - assert(!removeExecutor(manager, "4")) + assert(removeExecutor(manager, "3")) + assert(!removeExecutor(manager, "4")) // lower limit reached + assert(!removeExecutor(manager, "5")) onExecutorRemoved(manager, "1") onExecutorRemoved(manager, "2") + onExecutorRemoved(manager, "3") assert(executorIds(manager).size === 5) // Add until limit - assert(addExecutors(manager) === 3) // upper limit reached + assert(addExecutors(manager) === 2) // upper limit reached assert(addExecutors(manager) === 0) - assert(!removeExecutor(manager, "3")) // still at lower limit - assert(!removeExecutor(manager, "4")) - onExecutorAdded(manager, "8") + assert(!removeExecutor(manager, "4")) // still at lower limit + assert(!removeExecutor(manager, "5")) onExecutorAdded(manager, "9") onExecutorAdded(manager, "10") onExecutorAdded(manager, "11") onExecutorAdded(manager, "12") + onExecutorAdded(manager, "13") assert(executorIds(manager).size === 10) // Remove succeeds again, now that we are no longer at the lower limit - assert(removeExecutor(manager, "3")) assert(removeExecutor(manager, "4")) assert(removeExecutor(manager, "5")) assert(removeExecutor(manager, "6")) + assert(removeExecutor(manager, "7")) assert(executorIds(manager).size === 10) assert(addExecutors(manager) === 0) - onExecutorRemoved(manager, "3") onExecutorRemoved(manager, "4") + onExecutorRemoved(manager, "5") assert(executorIds(manager).size === 8) // Number of executors pending restarts at 1 assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 0) assert(executorIds(manager).size === 8) - onExecutorRemoved(manager, "5") onExecutorRemoved(manager, "6") - onExecutorAdded(manager, "13") + onExecutorRemoved(manager, "7") onExecutorAdded(manager, "14") + onExecutorAdded(manager, "15") assert(executorIds(manager).size === 8) assert(addExecutors(manager) === 0) // still at upper limit - onExecutorAdded(manager, "15") onExecutorAdded(manager, "16") + onExecutorAdded(manager, "17") assert(executorIds(manager).size === 10) assert(numExecutorsTarget(manager) === 10) } @@ -404,7 +406,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit } test("mock polling loop with no events") { - sc = createSparkContext(1, 20) + sc = createSparkContext(0, 20) val manager = sc.executorAllocationManager.get val clock = new ManualClock(2020L) manager.setClock(clock) @@ -430,7 +432,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit } test("mock polling loop add behavior") { - sc = createSparkContext(1, 20) + sc = createSparkContext(0, 20) val clock = new ManualClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) @@ -670,6 +672,31 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit assert(!removeTimes(manager).contains("executor-1")) } + test("avoid ramp up when target < running executors") { + sc = createSparkContext(0, 100000) + val manager = sc.executorAllocationManager.get + val stage1 = createStageInfo(0, 1000) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + + assert(addExecutors(manager) === 1) + assert(addExecutors(manager) === 2) + assert(addExecutors(manager) === 4) + assert(addExecutors(manager) === 8) + assert(numExecutorsTarget(manager) === 15) + (0 until 15).foreach { i => + onExecutorAdded(manager, s"executor-$i") + } + assert(executorIds(manager).size === 15) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + + adjustRequestedExecutors(manager) + assert(numExecutorsTarget(manager) === 0) + + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000))) + addExecutors(manager) + assert(numExecutorsTarget(manager) === 16) + } + private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = { val conf = new SparkConf() .setMaster("local") @@ -721,7 +748,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes) private val _schedule = PrivateMethod[Unit]('schedule) private val _addExecutors = PrivateMethod[Int]('addExecutors) - private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests) + private val _updateAndSyncNumExecutorsTarget = + PrivateMethod[Int]('updateAndSyncNumExecutorsTarget) private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor) private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded) private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved) @@ -765,7 +793,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { } private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = { - manager invokePrivate _addOrCancelExecutorRequests(0L) + manager invokePrivate _updateAndSyncNumExecutorsTarget(0L) } private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {