Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5ad6efd
[SPARK-25250] : On successful completion of a task attempt on a parti…
Oct 23, 2018
8667c28
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Oct 23, 2018
a73f619
[SPARK-25250] : Calling maybeFinishTaskSet() from method and adding c…
Dec 28, 2018
5509165
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Dec 28, 2018
ee5bc68
[SPARK-25250] : Fixing scalastyle tests
Dec 28, 2018
7677aec
[SPARK-25250] : Addressing Reviews January 2, 2019
Jan 2, 2019
67e1644
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 8, 2019
f395b65
[SPARK-25250] : Addressing Reviews January 8, 2019
Jan 8, 2019
f7102ca
[SPARK-25250] : Addressing Reviews January 9, 2019
Jan 9, 2019
6709fe1
[SPARK-25250] : Addressing Reviews January 10, 2019
Jan 10, 2019
5234e87
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 10, 2019
9efbc58
[SPARK-25250] : Addressing Reviews January 15, 2019
Jan 15, 2019
6abd52c
[SPARK-25250] : Addressing Reviews January 15, 2019 - 2
Jan 15, 2019
89373af
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 16, 2019
231c51b
[SPARK-25250] : Addressing Reviews January 16, 2019
Jan 16, 2019
fcfe9f5
[SPARK-25250] : Addressing Reviews January 18, 2019
Jan 18, 2019
7ce6f10
[SPARK-25250] : Adding unit test
Jan 22, 2019
0610939
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 22, 2019
929fbf9
[SPARK-25250] : Addressing Reviews January 24, 2019
Jan 24, 2019
393f901
[SPARK-25250] : Fixing Unit Tests
Jan 25, 2019
52e832a
[SPARK-25250] : Addressing Reviews January 30, 2019
Jan 30, 2019
afbac96
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 30, 2019
024ec53
[SPARK-25250] : Fixing Scalastyle Checks
Jan 30, 2019
d2b7044
[SPARK-25250] : Addressing Minor Reviews January 30, 2019
Jan 30, 2019
d6ac4a9
[SPARK-25250] : Addressing Reviews January 31, 2019
Jan 31, 2019
e9b363b
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Feb 15, 2019
b55dbb0
[SPARK-25250] : Restructuring PR and trying out a different solution
Feb 18, 2019
551f412
[SPARK-25250] : Fixing indentation
Feb 18, 2019
28017ed
[SPARK-25250] : Addressing Reviews February 19, 2019
Feb 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-25250] : Addressing Reviews January 8, 2019
Refactoring method name to completeTasks, also calling same method from task completion in ShuffleMapStage but not killing them.
  • Loading branch information
