Skip to content
Prev Previous commit
Next Next commit
Add more comments for the invocation of f.cancel()
  • Loading branch information
advancedxy committed Feb 27, 2018
commit 8c15c564c7d2d0adc0cfd725e34dbd359c6a0ab6
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,12 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
})

taskStartedSemaphore.acquire()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not taskStartedSemaphore.acquire(numSlice)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as one task starts, we can cancel the job.

// Job is cancelled when:
// 1. task in reduce stage has been started, guaranteed by previous line.
// 2. task in reduce stage is blocked after processing at most 10 records as
// taskCancelledSemaphore is not released until cancelTasks event is posted
// After job being cancelled, task in reduce stage will be cancelled and no more iteration are
// executed.
f.cancel()
Copy link
Contributor

@cloud-fan cloud-fan Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some comment to explain when we reach here. From what I am seeing:

  1. taskStartedSemaphore.release() must be called, so at least one task is started.
  2. the first task has processed no more than 10 records, the second task hasn't processed any data, because the reduce stage is not finished and taskCancelledSemaphore.acquire() will be blocked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do


val e = intercept[SparkException](f.get()).getCause
Expand Down