Skip to content

Commit c35427f

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-30355][CORE] Unify isExecutorActive between CoarseGrainedSchedulerBackend and DriverEndpoint
### What changes were proposed in this pull request? Unify `DriverEndpoint. executorIsAlive()` and `CoarseGrainedSchedulerBackend .isExecutorActive()`. ### Why are the changes needed? `DriverEndPoint` has method `executorIsAlive()` to check wether an executor is alive/active, while `CoarseGrainedSchedulerBackend` has method `isExecutorActive()` to do the same work. But, `isExecutorActive()` seems forget to consider `executorsPendingLossReason`. Unify these two methods makes behavior be consistent between `DriverEndPoint` and `CoarseGrainedSchedulerBackend` and make code more easier to maintain. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass Jenkins. Closes apache#27012 from Ngone51/unify-is-executor-alive. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 9c046dc commit c35427f

File tree

1 file changed

+9
-12
lines changed

1 file changed

+9
-12
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
9999
@GuardedBy("CoarseGrainedSchedulerBackend.this")
100100
private val executorsPendingToRemove = new HashMap[String, Boolean]
101101

102+
// Executors that have been lost, but for which we don't yet know the real exit reason.
103+
private val executorsPendingLossReason = new HashSet[String]
104+
102105
// A map to store hostname with its possible task number running on it
103106
@GuardedBy("CoarseGrainedSchedulerBackend.this")
104107
protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
@@ -123,9 +126,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
123126

124127
override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv
125128

126-
// Executors that have been lost, but for which we don't yet know the real exit reason.
127-
protected val executorsPendingLossReason = new HashSet[String]
128-
129129
protected val addressToExecutorId = new HashMap[RpcAddress, String]
130130

131131
// Spark configuration sent to executors. This is a lazy val so that subclasses of the
@@ -285,7 +285,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
285285
// Make sure no executor is killed while some task is launching on it
286286
val taskDescs = withLock {
287287
// Filter out executors under killing
288-
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
288+
val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
289289
val workOffers = activeExecutors.map {
290290
case (id, executorData) =>
291291
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
@@ -314,7 +314,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
314314
// Make sure no executor is killed while some task is launching on it
315315
val taskDescs = withLock {
316316
// Filter out executors under killing
317-
if (executorIsAlive(executorId)) {
317+
if (isExecutorActive(executorId)) {
318318
val executorData = executorDataMap(executorId)
319319
val workOffers = IndexedSeq(
320320
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores,
@@ -332,11 +332,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
332332
}
333333
}
334334

335-
private def executorIsAlive(executorId: String): Boolean = synchronized {
336-
!executorsPendingToRemove.contains(executorId) &&
337-
!executorsPendingLossReason.contains(executorId)
338-
}
339-
340335
// Launch tasks returned by a set of resource offers
341336
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
342337
for (task <- tasks.flatten) {
@@ -415,7 +410,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
415410
*/
416411
protected def disableExecutor(executorId: String): Boolean = {
417412
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
418-
if (executorIsAlive(executorId)) {
413+
if (isExecutorActive(executorId)) {
419414
executorsPendingLossReason += executorId
420415
true
421416
} else {
@@ -560,7 +555,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
560555
}
561556

562557
override def isExecutorActive(id: String): Boolean = synchronized {
563-
executorDataMap.contains(id) && !executorsPendingToRemove.contains(id)
558+
executorDataMap.contains(id) &&
559+
!executorsPendingToRemove.contains(id) &&
560+
!executorsPendingLossReason.contains(id)
564561
}
565562

566563
override def maxNumConcurrentTasks(): Int = synchronized {

0 commit comments

Comments
 (0)