Skip to content
Closed
9 changes: 5 additions & 4 deletions python/pyspark/sql/connect/streaming/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
QueryProgressEvent,
QueryIdleEvent,
QueryTerminatedEvent,
StreamingQueryProgress,
)
from pyspark.sql.streaming.query import (
StreamingQuery as PySparkStreamingQuery,
Expand Down Expand Up @@ -110,21 +111,21 @@ def status(self) -> Dict[str, Any]:
status.__doc__ = PySparkStreamingQuery.status.__doc__

@property
def recentProgress(self) -> List[Dict[str, Any]]:
def recentProgress(self) -> List[StreamingQueryProgress]:
cmd = pb2.StreamingQueryCommand()
cmd.recent_progress = True
progress = self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
return [json.loads(p) for p in progress]
return [StreamingQueryProgress.fromJson(json.loads(p)) for p in progress]

recentProgress.__doc__ = PySparkStreamingQuery.recentProgress.__doc__

@property
def lastProgress(self) -> Optional[Dict[str, Any]]:
def lastProgress(self) -> Optional[StreamingQueryProgress]:
cmd = pb2.StreamingQueryCommand()
cmd.last_progress = True
progress = self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
if len(progress) > 0:
return json.loads(progress[-1])
return StreamingQueryProgress.fromJson(json.loads(progress[-1]))
else:
return None

Expand Down
Loading