Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix flaky test
  • Loading branch information
jiangxb1987 committed Apr 6, 2018
commit 965a80f2d02aacff68bae096483c2809e1c9d67e
29 changes: 16 additions & 13 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,14 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
import JobCancellationSuite._
sc = new SparkContext("local[2]", "test interruptible iterator")

val numElements = 10000
val taskCompletedSem = new Semaphore(0)

sc.addSparkListener(new SparkListener {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
// release taskCancelledSemaphore when cancelTasks event has been posted
if (stageCompleted.stageInfo.stageId == 1) {
taskCancelledSemaphore.release(1000)
taskCancelledSemaphore.release(numElements)
}
}

Expand All @@ -349,36 +350,38 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}
})

val f = sc.parallelize(1 to 1000).map { i => (i, i) }
// Explicitly disable interrupt task thread on cancelling tasks, so the task thread can only be
// interrupted by `InterruptibleIterator`.
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")

val f = sc.parallelize(1 to numElements).map { i => (i, i) }
.repartitionAndSortWithinPartitions(new HashPartitioner(1))
.mapPartitions { iter =>
taskStartedSemaphore.release()
iter
}.foreachAsync { x =>
if (x._1 >= 10) {
// This block of code is partially executed. It will be blocked when x._1 >= 10 and the
// next iteration will be cancelled if the source iterator is interruptible. Then in this
// case, the maximum num of increment would be 10(|1...10|)
taskCancelledSemaphore.acquire()
}
// Block this code from being executed, until the job get cancelled. In this case, if the
// source iterator is interruptible, the max number of increment should be under
// `numElements`.
taskCancelledSemaphore.acquire()
executionOfInterruptibleCounter.getAndIncrement()
}

taskStartedSemaphore.acquire()
// Job is cancelled when:
// 1. task in reduce stage has been started, guaranteed by previous line.
// 2. task in reduce stage is blocked after processing at most 10 records as
// taskCancelledSemaphore is not released until cancelTasks event is posted
// After job being cancelled, task in reduce stage will be cancelled and no more iteration are
// executed.
// 2. task in reduce stage is blocked as taskCancelledSemaphore is not released until
// JobCancelled event is posted.
// After job being cancelled, task in reduce stage will be cancelled asynchronously, thus
// partial of the inputs should not get processed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thus partial of the inputs should not get processed. ->
It's very unlikely that Spark can process 10000 elements between JobCancelled is posted and task is really killed.

f.cancel()

val e = intercept[SparkException](f.get()).getCause
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))

// Make sure tasks are indeed completed.
taskCompletedSem.acquire()
assert(executionOfInterruptibleCounter.get() <= 10)
assert(executionOfInterruptibleCounter.get() < numElements)
}

def testCount() {
Expand Down