Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fix 'two jobs sharing the same stage' test and reenable it: truly sha…
…re a Semaphore locally as intended, and update expectation of failure in non-cancelled task
  • Loading branch information
srowen committed Jan 23, 2015
commit 43da66fc6857f2199209e8a301d6b82c4afd3f5c
17 changes: 9 additions & 8 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
assert(jobB.get() === 100)
}

ignore("two jobs sharing the same stage") {
test("two jobs sharing the same stage") {
// sem1: make sure cancel is issued after some tasks are launched
// sem2: make sure the first stage is not finished until cancel is issued
// twoJobsSharingStageSemaphore:
// make sure the first stage is not finished until cancel is issued
val sem1 = new Semaphore(0)
val sem2 = new Semaphore(0)

sc = new SparkContext("local[2]", "test")
sc.addSparkListener(new SparkListener {
Expand All @@ -186,7 +186,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter

// Create two actions that would share the some stages.
val rdd = sc.parallelize(1 to 10, 2).map { i =>
sem2.acquire()
JobCancellationSuite.twoJobsSharingStageSemaphore.acquire()
(i, i)
}.reduceByKey(_+_)
val f1 = rdd.collectAsync()
Expand All @@ -196,13 +196,13 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
future {
sem1.acquire()
f1.cancel()
sem2.release(10)
JobCancellationSuite.twoJobsSharingStageSemaphore.release(10)
}

// Expect both to fail now.
// TODO: update this test when we change Spark so cancelling f1 wouldn't affect f2.
// Expect f1 to fail due to cancellation,
intercept[SparkException] { f1.get() }
intercept[SparkException] { f2.get() }
// but f2 should not be affected
f2.get()
}

def testCount() {
Expand Down Expand Up @@ -268,4 +268,5 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
object JobCancellationSuite {
val taskStartedSemaphore = new Semaphore(0)
val taskCancelledSemaphore = new Semaphore(0)
val twoJobsSharingStageSemaphore = new Semaphore(0)
}