Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improve test
  • Loading branch information
scwf committed Sep 28, 2016
commit f91d86f92a7070b0e8ed63773ecf1020975bc2fb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean

import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
Expand Down Expand Up @@ -2106,18 +2107,37 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC))
}

test("The failed stage never resubmitted due to abort stage in another thread") {
test("After one stage is aborted for too many failed attempts, subsequent stages" +
"still behave correctly on fetch failures") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the JIRA number here? That helps with tracking tests in the future. So something like "[SPARK-17644] After one stage is aborted..."

def fetchFailJob: Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make this a little more descriptive / easy to read, how about calling the helper "runJobWithPersistentFetchFailure" and then add a comment that says "Runs a job that always encounters a fetch failure, so should eventually be aborted."

val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd1.map {
case (x, _) if (x == 1) =>
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
case (x, _) => x
}.count()
}

def successJob: Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and for this perhaps call it "runJobWithTemporaryFetchFailure" and then comment saying "Runs a job that encounters a single fetch failure but succeeds on the second attempt"

object FailThisAttempt {
val _fail = new AtomicBoolean(true)
}
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd1.map {
case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) =>
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
}
}

failAfter(60.seconds) {
val e = intercept[SparkException] {
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd1.map {
case (x, _) if (x == 1) =>
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
case (x, _) => x
}.count()
fetchFailJob
}
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
}
Expand All @@ -2126,18 +2146,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// the fix for SPARK-17644
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change to something like "Run a second job that will fail due to a fetch failure. This job will hang without the fix for SPARK-17644."

failAfter(60.seconds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a shorter timeout would be appropriate here to avoid slow-ness when this fails...maybe 10 seconds? That still seems plenty conservative since the resubmit timeout is 200 millis.

val e = intercept[SparkException] {
val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd2.map {
case (x, _) if (x == 1) =>
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
case (x, _) => x
}.count()
fetchFailJob
}
assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
}

failAfter(60.seconds) {
try {
successJob
} catch {
case e: Throwable => fail("this job should success")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this a little more descriptive -- maybe "A job with one fetch failure should eventually succeed"?

}
}
}

/**
Expand Down