Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
comment fix
  • Loading branch information
scwf committed Sep 23, 2016
commit d92adfcd2c51090dd472a995b5853cd6396aeee7
Original file line number Diff line number Diff line change
Expand Up @@ -2119,12 +2119,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd1.map { x =>
if (x._1 == 1) {
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
}
x._1
rdd1.map {
case (x, _) if (x == 1) =>
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
case (x, _) => x
}.count()
} catch {
case e: Throwable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't catch Throwable and it's better to check the exception like this:

    failAfter(60.seconds) {
      val e = intercept[SparkException] {
        ...
      }
      assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
    }

Expand All @@ -2137,12 +2136,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd2.map { x =>
if (x._1 == 1) {
rdd2.map {
case (x, _) if (x == 1) =>
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
}
x._1
case (x, _) => x
}.count()
} catch {
case e: Throwable =>
Expand Down