Skip to content

Commit 1127ca1

Browse files
committed
address comments of zsxwing
1 parent d92adfc commit 1127ca1

File tree

2 files changed

+12
-26
lines changed

2 files changed

+12
-26
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1235,13 +1235,13 @@ class DAGScheduler(
12351235
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
12361236
val failedStage = stageIdToStage(task.stageId)
12371237
val mapStage = shuffleIdToMapStage(shuffleId)
1238-
var abortedStage = false
12391238

12401239
if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
12411240
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
12421241
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
12431242
s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
12441243
} else {
1244+
var abortedStage = false
12451245
// It is likely that we receive multiple FetchFailed for a single stage (because we have
12461246
// multiple tasks running concurrently on different executors). In that case, it is
12471247
// possible the fetch failure has already been handled by the scheduler.
@@ -1257,6 +1257,7 @@ class DAGScheduler(
12571257
if (disallowStageRetryForTest) {
12581258
abortStage(failedStage, "Fetch failure will not retry stage due to testing config",
12591259
None)
1260+
abortedStage = true
12601261
} else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
12611262
abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
12621263
s"has failed the maximum allowable number of " +

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,9 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.util.Properties
21-
import java.util.concurrent.Executors
2221

2322
import scala.annotation.meta.param
2423
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
25-
import scala.concurrent.{ExecutionContext, Future}
26-
import scala.concurrent.duration.DurationConversions
2724
import scala.language.reflectiveCalls
2825
import scala.util.control.NonFatal
2926

@@ -37,7 +34,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
3734
import org.apache.spark.shuffle.FetchFailedException
3835
import org.apache.spark.shuffle.MetadataFetchFailedException
3936
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
40-
import org.apache.spark.util._
37+
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
4138

4239
class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
4340
extends DAGSchedulerEventProcessLoop(dagScheduler) {
@@ -2110,12 +2107,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
21102107
}
21112108

21122109
test("The failed stage never resubmitted due to abort stage in another thread") {
2113-
implicit val executorContext = ExecutionContext
2114-
.fromExecutorService(Executors.newFixedThreadPool(5))
2115-
val duration = 60.seconds
2116-
2117-
val f1 = Future {
2118-
try {
2110+
failAfter(60.seconds) {
2111+
val e = intercept[SparkException] {
21192112
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
21202113
val shuffleHandle =
21212114
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
@@ -2125,14 +2118,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
21252118
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
21262119
case (x, _) => x
21272120
}.count()
2128-
} catch {
2129-
case e: Throwable =>
2130-
logInfo("expected abort stage1: " + e.getMessage)
21312121
}
2122+
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
21322123
}
2133-
ThreadUtils.awaitResult(f1, duration)
2134-
val f2 = Future {
2135-
try {
2124+
2125+
// The following job that fails due to fetching failure will hang without
2126+
// the fix for SPARK-17644
2127+
failAfter(60.seconds) {
2128+
val e = intercept[SparkException] {
21362129
val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
21372130
val shuffleHandle =
21382131
rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
@@ -2142,17 +2135,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
21422135
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
21432136
case (x, _) => x
21442137
}.count()
2145-
} catch {
2146-
case e: Throwable =>
2147-
logInfo("expected abort stage2: " + e.getMessage)
21482138
}
2139+
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
21492140
}
2150-
try {
2151-
ThreadUtils.awaitResult(f2, duration)
2152-
} catch {
2153-
case e: Throwable => fail("The failed stage never resubmitted")
2154-
}
2155-
executorContext.shutdown()
21562141
}
21572142

21582143
/**

0 commit comments

Comments
 (0)