Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,31 @@ class ContinuousSuiteBase extends StreamTest {

protected def waitForRateSourceCommittedValue(
query: ContinuousExecution,
desiredValue: Long,
partitionIdToDesiredValue: Map[Int, Long],
maxWaitTimeMs: Long): Unit = {
def readHighestCommittedValue(c: ContinuousExecution): Option[Long] = {
def readCommittedValues(c: ContinuousExecution): Option[Map[Int, Long]] = {
c.committedOffsets.lastOption.map { case (_, offset) =>
offset match {
case o: RateStreamOffset =>
o.partitionToValueAndRunTimeMs.map {
case (_, ValueRunTimeMsPair(value, _)) => value
}.max
o.partitionToValueAndRunTimeMs.mapValues(_.value).toMap
}
}
}

def reachDesiredValues: Boolean = {
val committedValues = readCommittedValues(query).getOrElse(Map.empty)
partitionIdToDesiredValue.forall { case (key, value) =>
committedValues.contains(key) && committedValues(key) > value
}
}

val maxWait = System.currentTimeMillis() + maxWaitTimeMs
while (System.currentTimeMillis() < maxWait &&
readHighestCommittedValue(query).getOrElse(Long.MinValue) < desiredValue) {
while (System.currentTimeMillis() < maxWait && !reachDesiredValues) {
Thread.sleep(100)
}
if (System.currentTimeMillis() > maxWait) {
logWarning(s"Couldn't reach desired value in $maxWaitTimeMs milliseconds!" +
s"Current highest committed value is ${readHighestCommittedValue(query)}")
s"Current committed values is ${readCommittedValues(query)}")
}
}

Expand Down Expand Up @@ -264,7 +268,7 @@ class ContinuousSuite extends ContinuousSuiteBase {
val expected = Set(0, 1, 2, 3)
val continuousExecution =
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[ContinuousExecution]
waitForRateSourceCommittedValue(continuousExecution, expected.max, 20 * 1000)
waitForRateSourceCommittedValue(continuousExecution, Map(0 -> 2, 1 -> 3), 20 * 1000)
query.stop()

val results = spark.read.table("noharness").collect()
Expand Down