Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
test case
  • Loading branch information
scwf committed Sep 23, 2016
commit 8e667f532fa4509386ff6a6173b75a8e24cab40a
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.spark.scheduler

import java.util.Properties
import java.util.concurrent.Executors

import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationConversions
import scala.language.reflectiveCalls
import scala.util.control.NonFatal

Expand All @@ -32,8 +35,9 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
import org.apache.spark.util._

class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
extends DAGSchedulerEventProcessLoop(dagScheduler) {
Expand Down Expand Up @@ -2105,6 +2109,47 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC))
}

test("The failed stage never resubmitted due to abort stage in another thread") {
Copy link
Contributor

@squito squito Sep 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of changes for this test:

(a) Earlier there was already some discussion around naming and the PR title has been updated, this test should be renamed as well since multiple threads really have nothing to do with it.
(b) I'd prefer if tests are named to indicate the positive behavior that we want to verify . So with the above, I'd suggest a name like "After one stage is aborted for too many failed attempts, subsequent stages still behave correctly on fetch failures"
(c) duplicated code can be cleaned up (at first when I read the code, I was looking for differences between the two calls, so though its only one copy-paste, the intent is a lot clearer if its just once).

(d) I'd think it would be nice to also include a job which succeeds after a fetch failure at the end (3 jobs total). Unfortunately this is a bit of a pain to do in a test right now since you don't have access to stageAttemptId, but you can do it with something like this:

...
rdd1.map {
        case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) =>
          throw new FetchFailedException(
            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
...

with helper

object FailThisAttempt extends Logging {
  val _fail = new AtomicBoolean(true)
}

implicit val executorContext = ExecutionContext
.fromExecutorService(Executors.newFixedThreadPool(5))
val f1 = Future {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use failAfter(60.seconds) { ... } to get rid of ExecutionContext and Future.

try {
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd1.map { x =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may write

rdd1.map { 
    case (x, _) =>              
        if (x == 1) {
                throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
        }
        x
}.count()

instead. Pattern matching on tuples looks more readable than accessing to _1 attribute

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even

rdd1.map { 
    case (x, _) if (x == 1)  =>  throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")             
    case (x, _) => x
}.count()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks

if (x._1 == 1) {
throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
}
x._1
}.count()
} catch {
case e: Throwable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't catch Throwable and it's better to check the exception like this:

    failAfter(60.seconds) {
      val e = intercept[SparkException] {
        ...
      }
      assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
    }

logInfo("expected abort stage: " + e.getMessage)
}
}
Thread.sleep(10000)
val f2 = Future {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment to explain why needs two same jobs here? It took me a while to figure out. E.g.,

The following job that fails due to fetching failure will hang without the fix for SPARK-17644

try {
val rdd2 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
val shuffleHandle =
rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd2.map { x =>
if (x._1 == 1) {
throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
}
x._1
}.count()
} catch {
case e: Throwable =>
println("expected abort stage2: " + e.getMessage)
}
}

val duration = 60.seconds
ThreadUtils.awaitResult(f2, duration)
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down