From 285b85ccae02517b520c221e02068aa88ef5687e Mon Sep 17 00:00:00 2001 From: Alice Sayutina Date: Mon, 4 Dec 2023 03:38:27 +0100 Subject: [PATCH 1/3] test --- python/pyspark/sql/connect/client/core.py | 37 ++++++++++++++++++----- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index e36b7d74a787..fe9a4913fa87 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -544,6 +544,25 @@ def fromProto(cls, pb: pb2.ConfigResponse) -> "ConfigResult": ) +class ForbidRecursion: + def __init__(self): + self._local = threading.local() + self._local.in_recursion = False + + @property + def can_enter(self): + return self._local.in_recursion + + def __enter__(self): + if self._local.in_recursion: + raise RecursionError + + self._local.in_recursion = True + + def __exit__(self, exc_type, exc_val, exc_tb): + self._local.in_recursion = False + + class SparkConnectClient(object): """ Conceptually the remote spark session that communicates with the server @@ -631,6 +650,8 @@ def __init__( # be updated on the first response received. self._server_session_id: Optional[str] = None + self._forbid_recursive_error_handling = ForbidRecursion() + def _retrying(self) -> "Retrying": return Retrying(self._retry_policies) @@ -1494,13 +1515,15 @@ def _handle_error(self, error: Exception) -> NoReturn: ------- Throws the appropriate internal Python exception. """ - if isinstance(error, grpc.RpcError): - self._handle_rpc_error(error) - elif isinstance(error, ValueError): - if "Cannot invoke RPC" in str(error) and "closed" in str(error): - raise SparkConnectException( - error_class="NO_ACTIVE_SESSION", message_parameters=dict() - ) from None + if True or self._forbid_recursive_error_handling.can_enter: + with self._forbid_recursive_error_handling: + if isinstance(error, grpc.RpcError): + self._handle_rpc_error(error) + elif isinstance(error, ValueError): + if "Cannot invoke RPC" in str(error) and "closed" in str(error): + raise SparkConnectException( + error_class="NO_ACTIVE_SESSION", message_parameters=dict() + ) from None raise error def _fetch_enriched_error(self, info: "ErrorInfo") -> Optional[pb2.FetchErrorDetailsResponse]: From eaa16e62e481da336a34b25697a3e38de06a26f9 Mon Sep 17 00:00:00 2001 From: Alice Sayutina Date: Mon, 4 Dec 2023 03:52:02 +0100 Subject: [PATCH 2/3] minor --- python/pyspark/sql/connect/client/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index fe9a4913fa87..e325025a6c89 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -1515,7 +1515,7 @@ def _handle_error(self, error: Exception) -> NoReturn: ------- Throws the appropriate internal Python exception. """ - if True or self._forbid_recursive_error_handling.can_enter: + if self._forbid_recursive_error_handling.can_enter: with self._forbid_recursive_error_handling: if isinstance(error, grpc.RpcError): self._handle_rpc_error(error) From 73e43d9151aaa684d8c4e70143b4592cda795ff9 Mon Sep 17 00:00:00 2001 From: Alice Sayutina Date: Mon, 4 Dec 2023 11:23:56 +0100 Subject: [PATCH 3/3] Yihong's suggestion to always display stack trace --- python/pyspark/sql/connect/client/core.py | 53 ++++------------------- 1 file changed, 8 insertions(+), 45 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index e325025a6c89..0b502494f781 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -544,25 +544,6 @@ def fromProto(cls, pb: pb2.ConfigResponse) -> "ConfigResult": ) -class ForbidRecursion: - def __init__(self): - self._local = threading.local() - self._local.in_recursion = False - - @property - def can_enter(self): - return self._local.in_recursion - - def __enter__(self): - if self._local.in_recursion: - raise RecursionError - - self._local.in_recursion = True - - def __exit__(self, exc_type, exc_val, exc_tb): - self._local.in_recursion = False - - class SparkConnectClient(object): """ Conceptually the remote spark session that communicates with the server @@ -650,8 +631,6 @@ def __init__( # be updated on the first response received. self._server_session_id: Optional[str] = None - self._forbid_recursive_error_handling = ForbidRecursion() - def _retrying(self) -> "Retrying": return Retrying(self._retry_policies) @@ -1515,15 +1494,13 @@ def _handle_error(self, error: Exception) -> NoReturn: ------- Throws the appropriate internal Python exception. """ - if self._forbid_recursive_error_handling.can_enter: - with self._forbid_recursive_error_handling: - if isinstance(error, grpc.RpcError): - self._handle_rpc_error(error) - elif isinstance(error, ValueError): - if "Cannot invoke RPC" in str(error) and "closed" in str(error): - raise SparkConnectException( - error_class="NO_ACTIVE_SESSION", message_parameters=dict() - ) from None + if isinstance(error, grpc.RpcError): + self._handle_rpc_error(error) + elif isinstance(error, ValueError): + if "Cannot invoke RPC" in str(error) and "closed" in str(error): + raise SparkConnectException( + error_class="NO_ACTIVE_SESSION", message_parameters=dict() + ) from None raise error def _fetch_enriched_error(self, info: "ErrorInfo") -> Optional[pb2.FetchErrorDetailsResponse]: @@ -1543,20 +1520,6 @@ def _fetch_enriched_error(self, info: "ErrorInfo") -> Optional[pb2.FetchErrorDet except grpc.RpcError: return None - def _display_server_stack_trace(self) -> bool: - from pyspark.sql.connect.conf import RuntimeConf - - conf = RuntimeConf(self) - try: - if conf.get("spark.sql.connect.serverStacktrace.enabled") == "true": - return True - return conf.get("spark.sql.pyspark.jvmStacktrace.enabled") == "true" - except Exception as e: # noqa: F841 - # Falls back to true if an exception occurs during reading the config. - # Otherwise, it will recursively try to get the conf when it consistently - # fails, ending up with `RecursionError`. - return True - def _handle_rpc_error(self, rpc_error: grpc.RpcError) -> NoReturn: """ Error handling helper for dealing with GRPC Errors. On the server side, certain @@ -1590,7 +1553,7 @@ def _handle_rpc_error(self, rpc_error: grpc.RpcError) -> NoReturn: info, status.message, self._fetch_enriched_error(info), - self._display_server_stack_trace(), + True, ) from None raise SparkConnectGrpcException(status.message) from None