diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7f577f015973d..3d6b25bae2432 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1465,12 +1465,12 @@ object SQLConf { .booleanConf .createWithDefault(true) - val CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE = - buildConf("spark.sql.streaming.continuous.epochBacklogQueueSize") - .doc("The max number of entries to be stored in queue to wait for late epochs. " + - "If this parameter is exceeded by the size of the queue, stream will stop with an error.") + val CONTINUOUS_STREAMING_LATE_EPOCH_THRESHOLD = + buildConf("spark.sql.streaming.continuous.lateEpochThreshold") + .doc("The maximum of late epochs. If this parameter is exceeded, stream will stop with " + + "an error.") .intConf - .createWithDefault(10000) + .createWithDefault(100) val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE = buildConf("spark.sql.streaming.continuous.executorQueueSize") @@ -2146,8 +2146,8 @@ class SQLConf extends Serializable with Logging { def literalPickMinimumPrecision: Boolean = getConf(LITERAL_PICK_MINIMUM_PRECISION) - def continuousStreamingEpochBacklogQueueSize: Int = - getConf(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE) + def continuousStreamingLateEpochThreshold: Int = + getConf(CONTINUOUS_STREAMING_LATE_EPOCH_THRESHOLD) def continuousStreamingExecutorQueueSize: Int = getConf(CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index decf524f7167c..df00c4e8568e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -123,8 +123,8 @@ private[continuous] class EpochCoordinator( override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { - private val epochBacklogQueueSize = - session.sqlContext.conf.continuousStreamingEpochBacklogQueueSize + private val lateEpochsThreshold = + session.sqlContext.conf.continuousStreamingLateEpochThreshold private var queryWritesStopped: Boolean = false @@ -215,7 +215,7 @@ private[continuous] class EpochCoordinator( if (!partitionCommits.isDefinedAt((epoch, partitionId))) { partitionCommits.put((epoch, partitionId), message) resolveCommitsAtEpoch(epoch) - checkProcessingQueueBoundaries() + checkLateEpochsBoundaries() } case ReportPartitionOffset(partitionId, epoch, offset) => @@ -227,21 +227,13 @@ private[continuous] class EpochCoordinator( query.addOffset(epoch, stream, thisEpochOffsets.toSeq) resolveCommitsAtEpoch(epoch) } - checkProcessingQueueBoundaries() + checkLateEpochsBoundaries() } - private def checkProcessingQueueBoundaries() = { - if (partitionOffsets.size > epochBacklogQueueSize) { - query.stopInNewThread(new IllegalStateException("Size of the partition offset queue has " + - "exceeded its maximum")) - } - if (partitionCommits.size > epochBacklogQueueSize) { - query.stopInNewThread(new IllegalStateException("Size of the partition commit queue has " + - "exceeded its maximum")) - } - if (epochsWaitingToBeCommitted.size > epochBacklogQueueSize) { - query.stopInNewThread(new IllegalStateException("Size of the epoch queue has " + - "exceeded its maximum")) + private def checkLateEpochsBoundaries() = { + if (epochsWaitingToBeCommitted.size > lateEpochsThreshold) { + query.stopInNewThread(new IllegalStateException(s"Epoch ${lastCommittedEpoch + 1} " + + s"is late for more than ${epochsWaitingToBeCommitted.max - lastCommittedEpoch} epochs.")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 9840c7f066780..8dccbe0658843 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.functions._ -import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.test.TestSparkSession @@ -343,33 +342,3 @@ class ContinuousMetaSuite extends ContinuousSuiteBase { } } } - -class ContinuousEpochBacklogSuite extends ContinuousSuiteBase { - import testImplicits._ - - override protected def createSparkSession = new TestSparkSession( - new SparkContext( - "local[1]", - "continuous-stream-test-sql-context", - sparkConf.set("spark.sql.testkey", "true"))) - - // This test forces the backlog to overflow by not standing up enough executors for the query - // to make progress. - test("epoch backlog overflow") { - withSQLConf((CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE.key, "10")) { - val df = spark.readStream - .format("rate") - .option("numPartitions", "2") - .option("rowsPerSecond", "500") - .load() - .select('value) - - testStream(df)( - StartStream(Trigger.Continuous(1)), - ExpectFailure[IllegalStateException] { e => - e.getMessage.contains("queue has exceeded its maximum") - } - ) - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala index e3498db4194e8..7b1ed7dc60c15 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.sql.LocalSparkSession import org.apache.spark.sql.execution.streaming.continuous._ -import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE +import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_LATE_EPOCH_THRESHOLD import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite @@ -44,7 +44,7 @@ class EpochCoordinatorSuite private var writeSupport: StreamingWrite = _ private var query: ContinuousExecution = _ private var orderVerifier: InOrder = _ - private val epochBacklogQueueSize = 10 + private val lateEpochsThreshold = 2 override def beforeEach(): Unit = { val stream = mock[ContinuousStream] @@ -55,8 +55,9 @@ class EpochCoordinatorSuite spark = new TestSparkSession( new SparkContext( "local[2]", "test-sql-context", - new SparkConf().set("spark.sql.testkey", "true") - .set(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, epochBacklogQueueSize))) + new SparkConf() + .set("spark.sql.testkey", "true") + .set(CONTINUOUS_STREAMING_LATE_EPOCH_THRESHOLD, lateEpochsThreshold))) epochCoordinator = EpochCoordinatorRef.create(writeSupport, stream, query, "test", 1, spark, SparkEnv.get) @@ -192,42 +193,6 @@ class EpochCoordinatorSuite verifyCommitsInOrderOf(List(1, 2, 3, 4, 5)) } - test("several epochs, max epoch backlog reached by partitionOffsets") { - setWriterPartitions(1) - setReaderPartitions(1) - - reportPartitionOffset(0, 1) - // Commit messages not arriving - for (i <- 2 to epochBacklogQueueSize + 1) { - reportPartitionOffset(0, i) - } - - makeSynchronousCall() - - for (i <- 1 to epochBacklogQueueSize + 1) { - verifyNoCommitFor(i) - } - verifyStoppedWithException("Size of the partition offset queue has exceeded its maximum") - } - - test("several epochs, max epoch backlog reached by partitionCommits") { - setWriterPartitions(1) - setReaderPartitions(1) - - commitPartitionEpoch(0, 1) - // Offset messages not arriving - for (i <- 2 to epochBacklogQueueSize + 1) { - commitPartitionEpoch(0, i) - } - - makeSynchronousCall() - - for (i <- 1 to epochBacklogQueueSize + 1) { - verifyNoCommitFor(i) - } - verifyStoppedWithException("Size of the partition commit queue has exceeded its maximum") - } - test("several epochs, max epoch backlog reached by epochsWaitingToBeCommitted") { setWriterPartitions(2) setReaderPartitions(2) @@ -235,9 +200,9 @@ class EpochCoordinatorSuite commitPartitionEpoch(0, 1) reportPartitionOffset(0, 1) - // For partition 2 epoch 1 messages never arriving + // For partition 1 epoch 1 messages never arriving // +2 because the first epoch not yet arrived - for (i <- 2 to epochBacklogQueueSize + 2) { + for (i <- 2 to lateEpochsThreshold + 2) { commitPartitionEpoch(0, i) reportPartitionOffset(0, i) commitPartitionEpoch(1, i) @@ -246,10 +211,10 @@ class EpochCoordinatorSuite makeSynchronousCall() - for (i <- 1 to epochBacklogQueueSize + 2) { + for (i <- 1 to lateEpochsThreshold + 2) { verifyNoCommitFor(i) } - verifyStoppedWithException("Size of the epoch queue has exceeded its maximum") + verifyStoppedWithException(s"Epoch 1 is late for more than $lateEpochsThreshold epochs.") } private def setWriterPartitions(numPartitions: Int): Unit = {