pgandhi committed Jan 8, 2019
commit f395b6551732d67656676be9289f4436713c7ca6
Original file line number Diff line number Diff line change
Expand Up @@ -1384,8 +1384,7 @@ private[spark] class DAGScheduler(
if (!job.finished(rt.outputId)) {
job.finished(rt.outputId) = true
job.numFinished += 1
taskScheduler.markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
task.partitionId, task.stageId)
taskScheduler.completeTasks(task.partitionId, task.stageId, true)
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
Expand Down Expand Up @@ -1429,6 +1428,7 @@ private[spark] class DAGScheduler(
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
taskScheduler.completeTasks(task.partitionId, task.stageId, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not kill shuffle map task?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per @squito 's earlier comment, it was agreed that we kill tasks only for result stage( as an extension of SPARK-25773) in this PR. I am not sure whether we can kill ShuffleMapTasks as well here but I do not see any harm in doing so. @squito , WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry to be late to respond here, have been traveling. So this question has come up a lot, and while there are reasons to do it, there are some complications as well, and I don't think we should roll that change into this PR, which is trying to solve a different bug. In short, it has been argued in the past that a shuffle map task may still make useful progress on other tasks. There are also complications with handling tasks that dont' respond well to killing (I think hadoop input readers?) To be honest, I feel like there is a stronger argument in favor of doing the killing now, though we'd probably want it behind a conf. So I'd be a +1 for the change, just that it shoudl be separate. (And I'm probably not recalling all of the gotchas with killing tasks at the moment, so maybe with a dedicated discussion on this, we can dredge up all the cases we need to think through.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also complications with handling tasks that dont' respond well to killing (I think hadoop input readers?)

If that's the case, maybe we should not kill result task either, to be super safe.

if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ private[spark] trait TaskScheduler {
*/
def applicationAttemptId(): Option[String]

def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
partitionId: Int, stageId: Int): Unit
def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -288,20 +288,22 @@ private[spark] class TaskSchedulerImpl(

/**
* SPARK-25250: Whenever any Result Task gets successfully completed, we simply mark the
* corresponding partition id as completed in all attempts for that particular stage. As a
* result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up
* in the UI.
* corresponding partition id as completed in all attempts for that particular stage and
* additionally, for a Result Stage, we also kill the remaining task attempts running on the
* same partition. As a result, we do not see any Killed tasks due to
* TaskCommitDenied Exceptions showing up in the UI.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you shold revise the comments since we also call this method after ShuffleMapTask succeed now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
partitionId: Int, stageId: Int): Unit = {
override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = {
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
tsm.partitionToIndex.get(partitionId) match {
case Some(index) =>
tsm.markPartitionIdAsCompletedForTaskAttempt(index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we call markPartitionCompleted instead of creating this new method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this in mind when I was writing the code but a question here popped into my head, i.e. should we really update tasksSuccessful counter if we are killing those tasks? Wouldn't that be incorrect? Let me know your thoughts @cloud-fan @Ngone51 .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if tsm is the newest TaskSet, so, update taskSuccessful is necessary, so that it can have a chance to finish itself at this time. If not, I think it is not necessary, and even successful(index) = true is not. Because we've already send killing signals to those tasks, so zombie TaskSets will handle the failed task attempts at the end and finished normally.

(But you tell me tsm may not happen to be the newtest TaskSet above, so I'm not sure about it. )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable to me, but we should pick a better name to highlight this difference.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have renamed the method, please let me know if it still sounds wrong. Thank you.

val taskInfoList = tsm.taskAttempts(index)
taskInfoList.filter(_.running).foreach { taskInfo =>
killTaskAttempt(taskInfo.taskId, false,
s"Corresponding Partition ID $partitionId has been marked as Completed")
if (killTasks) {
val taskInfoList = tsm.taskAttempts(index)
taskInfoList.filter(_.running).foreach { taskInfo =>
killTaskAttempt(taskInfo.taskId, false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a try-catch here. Not all backends support killing task, and we should not fail if killTaskAttempt throws an exception. Killing task is just an optimization here, not a must-have.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have updated the code.

s"Corresponding Partition ID $partitionId has been marked as Completed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simplify it to Partition $partitionId is already completed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}

case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
partitionId: Int, stageId: Int): Unit = {}
override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = {}
}

/** Length of time to wait while draining listener events. */
Expand Down Expand Up @@ -669,8 +668,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
partitionId: Int, stageId: Int): Unit = {}
override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = {}
}
val noKillScheduler = new DAGScheduler(
sc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,5 @@ private class DummyTaskScheduler extends TaskScheduler {
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorMetrics: ExecutorMetrics): Boolean = true
override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
partitionId: Int, stageId: Int): Unit = {}
override def completeTasks(partitionId: Int, stageId: Int, killTasks: Boolean): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val offersSecondAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
taskScheduler.resourceOffers(offersSecondAttempt)

taskScheduler.markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(2, 0)
taskScheduler.completeTasks(2, 0, true)

val tsm1 = taskScheduler.taskSetManagerForAttempt(0, 1).get
val indexInTsm = tsm1.partitionToIndex(2)
Expand Down