-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48555][SQL][PYTHON][CONNECT] Support using Columns as parameters for several functions in pyspark/scala #46901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48555][SQL][PYTHON][CONNECT] Support using Columns as parameters for several functions in pyspark/scala #46901
Conversation
|
tagging @CTCC1 @LuciferYang @zhengruifeng As you wrote/reviewed the similar PR I mentioned 😄 |
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
Outdated
Show resolved
Hide resolved
| """ | ||
| from pyspark.sql.classic.column import _to_java_column | ||
|
|
||
| value = value._jc if isinstance(value, Column) else value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Example 3: Check for key using a column was already supported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it?
In spark 3.5.1 this gives me an error
df = spark.sql("select map(1, 2, 3, 4) as m, 1 as k")
df.select(F.map_contains_key(df.m, df.k))
# pyspark.errors.exceptions.base.PySparkTypeError: [NOT_ITERABLE] Column is not iterable.
which makes sense since you try to pass a Column type to _invoke_function which expects only native types or JavaObject for the args
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, it was not supported in Classic mode, but supported in Connect mode.
Classic:
In [2]: df = spark.sql("select map(1, 2, 3, 4) as m, 1 as k")
...: df.select(F.map_contains_key(df.m, df.k))
---------------------------------------------------------------------------
PySparkTypeError Traceback (most recent call last)
Cell In[2], line 2
1 df = spark.sql("select map(1, 2, 3, 4) as m, 1 as k")
----> 2 df.select(F.map_contains_key(df.m, df.k))
...
File ~/Dev/spark/python/pyspark/sql/classic/column.py:415, in Column.__iter__(self)
414 def __iter__(self) -> None:
--> 415 raise PySparkTypeError(
416 error_class="NOT_ITERABLE", message_parameters={"objectName": "Column"}
417 )
PySparkTypeError: [NOT_ITERABLE] Column is not iterable.
Connect:
In [1]: from pyspark.sql import functions as F
In [2]: df = spark.sql("select map(1, 2, 3, 4) as m, 1 as k")
...: df.select(F.map_contains_key(df.m, df.k))
Out[2]: DataFrame[map_contains_key(m, k): boolean]
There is a slight difference in the handling of value: Any typed value: Spark Connect always convert value: Any to Column/Expression (because of the requirement of the UnresolvedFunction proto), while some functions (e.g. map_contains_key) in Classic don't do this.
We will need to revisit all the Any typed parameters in functions. cc @HyukjinKwon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting behavior difference. What's the reason for not converting the classic PySpark value to a column/expression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it was not by design, seems just due to the type mismatch in the internal helper functions
| .. versionchanged:: 3.4.0 | ||
| Supports Spark Connect. | ||
|
|
||
| .. versionchanged:: 4.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's move it under parameters section, you may refer to
spark/python/pyspark/sql/functions/builtin.py
Lines 6216 to 6220 in bb1f026
| scale : :class:`~pyspark.sql.Column` or int, optional | |
| An optional parameter to control the rounding behavior. | |
| .. versionchanged:: 4.0.0 | |
| Support Column type. |
| .. versionchanged:: 3.4.0 | ||
| Supports Spark Connect. | ||
|
|
||
| .. versionchanged:: 4.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| .. versionchanged:: 3.4.0 | ||
| Supports Spark Connect. | ||
|
|
||
| .. versionchanged:: 4.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| .. versionchanged:: 3.4.0 | ||
| Supports Spark Connect. | ||
|
|
||
| .. versionchanged:: 4.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
LGTM, only need a few minor doc changes. Thanks for working on this. |
|
@zhengruifeng Fixed the docstrings, thanks for reviewing :) |
|
Merged to master. |
What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-48555
For pyspark, added the ability to use the "Column" type or names of column for the parameters of the following functions:
For scala, added the ability to use "Column" type as the parameters of the
substringfunctionThis functionality already exists in the SQL syntax:
however, it isn't possible to do the same in python
Note that in scala the functions other than
substringalready accepted Column params (or rather, they acceptAnyand pass whatever the param is tolitso it ends up working), so I only needed to change substring in the scala side.Why are the changes needed?
To align the scala/python API with the SQL one.
Does this PR introduce any user-facing change?
Yes, added new overloaded functions in scala and changed type hints/docstrings in python.
How was this patch tested?
Added doctests for the python changes, and tests in the scala test suites, then tested both manually and using the CI.
Was this patch authored or co-authored using generative AI tooling?
No.
Notes:
array_containsorwhen), of usingvalue._jc if isinstance(value, Column) else valueconnectfunctions, but seems like on the python side they already supported the use of columns so no extra changes were needed theresubstring_indexhowever I wasn't able to fix this one the same way I did forsubstring. Calling it with alitfor thecountarg worked, but using acolerror with aNumberFormatErrorfor "project_value_3". I assume the error is related to trying to parse the Int here . In that case I got lost in the scala code and decided to drop it, but if anyone knows how to fix this error I could change that function as well.The contribution is my original work and I license the work to the project under the project’s open source license.