Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleIdToMapStage(shuffleId)
var abortedStage = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this line just above if (disallowStageRetryForTest) { since it's only used in that scope.


if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
Expand All @@ -1261,6 +1262,7 @@ class DAGScheduler(
s"has failed the maximum allowable number of " +
s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
s"Most recent failure reason: ${failureMessage}", None)
abortedStage = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another abortStage in if (disallowStageRetryForTest) { branch.

} else if (failedStages.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having the abortedStage variable, how about re-writing the "else if" statement to be:

else {
if (failedStages.isEmpty) {
... stuff currently in else-if ...
}
failedStages += failedStage
failedStages += mapStage
}

That eliminates the confusion of multiple abortStage variables, as @zsxwing pointed out, and also makes the relationship between (i) adding the stage to failed stages and (ii) scheduling the Resubmit event more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it make sense to me, updated

// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled.
Expand All @@ -1271,8 +1273,10 @@ class DAGScheduler(
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
}
failedStages += failedStage
failedStages += mapStage
if (!abortedStage) {
failedStages += failedStage
failedStages += mapStage
}
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.Executors

import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationConversions
import scala.language.reflectiveCalls
import scala.util.control.NonFatal

Expand All @@ -31,9 +34,10 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.FetchFailedException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you group this with the next import (so import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}

import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
import org.apache.spark.util._

class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
extends DAGSchedulerEventProcessLoop(dagScheduler) {
Expand Down Expand Up @@ -2105,6 +2109,52 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC))
}

test("The failed stage never resubmitted due to abort stage in another thread") {
Copy link
Contributor

@squito squito Sep 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of changes for this test:

(a) Earlier there was already some discussion around naming and the PR title has been updated, this test should be renamed as well since multiple threads really have nothing to do with it.
(b) I'd prefer if tests are named to indicate the positive behavior that we want to verify . So with the above, I'd suggest a name like "After one stage is aborted for too many failed attempts, subsequent stages still behave correctly on fetch failures"
(c) duplicated code can be cleaned up (at first when I read the code, I was looking for differences between the two calls, so though its only one copy-paste, the intent is a lot clearer if its just once).

(d) I'd think it would be nice to also include a job which succeeds after a fetch failure at the end (3 jobs total). Unfortunately this is a bit of a pain to do in a test right now since you don't have access to stageAttemptId, but you can do it with something like this:

...
rdd1.map {
        case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) =>
          throw new FetchFailedException(
            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
...

with helper

object FailThisAttempt extends Logging {
  val _fail = new AtomicBoolean(true)
}

implicit val executorContext = ExecutionContext
.fromExecutorService(Executors.newFixedThreadPool(5))
val duration = 60.seconds

val f1 = Future {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use failAfter(60.seconds) { ... } to get rid of ExecutionContext and Future.

try {
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd1.map {
case (x, _) if (x == 1) =>
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
case (x, _) => x
}.count()
} catch {
case e: Throwable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't catch Throwable and it's better to check the exception like this:

    failAfter(60.seconds) {
      val e = intercept[SparkException] {
        ...
      }
      assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
    }

logInfo("expected abort stage1: " + e.getMessage)
}
}
ThreadUtils.awaitResult(f1, duration)
val f2 = Future {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment to explain why needs two same jobs here? It took me a while to figure out. E.g.,

The following job that fails due to fetching failure will hang without the fix for SPARK-17644

try {
val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd2.map {
case (x, _) if (x == 1) =>
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
case (x, _) => x
}.count()
} catch {
case e: Throwable =>
logInfo("expected abort stage2: " + e.getMessage)
}
}
try {
ThreadUtils.awaitResult(f2, duration)
} catch {
case e: Throwable => fail("The failed stage never resubmitted")
}
executorContext.shutdown()
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down