Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
assert(jobB.get() === 100)
}

ignore("two jobs sharing the same stage") {
test("two jobs sharing the same stage") {
// sem1: make sure cancel is issued after some tasks are launched
// sem2: make sure the first stage is not finished until cancel is issued
// twoJobsSharingStageSemaphore:
// make sure the first stage is not finished until cancel is issued
val sem1 = new Semaphore(0)
val sem2 = new Semaphore(0)

sc = new SparkContext("local[2]", "test")
sc.addSparkListener(new SparkListener {
Expand All @@ -186,7 +186,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter

// Create two actions that would share the some stages.
val rdd = sc.parallelize(1 to 10, 2).map { i =>
sem2.acquire()
JobCancellationSuite.twoJobsSharingStageSemaphore.acquire()
(i, i)
}.reduceByKey(_+_)
val f1 = rdd.collectAsync()
Expand All @@ -196,13 +196,13 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
future {
sem1.acquire()
f1.cancel()
sem2.release(10)
JobCancellationSuite.twoJobsSharingStageSemaphore.release(10)
}

// Expect both to fail now.
// TODO: update this test when we change Spark so cancelling f1 wouldn't affect f2.
// Expect f1 to fail due to cancellation,
intercept[SparkException] { f1.get() }
intercept[SparkException] { f2.get() }
// but f2 should not be affected
f2.get()
}

def testCount() {
Expand Down Expand Up @@ -268,4 +268,5 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
object JobCancellationSuite {
val taskStartedSemaphore = new Semaphore(0)
val taskCancelledSemaphore = new Semaphore(0)
val twoJobsSharingStageSemaphore = new Semaphore(0)
}