Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5ad6efd
[SPARK-25250] : On successful completion of a task attempt on a parti…
Oct 23, 2018
8667c28
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Oct 23, 2018
a73f619
[SPARK-25250] : Calling maybeFinishTaskSet() from method and adding c…
Dec 28, 2018
5509165
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Dec 28, 2018
ee5bc68
[SPARK-25250] : Fixing scalastyle tests
Dec 28, 2018
7677aec
[SPARK-25250] : Addressing Reviews January 2, 2019
Jan 2, 2019
67e1644
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 8, 2019
f395b65
[SPARK-25250] : Addressing Reviews January 8, 2019
Jan 8, 2019
f7102ca
[SPARK-25250] : Addressing Reviews January 9, 2019
Jan 9, 2019
6709fe1
[SPARK-25250] : Addressing Reviews January 10, 2019
Jan 10, 2019
5234e87
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 10, 2019
9efbc58
[SPARK-25250] : Addressing Reviews January 15, 2019
Jan 15, 2019
6abd52c
[SPARK-25250] : Addressing Reviews January 15, 2019 - 2
Jan 15, 2019
89373af
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 16, 2019
231c51b
[SPARK-25250] : Addressing Reviews January 16, 2019
Jan 16, 2019
fcfe9f5
[SPARK-25250] : Addressing Reviews January 18, 2019
Jan 18, 2019
7ce6f10
[SPARK-25250] : Adding unit test
Jan 22, 2019
0610939
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 22, 2019
929fbf9
[SPARK-25250] : Addressing Reviews January 24, 2019
Jan 24, 2019
393f901
[SPARK-25250] : Fixing Unit Tests
Jan 25, 2019
52e832a
[SPARK-25250] : Addressing Reviews January 30, 2019
Jan 30, 2019
afbac96
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 30, 2019
024ec53
[SPARK-25250] : Fixing Scalastyle Checks
Jan 30, 2019
d2b7044
[SPARK-25250] : Addressing Minor Reviews January 30, 2019
Jan 30, 2019
d6ac4a9
[SPARK-25250] : Addressing Reviews January 31, 2019
Jan 31, 2019
e9b363b
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Feb 15, 2019
b55dbb0
[SPARK-25250] : Restructuring PR and trying out a different solution
Feb 18, 2019
551f412
[SPARK-25250] : Fixing indentation
Feb 18, 2019
28017ed
[SPARK-25250] : Addressing Reviews February 19, 2019
Feb 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,8 @@ private[spark] class DAGScheduler(
if (!job.finished(rt.outputId)) {
job.finished(rt.outputId) = true
job.numFinished += 1
taskScheduler.markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
Copy link
Member

@Ngone51 Ngone51 Jan 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this require the newest TaskSet has been created in TaskSetScheduler at this point ? If so, can we make sure of that ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this method does not require that the newest TaskSet be created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not, how does it know about the completed partition ? Or say, is it possible this happens druing the time we have put the stage into the resubmit queue but haven't submited to TaskScheduler(so there's no newest TaskSet) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see what you are saying. Yes, this method assumes that the newest TaskSet has been created. Yes, indeed a tiny possibility exists that when this method gets called, possibly the new TaskSet might have not been added to taskSetsByStageIdAndAttempt. However, when I wrote the code, my assumption was that there is always a small delay for the task completion event to propagate to the DAGScheduler. I have tested this code by reproducing FetchFailures multiple times and when this method is called, the new TaskSet is always present so there has not yet been an instance when the race condition described above has occurred, while before this fix, I was able to reproduce the bug like 4 out of 5 times. Still, a valid point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree this could be a extremely rare case and your work has proved that this pr is really a effective fix.

And as I mentioned above, if we could record this completed partition in TaskScheduler at this point and telling others TaskSets about the completed patitions once they're created in TaskScheduler, we may avoid the potential issue totally. But, I think we don't need to do this right now since your fix has already proved to be effective. Instead, how about leaving some comments to explain the potential issue ? Thus, we may easily fix it once we really hit it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have updated the comment. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems it's no harm to do it for shuffle task as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have updated the code.

task.partitionId, task.stageId)
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,7 @@ private[spark] trait TaskScheduler {
*/
def applicationAttemptId(): Option[String]

def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have submitTasks, cancelTasks, killTaskAttempt etc. in this class, how about renaming this method to completeTasks? And add comment to explain what this method does.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

partitionId: Int, stageId: Int): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,29 @@ private[spark] class TaskSchedulerImpl(
}
}

/**
* SPARK-25250: Whenever any Result Task gets successfully completed, we simply mark the
* corresponding partition id as completed in all attempts for that particular stage. As a
* result, we do not see any Killed tasks due to TaskCommitDenied Exceptions showing up
* in the UI.
*/
override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this logic overlap with killAllTaskAttempts? should it reuse that logic? I understand it does something a little different, and I don't know this code well, but seems like there are related but separate implementations of something similar here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand the code, killAllTaskAttempts kills all the running tasks for a particular stage whereas markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts kills all running tasks for all stages and attempts working on a particular partition that has already been marked as completed by one of the previously running tasks for that corresponding partition. So the logic is different for both the cases, but we can modify the code to have one fixed method for performing both these tasks. Let me know what you think!

partitionId: Int, stageId: Int): Unit = {
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
val index: Option[Int] = tsm.partitionToIndex.get(partitionId)
if (!index.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it's more usual to match on the Option and use case Some(...), or use foreach. It avoids a couple lines of code here.
Same with the getOrElse above, I guess.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have updated the code.

tsm.markPartitionIdAsCompletedForTaskAttempt(index.get)
val taskInfoList = tsm.taskAttempts(index.get)
taskInfoList.foreach { taskInfo =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider taskInfoList.filter(_.running).foreach or for (taskInfo <- taskInfoList if taskInfo.running)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (taskInfo.running) {
killTaskAttempt(taskInfo.taskId, false, "Corresponding Partition Id " + partitionId +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Id -> ID and use string interpolation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

" has been marked as Completed")
}
}
}
}
}

/**
* Called to indicate that all task attempts (including speculated tasks) associated with the
* given TaskSetManager have completed, so state associated with the TaskSetManager should be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,11 @@ private[spark] class TaskSetManager(
def executorAdded() {
recomputeLocality()
}

def markPartitionIdAsCompletedForTaskAttempt(index: Int): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think we need to increase tasksSuccessful and mark this TaskSetManager as zombie if its successful tasks num has reached numTasks. You can refer to markPartitionCompleted() for details.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have replied to your question in the below comment. Let me know your thoughts. Thank you.

successful(index) = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this method also make a call to TaskSetManager.maybeFinishTaskSet()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, have made the necessary changes. Thank you.

maybeFinishTaskSet()
}
}

private[spark] object TaskSetManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
partitionId: Int, stageId: Int): Unit = {}
}

/** Length of time to wait while draining listener events. */
Expand Down Expand Up @@ -667,6 +669,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
override def applicationAttemptId(): Option[String] = None
override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
partitionId: Int, stageId: Int): Unit = {}
}
val noKillScheduler = new DAGScheduler(
sc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,6 @@ private class DummyTaskScheduler extends TaskScheduler {
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId,
executorMetrics: ExecutorMetrics): Boolean = true
override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
partitionId: Int, stageId: Int): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import java.nio.ByteBuffer
import java.util.HashSet

import scala.collection.mutable.HashMap
import scala.concurrent.duration._
Expand All @@ -39,6 +40,14 @@ class FakeSchedulerBackend extends SchedulerBackend {
def reviveOffers() {}
def defaultParallelism(): Int = 1
def maxNumConcurrentTasks(): Int = 0
val killedTaskIds: HashSet[Long] = new HashSet[Long]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: why not a Scala mutable Set?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

override def killTask(
taskId: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 space indentation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

executorId: String,
interruptThread: Boolean,
reason: String): Unit = {
killedTaskIds.add(taskId)
}
}

class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach
Expand Down Expand Up @@ -1319,4 +1328,26 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, TaskKilled("test"))
assert(tsm.isZombie)
}
test("SPARK-25250 On successful completion of a task attempt on a partition id, kill other" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: blank line above, and please use spaces in the body of the test to break this up into more readable chunks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test isn't testing the important case that was missed before, and I think for now we don't want to kill tasks as part of this change. OTOH, we do need to take the "Completions in zombie tasksets update status of non-zombie taskset" test and move it to a test in DAGSchedulerSuite.

" running task attempts on that same partition") {
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
val firstAttempt = FakeTask.createTaskSet(10, stageAttemptId = 0)
taskScheduler.submitTasks(firstAttempt)
val offersFirstAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
taskScheduler.resourceOffers(offersFirstAttempt)
val tsm0 = taskScheduler.taskSetManagerForAttempt(0, 0).get
val matchingTaskInfoFirstAttempt = tsm0.taskAttempts(0).head
tsm0.handleFailedTask(matchingTaskInfoFirstAttempt.taskId, TaskState.FAILED,
FetchFailed(null, 0, 0, 0, "fetch failed"))
val secondAttempt = FakeTask.createTaskSet(10, stageAttemptId = 1)
taskScheduler.submitTasks(secondAttempt)
val offersSecondAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
taskScheduler.resourceOffers(offersSecondAttempt)
taskScheduler.markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(2, 0)
val tsm1 = taskScheduler.taskSetManagerForAttempt(0, 1).get
val indexInTsm = tsm1.partitionToIndex(2)
val matchingTaskInfoSecondAttempt = tsm1.taskAttempts.flatten.filter(_.index == indexInTsm).head
assert(taskScheduler.backend.asInstanceOf[FakeSchedulerBackend].killedTaskIds.contains(
matchingTaskInfoSecondAttempt.taskId))
}
}