Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ private[spark] class ExecutorAllocationManager(
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis

updateAndSyncNumExecutorsTarget(now)

val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
Expand All @@ -323,6 +321,8 @@ private[spark] class ExecutorAllocationManager(
}
!expired
}
// Update executor target number only after initializing flag is unset
updateAndSyncNumExecutorsTarget(now)
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,12 +936,7 @@ class ExecutorAllocationManagerSuite

assert(maxNumExecutorsNeeded(manager) === 0)
schedule(manager)
// Verify executor is timeout but numExecutorsTarget is not recalculated
assert(numExecutorsTarget(manager) === 3)

// Schedule again to recalculate the numExecutorsTarget after executor is timeout
schedule(manager)
// Verify that current number of executors should be ramp down when executor is timeout
// Verify executor is timeout,numExecutorsTarget is recalculated
assert(numExecutorsTarget(manager) === 2)
}

Expand Down Expand Up @@ -1148,6 +1143,25 @@ class ExecutorAllocationManagerSuite
verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false)
}

test("SPARK-26758 check executor target number after idle time out ") {
sc = createSparkContext(1, 5, 3)
val manager = sc.executorAllocationManager.get
val clock = new ManualClock(10000L)
manager.setClock(clock)
assert(numExecutorsTarget(manager) === 3)
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 2, Map.empty)))
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-3", new ExecutorInfo("host1", 3, Map.empty)))
// make all the executors as idle, so that it will be killed
clock.advance(executorIdleTimeout * 1000)
schedule(manager)
// once the schedule is run target executor number should be 1
assert(numExecutorsTarget(manager) === 1)
}

private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
Expand Down