Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint
  • Loading branch information
WweiL committed Apr 15, 2024
commit ca066f7ecd4d40528154479c43414edfaefbf8b1
8 changes: 4 additions & 4 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,14 +1068,14 @@ def execute_command_as_iterator(
if self._user_id:
req.user_context.user_id = self._user_id
req.plan.command.CopyFrom(command)
for response in self._execute_and_fetch_as_iterator(req, observations):
for response in self._execute_and_fetch_as_iterator(req, observations or {}):
if isinstance(response, dict):
yield response
else:
raise PySparkValueError(
error_class="UNKNOWN_RESPONSE",
message_parameters={
"response": response,
"response": str(response),
},
)

Expand Down Expand Up @@ -1343,8 +1343,8 @@ def handle_response(
cmd_result = b.streaming_query_manager_command_result
yield {"streaming_query_manager_command_result": cmd_result}
if b.HasField("streaming_query_listener_events_result"):
cmd_result = b.streaming_query_listener_events_result
yield {"streaming_query_listener_events_result": cmd_result}
event_result = b.streaming_query_listener_events_result
yield {"streaming_query_listener_events_result": event_result}
if b.HasField("get_resources_command_result"):
resources = {}
for key, resource in b.get_resources_command_result.resources.items():
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/sql/connect/streaming/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,13 @@ class StreamingQueryListenerBus:
receive listener events and invoke correct listener call backs.
"""

def __init__(self, sqm: "StreamingQueryManager"):
def __init__(self, sqm: "StreamingQueryManager") -> None:
self._sqm = sqm
self._listener_bus: List[StreamingQueryListener] = []
self._execution_thread: Optional[Thread] = None
self._lock = Lock()

def append(self, listener: StreamingQueryListener):
def append(self, listener: StreamingQueryListener) -> None:
"""
Append a listener to the local listener bus. When the added listener is
the first listener, request the server to create the server side listener
Expand All @@ -301,7 +301,7 @@ def append(self, listener: StreamingQueryListener):
)
self._execution_thread.start()

def remove(self, listener: StreamingQueryListener):
def remove(self, listener: StreamingQueryListener) -> None:
"""
Remove the listener from the local listener bus.

Expand Down Expand Up @@ -384,7 +384,7 @@ def _query_event_handler(self, iter: Iterator[Dict[str, Any]]) -> None:

@staticmethod
def deserialize(
event: str,
event: pb2.StreamingQueryListenerEvent,
) -> Union["QueryProgressEvent", "QueryIdleEvent", "QueryTerminatedEvent"]:
if event.event_type == proto.StreamingQueryEventType.QUERY_PROGRESS_EVENT:
return QueryProgressEvent.fromJson(json.loads(event.event_json))
Expand Down