Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,15 @@ def recentProgress(self):
@since(2.1)
def lastProgress(self):
"""
Returns the most recent :class:`StreamingQueryProgress` update of this streaming query.
Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or
None if there were no progress updates
:return: a map
"""
return json.loads(self._jsq.lastProgress().json())
lastProgress = self._jsq.lastProgress()
if lastProgress:
return json.loads(lastProgress.json())
else:
return None

@since(2.0)
def processAllAvailable(self):
Expand Down
18 changes: 17 additions & 1 deletion python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,9 +1128,25 @@ def test_stream_status_and_progress(self):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
q = df.writeStream \

def func(x):
time.sleep(1)
return x

from pyspark.sql.functions import col, udf
sleep_udf = udf(func)

# Use "sleep_udf" to delay the progress update so that we can test `lastProgress` when there
# were no updates.
q = df.select(sleep_udf(col("value")).alias('value')).writeStream \
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
# "lastProgress" will return None in most cases. However, as it may be flaky when
# Jenkins is very slow, we don't assert it. If there is something wrong, "lastProgress"
# may throw error with a high chance and make this test flaky, so we should still be
# able to detect broken codes.
q.lastProgress

q.processAllAvailable()
lastProgress = q.lastProgress
recentProgress = q.recentProgress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ trait ProgressReporter extends Logging {
progressBuffer.toArray
}

/** Returns the most recent query progress update. */
/** Returns the most recent query progress update or null if there were no progress updates. */
def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
progressBuffer.last
progressBuffer.lastOption.orNull
}

/** Begins recording statistics about query progress for a given trigger. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.Utils

class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
Expand Down Expand Up @@ -217,7 +218,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {

test("SPARK-18811: Source resolution should not block main thread") {
failAfter(streamingTimeout) {
StreamingQueryManagerSuite.latch = new CountDownLatch(1)
BlockingSource.latch = new CountDownLatch(1)
withTempDir { tempDir =>
// if source resolution was happening on the main thread, it would block the start call,
// now it should only be blocking the stream execution thread
Expand All @@ -231,7 +232,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
eventually(Timeout(streamingTimeout)) {
assert(sq.status.message.contains("Initializing sources"))
}
StreamingQueryManagerSuite.latch.countDown()
BlockingSource.latch.countDown()
sq.stop()
}
}
Expand Down Expand Up @@ -321,7 +322,3 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
(inputData, mapped)
}
}

object StreamingQueryManagerSuite {
var latch: CountDownLatch = null
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.streaming

import scala.collection.JavaConverters._
import java.util.concurrent.CountDownLatch

import org.apache.commons.lang3.RandomStringUtils
import org.scalactic.TolerantNumerics
Expand All @@ -32,6 +32,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.ManualClock


Expand Down Expand Up @@ -312,6 +313,24 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
)
}

test("lastProgress should be null when recentProgress is empty") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a similar test to python just to make sure it returns None? Do we need to update any docs?

BlockingSource.latch = new CountDownLatch(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad that you got to re-use BlockingSource!

withTempDir { tempDir =>
val sq = spark.readStream
.format("org.apache.spark.sql.streaming.util.BlockingSource")
.load()
.writeStream
.format("org.apache.spark.sql.streaming.util.BlockingSource")
.option("checkpointLocation", tempDir.toString)
.start()
// Creating source is blocked so recentProgress is empty and lastProgress should be null
assert(sq.lastProgress === null)
// Release the latch and stop the query
BlockingSource.latch.countDown()
sq.stop()
}
}

test("codahale metrics") {
val inputData = MemoryStream[Int]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.streaming.util

import java.util.concurrent.CountDownLatch

import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source}
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */
Expand All @@ -42,7 +44,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
StreamingQueryManagerSuite.latch.await()
BlockingSource.latch.await()
new Source {
override def schema: StructType = fakeSchema
override def getOffset: Option[Offset] = Some(new LongOffset(0))
Expand All @@ -64,3 +66,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
}
}
}

object BlockingSource {
var latch: CountDownLatch = null
}