-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-46308] Forbid recursive error handling by adding recursion guards #44210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…it wouldn't fall into infinite recursion" This reverts commit c9df53f.
|
FYI @heyihong. @grundprinzip proposed to take that change we discussed back and change to my original version |
|
@cdkrot I don't have strong opinion on either approach. But we probably need to implement similar logics for Scala client as well https://github.com/apache/spark/blob/master/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala#L339-L347 (not necessarily in this pr) |
| client = SparkConnectClient(chan) | ||
| self.assertEqual(client._session_id, chan.session_id) | ||
|
|
||
| def test_forbid_recursion(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test does not test directly the scenario, we're talking about. Ideally you can just use the mock tests we have to fail any query and see that the recursion guard works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually tried, but it seems hard to make the mock test for this because it needs to pass through this pieces of code:
spark/python/pyspark/sql/connect/client/core.py
Line 1545 in 348d881
| status = rpc_status.from_call(cast(grpc.Call, rpc_error)) |
This seems hard to create a mock exception which would pass this without poking grpc's internals significantly. Alternatively we could introduce some testing clutches here, i.e. check if exception is from testing code, but that's not great either.
grundprinzip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I requested changes to the code that I believe make it more readable and require less changes in general. Explicitly, I think we should use a simpler flag mechanisms to track if we're processing an exception or not instead of adding a new class.
|
changed based on @grundprinzip request |
|
removed |
|
cc @HyukjinKwon |
|
Please update PR title |
| raise error | ||
|
|
||
| try: | ||
| self._inside_error_handling = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know Python has Global Interpreter Lock but is this thread-safe?
|
Dang, I wanted to leave a comment on that. It's not thread safe. |
|
Ah, I'm a simple person, and was simply following your review. Original proposal was thread safe. |
|
updated title; added threadlocals |
|
It seems we forgot to merge this because there were some unrelated test fails again. I retriggered those and test pass. |
|
Merged to master. |
…threadlocal ### What changes were proposed in this pull request? This PR changes the `thread.local` in `SparkConnectClient` to be used properly to fix the bug caused by #44210. It mistakenly used `thread.local` wrongly by inheriting `thread.local` and setting the class-level variables which always exist. ### Why are the changes needed? So users can properly use thread-based `interruptTag`. Now the code below cancels both queries: ```python import concurrent.futures import time import threading from pyspark.sql.functions import udf def run_query_with_tag(query, tag): try: spark.addTag(tag) print(f"starting query {tag}") df = spark.sql(query).select(udf(lambda: time.sleep(10))()) print(f"collecting query {tag}") res = df.collect() print(f"done with query {tag}") finally: spark.removeTag(tag) queries_with_tags = [ ("SELECT * FROM range(100)", "tag1"), ("SELECT * FROM range(100)", "tag2"), ] with concurrent.futures.ThreadPoolExecutor() as executor: futures = {executor.submit(run_query_with_tag, query, tag): (query, tag) for query, tag in queries_with_tags} time.sleep(5) print("Interrupting tag1") print(spark.interruptTag("tag1")) for f in futures: try: f.result() print(f"done with {f.result()}") except: print(f"failed with {f.exception()}") ``` ### Does this PR introduce _any_ user-facing change? No, this was caused by #44210 but the change has not been released out. ### How was this patch tested? Unittest was added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47005 from HyukjinKwon/thread-local. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
Revert #44144, and introduce a forbid recursion guard as previously proposed. This way the infinite error handling recursion is still prevented, but the client-side knob is still present.
Why are the changes needed?
Previously proposed as part of #44144, however was discussed in favour of something else. However it seems (proposal by @grundprinzip) that the original proposal was more correct, since it seems driver stacktrace is decided on client not server (see #43667)
Does this PR introduce any user-facing change?
No
How was this patch tested?
Hand testing
Was this patch authored or co-authored using generative AI tooling?
No