Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,30 +137,71 @@ private[continuous] class EpochCoordinator(
private val partitionOffsets =
mutable.Map[(Long, Int), PartitionOffset]()

private var lastCommittedEpoch = startEpoch - 1
// Remembers epochs that have to wait for previous epochs to be committed first.
private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is orthogonal to the current PR, but I realized that both this and the commits/offsets maps are unbounded queues. We probably should introduce some SQLConf for the maximum epoch backlog, and report an error when too many stack up. I'll file a JIRA ticket for this.


private def resolveCommitsAtEpoch(epoch: Long) = {
val thisEpochCommits =
partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
val thisEpochCommits = findPartitionCommitsForEpoch(epoch)
val nextEpochOffsets =
partitionOffsets.collect { case ((e, _), o) if e == epoch => o }

if (thisEpochCommits.size == numWriterPartitions &&
nextEpochOffsets.size == numReaderPartitions) {
logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.")
// Sequencing is important here. We must commit to the writer before recording the commit
// in the query, or we will end up dropping the commit if we restart in the middle.
writer.commit(epoch, thisEpochCommits.toArray)
query.commit(epoch)

// Cleanup state from before this epoch, now that we know all partitions are forever past it.
for (k <- partitionCommits.keys.filter { case (e, _) => e < epoch }) {
partitionCommits.remove(k)
}
for (k <- partitionOffsets.keys.filter { case (e, _) => e < epoch }) {
partitionOffsets.remove(k)

// Check that last committed epoch is the previous one for sequencing of committed epochs.
// If not, add the epoch being currently processed to epochs waiting to be committed,
// otherwise commit it.
if (lastCommittedEpoch != epoch - 1) {
logDebug(s"Epoch $epoch has received commits from all partitions " +
s"and is waiting for epoch ${epoch - 1} to be committed first.")
epochsWaitingToBeCommitted.add(epoch)
} else {
commitEpoch(epoch, thisEpochCommits)
lastCommittedEpoch = epoch

// Commit subsequent epochs that are waiting to be committed.
var nextEpoch = lastCommittedEpoch + 1
while (epochsWaitingToBeCommitted.contains(nextEpoch)) {
val nextEpochCommits = findPartitionCommitsForEpoch(nextEpoch)
commitEpoch(nextEpoch, nextEpochCommits)

epochsWaitingToBeCommitted.remove(nextEpoch)
lastCommittedEpoch = nextEpoch
nextEpoch += 1
}

// Cleanup state from before last committed epoch,
// now that we know all partitions are forever past it.
for (k <- partitionCommits.keys.filter { case (e, _) => e < lastCommittedEpoch }) {
partitionCommits.remove(k)
}
for (k <- partitionOffsets.keys.filter { case (e, _) => e < lastCommittedEpoch }) {
partitionOffsets.remove(k)
}
}
}
}

/**
* Collect per-partition commits for an epoch.
*/
private def findPartitionCommitsForEpoch(epoch: Long): Iterable[WriterCommitMessage] = {
partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
}

/**
* Commit epoch to the offset log.
*/
private def commitEpoch(epoch: Long, messages: Iterable[WriterCommitMessage]): Unit = {
logDebug(s"Epoch $epoch has received commits from all partitions " +
s"and is ready to be committed. Committing epoch $epoch.")
// Sequencing is important here. We must commit to the writer before recording the commit
// in the query, or we will end up dropping the commit if we restart in the middle.
writer.commit(epoch, messages.toArray)
query.commit(epoch)
}

override def receive: PartialFunction[Any, Unit] = {
// If we just drop these messages, we won't do any writes to the query. The lame duck tasks
// won't shed errors or anything.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class EpochCoordinatorSuite
verifyCommitsInOrderOf(List(1, 2))
}

ignore("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)") {
test("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)") {
setWriterPartitions(2)
setReaderPartitions(2)

Expand All @@ -141,7 +141,7 @@ class EpochCoordinatorSuite
verifyCommitsInOrderOf(List(1, 2))
}

ignore("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") {
test("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") {
setWriterPartitions(1)
setReaderPartitions(1)

Expand All @@ -162,7 +162,7 @@ class EpochCoordinatorSuite
verifyCommitsInOrderOf(List(1, 2, 3, 4))
}

ignore("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") {
test("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") {
setWriterPartitions(1)
setReaderPartitions(1)

Expand Down