Skip to content

Commit 2c72a44

Browse files
ahmed-mahransrowen
authored andcommitted
[SPARK-16487][STREAMING] Fix some batches might not get marked as fully processed in JobGenerator
## What changes were proposed in this pull request? In `JobGenerator`, the code reads like that some batches might not get marked as fully processed. In the following flowchart, the batch should get marked fully processed before endpoint C however it is not. Currently, this does not actually cause an issue, as the condition `(time - zeroTime) is multiple of checkpoint duration?` always evaluates to `true` as the `checkpoint duration` is always set to be equal to the `batch duration`. ![Flowchart](https://s31.postimg.org/udy9lti2j/spark_streaming_job_generator.png) This PR fixes this issue so as to improve code readability and to avoid any potential issue in case there is any future change making checkpoint duration to be set different from batch duration. Author: Ahmed Mahran <ahmed.mahran@mashin.io> Closes apache#14145 from ahmed-mahran/b-mark-batch-fully-processed.
1 parent e1bd70f commit 2c72a44

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,12 +287,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
287287
markBatchFullyProcessed(time)
288288
}
289289

290-
/** Perform checkpoint for the give `time`. */
290+
/** Perform checkpoint for the given `time`. */
291291
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
292292
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
293293
logInfo("Checkpointing graph for time " + time)
294294
ssc.graph.updateCheckpointData(time)
295295
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
296+
} else if (clearCheckpointDataLater) {
297+
markBatchFullyProcessed(time)
296298
}
297299
}
298300

0 commit comments

Comments
 (0)