Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
unify two configs
  • Loading branch information
uncleGen committed Apr 23, 2019
commit d2c4296be90b5201b9f3f18f02c37035e22b2d71
Original file line number Diff line number Diff line change
Expand Up @@ -1458,13 +1458,6 @@ object SQLConf {
.intConf
.createWithDefault(100)

val CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE =
buildConf("spark.sql.streaming.continuous.epochBacklogQueueSize")
.doc("The max number of entries to be stored in queue to wait for late epochs. " +
"If this parameter is exceeded by the size of the queue, stream will stop with an error.")
.intConf
.createWithDefault(10000)

val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
buildConf("spark.sql.streaming.continuous.executorQueueSize")
.internal()
Expand Down Expand Up @@ -2120,9 +2113,6 @@ class SQLConf extends Serializable with Logging {

def literalPickMinimumPrecision: Boolean = getConf(LITERAL_PICK_MINIMUM_PRECISION)

def continuousStreamingEpochBacklogQueueSize: Int =
getConf(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE)

def continuousStreamingLateEpochThreshold: Int =
getConf(CONTINUOUS_STREAMING_LATE_EPOCH_THRESHOLD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ private[continuous] class EpochCoordinator(
override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {

private val epochBacklogQueueSize =
session.sqlContext.conf.continuousStreamingEpochBacklogQueueSize

private val lateEpochsThreshold =
session.sqlContext.conf.continuousStreamingLateEpochThreshold

Expand Down Expand Up @@ -218,7 +215,7 @@ private[continuous] class EpochCoordinator(
if (!partitionCommits.isDefinedAt((epoch, partitionId))) {
partitionCommits.put((epoch, partitionId), message)
resolveCommitsAtEpoch(epoch)
checkProcessingQueueBoundaries()
checkLateEpochsBoundaries()
}

case ReportPartitionOffset(partitionId, epoch, offset) =>
Expand All @@ -230,18 +227,10 @@ private[continuous] class EpochCoordinator(
query.addOffset(epoch, stream, thisEpochOffsets.toSeq)
resolveCommitsAtEpoch(epoch)
}
checkProcessingQueueBoundaries()
checkLateEpochsBoundaries()
}

private def checkProcessingQueueBoundaries() = {
if (partitionOffsets.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the partition offset queue has " +
"exceeded its maximum"))
}
if (partitionCommits.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the partition commit queue has " +
"exceeded its maximum"))
}
private def checkLateEpochsBoundaries() = {
if (epochsWaitingToBeCommitted.size > lateEpochsThreshold) {
query.stopInNewThread(new IllegalStateException(s"Epoch ${lastCommittedEpoch + 1} " +
s"is late for more than ${epochsWaitingToBeCommitted.max - lastCommittedEpoch} epochs."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.LocalSparkSession
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.internal.SQLConf.{CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, CONTINUOUS_STREAMING_LATE_EPOCH_THRESHOLD}
import org.apache.spark.sql.internal.SQLConf.CONTINUOUS_STREAMING_LATE_EPOCH_THRESHOLD
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
Expand All @@ -45,7 +45,6 @@ class EpochCoordinatorSuite
private var query: ContinuousExecution = _
private var orderVerifier: InOrder = _
private val lateEpochsThreshold = 2
private val epochBacklogQueueSize = 10

override def beforeEach(): Unit = {
val stream = mock[ContinuousStream]
Expand All @@ -58,7 +57,6 @@ class EpochCoordinatorSuite
"local[2]", "test-sql-context",
new SparkConf()
.set("spark.sql.testkey", "true")
.set(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, epochBacklogQueueSize)
.set(CONTINUOUS_STREAMING_LATE_EPOCH_THRESHOLD, lateEpochsThreshold)))

epochCoordinator
Expand Down Expand Up @@ -195,42 +193,6 @@ class EpochCoordinatorSuite
verifyCommitsInOrderOf(List(1, 2, 3, 4, 5))
}

test("several epochs, max epoch backlog reached by partitionOffsets") {
setWriterPartitions(1)
setReaderPartitions(1)

reportPartitionOffset(0, 1)
// Commit messages not arriving
for (i <- 2 to epochBacklogQueueSize + 1) {
reportPartitionOffset(0, i)
}

makeSynchronousCall()

for (i <- 1 to epochBacklogQueueSize + 1) {
verifyNoCommitFor(i)
}
verifyStoppedWithException("Size of the partition offset queue has exceeded its maximum")
}

test("several epochs, max epoch backlog reached by partitionCommits") {
setWriterPartitions(1)
setReaderPartitions(1)

commitPartitionEpoch(0, 1)
// Offset messages not arriving
for (i <- 2 to epochBacklogQueueSize + 1) {
commitPartitionEpoch(0, i)
}

makeSynchronousCall()

for (i <- 1 to epochBacklogQueueSize + 1) {
verifyNoCommitFor(i)
}
verifyStoppedWithException("Size of the partition commit queue has exceeded its maximum")
}

test("several epochs, max epoch backlog reached by epochsWaitingToBeCommitted") {
setWriterPartitions(2)
setReaderPartitions(2)
Expand Down