diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f1a9aa353e76..4bf4929c1339 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -21,7 +21,7 @@ import java.net.URL import java.nio.ByteBuffer import java.util.Locale import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.util.{Failure, Success} import scala.util.control.NonFatal @@ -77,6 +77,10 @@ private[spark] class CoarseGrainedExecutorBackend( private var decommissioned = false + // Track the last time in ns that at least one task is running. If no task is running and all + // shuffle/RDD data migration are done, the decommissioned executor should exit. + private var lastTaskFinishTime = new AtomicLong(System.nanoTime()) + override def onStart(): Unit = { if (env.conf.get(DECOMMISSION_ENABLED)) { val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL) @@ -273,6 +277,7 @@ private[spark] class CoarseGrainedExecutorBackend( val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources) if (TaskState.isFinished(state)) { taskResources.remove(taskId) + lastTaskFinishTime.set(System.nanoTime()) } driver match { case Some(driverRef) => driverRef.send(msg) @@ -345,7 +350,6 @@ private[spark] class CoarseGrainedExecutorBackend( val shutdownThread = new Thread("wait-for-blocks-to-migrate") { override def run(): Unit = { - var lastTaskRunningTime = System.nanoTime() val sleep_time = 1000 // 1s // This config is internal and only used by unit tests to force an executor // to hang around for longer when decommissioned. @@ -362,7 +366,7 @@ private[spark] class CoarseGrainedExecutorBackend( val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() // We can only trust allBlocksMigrated boolean value if there were no tasks running // since the start of computing it. - if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) { + if (allBlocksMigrated && (migrationTime > lastTaskFinishTime.get())) { logInfo("No running tasks, all blocks migrated, stopping.") exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true) } else { @@ -374,12 +378,6 @@ private[spark] class CoarseGrainedExecutorBackend( } } else { logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks") - // If there is a running task it could store blocks, so make sure we wait for a - // migration loop to complete after the last task is done. - // Note: this is only advanced if there is a running task, if there - // is no running task but the blocks are not done migrating this does not - // move forward. - lastTaskRunningTime = System.nanoTime() } Thread.sleep(sleep_time) }