Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jun 18, 2024

What changes were proposed in this pull request?

This PR changes the thread.local in SparkConnectClient to be used properly to fix the bug caused by #44210. It mistakenly used thread.local wrongly by inheriting thread.local and setting the class-level variables which always exist.

Why are the changes needed?

So users can properly use thread-based interruptTag. Now the code below cancels both queries:

import concurrent.futures
import time
import threading
from pyspark.sql.functions import udf

def run_query_with_tag(query, tag):
    try:
        spark.addTag(tag)
        print(f"starting query {tag}")
        df = spark.sql(query).select(udf(lambda: time.sleep(10))())
        print(f"collecting query {tag}")
        res = df.collect()
        print(f"done with query {tag}")
    finally:
        spark.removeTag(tag)

queries_with_tags = [
    ("SELECT * FROM range(100)", "tag1"),
    ("SELECT * FROM range(100)", "tag2"),
]

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(run_query_with_tag, query, tag): (query, tag) for query, tag in queries_with_tags}
    time.sleep(5)
    print("Interrupting tag1")
    print(spark.interruptTag("tag1"))
    for f in futures:
        try:
            f.result()
            print(f"done with {f.result()}")
        except:
            print(f"failed with {f.exception()}")

Does this PR introduce any user-facing change?

No, this was caused by #44210 but the change has not been released out.

How was this patch tested?

Unittest was added.

Was this patch authored or co-authored using generative AI tooling?

No.

@HyukjinKwon
Copy link
Member Author

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants