Skip to content
Closed
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ object MimaExcludes {
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),

// [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"),

// [SPARK-18657] Add StreamingQuery.runId
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.runId")
)
}

Expand Down
19 changes: 17 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,29 @@ def __init__(self, jsq):
@property
@since(2.0)
def id(self):
"""The id of the streaming query.
"""Returns the unique id of this query that persists across restarts from checkpoint data.
That is, this id is generated when a query is started for the first time, and
will be the same every time it is restarted from checkpoint data.
There can only be one query with the same id active in a Spark cluster.
Also see, `runId`.
"""
return self._jsq.id().toString()

@property
@since(2.1)
def runId(self):
"""Returns the unique id of this query that does not persist across restarts. That is, every
query that is started (or restarted from checkpoint) will have a different runId.
"""
return self._jsq.runId().toString()

@property
@since(2.0)
def name(self):
"""The name of the streaming query. This name is unique across all active queries.
"""Returns the user-specified name of the query, or null if not specified.
This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
as `dataframe.writeStream.queryName("query").start()`.
This name, if set, must be unique across all active queries.
"""
return self._jsq.name()

Expand Down
24 changes: 24 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,30 @@ def test_stream_save_options_overwrite(self):
q.stop()
shutil.rmtree(tmpPath)

def test_id_runid_name(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for q in self.spark._wrapped.streams.active:
q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
try:
q = df.writeStream \
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
self.assertTrue(any(x.id == q.id for x in self.spark.streams.active))
self.assertTrue(any(x.runId == q.runId for x in self.spark.streams.active))
self.assertTrue(any(x.name == 'this_query' for x in self.spark.streams.active))
q.stop()
q2 = df.writeStream \
.start(path=out, format='parquet', checkpointLocation=chk)
self.assertTrue(q2.name is None)
q2.stop()
finally:
for q in self.spark.streams.active:
q.stop()
shutil.rmtree(tmpPath)

def test_stream_status_and_progress(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for q in self.spark._wrapped.streams.active:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@

package org.apache.spark.sql.execution.streaming

import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization


/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
* vector clock that must progress linearly forward.
*
* @param offsets Sequence of Offsets
* @param metadata Optional, metadata infomation as a Json string, generated from
* [[OffsetSeqMetadata]]
*/
case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[String] = None) {
case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {

/**
* Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
Expand Down Expand Up @@ -54,6 +61,26 @@ object OffsetSeq {
* `nulls` in the sequence are converted to `None`s.
*/
def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = {
OffsetSeq(offsets.map(Option(_)), metadata)
OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply))
}
}


/**
* Contains metadata associated with a [[OffsetSeq]]. This information is
* persisted to the offset log in the checkpoint location via the [[OffsetSeq]] metadata field.
*
* @param batchWatermarkMs: The current eventTime watermark, used to
* bound the lateness of data that will processed. Time unit: milliseconds
* @param batchTimestampMs: The current batch processing timestamp.
* Time unit: milliseconds
*/
case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this in its own file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. But its a small class and closely tied with OffsetSeq, so I thought its not worth having a separate file for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not worth moving these 6 lines of code in a new file.

def json: String = Serialization.write(this)(OffsetSeqMetadata.format)
}

object OffsetSeqMetadata {
private implicit val format = Serialization.formats(NoTypeHints)
def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
}

Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)

// write metadata
out.write('\n')
out.write(offsetSeq.metadata.getOrElse("").getBytes(UTF_8))
out.write(offsetSeq.metadata.map(_.json).getOrElse("").getBytes(UTF_8))

// write offsets, one per line
offsetSeq.offsets.map(_.map(_.json)).foreach { offset =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ trait ProgressReporter extends Logging {

// Internal state of the stream, required for computing metrics.
protected def id: UUID
protected def runId: UUID
protected def name: String
protected def triggerClock: Clock
protected def logicalPlan: LogicalPlan
Expand All @@ -52,7 +53,7 @@ trait ProgressReporter extends Logging {
protected def committedOffsets: StreamProgress
protected def sources: Seq[Source]
protected def sink: Sink
protected def streamExecutionMetadata: StreamExecutionMetadata
protected def offsetSeqMetadata: OffsetSeqMetadata
protected def currentBatchId: Long
protected def sparkSession: SparkSession

Expand Down Expand Up @@ -134,11 +135,12 @@ trait ProgressReporter extends Logging {

val newProgress = new StreamingQueryProgress(
id = id,
runId = runId,
name = name,
timestamp = currentTriggerStartTimestamp,
batchId = currentBatchId,
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
currentWatermark = streamExecutionMetadata.batchWatermarkMs,
currentWatermark = offsetSeqMetadata.batchWatermarkMs,
stateOperators = executionStats.stateOperators.toArray,
sources = sourceProgress.toArray,
sink = sinkProgress)
Expand Down
Loading