Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-23503][Structured Streaming] Continuous Execution should seque…
…nce committed epochs
  • Loading branch information
Efim Poberezkin committed Apr 18, 2018
commit 5a3aec9035482cb26fc7e9af450c22f0e8b81dbe
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ private[continuous] class EpochCoordinator(
private val partitionOffsets =
mutable.Map[(Long, Int), PartitionOffset]()

private var lastCommittedEpoch = startEpoch - 1
// Remembers epochs that have to wait for previous epochs to be committed first.
private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is orthogonal to the current PR, but I realized that both this and the commits/offsets maps are unbounded queues. We probably should introduce some SQLConf for the maximum epoch backlog, and report an error when too many stack up. I'll file a JIRA ticket for this.


private def resolveCommitsAtEpoch(epoch: Long) = {
val thisEpochCommits =
partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
Expand All @@ -145,18 +149,42 @@ private[continuous] class EpochCoordinator(

if (thisEpochCommits.size == numWriterPartitions &&
nextEpochOffsets.size == numReaderPartitions) {
logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.")
// Sequencing is important here. We must commit to the writer before recording the commit
// in the query, or we will end up dropping the commit if we restart in the middle.
writer.commit(epoch, thisEpochCommits.toArray)
query.commit(epoch)

// Cleanup state from before this epoch, now that we know all partitions are forever past it.
for (k <- partitionCommits.keys.filter { case (e, _) => e < epoch }) {
partitionCommits.remove(k)
}
for (k <- partitionOffsets.keys.filter { case (e, _) => e < epoch }) {
partitionOffsets.remove(k)

// Check that last committed epoch is the previous one for sequencing of committed epochs.
if (lastCommittedEpoch == epoch - 1) {
logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.")
// Sequencing is important here. We must commit to the writer before recording the commit
// in the query, or we will end up dropping the commit if we restart in the middle.
writer.commit(epoch, thisEpochCommits.toArray)
query.commit(epoch)
lastCommittedEpoch = epoch

// Commit subsequent epochs that are waiting to be committed.
var nextEpoch = lastCommittedEpoch + 1
while (epochsWaitingToBeCommitted.contains(nextEpoch)) {
val nextEpochCommits =
partitionCommits.collect { case ((e, _), msg) if e == nextEpoch => msg }
logDebug(s"Committing epoch $nextEpoch.")
writer.commit(nextEpoch, nextEpochCommits.toArray)
query.commit(nextEpoch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of duplicated logic here - helper methods would probably be nice.


epochsWaitingToBeCommitted.remove(nextEpoch)
lastCommittedEpoch = nextEpoch
nextEpoch += 1
}

// Cleanup state from before last committed epoch,
// now that we know all partitions are forever past it.
for (k <- partitionCommits.keys.filter { case (e, _) => e < lastCommittedEpoch }) {
partitionCommits.remove(k)
}
for (k <- partitionOffsets.keys.filter { case (e, _) => e < lastCommittedEpoch }) {
partitionOffsets.remove(k)
}
} else {
logDebug(s"Epoch $epoch has received commits from all partitions" +
s"and is waiting for epoch ${epoch - 1} to be committed first.")
epochsWaitingToBeCommitted.add(epoch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe swap the order of the if else. I'd forgotten what the condition was for after scrolling down here.

}
}
}
Expand Down