Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,13 @@ object SQLConf {

val CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE =
buildConf("spark.sql.streaming.continuous.epochBacklogQueueSize")
.doc("The max number of entries to be stored in queue to wait for late epochs. " +
"If this parameter is exceeded by the size of the queue, stream will stop with an error.")
.intConf
.createWithDefault(10)

val CONTINUOUS_STREAMING_EPOCH_MSG_BACKLOG_QUEUE_SIZE =
buildConf("spark.sql.streaming.continuous.epochMessageBacklogQueueSize")
.doc("The max number of entries to be stored in queue to wait for late epochs. " +
"If this parameter is exceeded by the size of the queue, stream will stop with an error.")
.intConf
Expand Down Expand Up @@ -2116,6 +2123,9 @@ class SQLConf extends Serializable with Logging {
def continuousStreamingEpochBacklogQueueSize: Int =
getConf(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE)

def continuousStreamingEpochMsgBacklogQueueSize: Int =
getConf(CONTINUOUS_STREAMING_EPOCH_MSG_BACKLOG_QUEUE_SIZE)

def continuousStreamingExecutorQueueSize: Int = getConf(CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE)

def continuousStreamingExecutorPollIntervalMs: Long =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
private[continuous] class EpochCoordinator(
writeSupport: StreamingWrite,
stream: ContinuousStream,
query: ContinuousExecution,
queryExecution: ContinuousExecution,
startEpoch: Long,
session: SparkSession,
override val rpcEnv: RpcEnv)
Expand All @@ -126,6 +126,9 @@ private[continuous] class EpochCoordinator(
private val epochBacklogQueueSize =
session.sqlContext.conf.continuousStreamingEpochBacklogQueueSize

private val epochMsgBacklogQueueSize =
session.sqlContext.conf.continuousStreamingEpochMsgBacklogQueueSize

private var queryWritesStopped: Boolean = false

private var numReaderPartitions: Int = _
Expand Down Expand Up @@ -202,7 +205,7 @@ private[continuous] class EpochCoordinator(
// Sequencing is important here. We must commit to the writer before recording the commit
// in the query, or we will end up dropping the commit if we restart in the middle.
writeSupport.commit(epoch, messages.toArray)
query.commit(epoch)
queryExecution.commit(epoch)
}

override def receive: PartialFunction[Any, Unit] = {
Expand All @@ -224,24 +227,24 @@ private[continuous] class EpochCoordinator(
partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
if (thisEpochOffsets.size == numReaderPartitions) {
logDebug(s"Epoch $epoch has offsets reported from all partitions: $thisEpochOffsets")
query.addOffset(epoch, stream, thisEpochOffsets.toSeq)
queryExecution.addOffset(epoch, stream, thisEpochOffsets.toSeq)
resolveCommitsAtEpoch(epoch)
}
checkProcessingQueueBoundaries()
}

private def checkProcessingQueueBoundaries() = {
if (partitionOffsets.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the partition offset queue has " +
"exceeded its maximum"))
if (partitionOffsets.size > epochMsgBacklogQueueSize) {
queryExecution.stopInNewThread(new IllegalStateException("Size of the partition offset " +
"queue has exceeded its maximum"))
}
if (partitionCommits.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the partition commit queue has " +
"exceeded its maximum"))
if (partitionCommits.size > epochMsgBacklogQueueSize) {
queryExecution.stopInNewThread(new IllegalStateException("Size of the partition commit " +
"queue has exceeded its maximum"))
}
if (epochsWaitingToBeCommitted.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the epoch queue has " +
"exceeded its maximum"))
queryExecution.stopInNewThread(new IllegalStateException(s"Epoch ${lastCommittedEpoch + 1} " +
s"is late for more than ${epochsWaitingToBeCommitted.max - lastCommittedEpoch} epochs."))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE
import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_MSG_BACKLOG_QUEUE_SIZE
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.test.TestSparkSession

Expand Down Expand Up @@ -357,7 +357,7 @@ class ContinuousEpochBacklogSuite extends ContinuousSuiteBase {
// This test forces the backlog to overflow by not standing up enough executors for the query
// to make progress.
test("epoch backlog overflow") {
withSQLConf((CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE.key, "10")) {
withSQLConf((CONTINUOUS_STREAMING_EPOCH_MSG_BACKLOG_QUEUE_SIZE.key, "10")) {
val df = spark.readStream
.format("rate")
.option("numPartitions", "2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.LocalSparkSession
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE
import org.apache.spark.sql.internal.SQLConf.{CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, CONTINUOUS_STREAMING_EPOCH_MSG_BACKLOG_QUEUE_SIZE}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
Expand All @@ -44,7 +44,8 @@ class EpochCoordinatorSuite
private var writeSupport: StreamingWrite = _
private var query: ContinuousExecution = _
private var orderVerifier: InOrder = _
private val epochBacklogQueueSize = 10
private val epochBacklogQueueSize = 2
private val epochMsgBacklogQueueSize = 10

override def beforeEach(): Unit = {
val stream = mock[ContinuousStream]
Expand All @@ -55,8 +56,10 @@ class EpochCoordinatorSuite
spark = new TestSparkSession(
new SparkContext(
"local[2]", "test-sql-context",
new SparkConf().set("spark.sql.testkey", "true")
.set(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, epochBacklogQueueSize)))
new SparkConf()
.set("spark.sql.testkey", "true")
.set(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, epochBacklogQueueSize)
.set(CONTINUOUS_STREAMING_EPOCH_MSG_BACKLOG_QUEUE_SIZE, epochMsgBacklogQueueSize)))

epochCoordinator
= EpochCoordinatorRef.create(writeSupport, stream, query, "test", 1, spark, SparkEnv.get)
Expand Down Expand Up @@ -192,37 +195,37 @@ class EpochCoordinatorSuite
verifyCommitsInOrderOf(List(1, 2, 3, 4, 5))
}

test("several epochs, max epoch backlog reached by partitionOffsets") {
test("several epochs, max epoch msg backlog reached by partitionOffsets") {
setWriterPartitions(1)
setReaderPartitions(1)

reportPartitionOffset(0, 1)
// Commit messages not arriving
for (i <- 2 to epochBacklogQueueSize + 1) {
for (i <- 2 to epochMsgBacklogQueueSize + 1) {
reportPartitionOffset(0, i)
}

makeSynchronousCall()

for (i <- 1 to epochBacklogQueueSize + 1) {
for (i <- 1 to epochMsgBacklogQueueSize + 1) {
verifyNoCommitFor(i)
}
verifyStoppedWithException("Size of the partition offset queue has exceeded its maximum")
}

test("several epochs, max epoch backlog reached by partitionCommits") {
test("several epochs, max epoch msg backlog reached by partitionCommits") {
setWriterPartitions(1)
setReaderPartitions(1)

commitPartitionEpoch(0, 1)
// Offset messages not arriving
for (i <- 2 to epochBacklogQueueSize + 1) {
for (i <- 2 to epochMsgBacklogQueueSize + 1) {
commitPartitionEpoch(0, i)
}

makeSynchronousCall()

for (i <- 1 to epochBacklogQueueSize + 1) {
for (i <- 1 to epochMsgBacklogQueueSize + 1) {
verifyNoCommitFor(i)
}
verifyStoppedWithException("Size of the partition commit queue has exceeded its maximum")
Expand All @@ -235,7 +238,7 @@ class EpochCoordinatorSuite
commitPartitionEpoch(0, 1)
reportPartitionOffset(0, 1)

// For partition 2 epoch 1 messages never arriving
// For partition 1 epoch 1 messages never arriving
// +2 because the first epoch not yet arrived
for (i <- 2 to epochBacklogQueueSize + 2) {
commitPartitionEpoch(0, i)
Expand All @@ -249,7 +252,7 @@ class EpochCoordinatorSuite
for (i <- 1 to epochBacklogQueueSize + 2) {
verifyNoCommitFor(i)
}
verifyStoppedWithException("Size of the epoch queue has exceeded its maximum")
verifyStoppedWithException(s"Epoch 1 is late for more than $epochBacklogQueueSize epochs.")
}

private def setWriterPartitions(numPartitions: Int): Unit = {
Expand Down