Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ class MicroBatchExecution(
currentBatchHasNewData = isNewDataAvailable

currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)

// remember the committed offsets to report progress correctly
prevCommittedOffsets ++= committedOffsets

if (isCurrentBatchConstructed) {
if (currentBatchHasNewData) updateStatusMessage("Processing new data")
else updateStatusMessage("No new data but cleaning up state")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ trait ProgressReporter extends Logging {
protected def lastExecution: QueryExecution
protected def newData: Map[BaseStreamingSource, LogicalPlan]
protected def availableOffsets: StreamProgress
protected def prevCommittedOffsets: StreamProgress
protected def committedOffsets: StreamProgress
protected def sources: Seq[BaseStreamingSource]
protected def sink: BaseStreamingSink
Expand Down Expand Up @@ -147,7 +148,7 @@ trait ProgressReporter extends Logging {
val numRecords = executionStats.inputRows.getOrElse(source, 0L)
new SourceProgress(
description = source.toString,
startOffset = committedOffsets.get(source).map(_.json).orNull,
startOffset = prevCommittedOffsets.get(source).map(_.json).orNull,
endOffset = availableOffsets.get(source).map(_.json).orNull,
numInputRows = numRecords,
inputRowsPerSecond = numRecords / inputTimeSec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ abstract class StreamExecution(

def logicalPlan: LogicalPlan

/**
* Tracks the previous committed offsets so that the progress reporter can report the
* start offsets correctly after the current batch of data is proccessed and committed.
*/
@volatile
var prevCommittedOffsets = new StreamProgress

/**
* Tracks how much data we have processed and committed to the sink or state store from each
* input source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ class ContinuousExecution(
commitLog.add(epoch)
val offset =
continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
prevCommittedOffsets ++= committedOffsets
committedOffsets ++= Seq(continuousSources(0) -> offset)
continuousSources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset])
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi

assert(progress.sources.length === 1)
assert(progress.sources(0).description contains "MemoryStream")
assert(progress.sources(0).startOffset === "0")
assert(progress.sources(0).startOffset === null)
assert(progress.sources(0).endOffset !== null)
assert(progress.sources(0).processedRowsPerSecond === 4.0) // 2 rows processed in 500 ms

Expand All @@ -362,6 +362,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
assert(query.lastProgress.batchId === 1)
assert(query.lastProgress.inputRowsPerSecond === 2.0)
assert(query.lastProgress.sources(0).inputRowsPerSecond === 2.0)
assert(query.lastProgress.sources(0).startOffset.toInt <
query.lastProgress.sources(0).endOffset.toInt)
true
},

Expand Down