Skip to content

Conversation

@CTCC1
Copy link
Contributor

@CTCC1 CTCC1 commented Apr 15, 2024

What changes were proposed in this pull request?

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-47845

Support column type in split function for scala and python. SQL already supports this, but scala / python functions currently don't.

A hypothetical example to illustrate:

import org.apache.spark.sql.functions.{col, split}

val example = spark.createDataFrame(
    Seq(
      ("Doe, John", ", ", 2),
      ("Smith,Jane", ",", 2),
      ("Johnson", ",", 1)
    )
  )
  .toDF("name", "delim", "expected_parts_count")

example.createOrReplaceTempView("test_data")

// works for SQL
spark.sql("SELECT split(name, delim, expected_parts_count) AS name_parts FROM test_data").show()

// currently doesn't compile for scala, but will be supported after this PR
example.withColumn("name_parts", split(col("name"), col("delim"), col("expected_parts_count"))).show() 

Why are the changes needed?

I have a use case to split a String typed column with different delimiters defined in other columns of the Dataframe.

But also following this guidance:

* This function APIs usually have methods with `Column` signature only because it can support not
* only `Column` but also other types such as a native string. The other variants currently exist
* for historical reasons.

We might want to do this to a bunch of other functions as well. But I will start with this one and maybe do more if I get time / people generally think it's useful to do so.

Does this PR introduce any user-facing change?

Yes, new feature (via function overloading in scala, and more type handling in python).

How was this patch tested?

Added unit tests with column input type to scala functions.
Added doctest for pyspark and pyspark connect.
Added explain and plan proto generated for jvm connect test.

Was this patch authored or co-authored using generative AI tooling?

No.

Others

The contribution is my original work and I license the work to the project under the project’s open source license.

@CTCC1 CTCC1 changed the title [SPARK-47845][SQL][PYTHON][CONNECT] Support column type in split function for scala and python [SPARK-47845][SQL][PYTHON][CONNECT] Support Column type in split function for scala and python Apr 15, 2024
@CTCC1 CTCC1 marked this pull request as draft April 15, 2024 01:46
Copy link
Contributor Author

@CTCC1 CTCC1 Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to consider renaming variable that shadows python built-in such as str. It's annoying here that it breaks the usage of isinstance(pattern, str).
Given renaming variable would be a backwards incompatible change (for user code that uses kwargs) and needs further discussion, I will work around this by aliasing the python builtin (See the workaround comment in code).

@CTCC1 CTCC1 force-pushed the SPARK-47845-support-column-type-in-split-function branch from c7aa7af to dd68c9f Compare April 15, 2024 08:26
@CTCC1 CTCC1 marked this pull request as ready for review April 15, 2024 08:36
@CTCC1 CTCC1 force-pushed the SPARK-47845-support-column-type-in-split-function branch from dd68c9f to a48100d Compare April 15, 2024 17:17
@CTCC1
Copy link
Contributor Author

CTCC1 commented Apr 16, 2024

Actually my first PR here :)
@zhengruifeng based on git blame you did something very similar before. Do you want to take a look? Thanks in advance!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think lit function accept both Column and int

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limit can also be of type str referring to a column, hence the check to avoid making the column name a string literal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, we can use it for pattern

Comment on lines 2484 to 2498
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# work around shadowing of str in the input variable name
from builtins import str as py_str
if isinstance(pattern, py_str):
_pattern = lit(pattern)
elif isinstance(pattern, Column):
_pattern = pattern
else:
raise PySparkTypeError(
error_class="NOT_COLUMN_OR_STR",
message_parameters={"arg_name": "pattern", "arg_type": type(pattern).__name__},
)
limit = lit(limit) if isinstance(limit, int) else _to_col(limit)
return _invoke_function("split", _to_col(str), _pattern, limit)
limit = lit(limit) if isinstance(limit, int) else _to_col(limit)
return _invoke_function("split", _to_col(str), lit(pattern), limit)

