Skip to content
Prev Previous commit
Next Next commit
ready for CI, pending query.name fix
  • Loading branch information
WweiL committed Jun 8, 2024
commit ef7a116828d1781dde35d422550ff369c548a6aa
8 changes: 7 additions & 1 deletion python/pyspark/sql/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,13 @@ def fromJson(cls, j: Dict[str, Any]) -> "StreamingQueryProgress":
)

def __getitem__(self, key):
return getattr(self, key)
# Before Spark 4.0, StreamingQuery.lastProgress returns a dict, which casts id and runId
# to string. To prevent breaking change, also cast them to string when accessed with
# __getitem__.
if key == "id" or key == "runId":
Copy link
Contributor Author

@WweiL WweiL Jun 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is really needed. But if we delete this if, now "query.lastProgress["id"]" would return type uuid, before it was string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there would be lots of breaking changes (e.g. now the sources method also return the actual SourceProgress

def sources(self) -> List["SourceProgress"]:

let me also make these subclass of dict...

return str(getattr(self, key))
else:
return getattr(self, key)

def __setitem__(self, key, value):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the fear of users ever set the value of the returned dict before this change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm .. but the end users can't access to this value if I am reading this correctly?

Copy link
Contributor Author

@WweiL WweiL Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fear is backward compatibility. This is possible in current master:

>>> q = spark.readStream.format("rate").load().writeStream.format("noop").start()
24/06/10 16:10:35 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-709975db-23ed-4838-b9ae-93a7ffe59183. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/06/10 16:10:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
>>> p = q.lastProgress
>>> p
{'id': '44510846-29f8-4218-95cf-616efecadb05', 'runId': 'afcac0a7-424b-428b-948e-2c0fc21a43a2', 'name': None, 'timestamp': '2024-06-10T23:10:38.257Z', 'batchId': 2, 'batchDuration': 215, 'numInputRows': 1, 'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond': 4.651162790697675, 'durationMs': {'addBatch': 30, 'commitOffsets': 82, 'getBatch': 0, 'latestOffset': 0, 'queryPlanning': 4, 'triggerExecution': 215, 'walCommit': 98}, 'stateOperators': [], 'sources': [{'description': 'RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default', 'startOffset': 1, 'endOffset': 2, 'latestOffset': 2, 'numInputRows': 1, 'inputRowsPerSecond': 76.92307692307692, 'processedRowsPerSecond': 4.651162790697675}], 'sink': {'description': 'org.apache.spark.sql.execution.datasources.noop.NoopTable$@67a2b2a4', 'numOutputRows': 1}}
>>> p["id"]
'44510846-29f8-4218-95cf-616efecadb05'
>>> p["id"] = "aaaaaaa"
>>> p["id"]
'aaaaaaa'

This is not possible in Scala of course, but not sure if we should keep this python specific behavior....

internal_key = "_" + key
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_streaming_progress(self):
self.assertEqual(lastProgress["id"], query.id)
# SPARK-48567 Use attribute to access fields in q.lastProgress
self.assertEqual(lastProgress.name, query.name)
self.assertEqual(lastProgress.id, query.id)
self.assertEqual(str(lastProgress.id), query.id)
new_name = "myNewQuery"
lastProgress["name"] = new_name
self.assertEqual(lastProgress.name, new_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def check_streaming_query_progress(self, progress, is_stateful):

self.assertTrue(isinstance(progress.sink, SinkProgress))
self.check_sink_progress(progress.sink)
self.assertTrue(isinstance(progress.observedMetrics, dict))
self.assertTrue(isinstance(progress.observedMetrics, Row))

def check_state_operator_progress(self, progress):
"""Check StateOperatorProgress"""
Expand Down Expand Up @@ -264,9 +264,6 @@ def test_streaming_progress(self):
for p in q.recentProgress:
self.check_streaming_query_progress(p, True)

row = q.lastProgress.observedMetrics.get("my_event")
self.assertTrue(row["rc"] > 0)
self.assertTrue(row["erc"] > 0)
finally:
q.stop()

Expand Down