Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
40ffdef
[SPARK-50250][SQL] Assign appropriate error condition for `_LEGACY_ER…
itholic Nov 13, 2024
ede05fa
[SPARK-50248][SQL] Assign appropriate error condition for `_LEGACY_ER…
itholic Nov 13, 2024
6fb1d43
[SPARK-50246][SQL] Assign appropriate error condition for `_LEGACY_ER…
itholic Nov 13, 2024
898bff2
[SPARK-50245][SQL][TESTS] Extended CollationSuite and added tests whe…
vladanvasi-db Nov 13, 2024
bd94419
[SPARK-50226][SQL] Correct MakeDTInterval and MakeYMInterval to catch…
gotocoding-DB Nov 13, 2024
bc9b259
[SPARK-50066][SQL] Codegen Support for `SchemaOfXml` (by Invoke & Run…
panbingkun Nov 13, 2024
558fc89
[SPARK-49611][SQL][FOLLOW-UP] Make collations TVF consistent and retu…
mihailomilosevic2001 Nov 13, 2024
7b1b450
Revert [SPARK-50215][SQL] Refactored StringType pattern matching in j…
vladanvasi-db Nov 13, 2024
87ad4b4
[SPARK-50139][INFRA][SS][PYTHON] Introduce scripts to re-generate and…
LuciferYang Nov 13, 2024
05508cf
[SPARK-42838][SQL] Assign a name to the error class _LEGACY_ERROR_TEM…
mihailomilosevic2001 Nov 13, 2024
5cc60f4
[SPARK-50300][BUILD] Use mirror host instead of `archive.apache.org`
dongjoon-hyun Nov 13, 2024
33378a6
[SPARK-50304][INFRA] Remove `(any|empty).proto` from RAT exclusion
dongjoon-hyun Nov 14, 2024
891f694
[SPARK-50306][PYTHON][CONNECT] Support Python 3.13 in Spark Connect
HyukjinKwon Nov 14, 2024
2fd4702
[SPARK-49913][SQL] Add check for unique label names in nested labeled…
miland-db Nov 14, 2024
6bee268
[SPARK-50299][BUILD] Upgrade jupiter-interface to 0.13.1 and Junit5 t…
LuciferYang Nov 14, 2024
09d6b32
[SPARK-48755][DOCS][PYTHON][FOLLOWUP] Add PySpark doc for `transformW…
itholic Nov 14, 2024
0b1b676
[SPARK-50092][SQL] Fix PostgreSQL connector behaviour for multidimens…
PetarVasiljevic-DB Nov 14, 2024
aea9e87
[SPARK-50291][PYTHON] Standardize verifySchema parameter of createDat…
xinrong-meng Nov 14, 2024
c1968a1
[SPARK-50216][SQL][TESTS] Update `CollationBenchmark` to invoke `coll…
stevomitric Nov 14, 2024
0aee601
[SPARK-50153][SQL] Add `name` to `RuleExecutor` to make printing `Que…
panbingkun Nov 14, 2024
c2343f7
[SPARK-45265][SQL] Support Hive 4.0 metastore
yaooqinn Nov 14, 2024
e0a83f6
[SPARK-50317][BUILD] Upgrade ORC to 2.0.3
dongjoon-hyun Nov 14, 2024
c90efae
[SPARK-50318][SQL] Add IntervalUtils.makeYearMonthInterval to dedupli…
gotocoding-DB Nov 15, 2024
3237885
[SPARK-50312][SQL] SparkThriftServer createServer parameter passing e…
CuiYanxiang Nov 15, 2024
e615e3f
[SPARK-50049][SQL] Support custom driver metrics in writing to v2 table
cloud-fan Nov 15, 2024
3f5e846
[SPARK-50237][SQL] Assign appropriate error condition for `_LEGACY_ER…
itholic Nov 15, 2024
cf90271
[MINOR] Fix code style for if/for/while statements
exmy Nov 15, 2024
cc81ed0
[SPARK-50325][SQL] Factor out alias resolution to be reused in the si…
vladimirg-db Nov 15, 2024
d317002
[SPARK-50322][SQL] Fix parameterized identifier in a sub-query
MaxGekk Nov 15, 2024
77e006f
[SPARK-50327][SQL] Factor out function resolution to be reused in the…
vladimirg-db Nov 15, 2024
11e4706
[SPARK-50320][CORE] Make `--remote` an official option by removing `e…
dongjoon-hyun Nov 15, 2024
007c31d
[SPARK-50236][SQL] Assign appropriate error condition for `_LEGACY_ER…
itholic Nov 15, 2024
281a8e1
[SPARK-50309][DOCS] Document `SQL Pipe` Syntax
dtenedor Nov 15, 2024
b626528
[SPARK-50313][SQL][TESTS] Enable ANSI in SQL *SQLQueryTestSuite by de…
yaooqinn Nov 18, 2024
a01856d
[SPARK-50330][SQL] Add hints to Sort and Window nodes
agubichev Nov 18, 2024
8b2d032
[SPARK-45265][SQL][BUILD][FOLLOWUP] Add `-Xss64m` for Maven testing o…
LuciferYang Nov 18, 2024
05750de
[MINOR][PYTHON][DOCS] Fix the type hint of `histogram_numeric`
zhengruifeng Nov 18, 2024
400a8d3
Revert "[SPARK-49787][SQL] Cast between UDT and other types"
cloud-fan Nov 18, 2024
fa36e8b
[SPARK-50335][PYTHON][DOCS] Refine docstrings for window/aggregation …
zhengruifeng Nov 19, 2024
b61411d
[SPARK-50328][INFRA] Add a separate docker file for SparkR
zhengruifeng Nov 19, 2024
e1477a3
[SPARK-50298][PYTHON][CONNECT] Implement verifySchema parameter of cr…
xinrong-meng Nov 19, 2024
6d47981
[SPARK-50331][INFRA] Add a daily test for PySpark on MacOS
LuciferYang Nov 19, 2024
5a57efd
[SPARK-50313][SQL][TESTS][FOLLOWUP] Restore some tests in *SQLQueryTe…
yaooqinn Nov 19, 2024
b74aa8c
[SPARK-50340][SQL] Unwrap UDT in INSERT input query
cloud-fan Nov 19, 2024
87a5b37
[SPARK-50313][SQL][TESTS][FOLLOWUP] Regenerate golden files for Java 21
LuciferYang Nov 19, 2024
f1b68d8
[SPARK-50315][SQL] Support custom metrics for V1Fallback writes
olaky Nov 19, 2024
19509d0
Revert "[SPARK-49002][SQL] Consistently handle invalid locations in W…
cloud-fan Nov 19, 2024
37497e6
[SPARK-50335][PYTHON][DOCS][FOLLOW-UP] Make percentile doctests more …
zhengruifeng Nov 20, 2024
c149dcb
[SPARK-50352][PYTHON][DOCS] Refine docstrings for window/aggregation …
zhengruifeng Nov 20, 2024
8791767
[SPARK-48344][SQL] Prepare SQL Scripting for addition of Execution Fr…
miland-db Nov 20, 2024
b7cf448
[SPARK-49550][FOLLOWUP][SQL][DOC] Switch Hadoop to 3.4.1 in IsolatedC…
pan3793 Nov 20, 2024
2185f3c
[SPARK-50359][PYTHON] Upgrade PyArrow to 18.0
zhengruifeng Nov 20, 2024
0157778
[SPARK-50358][SQL][TESTS] Update postgres docker image to 17.1
panbingkun Nov 20, 2024
b582dac
[MINOR][DOCS] Fix a HTML/Markdown syntax error in sql-migration-guide.md
yaooqinn Nov 20, 2024
19b8250
[SPARK-50331][INFRA][FOLLOW-UP] Skip Torch/DeepSpeed tests in MacOS P…
zhengruifeng Nov 20, 2024
7a4f3c4
[SPARK-50345][BUILD] Upgrade Kafka to 3.9.0
panbingkun Nov 20, 2024
3151d97
[SPARK-49801][INFRA][FOLLOWUP] Sync pandas version in release environ…
yaooqinn Nov 20, 2024
23f276f
[SPARK-50353][SQL] Refactor ResolveSQLOnFile
mihailoale-db Nov 20, 2024
533b8ca
[SPARK-50363][PYTHON][DOCS] Refine the docstring for datetime functio…
zhengruifeng Nov 20, 2024
81a56df
[SPARK-50362][PYTHON][ML] Skip `CrossValidatorTests` if `torch/torche…
LuciferYang Nov 20, 2024
6ee53da
[SPARK-50258][SQL] Fix output column order changed issue after AQE op…
wangyum Nov 20, 2024
30d0b01
[SPARK-50364][SQL] Implement serialization for LocalDateTime type in …
krm95 Nov 20, 2024
ad46db4
[SPARK-50130][SQL][FOLLOWUP] Make Encoder generation lazy
ueshin Nov 20, 2024
a409199
[SPARK-50376][PYTHON][ML][TESTS] Centralize the dependency check in M…
zhengruifeng Nov 21, 2024
3bc374d
[SPARK-50333][SQL] Codegen Support for `CsvToStructs` (by Invoke & Ru…
panbingkun Nov 21, 2024
95faa02
[SPARK-49490][SQL] Add benchmarks for initCap
mrk-andreev Nov 21, 2024
ee21e6b
[SPARK-50113][CONNECT][PYTHON][TESTS] Add `@remote_only` to check the…
itholic Nov 21, 2024
0f1e410
[SPARK-50016][SQL] Assign appropriate error condition for `_LEGACY_ER…
itholic Nov 21, 2024
b05ef45
[SPARK-50175][SQL] Change collation precedence calculation
stefankandic Nov 21, 2024
fbf255e
[SPARK-50379][SQL] Fix DayTimeIntevalType handling in WindowExecBase
mihailomilosevic2001 Nov 21, 2024
cbb16b9
[MINOR][DOCS] Fix miss semicolon on create table example sql
camilesing Nov 21, 2024
f2de888
[MINOR][DOCS] Remove wrong and ambiguous default statement in datetim…
yaooqinn Nov 21, 2024
229b1b8
[SPARK-50375][BUILD] Upgrade `commons-io` to 2.18.0
panbingkun Nov 21, 2024
136c722
[SPARK-50334][SQL] Extract common logic for reading the descriptor of…
panbingkun Nov 21, 2024
2e1c3dc
[SPARK-50087] Robust handling of boolean expressions in CASE WHEN for…
cloud-fan Nov 21, 2024
2d09ef2
[SPARK-50381][CORE] Support `spark.master.rest.maxThreads`
dongjoon-hyun Nov 21, 2024
69324bd
Merge branch 'master' into pr48820
ueshin Nov 21, 2024
349df78
Fix.
ueshin Nov 21, 2024
1079339
Fix.
ueshin Nov 21, 2024
c6b0651
Fix.
ueshin Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-50113][CONNECT][PYTHON][TESTS] Add @remote_only to check the…
… APIs that only supported with Spark Connect

### What changes were proposed in this pull request?

This PR proposes to add `remote_only` to check the APIs that only supported with Spark Connect

### Why are the changes needed?

The current compatibility check cannot capture the missing methods that only supported with Spark Connect

### Does this PR introduce _any_ user-facing change?

No, it's test-only

### How was this patch tested?

Updated the existing UT

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48651 from itholic/SPARK-50113.

Authored-by: Haejoon Lee <[email protected]>
Signed-off-by: Haejoon Lee <[email protected]>
  • Loading branch information
itholic committed Nov 21, 2024
commit ee21e6b07a0d30cbdf78a2dd6bfe43d8fc23d518
21 changes: 20 additions & 1 deletion python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@
_from_numpy_type,
)
from pyspark.errors.exceptions.captured import install_exception_handler
from pyspark.sql.utils import is_timestamp_ntz_preferred, to_str, try_remote_session_classmethod
from pyspark.sql.utils import (
is_timestamp_ntz_preferred,
to_str,
try_remote_session_classmethod,
remote_only,
)
from pyspark.errors import PySparkValueError, PySparkTypeError, PySparkRuntimeError

if TYPE_CHECKING:
Expand Down Expand Up @@ -550,6 +555,7 @@ def getOrCreate(self) -> "SparkSession":
return session

# Spark Connect-specific API
@remote_only
def create(self) -> "SparkSession":
"""Creates a new SparkSession. Can only be used in the context of Spark Connect
and will throw an exception otherwise.
Expand Down Expand Up @@ -2067,6 +2073,7 @@ def __exit__(

# SparkConnect-specific API
@property
@remote_only
def client(self) -> "SparkConnectClient":
"""
Gives access to the Spark Connect client. In normal cases this is not necessary to be used
Expand All @@ -2090,6 +2097,7 @@ def client(self) -> "SparkConnectClient":
messageParameters={"feature": "SparkSession.client"},
)

@remote_only
def addArtifacts(
self, *path: str, pyfile: bool = False, archive: bool = False, file: bool = False
) -> None:
Expand Down Expand Up @@ -2125,6 +2133,7 @@ def addArtifacts(

addArtifact = addArtifacts

@remote_only
def registerProgressHandler(self, handler: "ProgressHandler") -> None:
"""
Register a progress handler to be called when a progress update is received from the server.
Expand Down Expand Up @@ -2153,6 +2162,7 @@ def registerProgressHandler(self, handler: "ProgressHandler") -> None:
messageParameters={"feature": "SparkSession.registerProgressHandler"},
)

@remote_only
def removeProgressHandler(self, handler: "ProgressHandler") -> None:
"""
Remove a progress handler that was previously registered.
Expand All @@ -2169,6 +2179,7 @@ def removeProgressHandler(self, handler: "ProgressHandler") -> None:
messageParameters={"feature": "SparkSession.removeProgressHandler"},
)

@remote_only
def clearProgressHandlers(self) -> None:
"""
Clear all registered progress handlers.
Expand All @@ -2180,6 +2191,7 @@ def clearProgressHandlers(self) -> None:
messageParameters={"feature": "SparkSession.clearProgressHandlers"},
)

@remote_only
def copyFromLocalToFs(self, local_path: str, dest_path: str) -> None:
"""
Copy file from local to cloud storage file system.
Expand Down Expand Up @@ -2208,6 +2220,7 @@ def copyFromLocalToFs(self, local_path: str, dest_path: str) -> None:
messageParameters={"feature": "SparkSession.copyFromLocalToFs"},
)

@remote_only
def interruptAll(self) -> List[str]:
"""
Interrupt all operations of this session currently running on the connected server.
Expand All @@ -2228,6 +2241,7 @@ def interruptAll(self) -> List[str]:
messageParameters={"feature": "SparkSession.interruptAll"},
)

@remote_only
def interruptTag(self, tag: str) -> List[str]:
"""
Interrupt all operations of this session with the given operation tag.
Expand All @@ -2248,6 +2262,7 @@ def interruptTag(self, tag: str) -> List[str]:
messageParameters={"feature": "SparkSession.interruptTag"},
)

@remote_only
def interruptOperation(self, op_id: str) -> List[str]:
"""
Interrupt an operation of this session with the given operationId.
Expand All @@ -2268,6 +2283,7 @@ def interruptOperation(self, op_id: str) -> List[str]:
messageParameters={"feature": "SparkSession.interruptOperation"},
)

@remote_only
def addTag(self, tag: str) -> None:
"""
Add a tag to be assigned to all the operations started by this thread in this session.
Expand All @@ -2292,6 +2308,7 @@ def addTag(self, tag: str) -> None:
messageParameters={"feature": "SparkSession.addTag"},
)

@remote_only
def removeTag(self, tag: str) -> None:
"""
Remove a tag previously added to be assigned to all the operations started by this thread in
Expand All @@ -2309,6 +2326,7 @@ def removeTag(self, tag: str) -> None:
messageParameters={"feature": "SparkSession.removeTag"},
)

@remote_only
def getTags(self) -> Set[str]:
"""
Get the tags that are currently set to be assigned to all the operations started by this
Expand All @@ -2326,6 +2344,7 @@ def getTags(self) -> Set[str]:
messageParameters={"feature": "SparkSession.getTags"},
)

@remote_only
def clearTags(self) -> None:
"""
Clear the current thread's operation tags.
Expand Down
43 changes: 35 additions & 8 deletions python/pyspark/sql/tests/test_connect_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,16 @@
class ConnectCompatibilityTestsMixin:
def get_public_methods(self, cls):
"""Get public methods of a class."""
return {
name: method
for name, method in inspect.getmembers(cls)
if (inspect.isfunction(method) or isinstance(method, functools._lru_cache_wrapper))
and not name.startswith("_")
}
methods = {}
for name, method in inspect.getmembers(cls):
if (
inspect.isfunction(method) or isinstance(method, functools._lru_cache_wrapper)
) and not name.startswith("_"):
if getattr(method, "_remote_only", False):
methods[name] = None
else:
methods[name] = method
return methods

def get_public_properties(self, cls):
"""Get public properties of a class."""
Expand All @@ -88,6 +92,10 @@ def compare_method_signatures(self, classic_cls, connect_cls, cls_name):
common_methods = set(classic_methods.keys()) & set(connect_methods.keys())

for method in common_methods:
# Skip non-callable, Spark Connect-specific methods
if classic_methods[method] is None or connect_methods[method] is None:
continue

classic_signature = inspect.signature(classic_methods[method])
connect_signature = inspect.signature(connect_methods[method])

Expand Down Expand Up @@ -145,7 +153,11 @@ def check_missing_methods(
connect_methods = self.get_public_methods(connect_cls)

# Identify missing methods
classic_only_methods = set(classic_methods.keys()) - set(connect_methods.keys())
classic_only_methods = {
name
for name, method in classic_methods.items()
if name not in connect_methods or method is None
}
connect_only_methods = set(connect_methods.keys()) - set(classic_methods.keys())

# Compare the actual missing methods with the expected ones
Expand Down Expand Up @@ -249,7 +261,22 @@ def test_spark_session_compatibility(self):
"""Test SparkSession compatibility between classic and connect."""
expected_missing_connect_properties = {"sparkContext"}
expected_missing_classic_properties = {"is_stopped", "session_id"}
expected_missing_connect_methods = {"newSession"}
expected_missing_connect_methods = {
"addArtifact",
"addArtifacts",
"addTag",
"clearProgressHandlers",
"clearTags",
"copyFromLocalToFs",
"getTags",
"interruptAll",
"interruptOperation",
"interruptTag",
"newSession",
"registerProgressHandler",
"removeProgressHandler",
"removeTag",
}
expected_missing_classic_methods = set()
self.check_compatibility(
ClassicSparkSession,
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,19 @@ def linspace(start: float, stop: float, num: int) -> Sequence[float]:
return [float(start)]
step = (float(stop) - float(start)) / (num - 1)
return [start + step * i for i in range(num)]


def remote_only(func: Union[Callable, property]) -> Union[Callable, property]:
"""
Decorator to mark a function or method as only available in Spark Connect.

This decorator allows for easy identification of Spark Connect-specific APIs.
"""
if isinstance(func, property):
# If it's a property, we need to set the attribute on the getter function
getter_func = func.fget
getter_func._remote_only = True # type: ignore[union-attr]
return property(getter_func)
else:
func._remote_only = True # type: ignore[attr-defined]
return func
19 changes: 13 additions & 6 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,19 +382,24 @@ def inheritable_thread_target(f: Optional[Union[Callable, "SparkSession"]] = Non
assert session is not None, "Spark Connect session must be provided."

def outer(ff: Callable) -> Callable:
thread_local = session.client.thread_local # type: ignore[union-attr, operator]
session_client_thread_local_attrs = [
(attr, copy.deepcopy(value))
for (
attr,
value,
) in session.client.thread_local.__dict__.items() # type: ignore[union-attr]
) in thread_local.__dict__.items()
]

@functools.wraps(ff)
def inner(*args: Any, **kwargs: Any) -> Any:
# Set thread locals in child thread.
for attr, value in session_client_thread_local_attrs:
setattr(session.client.thread_local, attr, value) # type: ignore[union-attr]
setattr(
session.client.thread_local, # type: ignore[union-attr, operator]
attr,
value,
)
return ff(*args, **kwargs)

return inner
Expand Down Expand Up @@ -489,7 +494,8 @@ def __init__(
def copy_local_properties(*a: Any, **k: Any) -> Any:
# Set tags in child thread.
assert hasattr(self, "_tags")
session.client.thread_local.tags = self._tags # type: ignore[union-attr, has-type]
thread_local = session.client.thread_local # type: ignore[union-attr, operator]
thread_local.tags = self._tags # type: ignore[has-type]
return target(*a, **k)

super(InheritableThread, self).__init__(
Expand Down Expand Up @@ -523,9 +529,10 @@ def start(self) -> None:
if is_remote():
# Spark Connect
assert hasattr(self, "_session")
if not hasattr(self._session.client.thread_local, "tags"):
self._session.client.thread_local.tags = set()
self._tags = set(self._session.client.thread_local.tags)
thread_local = self._session.client.thread_local # type: ignore[union-attr, operator]
if not hasattr(thread_local, "tags"):
thread_local.tags = set()
self._tags = set(thread_local.tags)
else:
# Non Spark Connect
from pyspark import SparkContext
Expand Down