From f31420de5ae8f212e263628bf1f80032e9fe00d2 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Wed, 29 Nov 2023 19:14:12 -0800 Subject: [PATCH 1/2] fix --- .../executor/CoarseGrainedExecutorBackend.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f1a9aa353e76..4eb1a277dfc7 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -77,6 +77,10 @@ private[spark] class CoarseGrainedExecutorBackend( private var decommissioned = false + // Track the last time in ns that at least one task is running. If no task is running and all + // shuffle/RDD data migration are done, the decommissioned executor should exit. + private var lastTaskFinishTime = System.nanoTime() + override def onStart(): Unit = { if (env.conf.get(DECOMMISSION_ENABLED)) { val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL) @@ -273,6 +277,7 @@ private[spark] class CoarseGrainedExecutorBackend( val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources) if (TaskState.isFinished(state)) { taskResources.remove(taskId) + lastTaskFinishTime = System.nanoTime() } driver match { case Some(driverRef) => driverRef.send(msg) @@ -345,7 +350,6 @@ private[spark] class CoarseGrainedExecutorBackend( val shutdownThread = new Thread("wait-for-blocks-to-migrate") { override def run(): Unit = { - var lastTaskRunningTime = System.nanoTime() val sleep_time = 1000 // 1s // This config is internal and only used by unit tests to force an executor // to hang around for longer when decommissioned. @@ -362,7 +366,7 @@ private[spark] class CoarseGrainedExecutorBackend( val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() // We can only trust allBlocksMigrated boolean value if there were no tasks running // since the start of computing it. - if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) { + if (allBlocksMigrated && (migrationTime > lastTaskFinishTime)) { logInfo("No running tasks, all blocks migrated, stopping.") exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true) } else { @@ -374,12 +378,6 @@ private[spark] class CoarseGrainedExecutorBackend( } } else { logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks") - // If there is a running task it could store blocks, so make sure we wait for a - // migration loop to complete after the last task is done. - // Note: this is only advanced if there is a running task, if there - // is no running task but the blocks are not done migrating this does not - // move forward. - lastTaskRunningTime = System.nanoTime() } Thread.sleep(sleep_time) } From f8aaaca8b9544142d3b22919c1338cc82b820b2b Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Fri, 1 Dec 2023 16:11:48 -0800 Subject: [PATCH 2/2] update --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 4eb1a277dfc7..4bf4929c1339 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -21,7 +21,7 @@ import java.net.URL import java.nio.ByteBuffer import java.util.Locale import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.util.{Failure, Success} import scala.util.control.NonFatal @@ -79,7 +79,7 @@ private[spark] class CoarseGrainedExecutorBackend( // Track the last time in ns that at least one task is running. If no task is running and all // shuffle/RDD data migration are done, the decommissioned executor should exit. - private var lastTaskFinishTime = System.nanoTime() + private var lastTaskFinishTime = new AtomicLong(System.nanoTime()) override def onStart(): Unit = { if (env.conf.get(DECOMMISSION_ENABLED)) { @@ -277,7 +277,7 @@ private[spark] class CoarseGrainedExecutorBackend( val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources) if (TaskState.isFinished(state)) { taskResources.remove(taskId) - lastTaskFinishTime = System.nanoTime() + lastTaskFinishTime.set(System.nanoTime()) } driver match { case Some(driverRef) => driverRef.send(msg) @@ -366,7 +366,7 @@ private[spark] class CoarseGrainedExecutorBackend( val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() // We can only trust allBlocksMigrated boolean value if there were no tasks running // since the start of computing it. - if (allBlocksMigrated && (migrationTime > lastTaskFinishTime)) { + if (allBlocksMigrated && (migrationTime > lastTaskFinishTime.get())) { logInfo("No running tasks, all blocks migrated, stopping.") exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true) } else {