Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.net.URL
import java.nio.ByteBuffer
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.{Failure, Success}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -77,6 +77,10 @@ private[spark] class CoarseGrainedExecutorBackend(

private var decommissioned = false

// Track the last time in ns that at least one task is running. If no task is running and all
// shuffle/RDD data migration are done, the decommissioned executor should exit.
private var lastTaskFinishTime = new AtomicLong(System.nanoTime())

override def onStart(): Unit = {
if (env.conf.get(DECOMMISSION_ENABLED)) {
val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL)
Expand Down Expand Up @@ -273,6 +277,7 @@ private[spark] class CoarseGrainedExecutorBackend(
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
if (TaskState.isFinished(state)) {
taskResources.remove(taskId)
lastTaskFinishTime.set(System.nanoTime())
}
driver match {
case Some(driverRef) => driverRef.send(msg)
Expand Down Expand Up @@ -345,7 +350,6 @@ private[spark] class CoarseGrainedExecutorBackend(

val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
override def run(): Unit = {
var lastTaskRunningTime = System.nanoTime()
val sleep_time = 1000 // 1s
// This config is internal and only used by unit tests to force an executor
// to hang around for longer when decommissioned.
Expand All @@ -362,7 +366,7 @@ private[spark] class CoarseGrainedExecutorBackend(
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
// We can only trust allBlocksMigrated boolean value if there were no tasks running
// since the start of computing it.
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
if (allBlocksMigrated && (migrationTime > lastTaskFinishTime.get())) {
logInfo("No running tasks, all blocks migrated, stopping.")
exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
} else {
Expand All @@ -374,12 +378,6 @@ private[spark] class CoarseGrainedExecutorBackend(
}
} else {
logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks")
// If there is a running task it could store blocks, so make sure we wait for a
// migration loop to complete after the last task is done.
// Note: this is only advanced if there is a running task, if there
// is no running task but the blocks are not done migrating this does not
// move forward.
lastTaskRunningTime = System.nanoTime()
}
Thread.sleep(sleep_time)
}
Expand Down