Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test case
  • Loading branch information
scwf committed Sep 23, 2016
commit d02cf93586d40035f6bb1f5b635917fe41bf6e99
Original file line number Diff line number Diff line change
Expand Up @@ -2112,6 +2112,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
test("The failed stage never resubmitted due to abort stage in another thread") {
Copy link
Contributor

@squito squito Sep 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of changes for this test:

(a) Earlier there was already some discussion around naming and the PR title has been updated, this test should be renamed as well since multiple threads really have nothing to do with it.
(b) I'd prefer if tests are named to indicate the positive behavior that we want to verify . So with the above, I'd suggest a name like "After one stage is aborted for too many failed attempts, subsequent stages still behave correctly on fetch failures"
(c) duplicated code can be cleaned up (at first when I read the code, I was looking for differences between the two calls, so though its only one copy-paste, the intent is a lot clearer if its just once).

(d) I'd think it would be nice to also include a job which succeeds after a fetch failure at the end (3 jobs total). Unfortunately this is a bit of a pain to do in a test right now since you don't have access to stageAttemptId, but you can do it with something like this:

...
rdd1.map {
        case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) =>
          throw new FetchFailedException(
            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
...

with helper

object FailThisAttempt extends Logging {
  val _fail = new AtomicBoolean(true)
}

implicit val executorContext = ExecutionContext
.fromExecutorService(Executors.newFixedThreadPool(5))
val duration = 60.seconds

val f1 = Future {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use failAfter(60.seconds) { ... } to get rid of ExecutionContext and Future.

try {
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
Expand All @@ -2125,10 +2127,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}.count()
} catch {
case e: Throwable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't catch Throwable and it's better to check the exception like this:

    failAfter(60.seconds) {
      val e = intercept[SparkException] {
        ...
      }
      assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
    }

logInfo("expected abort stage: " + e.getMessage)
logInfo("expected abort stage1: " + e.getMessage)
}
}
Thread.sleep(10000)
ThreadUtils.awaitResult(f1, duration)
val f2 = Future {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment to explain why needs two same jobs here? It took me a while to figure out. E.g.,

The following job that fails due to fetching failure will hang without the fix for SPARK-17644

try {
val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
Expand All @@ -2142,11 +2144,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}.count()
} catch {
case e: Throwable =>
println("expected abort stage2: " + e.getMessage)
logInfo("expected abort stage2: " + e.getMessage)
}
}

val duration = 60.seconds
ThreadUtils.awaitResult(f2, duration)
}

Expand Down