Skip to content
Prev Previous commit
Next Next commit
merge master, lint
  • Loading branch information
WweiL committed Jun 10, 2024
commit 917adb6838eb173f99c206dc67334de15f0c21ac
4 changes: 3 additions & 1 deletion python/pyspark/sql/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ def fromJson(cls, j: Dict[str, Any]) -> "StreamingQueryProgress":
sink=SinkProgress.fromJson(j["sink"]),
numInputRows=j["numInputRows"] if "numInputRows" in j else None,
inputRowsPerSecond=j["inputRowsPerSecond"] if "inputRowsPerSecond" in j else None,
processedRowsPerSecond=j["processedRowsPerSecond"] if "processedRowsPerSecond" in j else None,
processedRowsPerSecond=j["processedRowsPerSecond"]
if "processedRowsPerSecond" in j
else None,
observedMetrics={
k: Row(*row_dict.keys())(*row_dict.values()) # Assume no nested rows
for k, row_dict in j["observedMetrics"].items()
Expand Down
20 changes: 20 additions & 0 deletions python/pyspark/sql/tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ def test_streaming_progress(self):
finally:
query.stop()

def test_streaming_query_name_edge_case(self):
# Query name should be None when not specified
q1 = self.spark.readStream.format("rate").load().writeStream.format("noop").start()
self.assertEqual(q1.name, None)

# Cannot set query name to be an empty string
error_thrown = False
try:
(
self.spark.readStream.format("rate")
.load()
.writeStream.format("noop")
.queryName("")
.start()
)
except PySparkValueError:
error_thrown = True

self.assertTrue(error_thrown)

def test_stream_trigger(self):
df = self.spark.readStream.format("text").load("python/test_support/sql/streaming")

Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.