Skip to content
Prev Previous commit
Next Next commit
Use single partition in unit test
  • Loading branch information
advancedxy committed Feb 27, 2018
commit d6ed9a15e24c414251ffaf09839eb5b3c0567d75
11 changes: 5 additions & 6 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -326,20 +326,19 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
// execution and a counter is used to make sure that the corresponding tasks are indeed
// cancelled.
import JobCancellationSuite._
val numSlice = 2
val numSlice = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we hardcode it? using a variable makes people feel like they can change its value and the test can still pass, however it's not true as assert(executionOfInterruptibleCounter.get() <= 10) needs to be updated too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update it later.

But looks like Jenkins are having troubles there days? it it back to normal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, let's just try it :)

sc = new SparkContext(s"local[$numSlice]", "test")

val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
.repartitionAndSortWithinPartitions(new HashPartitioner(2))
.repartitionAndSortWithinPartitions(new HashPartitioner(numSlice))
.mapPartitions { iter =>
taskStartedSemaphore.release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be called twice as the root RDD has 2 partitions, so f.cancel might be called before both of these 2 partitions finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f.cancel() should be called before these partitions(tasks) finishing , and we want to make sure these tasks could be cancelled

iter
}.foreachAsync { x =>
if (x._1 >= 10) {
// This block of code is partially executed. It will be blocked when x._1 >= 10 and the
// next iteration will be cancelled if the source iterator is interruptible. Then in this
// case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
// element in another partition(assuming no ordering guarantee).
// case, the maximum num of increment would be 10(|1...10|)
taskCancelledSemaphore.acquire()
}
executionOfInterruptibleCounter.getAndIncrement()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean without your PR, the task will keep running and hit this line 1000 times after canceling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Expand All @@ -365,12 +364,12 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
taskStartedSemaphore.acquire()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not taskStartedSemaphore.acquire(numSlice)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as one task starts, we can cancel the job.

f.cancel()
Copy link
Contributor

@cloud-fan cloud-fan Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some comment to explain when we reach here. From what I am seeing:

  1. taskStartedSemaphore.release() must be called, so at least one task is started.
  2. the first task has processed no more than 10 records, the second task hasn't processed any data, because the reduce stage is not finished and taskCancelledSemaphore.acquire() will be blocked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do


val e = intercept[SparkException] { f.get() }.getCause
val e = intercept[SparkException](f.get()).getCause
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))

// Make sure tasks are indeed completed.
taskCompletedSem.acquire(numSlice)
assert(executionOfInterruptibleCounter.get() <= 11)
assert(executionOfInterruptibleCounter.get() <= 10)
}

def testCount() {
Expand Down