Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1433,17 +1433,18 @@ class DAGScheduler(
val failedStage = stageIdToStage(task.stageId)
logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " +
"failed.")
val message = s"Stage failed because barrier task $task finished unsuccessfully. " +
val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" +
failure.toErrorString
try {
// cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, interruptThread = false)
// killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask.
val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) failed."
taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, reason)
} catch {
case e: UnsupportedOperationException =>
// Cannot continue with barrier stage if failed to cancel zombie barrier tasks.
// TODO SPARK-24877 leave the zombie tasks and ignore their completion events.
logWarning(s"Could not cancel tasks for stage $stageId", e)
abortStage(failedStage, "Could not cancel zombie barrier tasks for stage " +
logWarning(s"Could not kill all tasks for stage $stageId", e)
abortStage(failedStage, "Could not kill zombie barrier tasks for stage " +
s"$failedStage (${failedStage.name})", Some(e))
}
markStageAsFinished(failedStage, Some(message))
Expand All @@ -1457,7 +1458,8 @@ class DAGScheduler(

if (shouldAbortStage) {
val abortMessage = if (disallowStageRetryForTest) {
"Barrier stage will not retry stage due to testing config"
"Barrier stage will not retry stage due to testing config. Most recent failure " +
s"reason: $message"
} else {
s"""$failedStage (${failedStage.name})
|has failed the maximum allowable number of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ private[spark] trait TaskScheduler {
*/
def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean

// Kill all the running task attempts in a stage.
def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit

// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,22 @@ private[spark] class TaskSchedulerImpl(
}
}

override def killAllTaskAttempts(
stageId: Int,
interruptThread: Boolean,
reason: String): Unit = synchronized {
logInfo(s"Killing all running tasks in stage $stageId: $reason")
taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is some dup code and we dropped the useful comments from cancelTasks. It would be great if we move the common code here with comment and let cancelTasks call this method.

attempts.foreach { case (_, tsm) =>
tsm.runningTasksSet.foreach { tid =>
taskIdToExecutorId.get(tid).foreach { execId =>
backend.killTask(tid, execId, interruptThread, reason)
}
}
}
}
}

/**
* Called to indicate that all task attempts (including speculated tasks) associated with the
* given TaskSetManager have completed, so state associated with the TaskSetManager should be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
override def killTaskAttempt(
taskId: Long, interruptThread: Boolean, reason: String): Boolean = false
override def killAllTaskAttempts(
stageId: Int, interruptThread: Boolean, reason: String): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
Expand Down Expand Up @@ -629,6 +631,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
taskId: Long, interruptThread: Boolean, reason: String): Boolean = {
throw new UnsupportedOperationException
}
override def killAllTaskAttempts(
stageId: Int, interruptThread: Boolean, reason: String): Unit = {
throw new UnsupportedOperationException
}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
override def executorHeartbeatReceived(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ private class DummyTaskScheduler extends TaskScheduler {
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
override def killTaskAttempt(
taskId: Long, interruptThread: Boolean, reason: String): Boolean = false
override def killAllTaskAttempts(
stageId: Int, interruptThread: Boolean, reason: String): Unit = {}
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
override def defaultParallelism(): Int = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
Expand Down