From 641936f6ac61c21dbe20049e4745e61c2ed41ed9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Thu, 15 Jan 2015 11:52:10 +0800 Subject: [PATCH 01/17] Make sure mapStage.pendingtasks is set() while MapStage.isAvailable is true while stage was retry --- .../org/apache/spark/scheduler/Task.scala | 15 ++++++ .../spark/scheduler/TaskSetManager.scala | 5 +- .../spark/scheduler/DAGSchedulerSuite.scala | 46 +++++++++++++++++++ 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 76a19aeac467..d9ff87778283 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -136,6 +136,21 @@ private[spark] abstract class Task[T]( taskThread.interrupt() } } + + override def hashCode(): Int = { + val state = Seq(stageId, partitionId) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } + + def canEqual(other: Any): Boolean = other.isInstanceOf[Task[T]] + + override def equals(other: Any): Boolean = other match { + case that: Task[_] => + (that canEqual this) && + stageId == that.stageId && + partitionId == that.partitionId + case _ => false + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 82455b0426a5..9c5d0f3fb6b6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -487,8 +487,9 @@ private[spark] class TaskSetManager( // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" - logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format( - taskName, taskId, host, taskLocality, serializedTask.limit)) + logInfo("Starting %s (TID %d, %s, %d, %s, %d bytes)".format( + taskName, taskId, host, + task.partitionId, taskLocality, serializedTask.limit)) sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 86728cb2b62a..b5427fb817c6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -739,6 +739,52 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + test("Make sure mapStage.pendingtasks is set() " + + "while MapStage.isAvailable is true while stage was retry ") { + val firstRDD = new MyRDD(sc, 6, Nil) + val firstShuffleDep = new ShuffleDependency(firstRDD, null) + val firstShuyffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduceRdd, Array(0, 1)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostB", 1)), + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostC", 3)), + (Success, makeMapStatus("hostB", 4)), + (Success, makeMapStatus("hostB", 5)), + (Success, makeMapStatus("hostC", 6)) + )) + complete(taskSets(1), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostA", 1)) + )) + runEvent(ExecutorLost("exec-hostA")) + runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, null, null)) + runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, null, null)) + runEvent(CompletionEvent(taskSets(1).tasks(0), + FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"), + null, null, null, null)) + scheduler.resubmitFailedStages() + runEvent(CompletionEvent(taskSets(1).tasks(0), Success, + makeMapStatus("hostC", 1), null, null, null)) + runEvent(CompletionEvent(taskSets(1).tasks(2), Success, + makeMapStatus("hostC", 1), null, null, null)) + runEvent(CompletionEvent(taskSets(1).tasks(4), Success, + makeMapStatus("hostC", 1), null, null, null)) + runEvent(CompletionEvent(taskSets(1).tasks(5), Success, + makeMapStatus("hostB", 2), null, null, null)) + val stage = scheduler.stageIdToStage(taskSets(1).stageId) + assert(stage.attemptId == 2) + assert(stage.isAvailable) + assert(stage.pendingTasks.size == 0) + } + /** * Makes sure that failures of stage used by multiple jobs are correctly handled. * From 741ab4dbbb1be88d290a1d773103645a32615f34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Mon, 26 Jan 2015 15:23:16 +0800 Subject: [PATCH 02/17] Add sub-class canEqual to make ShuffleMapTask not equal Result Task while stageId and partId are same --- .../main/scala/org/apache/spark/scheduler/ResultTask.scala | 2 ++ .../scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 2 ++ core/src/main/scala/org/apache/spark/scheduler/Task.scala | 5 ++--- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 9c2606e278c5..72a2994d2ff5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -68,4 +68,6 @@ private[spark] class ResultTask[T, U]( override def preferredLocations: Seq[TaskLocation] = preferredLocs override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")" + + override def canEqual(other: Any): Boolean = other.isInstanceOf[ResultTask[T, U]] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 14c8c0096148..13a7a9c23884 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -87,4 +87,6 @@ private[spark] class ShuffleMapTask( override def preferredLocations: Seq[TaskLocation] = preferredLocs override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId) + + override def canEqual(other: Any): Boolean = other.isInstanceOf[ShuffleMapTask] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index d9ff87778283..5071844fd4fc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -138,11 +138,10 @@ private[spark] abstract class Task[T]( } override def hashCode(): Int = { - val state = Seq(stageId, partitionId) - state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + 31 * stageId.hashCode() + partitionId.hashCode() } - def canEqual(other: Any): Boolean = other.isInstanceOf[Task[T]] + def canEqual(other: Any): Boolean = other.isInstanceOf[Task[_]] override def equals(other: Any): Boolean = other match { case that: Task[_] => From 5ec8a8217832b5f27cf724a2b3f2ee41092c9a4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Thu, 29 Jan 2015 17:16:48 +0800 Subject: [PATCH 03/17] Refine --- core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala | 2 +- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 72a2994d2ff5..99986760478c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -69,5 +69,5 @@ private[spark] class ResultTask[T, U]( override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")" - override def canEqual(other: Any): Boolean = other.isInstanceOf[ResultTask[T, U]] + override def canEqual(other: Any): Boolean = other.isInstanceOf[ResultTask] } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b5427fb817c6..2afa3b517044 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -743,10 +743,8 @@ class DAGSchedulerSuite "while MapStage.isAvailable is true while stage was retry ") { val firstRDD = new MyRDD(sc, 6, Nil) val firstShuffleDep = new ShuffleDependency(firstRDD, null) - val firstShuyffleId = firstShuffleDep.shuffleId val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep)) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) - val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) complete(taskSets(0), Seq( From 90447201931bccee87d60a42c4501d90993f10b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 30 Jan 2015 11:21:14 +0800 Subject: [PATCH 04/17] add parameter type --- core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 99986760478c..72a2994d2ff5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -69,5 +69,5 @@ private[spark] class ResultTask[T, U]( override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")" - override def canEqual(other: Any): Boolean = other.isInstanceOf[ResultTask] + override def canEqual(other: Any): Boolean = other.isInstanceOf[ResultTask[T, U]] } From 03db62475d0f0245563ee7d0148b6683c2a254d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 30 Jan 2015 15:01:20 +0800 Subject: [PATCH 05/17] Refine --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2afa3b517044..d81340065adc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -743,6 +743,7 @@ class DAGSchedulerSuite "while MapStage.isAvailable is true while stage was retry ") { val firstRDD = new MyRDD(sc, 6, Nil) val firstShuffleDep = new ShuffleDependency(firstRDD, null) + val firstShuyffleId = firstShuffleDep.shuffleId val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep)) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) From 8dfdc18e3f1d92930cdd6f818111d88dc4bbc358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 27 Feb 2015 10:43:56 +0800 Subject: [PATCH 06/17] Refine log message use string interpolation --- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 9c5d0f3fb6b6..5b79cdaf94b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -487,9 +487,8 @@ private[spark] class TaskSetManager( // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" - logInfo("Starting %s (TID %d, %s, %d, %s, %d bytes)".format( - taskName, taskId, host, - task.partitionId, taskLocality, serializedTask.limit)) + logInfo(s"Starting $taskName (TID $taskId, $host, ${task.partitionId}," + + s"$taskLocality, ${serializedTask.limit} bytes)") sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, From c286f7a3bb167817eae423a83901041476a35b4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 27 Feb 2015 17:19:15 +0800 Subject: [PATCH 07/17] Refine with the latest spark --- .../spark/scheduler/DAGSchedulerSuite.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d81340065adc..aca7df2b0066 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -764,20 +764,22 @@ class DAGSchedulerSuite (Success, makeMapStatus("hostA", 1)) )) runEvent(ExecutorLost("exec-hostA")) - runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, null, null)) - runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, null, null)) + runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, + null, null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, + null, null, createFakeTaskInfo(), null)) runEvent(CompletionEvent(taskSets(1).tasks(0), FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"), - null, null, null, null)) + null, null, createFakeTaskInfo(), null)) scheduler.resubmitFailedStages() runEvent(CompletionEvent(taskSets(1).tasks(0), Success, - makeMapStatus("hostC", 1), null, null, null)) + makeMapStatus("hostC", 1), null, createFakeTaskInfo(), null)) runEvent(CompletionEvent(taskSets(1).tasks(2), Success, - makeMapStatus("hostC", 1), null, null, null)) + makeMapStatus("hostC", 1), null, createFakeTaskInfo(), null)) runEvent(CompletionEvent(taskSets(1).tasks(4), Success, - makeMapStatus("hostC", 1), null, null, null)) + makeMapStatus("hostC", 1), null, createFakeTaskInfo(), null)) runEvent(CompletionEvent(taskSets(1).tasks(5), Success, - makeMapStatus("hostB", 2), null, null, null)) + makeMapStatus("hostB", 2), null, createFakeTaskInfo(), null)) val stage = scheduler.stageIdToStage(taskSets(1).stageId) assert(stage.attemptId == 2) assert(stage.isAvailable) From dcf153309894f19271d5db0e069a2a6474e1a664 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Thu, 18 Jun 2015 21:59:03 +0800 Subject: [PATCH 08/17] Refine solution and fix bug in code --- .../org/apache/spark/MapOutputTracker.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../apache/spark/scheduler/ResultTask.scala | 2 - .../spark/scheduler/ShuffleMapTask.scala | 2 - .../org/apache/spark/scheduler/Task.scala | 3 - .../spark/scheduler/DAGSchedulerSuite.scala | 56 +++++++++---------- 6 files changed, 28 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 862ffe868f58..15e71719fd83 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -305,7 +305,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) if (mapStatuses.contains(shuffleId)) { val statuses = mapStatuses(shuffleId) - if (statuses.nonEmpty) { + if (statuses.nonEmpty && statuses.exists(_ != null)) { // HashMap to add up sizes of all blocks at the same location val locs = new HashMap[BlockManagerId, Long] var totalOutputSize = 0L diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 71a219a4f341..76f67b49a081 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1215,7 +1215,7 @@ class DAGScheduler( if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) { // TODO: This will be really slow if we keep accumulating shuffle map stages - for ((shuffleId, stage) <- shuffleToMapStage) { + for ((shuffleId, stage) <- shuffleToMapStage if !runningStages.contains(stage) ) { stage.removeOutputsOnExecutor(execId) val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 72a2994d2ff5..9c2606e278c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -68,6 +68,4 @@ private[spark] class ResultTask[T, U]( override def preferredLocations: Seq[TaskLocation] = preferredLocs override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")" - - override def canEqual(other: Any): Boolean = other.isInstanceOf[ResultTask[T, U]] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 13a7a9c23884..14c8c0096148 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -87,6 +87,4 @@ private[spark] class ShuffleMapTask( override def preferredLocations: Seq[TaskLocation] = preferredLocs override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId) - - override def canEqual(other: Any): Boolean = other.isInstanceOf[ShuffleMapTask] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 5071844fd4fc..ab7e435e19a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -141,11 +141,8 @@ private[spark] abstract class Task[T]( 31 * stageId.hashCode() + partitionId.hashCode() } - def canEqual(other: Any): Boolean = other.isInstanceOf[Task[_]] - override def equals(other: Any): Boolean = other match { case that: Task[_] => - (that canEqual this) && stageId == that.stageId && partitionId == that.partitionId case _ => false diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index aca7df2b0066..3cf774c01691 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -739,51 +739,47 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } - test("Make sure mapStage.pendingtasks is set() " + - "while MapStage.isAvailable is true while stage was retry ") { - val firstRDD = new MyRDD(sc, 6, Nil) + test("run with ShuffleMapStage retry") { + val firstRDD = new MyRDD(sc, 3, Nil) val firstShuffleDep = new ShuffleDependency(firstRDD, null) - val firstShuyffleId = firstShuffleDep.shuffleId - val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep)) + val firstShuffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) - val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) - submit(reduceRdd, Array(0, 1)) + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + submit(reduceRdd, Array(0)) + complete(taskSets(0), Seq( - (Success, makeMapStatus("hostB", 1)), - (Success, makeMapStatus("hostB", 2)), - (Success, makeMapStatus("hostC", 3)), - (Success, makeMapStatus("hostB", 4)), - (Success, makeMapStatus("hostB", 5)), - (Success, makeMapStatus("hostC", 6)) + (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)), + (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)), + (Success, makeMapStatus("hostC", shuffleMapRdd.partitions.size)) )) + complete(taskSets(1), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 2)), - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 2)), - (Success, makeMapStatus("hostA", 1)) + (Success, makeMapStatus("hostA", reduceRdd.partitions.size)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.size)) )) runEvent(ExecutorLost("exec-hostA")) + // Resubmit already succcessd in hostA task runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, createFakeTaskInfo(), null)) - runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, - null, null, createFakeTaskInfo(), null)) + + // Cause mapOutTracker remove hostA outputs for taskset(0). + // Task that resubmitted will fetch matadata failed. runEvent(CompletionEvent(taskSets(1).tasks(0), - FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"), + FetchFailed(null, firstShuffleId, -1, 0, "Fetch matadata failed"), null, null, createFakeTaskInfo(), null)) + //FetchFailed cause resubmit failed Stages. scheduler.resubmitFailedStages() + runEvent(CompletionEvent(taskSets(1).tasks(0), Success, - makeMapStatus("hostC", 1), null, createFakeTaskInfo(), null)) + makeMapStatus("hostC", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) runEvent(CompletionEvent(taskSets(1).tasks(2), Success, - makeMapStatus("hostC", 1), null, createFakeTaskInfo(), null)) - runEvent(CompletionEvent(taskSets(1).tasks(4), Success, - makeMapStatus("hostC", 1), null, createFakeTaskInfo(), null)) - runEvent(CompletionEvent(taskSets(1).tasks(5), Success, - makeMapStatus("hostB", 2), null, createFakeTaskInfo(), null)) - val stage = scheduler.stageIdToStage(taskSets(1).stageId) - assert(stage.attemptId == 2) + makeMapStatus("hostC", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + + val stage = scheduler.stageIdToStage(taskSets(1).stageId).asInstanceOf[ShuffleMapStage] + assert(stage.attemptId === 2) assert(stage.isAvailable) - assert(stage.pendingTasks.size == 0) + assert(stage.pendingTasks.isEmpty) } /** From 27da8e7e4d76b36d38c8d01d7751bec51132bc0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Thu, 18 Jun 2015 22:07:30 +0800 Subject: [PATCH 09/17] Refine codeStyle --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3cf774c01691..cd0e6892d887 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -768,7 +768,7 @@ class DAGSchedulerSuite runEvent(CompletionEvent(taskSets(1).tasks(0), FetchFailed(null, firstShuffleId, -1, 0, "Fetch matadata failed"), null, null, createFakeTaskInfo(), null)) - //FetchFailed cause resubmit failed Stages. + // FetchFailed cause resubmit failed Stages. scheduler.resubmitFailedStages() runEvent(CompletionEvent(taskSets(1).tasks(0), Success, From bd5fec46125f362e4fc2c96327cc5a169f809dfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 19 Jun 2015 15:38:52 +0800 Subject: [PATCH 10/17] Refine Tests --- .../org/apache/spark/scheduler/DAGScheduler.scala | 8 +++++--- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 13 +++++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 76f67b49a081..5d56a921dbd6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1215,10 +1215,12 @@ class DAGScheduler( if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) { // TODO: This will be really slow if we keep accumulating shuffle map stages - for ((shuffleId, stage) <- shuffleToMapStage if !runningStages.contains(stage) ) { + for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnExecutor(execId) - val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray - mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true) + if (!runningStages.contains(stage)) { + val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray + mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true) + } } if (shuffleToMapStage.isEmpty) { mapOutputTracker.incrementEpoch() diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index cd0e6892d887..8bfbbaa9f74c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -699,6 +699,7 @@ class DAGSchedulerSuite runEvent(ExecutorLost("exec-hostA")) val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) + val taskSet = taskSets(0) // should be ignored for being too old runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", @@ -751,7 +752,7 @@ class DAGSchedulerSuite complete(taskSets(0), Seq( (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)), (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)), - (Success, makeMapStatus("hostC", shuffleMapRdd.partitions.size)) + (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.size)) )) complete(taskSets(1), Seq( @@ -766,11 +767,19 @@ class DAGSchedulerSuite // Cause mapOutTracker remove hostA outputs for taskset(0). // Task that resubmitted will fetch matadata failed. runEvent(CompletionEvent(taskSets(1).tasks(0), - FetchFailed(null, firstShuffleId, -1, 0, "Fetch matadata failed"), + FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"), null, null, createFakeTaskInfo(), null)) + // FetchFailed cause resubmit failed Stages. scheduler.resubmitFailedStages() + runEvent(CompletionEvent(taskSets(0).tasks(2), Success, + makeMapStatus("hostC", shuffleMapRdd.partitions.size), null, createFakeTaskInfo(), null)) + + val thrown = intercept[Exception] { mapOutputTracker.getServerStatuses(0, 2) } + // Assert not throw MetadataFetchFailedException: Missing an output location for shuffle 0 + assert(thrown == null) + runEvent(CompletionEvent(taskSets(1).tasks(0), Success, makeMapStatus("hostC", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) runEvent(CompletionEvent(taskSets(1).tasks(2), Success, From 4dbe4d32c9364d8c11c417e0e91688ee20160dab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 19 Jun 2015 15:47:35 +0800 Subject: [PATCH 11/17] Refine assert exception --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8bfbbaa9f74c..83f022cadc6e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import org.apache.spark.shuffle.MetadataFetchFailedException + import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -776,9 +778,11 @@ class DAGSchedulerSuite runEvent(CompletionEvent(taskSets(0).tasks(2), Success, makeMapStatus("hostC", shuffleMapRdd.partitions.size), null, createFakeTaskInfo(), null)) - val thrown = intercept[Exception] { mapOutputTracker.getServerStatuses(0, 2) } - // Assert not throw MetadataFetchFailedException: Missing an output location for shuffle 0 - assert(thrown == null) + try { + mapOutputTracker.getServerStatuses(0, 2) + } catch { + case e: MetadataFetchFailedException => fail("Should not throw metadataFetchFailedException") + } runEvent(CompletionEvent(taskSets(1).tasks(0), Success, makeMapStatus("hostC", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) From 9dfff638ba76883404fb8166622e86089c646b5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 19 Jun 2015 16:34:39 +0800 Subject: [PATCH 12/17] Refine the test 'ignore late map task completions' --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 83f022cadc6e..c42e6996bc20 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -697,10 +697,7 @@ class DAGSchedulerSuite val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) // pretend we were told hostA went away - val oldEpoch = mapOutputTracker.getEpoch runEvent(ExecutorLost("exec-hostA")) - val newEpoch = mapOutputTracker.getEpoch - assert(newEpoch > oldEpoch) val taskSet = taskSets(0) // should be ignored for being too old @@ -712,8 +709,8 @@ class DAGSchedulerSuite // should be ignored for being too old runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) - // should work because it's a new epoch - taskSet.tasks(1).epoch = newEpoch + // should work because the host is re-add + runEvent(ExecutorAdded("exec-hostA", "hostA")) runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === From e1e0b66abc7430e88b8a58ec0c81a8d500bf6caa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 3 Jul 2015 15:12:27 +0800 Subject: [PATCH 13/17] Refine the testcase --- .../spark/scheduler/DAGSchedulerSuite.scala | 81 +++++++++++++------ 1 file changed, 55 insertions(+), 26 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index c42e6996bc20..5781283969aa 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -748,48 +748,77 @@ class DAGSchedulerSuite val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) + // things start out smoothly, stage 0 completes with no issues complete(taskSets(0), Seq( (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)), (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)), (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.size)) )) - complete(taskSets(1), Seq( - (Success, makeMapStatus("hostA", reduceRdd.partitions.size)), - (Success, makeMapStatus("hostB", reduceRdd.partitions.size)) - )) + // then one executor dies, and a task fails in stage 1 runEvent(ExecutorLost("exec-hostA")) - // Resubmit already succcessd in hostA task - runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, - null, null, createFakeTaskInfo(), null)) - - // Cause mapOutTracker remove hostA outputs for taskset(0). - // Task that resubmitted will fetch matadata failed. runEvent(CompletionEvent(taskSets(1).tasks(0), FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"), null, null, createFakeTaskInfo(), null)) - // FetchFailed cause resubmit failed Stages. - scheduler.resubmitFailedStages() - - runEvent(CompletionEvent(taskSets(0).tasks(2), Success, + // so we resubmit stage 0, which completes happily + Thread.sleep(1000) + val stage0Resubmit = taskSets(2) + assert(stage0Resubmit.stageId == 0) + assert(stage0Resubmit.attempt === 1) + val task = stage0Resubmit.tasks(0) + assert(task.partitionId === 2) + runEvent(CompletionEvent(task, Success, makeMapStatus("hostC", shuffleMapRdd.partitions.size), null, createFakeTaskInfo(), null)) - try { - mapOutputTracker.getServerStatuses(0, 2) - } catch { - case e: MetadataFetchFailedException => fail("Should not throw metadataFetchFailedException") - } - - runEvent(CompletionEvent(taskSets(1).tasks(0), Success, + // now here is where things get tricky : we will now have a task set representing + // the second attempt for stage 1, but we *also* have some tasks for the first attempt for + // stage 1 still going + val stage1Resubmit = taskSets(3) + assert(stage1Resubmit.stageId == 1) + assert(stage1Resubmit.attempt === 1) + assert(stage1Resubmit.tasks.length === 3) + + // we'll have some tasks finish from the first attempt, and some finish from the second attempt, + // so that we actually have all stage outputs, though no attempt has completed all its + // tasks + runEvent(CompletionEvent(taskSets(3).tasks(0), Success, makeMapStatus("hostC", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) - runEvent(CompletionEvent(taskSets(1).tasks(2), Success, + runEvent(CompletionEvent(taskSets(3).tasks(1), Success, makeMapStatus("hostC", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + // late task finish from the first attempt + runEvent(CompletionEvent(taskSets(1).tasks(2), Success, + makeMapStatus("hostB", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) + + // What should happen now is that we submit stage 2. However, we might not see an error + // b/c of DAGScheduler's error handling (it tends to swallow errors and just log them). But + // we can check some conditions. + // Note that the really important thing here is not so much that we submit stage 2 *immediately* + // but that we don't end up with some error from these interleaved completions. It would also + // be OK (though sub-optimal) if stage 2 simply waited until the resubmission of stage 1 had + // all its tasks complete + + // check that we have all the map output for stage 0 (it should have been there even before + // the last round of completions from stage 1, but just to double check it hasn't been messed + // up) + (0 until 3).foreach { reduceIdx => + val arr = mapOutputTracker.getServerStatuses(0, reduceIdx) + assert(arr != null) + assert(arr.nonEmpty) + } + + // and check we have all the map output for stage 1 + (0 until 1).foreach { reduceIdx => + val arr = mapOutputTracker.getServerStatuses(1,reduceIdx) + assert(arr != null) + assert(arr.nonEmpty) + } - val stage = scheduler.stageIdToStage(taskSets(1).stageId).asInstanceOf[ShuffleMapStage] - assert(stage.attemptId === 2) - assert(stage.isAvailable) - assert(stage.pendingTasks.isEmpty) + // and check that stage 2 has been submitted + assert(taskSets.size == 5) + val stage2TaskSet = taskSets(4) + assert(stage2TaskSet.stageId == 2) + assert(stage2TaskSet.attempt == 0) } /** From 23792509f2061012495035717071c5c196303343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Fri, 3 Jul 2015 20:56:44 +0800 Subject: [PATCH 14/17] Refine scala style --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 5781283969aa..9e66d6810192 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -809,7 +809,7 @@ class DAGSchedulerSuite // and check we have all the map output for stage 1 (0 until 1).foreach { reduceIdx => - val arr = mapOutputTracker.getServerStatuses(1,reduceIdx) + val arr = mapOutputTracker.getServerStatuses(1, reduceIdx) assert(arr != null) assert(arr.nonEmpty) } From 314873af23f391254dc89cf4e233d2571f3358d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Tue, 21 Jul 2015 11:25:24 +0800 Subject: [PATCH 15/17] Only tracker partitionId to instead of Task --- .../org/apache/spark/MapOutputTracker.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 18 ++++++++---------- .../org/apache/spark/scheduler/Stage.scala | 2 +- .../org/apache/spark/scheduler/Task.scala | 11 ----------- .../spark/scheduler/DAGSchedulerSuite.scala | 7 +++++-- 5 files changed, 15 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 15e71719fd83..862ffe868f58 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -305,7 +305,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) if (mapStatuses.contains(shuffleId)) { val statuses = mapStatuses(shuffleId) - if (statuses.nonEmpty && statuses.exists(_ != null)) { + if (statuses.nonEmpty) { // HashMap to add up sizes of all blocks at the same location val locs = new HashMap[BlockManagerId, Long] var totalOutputSize = 0L diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5d56a921dbd6..85b6893245a2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -855,7 +855,7 @@ class DAGScheduler( private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry - stage.pendingTasks.clear() + stage.pendingPartitions.clear() // First figure out the indexes of partition ids to compute. val partitionsToCompute: Seq[Int] = { @@ -938,8 +938,8 @@ class DAGScheduler( if (tasks.size > 0) { logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") - stage.pendingTasks ++= tasks - logDebug("New pending tasks: " + stage.pendingTasks) + stage.pendingPartitions ++= tasks.map(_.partitionId) + logDebug("New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) @@ -1027,7 +1027,7 @@ class DAGScheduler( case Success => listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) - stage.pendingTasks -= task + stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask @@ -1073,7 +1073,7 @@ class DAGScheduler( shuffleStage.addOutputLoc(smt.partitionId, status) } - if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) { + if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) @@ -1126,7 +1126,7 @@ class DAGScheduler( case Resubmitted => logInfo("Resubmitted " + task + ", so marking it as still running") - stage.pendingTasks += task + stage.pendingPartitions += task.partitionId case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) @@ -1217,10 +1217,8 @@ class DAGScheduler( // TODO: This will be really slow if we keep accumulating shuffle map stages for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnExecutor(execId) - if (!runningStages.contains(stage)) { - val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray - mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true) - } + val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray + mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true) } if (shuffleToMapStage.isEmpty) { mapOutputTracker.incrementEpoch() diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index b86724de2cb7..f1c37fd82cfc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -60,7 +60,7 @@ private[spark] abstract class Stage( /** Set of jobs that this stage belongs to. */ val jobIds = new HashSet[Int] - var pendingTasks = new HashSet[Task[_]] + var pendingPartitions = new HashSet[Int] /** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index ab7e435e19a7..76a19aeac467 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -136,17 +136,6 @@ private[spark] abstract class Task[T]( taskThread.interrupt() } } - - override def hashCode(): Int = { - 31 * stageId.hashCode() + partitionId.hashCode() - } - - override def equals(other: Any): Boolean = other match { - case that: Task[_] => - stageId == that.stageId && - partitionId == that.partitionId - case _ => false - } } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9e66d6810192..8170220cd5b5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -697,7 +697,10 @@ class DAGSchedulerSuite val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) // pretend we were told hostA went away + val oldEpoch = mapOutputTracker.getEpoch runEvent(ExecutorLost("exec-hostA")) + val newEpoch = mapOutputTracker.getEpoch + assert(newEpoch > oldEpoch) val taskSet = taskSets(0) // should be ignored for being too old @@ -709,8 +712,8 @@ class DAGSchedulerSuite // should be ignored for being too old runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) - // should work because the host is re-add - runEvent(ExecutorAdded("exec-hostA", "hostA")) + // should work because it's a new epoch + taskSet.tasks(1).epoch = newEpoch runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === From 7ae128e2f2888a5840be785da34cc20902ba8b5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Tue, 21 Jul 2015 11:31:15 +0800 Subject: [PATCH 16/17] Refine test name --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8170220cd5b5..2fa9bed79b98 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -742,7 +742,7 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } - test("run with ShuffleMapStage retry") { + test("verify not submit next stage while not have registered mapStatus") { val firstRDD = new MyRDD(sc, 3, Nil) val firstShuffleDep = new ShuffleDependency(firstRDD, null) val firstShuffleId = firstShuffleDep.shuffleId From b2df3fd330325a1bf3dc5782e973506202af92f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?hushan=5B=E8=83=A1=E7=8F=8A=5D?= Date: Tue, 21 Jul 2015 20:23:33 +0800 Subject: [PATCH 17/17] refine suite code --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2fa9bed79b98..f389ce869dec 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -768,7 +768,7 @@ class DAGSchedulerSuite Thread.sleep(1000) val stage0Resubmit = taskSets(2) assert(stage0Resubmit.stageId == 0) - assert(stage0Resubmit.attempt === 1) + assert(stage0Resubmit.stageAttemptId === 1) val task = stage0Resubmit.tasks(0) assert(task.partitionId === 2) runEvent(CompletionEvent(task, Success, @@ -779,7 +779,7 @@ class DAGSchedulerSuite // stage 1 still going val stage1Resubmit = taskSets(3) assert(stage1Resubmit.stageId == 1) - assert(stage1Resubmit.attempt === 1) + assert(stage1Resubmit.stageAttemptId === 1) assert(stage1Resubmit.tasks.length === 3) // we'll have some tasks finish from the first attempt, and some finish from the second attempt, @@ -821,7 +821,7 @@ class DAGSchedulerSuite assert(taskSets.size == 5) val stage2TaskSet = taskSets(4) assert(stage2TaskSet.stageId == 2) - assert(stage2TaskSet.attempt == 0) + assert(stage2TaskSet.stageAttemptId == 0) } /** @@ -834,7 +834,7 @@ class DAGSchedulerSuite * | \ | * | \ | * | \ | - * reduceRdd1 reduceRdd2 + * reduceRdd1 reduceRddi2 * * We start both shuffleMapRdds and then fail shuffleMapRdd1. As a result, the job listeners for * reduceRdd1 and reduceRdd2 should both be informed that the job failed. shuffleMapRDD2 should