-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23503][SS] Enforce sequencing of committed epochs for Continuous Execution #20936
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
5a3aec9
88035d0
0a3409a
478291b
b1b9985
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -137,30 +137,65 @@ private[continuous] class EpochCoordinator( | |
| private val partitionOffsets = | ||
| mutable.Map[(Long, Int), PartitionOffset]() | ||
|
|
||
| private var lastCommittedEpoch = startEpoch - 1 | ||
| // Remembers epochs that have to wait for previous epochs to be committed first. | ||
| private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long] | ||
|
|
||
| private def resolveCommitsAtEpoch(epoch: Long) = { | ||
| val thisEpochCommits = | ||
| partitionCommits.collect { case ((e, _), msg) if e == epoch => msg } | ||
| val thisEpochCommits = findCommitsForEpoch(epoch) | ||
| val nextEpochOffsets = | ||
| partitionOffsets.collect { case ((e, _), o) if e == epoch => o } | ||
|
|
||
| if (thisEpochCommits.size == numWriterPartitions && | ||
| nextEpochOffsets.size == numReaderPartitions) { | ||
| logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.") | ||
| // Sequencing is important here. We must commit to the writer before recording the commit | ||
| // in the query, or we will end up dropping the commit if we restart in the middle. | ||
| writer.commit(epoch, thisEpochCommits.toArray) | ||
| query.commit(epoch) | ||
|
|
||
| // Cleanup state from before this epoch, now that we know all partitions are forever past it. | ||
| for (k <- partitionCommits.keys.filter { case (e, _) => e < epoch }) { | ||
| partitionCommits.remove(k) | ||
| } | ||
| for (k <- partitionOffsets.keys.filter { case (e, _) => e < epoch }) { | ||
| partitionOffsets.remove(k) | ||
|
|
||
| // Check that last committed epoch is the previous one for sequencing of committed epochs. | ||
| // If not, add the epoch being currently processed to epochs waiting to be committed, | ||
| // otherwise commit it. | ||
| if (lastCommittedEpoch != epoch - 1) { | ||
| logDebug(s"Epoch $epoch has received commits from all partitions " + | ||
| s"and is waiting for epoch ${epoch - 1} to be committed first.") | ||
| epochsWaitingToBeCommitted.add(epoch) | ||
| } else { | ||
| commitEpoch(epoch, thisEpochCommits) | ||
| lastCommittedEpoch = epoch | ||
|
|
||
| // Commit subsequent epochs that are waiting to be committed. | ||
| var nextEpoch = lastCommittedEpoch + 1 | ||
| while (epochsWaitingToBeCommitted.contains(nextEpoch)) { | ||
| val nextEpochCommits = findCommitsForEpoch(nextEpoch) | ||
| commitEpoch(nextEpoch, nextEpochCommits) | ||
|
|
||
| epochsWaitingToBeCommitted.remove(nextEpoch) | ||
| lastCommittedEpoch = nextEpoch | ||
| nextEpoch += 1 | ||
| } | ||
|
|
||
| // Cleanup state from before last committed epoch, | ||
| // now that we know all partitions are forever past it. | ||
| for (k <- partitionCommits.keys.filter { case (e, _) => e < lastCommittedEpoch }) { | ||
| partitionCommits.remove(k) | ||
| } | ||
| for (k <- partitionOffsets.keys.filter { case (e, _) => e < lastCommittedEpoch }) { | ||
| partitionOffsets.remove(k) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def findCommitsForEpoch(epoch: Long): Iterable[WriterCommitMessage] = { | ||
|
||
| partitionCommits.collect { case ((e, _), msg) if e == epoch => msg } | ||
| } | ||
|
|
||
| private def commitEpoch(epoch: Long, messages: Iterable[WriterCommitMessage]): Unit = { | ||
| logDebug(s"Epoch $epoch has received commits from all partitions " + | ||
| s"and is ready to be committed. Committing epoch $epoch.") | ||
| // Sequencing is important here. We must commit to the writer before recording the commit | ||
| // in the query, or we will end up dropping the commit if we restart in the middle. | ||
| writer.commit(epoch, messages.toArray) | ||
| query.commit(epoch) | ||
| } | ||
|
|
||
| override def receive: PartialFunction[Any, Unit] = { | ||
| // If we just drop these messages, we won't do any writes to the query. The lame duck tasks | ||
| // won't shed errors or anything. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is orthogonal to the current PR, but I realized that both this and the commits/offsets maps are unbounded queues. We probably should introduce some SQLConf for the maximum epoch backlog, and report an error when too many stack up. I'll file a JIRA ticket for this.