Skip to content
Prev Previous commit
Next Next commit
Review fix
  • Loading branch information
gaborgsomogyi committed Feb 27, 2019
commit 41656f1218b795f91033e70683684ce014900c3d
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,15 @@ private[continuous] class EpochCoordinator(
private def checkProcessingQueueBoundaries() = {
if (partitionOffsets.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the partition offset queue has " +
"exceeded it's maximum"))
"exceeded its maximum"))
}
if (partitionCommits.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the partition commit queue has " +
"exceeded it's maximum"))
"exceeded its maximum"))
}
if (epochsWaitingToBeCommitted.size > epochBacklogQueueSize) {
query.stopInNewThread(new IllegalStateException("Size of the epoch queue has " +
"exceeded it's maximum"))
"exceeded its maximum"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ class EpochCoordinatorSuite
new SparkContext(
"local[2]", "test-sql-context",
new SparkConf().set("spark.sql.testkey", "true")
.set(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE.key,
epochBacklogQueueSize.toString)))
.set(CONTINUOUS_STREAMING_EPOCH_BACKLOG_QUEUE_SIZE, epochBacklogQueueSize)))

epochCoordinator
= EpochCoordinatorRef.create(writeSupport, stream, query, "test", 1, spark, SparkEnv.get)
Expand Down