Skip to content

Commit 785ae61

Browse files
sandeep-kattaJackey Lee
authored andcommitted
[SPARK-26758][CORE] Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value
## What changes were proposed in this pull request? **updateAndSyncNumExecutorsTarget** API should be called after **initializing** flag is unset ## How was this patch tested? Added UT and also manually tested After Fix ![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png) Closes apache#23697 from sandeep-katta/executorIssue. Authored-by: sandeep-katta <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 55277cc commit 785ae61

File tree

2 files changed

+22
-8
lines changed

2 files changed

+22
-8
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,6 @@ private[spark] class ExecutorAllocationManager(
312312
private def schedule(): Unit = synchronized {
313313
val now = clock.getTimeMillis
314314

315-
updateAndSyncNumExecutorsTarget(now)
316-
317315
val executorIdsToBeRemoved = ArrayBuffer[String]()
318316
removeTimes.retain { case (executorId, expireTime) =>
319317
val expired = now >= expireTime
@@ -323,6 +321,8 @@ private[spark] class ExecutorAllocationManager(
323321
}
324322
!expired
325323
}
324+
// Update executor target number only after initializing flag is unset
325+
updateAndSyncNumExecutorsTarget(now)
326326
if (executorIdsToBeRemoved.nonEmpty) {
327327
removeExecutors(executorIdsToBeRemoved)
328328
}

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -936,12 +936,7 @@ class ExecutorAllocationManagerSuite
936936

937937
assert(maxNumExecutorsNeeded(manager) === 0)
938938
schedule(manager)
939-
// Verify executor is timeout but numExecutorsTarget is not recalculated
940-
assert(numExecutorsTarget(manager) === 3)
941-
942-
// Schedule again to recalculate the numExecutorsTarget after executor is timeout
943-
schedule(manager)
944-
// Verify that current number of executors should be ramp down when executor is timeout
939+
// Verify executor is timeout,numExecutorsTarget is recalculated
945940
assert(numExecutorsTarget(manager) === 2)
946941
}
947942

@@ -1148,6 +1143,25 @@ class ExecutorAllocationManagerSuite
11481143
verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false)
11491144
}
11501145

1146+
test("SPARK-26758 check executor target number after idle time out ") {
1147+
sc = createSparkContext(1, 5, 3)
1148+
val manager = sc.executorAllocationManager.get
1149+
val clock = new ManualClock(10000L)
1150+
manager.setClock(clock)
1151+
assert(numExecutorsTarget(manager) === 3)
1152+
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
1153+
clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
1154+
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
1155+
clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 2, Map.empty)))
1156+
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
1157+
clock.getTimeMillis(), "executor-3", new ExecutorInfo("host1", 3, Map.empty)))
1158+
// make all the executors as idle, so that it will be killed
1159+
clock.advance(executorIdleTimeout * 1000)
1160+
schedule(manager)
1161+
// once the schedule is run target executor number should be 1
1162+
assert(numExecutorsTarget(manager) === 1)
1163+
}
1164+
11511165
private def createSparkContext(
11521166
minExecutors: Int = 1,
11531167
maxExecutors: Int = 5,

0 commit comments

Comments
 (0)