Copy link
Contributor Author

@CTCC1 CTCC1 Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, this is simpler for sure! The only concern is that we will not raise PySparkTypeError if pattern is passed in for a type other than Column or str, and it will form a UnresolvedFunction. Is raising such error early a requirement for connect?

For now I simplified the code but kept the error raise. Lmk if we want to skip raising the error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a few functions have such check, and most functions don't check the types.
We might need to figure out an easy way for type checking.

As to this function, let's keep it simpler for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I removed the type check now. Maybe in the future we can standardize this, e.g. with a decorator that inspect the function signature and do type check accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add more doctest in the Examples section to test the new supported types

those doctests will automatically be reused in Spark Connect Python Client.

@zhengruifeng
Copy link
Contributor

also cc @HyukjinKwon and @LuciferYang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some new test cases for the connect module in PlanGenerationTestSuite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CTCC1 also need to run connect/ testOnly org. apache. spark. sql. connect. ProtoToParsedPlanTestSuite to generate the golden files needed for reverse validation testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the pointer, fixed

@HyukjinKwon
Copy link
Member

I am fine with this change

@CTCC1 CTCC1 force-pushed the SPARK-47845-support-column-type-in-split-function branch from 3d70901 to 46355bb Compare April 20, 2024 04:20
@zhengruifeng
Copy link
Contributor

thank you @CTCC1

merged to master

HyukjinKwon pushed a commit that referenced this pull request Jun 17, 2024
…rs for several functions in pyspark/scala

### What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-48555
For pyspark, added the ability to use the "Column" type or names of column for the parameters of the following functions:

- array_remove
- array_position
- map_contains_key
- substring

For scala, added the ability to use "Column" type as the parameters of the `substring` function

This functionality already exists in the SQL syntax:
```
select array_remove(col1, col2) from values (array(1,2,3), 2)
```

however, it isn't possible to do the same in python
```python
df.select(F.array_remove(F.col("col1"), F.col("col2"))
```

Note that in scala the functions other than `substring` already accepted Column params (or rather, they accept `Any` and pass whatever the param is to `lit` so it ends up working), so I  only needed to change substring in the scala side.

### Why are the changes needed?
To align the scala/python API with the SQL one.

### Does this PR introduce _any_ user-facing change?
Yes, added new overloaded functions in scala and changed type hints/docstrings in python.

### How was this patch tested?
Added doctests for the python changes, and tests in the scala test suites, then tested both manually and using the CI.

### Was this patch authored or co-authored using generative AI tooling?
No.

### Notes:

- I opened the related JIRA ticket, but looks like I don't have the option to assign it myself, so if it is required and any reviewer does have permissions for it, I'd appreciate it
- The "Build" workflow passed successfully, but the "report tes results" one didn't due to some authorization issue, I see that this is the same for many other open PRs right now so I assume its ok.
- For the python changes, I tried to follow the convention used by other functions (such as `array_contains` or `when`), of using `value._jc if isinstance(value, Column) else value`
- Im not really familiar with the `connect` functions, but seems like on the python side they already supported the use of columns so no extra changes were needed there
- For the scala side, this is the first time I'm touching scala, I think I covered it all as I tried to match similar changes done in [a similar PR](#46045)
- The same issue also exists for `substring_index` however I wasn't able to fix this one the same way I did for `substring`. Calling it with a `lit` for the `count` arg worked, but using a `col` error with a `NumberFormatError` for "project_value_3". I assume the error is related to trying to parse the Int [here](https://github.com/apache/spark/blob/7cba1ab4d6acef4e9d73a8e6018b0902aac3a18d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala#L1449) . In that case I got lost in the scala code and decided to drop it, but if anyone knows how to fix this error I could change that function as well.

The contribution is my original work and I license the work to the project under the project’s open source license.

Closes #46901 from Ronserruya/support_columns_in_pyspark_functions.

Authored-by: Ron Serruya <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants