From 5a3aec9035482cb26fc7e9af450c22f0e8b81dbe Mon Sep 17 00:00:00 2001 From: Efim Poberezkin Date: Thu, 29 Mar 2018 18:24:06 +0400 Subject: [PATCH 1/5] [SPARK-23503][Structured Streaming] Continuous Execution should sequence committed epochs --- .../continuous/EpochCoordinator.scala | 52 ++++++++++++++----- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index cc6808065c0c..db618aee82d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -137,6 +137,10 @@ private[continuous] class EpochCoordinator( private val partitionOffsets = mutable.Map[(Long, Int), PartitionOffset]() + private var lastCommittedEpoch = startEpoch - 1 + // Remembers epochs that have to wait for previous epochs to be committed first. + private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long] + private def resolveCommitsAtEpoch(epoch: Long) = { val thisEpochCommits = partitionCommits.collect { case ((e, _), msg) if e == epoch => msg } @@ -145,18 +149,42 @@ private[continuous] class EpochCoordinator( if (thisEpochCommits.size == numWriterPartitions && nextEpochOffsets.size == numReaderPartitions) { - logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.") - // Sequencing is important here. We must commit to the writer before recording the commit - // in the query, or we will end up dropping the commit if we restart in the middle. - writer.commit(epoch, thisEpochCommits.toArray) - query.commit(epoch) - - // Cleanup state from before this epoch, now that we know all partitions are forever past it. - for (k <- partitionCommits.keys.filter { case (e, _) => e < epoch }) { - partitionCommits.remove(k) - } - for (k <- partitionOffsets.keys.filter { case (e, _) => e < epoch }) { - partitionOffsets.remove(k) + + // Check that last committed epoch is the previous one for sequencing of committed epochs. + if (lastCommittedEpoch == epoch - 1) { + logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.") + // Sequencing is important here. We must commit to the writer before recording the commit + // in the query, or we will end up dropping the commit if we restart in the middle. + writer.commit(epoch, thisEpochCommits.toArray) + query.commit(epoch) + lastCommittedEpoch = epoch + + // Commit subsequent epochs that are waiting to be committed. + var nextEpoch = lastCommittedEpoch + 1 + while (epochsWaitingToBeCommitted.contains(nextEpoch)) { + val nextEpochCommits = + partitionCommits.collect { case ((e, _), msg) if e == nextEpoch => msg } + logDebug(s"Committing epoch $nextEpoch.") + writer.commit(nextEpoch, nextEpochCommits.toArray) + query.commit(nextEpoch) + + epochsWaitingToBeCommitted.remove(nextEpoch) + lastCommittedEpoch = nextEpoch + nextEpoch += 1 + } + + // Cleanup state from before last committed epoch, + // now that we know all partitions are forever past it. + for (k <- partitionCommits.keys.filter { case (e, _) => e < lastCommittedEpoch }) { + partitionCommits.remove(k) + } + for (k <- partitionOffsets.keys.filter { case (e, _) => e < lastCommittedEpoch }) { + partitionOffsets.remove(k) + } + } else { + logDebug(s"Epoch $epoch has received commits from all partitions" + + s"and is waiting for epoch ${epoch - 1} to be committed first.") + epochsWaitingToBeCommitted.add(epoch) } } } From 88035d0530a0cbd878cd6b1a096985e03b88abf4 Mon Sep 17 00:00:00 2001 From: Efim Poberezkin Date: Fri, 30 Mar 2018 12:43:42 +0400 Subject: [PATCH 2/5] Moved duplicated code to methods and swapped the order of if else --- .../continuous/EpochCoordinator.scala | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index db618aee82d9..8f950b2c7e52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -142,8 +142,7 @@ private[continuous] class EpochCoordinator( private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long] private def resolveCommitsAtEpoch(epoch: Long) = { - val thisEpochCommits = - partitionCommits.collect { case ((e, _), msg) if e == epoch => msg } + val thisEpochCommits = findCommitsForEpoch(epoch) val nextEpochOffsets = partitionOffsets.collect { case ((e, _), o) if e == epoch => o } @@ -151,22 +150,21 @@ private[continuous] class EpochCoordinator( nextEpochOffsets.size == numReaderPartitions) { // Check that last committed epoch is the previous one for sequencing of committed epochs. - if (lastCommittedEpoch == epoch - 1) { - logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.") - // Sequencing is important here. We must commit to the writer before recording the commit - // in the query, or we will end up dropping the commit if we restart in the middle. - writer.commit(epoch, thisEpochCommits.toArray) - query.commit(epoch) + // If not, add the epoch being currently processed to epochs waiting to be committed, + // otherwise commit it. + if (lastCommittedEpoch != epoch - 1) { + logDebug(s"Epoch $epoch has received commits from all partitions" + + s"and is waiting for epoch ${epoch - 1} to be committed first.") + epochsWaitingToBeCommitted.add(epoch) + } else { + commitEpoch(epoch, thisEpochCommits) lastCommittedEpoch = epoch // Commit subsequent epochs that are waiting to be committed. var nextEpoch = lastCommittedEpoch + 1 while (epochsWaitingToBeCommitted.contains(nextEpoch)) { - val nextEpochCommits = - partitionCommits.collect { case ((e, _), msg) if e == nextEpoch => msg } - logDebug(s"Committing epoch $nextEpoch.") - writer.commit(nextEpoch, nextEpochCommits.toArray) - query.commit(nextEpoch) + val nextEpochCommits = findCommitsForEpoch(nextEpoch) + commitEpoch(nextEpoch, nextEpochCommits) epochsWaitingToBeCommitted.remove(nextEpoch) lastCommittedEpoch = nextEpoch @@ -181,14 +179,23 @@ private[continuous] class EpochCoordinator( for (k <- partitionOffsets.keys.filter { case (e, _) => e < lastCommittedEpoch }) { partitionOffsets.remove(k) } - } else { - logDebug(s"Epoch $epoch has received commits from all partitions" + - s"and is waiting for epoch ${epoch - 1} to be committed first.") - epochsWaitingToBeCommitted.add(epoch) } } } + private def findCommitsForEpoch(epoch: Long): Iterable[WriterCommitMessage] = { + partitionCommits.collect { case ((e, _), msg) if e == epoch => msg } + } + + private def commitEpoch(epoch: Long, messages: Iterable[WriterCommitMessage]): Unit = { + logDebug(s"Epoch $epoch has received commits from all partitions" + + s"and is ready to be committed. Committing epoch $epoch.") + // Sequencing is important here. We must commit to the writer before recording the commit + // in the query, or we will end up dropping the commit if we restart in the middle. + writer.commit(epoch, messages.toArray) + query.commit(epoch) + } + override def receive: PartialFunction[Any, Unit] = { // If we just drop these messages, we won't do any writes to the query. The lame duck tasks // won't shed errors or anything. From 0a3409a4fe41c484dbe1456fe6ee1db057fb2151 Mon Sep 17 00:00:00 2001 From: Efim Poberezkin Date: Fri, 30 Mar 2018 14:42:26 +0400 Subject: [PATCH 3/5] Added missing spaces to debug messages --- .../sql/execution/streaming/continuous/EpochCoordinator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index 8f950b2c7e52..238e1a1a7762 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -153,7 +153,7 @@ private[continuous] class EpochCoordinator( // If not, add the epoch being currently processed to epochs waiting to be committed, // otherwise commit it. if (lastCommittedEpoch != epoch - 1) { - logDebug(s"Epoch $epoch has received commits from all partitions" + + logDebug(s"Epoch $epoch has received commits from all partitions " + s"and is waiting for epoch ${epoch - 1} to be committed first.") epochsWaitingToBeCommitted.add(epoch) } else { @@ -188,7 +188,7 @@ private[continuous] class EpochCoordinator( } private def commitEpoch(epoch: Long, messages: Iterable[WriterCommitMessage]): Unit = { - logDebug(s"Epoch $epoch has received commits from all partitions" + + logDebug(s"Epoch $epoch has received commits from all partitions " + s"and is ready to be committed. Committing epoch $epoch.") // Sequencing is important here. We must commit to the writer before recording the commit // in the query, or we will end up dropping the commit if we restart in the middle. From 478291b26ecc764b4444d2e078edcf0c2d0e313c Mon Sep 17 00:00:00 2001 From: Efim Poberezkin Date: Wed, 18 Apr 2018 13:55:34 +0400 Subject: [PATCH 4/5] Unignored tests for EpochCoordinator sequencing of late epochs --- .../sql/streaming/continuous/EpochCoordinatorSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala index 99e30561f81d..82836dced9df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala @@ -120,7 +120,7 @@ class EpochCoordinatorSuite verifyCommitsInOrderOf(List(1, 2)) } - ignore("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)") { + test("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)") { setWriterPartitions(2) setReaderPartitions(2) @@ -141,7 +141,7 @@ class EpochCoordinatorSuite verifyCommitsInOrderOf(List(1, 2)) } - ignore("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") { + test("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") { setWriterPartitions(1) setReaderPartitions(1) @@ -162,7 +162,7 @@ class EpochCoordinatorSuite verifyCommitsInOrderOf(List(1, 2, 3, 4)) } - ignore("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") { + test("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") { setWriterPartitions(1) setReaderPartitions(1) From b1b9985a5b745625bd7606331fde9b05a9af9442 Mon Sep 17 00:00:00 2001 From: Efim Poberezkin Date: Thu, 26 Apr 2018 12:06:32 +0400 Subject: [PATCH 5/5] Improved readability --- .../streaming/continuous/EpochCoordinator.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index 238e1a1a7762..8877ebeb2673 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -142,7 +142,7 @@ private[continuous] class EpochCoordinator( private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long] private def resolveCommitsAtEpoch(epoch: Long) = { - val thisEpochCommits = findCommitsForEpoch(epoch) + val thisEpochCommits = findPartitionCommitsForEpoch(epoch) val nextEpochOffsets = partitionOffsets.collect { case ((e, _), o) if e == epoch => o } @@ -163,7 +163,7 @@ private[continuous] class EpochCoordinator( // Commit subsequent epochs that are waiting to be committed. var nextEpoch = lastCommittedEpoch + 1 while (epochsWaitingToBeCommitted.contains(nextEpoch)) { - val nextEpochCommits = findCommitsForEpoch(nextEpoch) + val nextEpochCommits = findPartitionCommitsForEpoch(nextEpoch) commitEpoch(nextEpoch, nextEpochCommits) epochsWaitingToBeCommitted.remove(nextEpoch) @@ -183,10 +183,16 @@ private[continuous] class EpochCoordinator( } } - private def findCommitsForEpoch(epoch: Long): Iterable[WriterCommitMessage] = { + /** + * Collect per-partition commits for an epoch. + */ + private def findPartitionCommitsForEpoch(epoch: Long): Iterable[WriterCommitMessage] = { partitionCommits.collect { case ((e, _), msg) if e == epoch => msg } } + /** + * Commit epoch to the offset log. + */ private def commitEpoch(epoch: Long, messages: Iterable[WriterCommitMessage]): Unit = { logDebug(s"Epoch $epoch has received commits from all partitions " + s"and is ready to be committed. Committing epoch $epoch.")