-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-45112][SQL] Use UnresolvedFunction based resolution in SQL Dataset functions #42864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45112][SQL] Use UnresolvedFunction based resolution in SQL Dataset functions #42864
Conversation
1ce6422 to
b7fab4c
Compare
b7fab4c to
03bb5bc
Compare
8e3e600 to
fd88b32
Compare
9a49db8 to
e9fe889
Compare
e9fe889 to
3191591
Compare
| | abc| value| | ||
| +------------------+------+ | ||
| +---------------+------+ | ||
| |substr(l, 1, 3)|d[key]| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid this change, I think we can call self.substring in this method. df.l[slice(1, 3)] does not require a certain function name, so keeping it as it was is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping @peter-toth
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // scalastyle:on line.size.limit | ||
| case class In(value: Expression, list: Seq[Expression]) extends Predicate { | ||
|
|
||
| def this(valueAndList: Seq[Expression]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean we support select in(a, b, c) now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. But it was not a goal, it is just a useful side effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is useful. in(a, b, c) looks pretty weird to me. Can we revert it and treat def in as a special case? The SQL parser also treat IN as a special case and has dedicated syntax for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All right, reverted in 8bf64a7.
| def count(e: Column): Column = withAggregateFunction { | ||
| e.expr match { | ||
| def count(e: Column): Column = { | ||
| val withoutStar = e.expr match { | ||
| // Turn count(*) into count(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated issue: this is not the right place to do this conversion. should be done in the analyzer. cc @zhengruifeng does spark connect hit the same issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, spark connect doesn't hit such issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark/python/pyspark/sql/connect/functions.py
Lines 1014 to 1015 in 89041a4
| def count(col: "ColumnOrName") -> Column: | |
| return _invoke_function_over_columns("count", col) |
spark/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
Line 391 in 89041a4
| def count(e: Column): Column = Column.fn("count", e) |
| def schema_of_json(json: Column, options: java.util.Map[String, String]): Column = { | ||
| withExpr(SchemaOfJson(json.expr, options.asScala.toMap)) | ||
| } | ||
| def schema_of_json(json: Column, options: java.util.Map[String, String]): Column = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these should be the few exceptions that we should still keep them as they were, because the expression (SchemaOfJson in this case) takes non-expression inputs, and it's tricky to define a protocol that can pass non-expression inputs as expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No sure I get your point. SchemaOfJson seems to accept options as expressions: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L776-L778. We just follow Connect here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, then I'm fine with it. It's inevitable that we need to create map expression from the language native (scala or python) map values.
| case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] => | ||
| UnresolvedAlias(a, Some(Column.generateAlias)) | ||
| case u: UnresolvedFunction => UnresolvedAlias(expr, None) | ||
| case e if !e.resolved => UnresolvedAlias(expr, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this change some Python tests fail like sum_udf(col("v2")) + 5 here: https://github.com/apache/spark/blob/master/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py#L404C17-L404C40.
I debugged this today and it seems these are AggregateExpressions but the aggregate function is PythonUDAF so they don't match the previous case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] => case. Shall we remove the if condition? Column.generateAlias seems to take care of it: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L44-L50
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the AggregateExpression path to case a: AggregateExpression => UnresolvedAlias(a, Some(Column.generateAlias)) but still kept the case _ if !expr.resolved => UnresolvedAlias(expr, None) because in the plus_one(sum_udf(col("v1"))) case an unresolved PythonUDF is passed here.
|
also cc @beliefer @panbingkun |
|
@zhengruifeng Thank you for you ping. |
The ultimate goal is to move dataset and connect functions into the |
…112-use-unresolvedfunction-in-dataset-functions # Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/functions.scala
|
To add more color to @peter-toth 's comment. Think about a base trait to define these function APIs Then in the spark connect side, we override |
This reverts commit d611fd5.
dcae4e6 to
6bd03d1
Compare
| override def builder(e: Seq[Expression]): Expression = { | ||
| assert(e.length == 1, "Defined UDF only has one column") | ||
| val expr = e.head | ||
| assert(expr.resolved, "column should be resolved to use the same type " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about this change. IIUC we always call builder with resolved expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| And( | ||
| LessThan(lit(1).expr, lit(5).expr), | ||
| LessThan(lit(6).expr, lit(7).expr)), | ||
| EqualTo(lit(0).expr, lit(-1).expr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use dsl to build expressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 580f97b
|
thanks, merging to master! |
|
Thanks for the review! |
### What changes were proposed in this pull request? This PR proposes bottum-up resolution in `ResolveFunctions`, which is much faster (requires less number of resolution rounds) if we have deeply nested `UnresolvedFunctions`. These structures are more likely to occur after #42864. ### Why are the changes needed? Performance optimization. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43146 from peter-toth/SPARK-45354-resolve-functions-bottom-up. Authored-by: Peter Toth <[email protected]> Signed-off-by: Peter Toth <[email protected]>
What changes were proposed in this pull request?
This is the first cleanup PR of the ticket to use
UnresolvedFunction/FunctionRegistrybased resolution in SQL Dataset functions similar to what Spark Connect does.Why are the changes needed?
If we can make the SQL and Connect Dataset functions similar then we can move the functions to
sql-apimodule.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.
Was this patch authored or co-authored using generative AI tooling?
No.