Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix
  • Loading branch information
jiangxb1987 committed Dec 2, 2023
commit f31420de5ae8f212e263628bf1f80032e9fe00d2
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ private[spark] class CoarseGrainedExecutorBackend(

private var decommissioned = false

// Track the last time in ns that at least one task is running. If no task is running and all
// shuffle/RDD data migration are done, the decommissioned executor should exit.
private var lastTaskFinishTime = System.nanoTime()

override def onStart(): Unit = {
if (env.conf.get(DECOMMISSION_ENABLED)) {
val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL)
Expand Down Expand Up @@ -273,6 +277,7 @@ private[spark] class CoarseGrainedExecutorBackend(
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
if (TaskState.isFinished(state)) {
taskResources.remove(taskId)
lastTaskFinishTime = System.nanoTime()
}
driver match {
case Some(driverRef) => driverRef.send(msg)
Expand Down Expand Up @@ -345,7 +350,6 @@ private[spark] class CoarseGrainedExecutorBackend(

val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
override def run(): Unit = {
var lastTaskRunningTime = System.nanoTime()
val sleep_time = 1000 // 1s
// This config is internal and only used by unit tests to force an executor
// to hang around for longer when decommissioned.
Expand All @@ -362,7 +366,7 @@ private[spark] class CoarseGrainedExecutorBackend(
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
// We can only trust allBlocksMigrated boolean value if there were no tasks running
// since the start of computing it.
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
if (allBlocksMigrated && (migrationTime > lastTaskFinishTime)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting queried in a different thread - so needs to be thread safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! updated...

logInfo("No running tasks, all blocks migrated, stopping.")
exitExecutor(0, ExecutorLossMessage.decommissionFinished, notifyDriver = true)
} else {
Expand All @@ -374,12 +378,6 @@ private[spark] class CoarseGrainedExecutorBackend(
}
} else {
logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} running tasks")
// If there is a running task it could store blocks, so make sure we wait for a
// migration loop to complete after the last task is done.
// Note: this is only advanced if there is a running task, if there
// is no running task but the blocks are not done migrating this does not
// move forward.
lastTaskRunningTime = System.nanoTime()
}
Thread.sleep(sleep_time)
}
Expand Down