-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25250][CORE] : Late zombie task completions handled correctly even before new taskset launched #22806
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
5ad6efd
8667c28
a73f619
5509165
ee5bc68
7677aec
67e1644
f395b65
f7102ca
6709fe1
5234e87
9efbc58
6abd52c
89373af
231c51b
fcfe9f5
7ce6f10
0610939
929fbf9
393f901
52e832a
afbac96
024ec53
d2b7044
d6ac4a9
e9b363b
b55dbb0
551f412
28017ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -133,6 +133,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi | |
| /** Stages for which the DAGScheduler has called TaskScheduler.cancelTasks(). */ | ||
| val cancelledStages = new HashSet[Int]() | ||
|
|
||
| val completedPartitions = new HashMap[Int, HashSet[Int]]() | ||
|
|
||
| val taskScheduler = new TaskScheduler() { | ||
| override def schedulingMode: SchedulingMode = SchedulingMode.FIFO | ||
| override def rootPool: Pool = new Pool("", schedulingMode, 0, 0) | ||
|
|
@@ -160,7 +162,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi | |
| override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} | ||
| override def workerRemoved(workerId: String, host: String, message: String): Unit = {} | ||
| override def applicationAttemptId(): Option[String] = None | ||
| override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = {} | ||
| // Since, the method completeTasks in TaskSchedulerImpl.scala marks the partition complete | ||
| // for all stage attempts in the particular stage id, it does not need any info about | ||
| // stageAttemptId. Hence, completed partition id's are stored only for stage id's to mock | ||
| // the method implementation here. | ||
| override def completeTasks(partitionId: Int, stageId: Int, taskInfo: TaskInfo): Unit = { | ||
| val partitionIds = completedPartitions.getOrElseUpdate(stageId, new HashSet[Int]) | ||
| partitionIds.add(partitionId) | ||
| completedPartitions.put(stageId, partitionIds) | ||
| } | ||
| } | ||
|
|
||
| /** Length of time to wait while draining listener events. */ | ||
|
|
@@ -249,6 +259,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi | |
| cancelledStages.clear() | ||
| cacheLocations.clear() | ||
| results.clear() | ||
| completedPartitions.clear() | ||
| securityMgr = new SecurityManager(conf) | ||
| broadcastManager = new BroadcastManager(true, conf, securityMgr) | ||
| mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) { | ||
|
|
@@ -2851,6 +2862,40 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi | |
| } | ||
| } | ||
|
|
||
| test("SPARK-25250: Late zombie task completions handled correctly even before" + | ||
| " new taskset launched") { | ||
| val shuffleMapRdd = new MyRDD(sc, 4, Nil) | ||
| val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(4)) | ||
| val reduceRdd = new MyRDD(sc, 4, List(shuffleDep), tracker = mapOutputTracker) | ||
| submit(reduceRdd, Array(0, 1, 2, 3)) | ||
|
|
||
| completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = 4) | ||
|
|
||
| // Fail Stage 1 Attempt 0 with Fetch Failure | ||
| runEvent(makeCompletionEvent( | ||
| taskSets(1).tasks(0), | ||
| FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, 0, "ignored"), | ||
| null)) | ||
|
|
||
| // this will trigger a resubmission of stage 0, since we've lost some of its | ||
| // map output, for the next iteration through the loop | ||
| scheduler.resubmitFailedStages() | ||
| completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = 4) | ||
|
|
||
| runEvent(makeCompletionEvent( | ||
| taskSets(1).tasks(3), Success, Nil, Nil)) | ||
| assert(completedPartitions.get(taskSets(3).stageId).get.contains( | ||
|
||
| taskSets(3).tasks(1).partitionId) == false, "Corresponding partition id for" + | ||
| " stage 1 attempt 1 is not complete yet") | ||
|
||
|
|
||
| // this will mark partition id 1 of stage 1 attempt 0 as complete. So we expect the status | ||
| // of that partition id to be reflected for stage 1 attempt 1 as well. | ||
| runEvent(makeCompletionEvent( | ||
| taskSets(1).tasks(1), Success, Nil, Nil)) | ||
| assert(completedPartitions.get(taskSets(3).stageId).get.contains( | ||
| taskSets(3).tasks(1).partitionId) == true) | ||
|
||
| } | ||
|
|
||
| /** | ||
| * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. | ||
| * Note that this checks only the host and not the executor ID. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need the last
put, you're updating a mutable hashset.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done