Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
thread locals
  • Loading branch information
cdkrot committed Dec 7, 2023
commit 8220c46aa2a28200f9195278e44ba384dc44623b
15 changes: 8 additions & 7 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,11 @@ def __init__(
use_reattachable_execute: bool
Enable reattachable execution.
"""
self.thread_local = threading.local()
class ClientThreadLocals(threading.local):
tags: set = set()
inside_error_handling: bool = False

self.thread_local = ClientThreadLocals()

# Parse the connection string.
self._builder = (
Expand Down Expand Up @@ -631,8 +635,6 @@ def __init__(
# be updated on the first response received.
self._server_session_id: Optional[str] = None

self._inside_error_handling: bool = False

def _retrying(self) -> "Retrying":
return Retrying(self._retry_policies)

Expand Down Expand Up @@ -1497,14 +1499,13 @@ def _handle_error(self, error: Exception) -> NoReturn:
Throws the appropriate internal Python exception.
"""

if not self._inside_error_handling:
if self.thread_local.inside_error_handling:
# We are already inside error handling routine,
# avoid recursive error processing (with potentially infinite recursion)
raise error

try:
self._inside_error_handling = True

self.thread_local.inside_error_handling = True
if isinstance(error, grpc.RpcError):
self._handle_rpc_error(error)
elif isinstance(error, ValueError):
Expand All @@ -1514,7 +1515,7 @@ def _handle_error(self, error: Exception) -> NoReturn:
) from None
raise error
finally:
self._inside_error_handling = False
self.thread_local.inside_error_handling = False

def _fetch_enriched_error(self, info: "ErrorInfo") -> Optional[pb2.FetchErrorDetailsResponse]:
if "errorId" not in info.metadata:
Expand Down