-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-44044][SS] Improve Error message for Window functions with streaming #41578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
jaceklaskowski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tiny nits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Remove found
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Streaming (upper case)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting...should the error message include OVER spec? 🤔 It might not necessarily be related to this change but an issue with window aggregation in Spark SQL in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we add OVER, it looks like this:
Unsupported window function found in column 'row_number() OVER (PARTITION BY col1 ORDER BY col2 ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as column rn_col'. Structured streaming only supports time-window aggregation using the `window` function.
I don't have a strong opinion on this but I didn't include it because I personally feel that I'm a little bit overwhelmed by less relevant information. What's your opinion on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR. Now the message looks like following:
Unsupported window function in 'row_number()' as column 'rn_col'. Structured Streaming only supports time-window aggregation using the `window` function. (window specification: '(PARTITION BY col1 ORDER BY col2 ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you introduce an error class with this error format, and put it to error-classes.json, please. Also cc @HeartSaVioR for checking semantic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@siying
The change itself seems to be OK.
We are going to apply error class framework on new attempt of touching exception. Although we haven't made any progress on UnsupportedOperationChecker, we could consider this as a starter.
Please look into the recent PR on error class migration in SS, especially error-classes.json and QueryExecutionErrors.scala.
#41205
Here is a guide doc for error class framework.
https://github.com/apache/spark/blob/master/core/src/main/resources/error/README.md
Please go through and apply it. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case w @ Window(_, _, _, child) if child.isStreaming =>
w would be same as subPlan.asInstanceOf[Window]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it doesn't even need to... we can just extract the value out from the pattern.
case Window(windowExpressions, _, _, child) if child.isStreaming =>
and then remove L512 and use windowExpressions directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@siying
The change itself seems to be OK.
We are going to apply error class framework on new attempt of touching exception. Although we haven't made any progress on UnsupportedOperationChecker, we could consider this as a starter.
Please look into the recent PR on error class migration in SS, especially error-classes.json and QueryExecutionErrors.scala.
#41205
Here is a guide doc for error class framework.
https://github.com/apache/spark/blob/master/core/src/main/resources/error/README.md
Please go through and apply it. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use checkError to check the error class and parameters. In that way, tech editors can change the error message and don't depend on internal Spark tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to avoid the dependency from the error message format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the argument contain multiple names? If not, please, change it to windowFunc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you wrap windowFuncs by toSQLId, please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, remove '' around windowSpec
|
@MaxGekk I addressed the comments. The CI failure is on python link. I don't know why it happens but I don't see how it is related. |
|
@MaxGekk here are the only error messages in CI and I assume it is not related: |
|
@MaxGekk Would you mind having an another round of review? Given that you've been providing review comments, I'd like to make sure we get sign-off from you. Thanks in advance! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, restore indentation here.
Yep, it seems not. I re-ran it locally. Could you rebase on the recent master. |
|
@MaxGekk I removed the unintended indenting and rebase. now all tests passed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change quoting of the expression and the id using toSQLExpr and toSQLId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and just pass both to nonTimeWindowNotSupportedInStreamingError, and put as column to error-classes.json.
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
Outdated
Show resolved
Hide resolved
|
@MaxGekk I addressed the comments. |
|
@siying Could you resolve conflicts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you pass sequences of (windowFunc, columnName, windowSpec), and do quoting inside of QueryExecutionErrors, please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's merge all of them and return (we.windowFunction, e.toAttribute, we.windowSpec)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to perform quoting here.
Co-authored-by: Maxim Gekk <[email protected]>
…xecutionErrors.scala Co-authored-by: Maxim Gekk <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
|
@MaxGekk I address the comments. |
|
+1, LGTM. Merging to master. |
|
@MaxGekk thank you for your review! |
What changes were proposed in this pull request?
Replace existing error message when non-time window function is used with streaming to include aggregation function and column. The error message looks like following now:
org.apache.spark.sql.AnalysisException: Window function is not supported in 'row_number()' as column 'rn_col' on streaming DataFrames/Datasets. Structured Streaming only supports time-window aggregation using the
windowunction. (window specification: '(PARTITION BY col1 ORDER BY col2 ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)')Note that the message is a little bit unnatural as the existing unit test requires the exception follows the pattern that it includes "not supported", "streaming" "DataFrames" and "Dataset".
Why are the changes needed?
The exiting error message is vague and a full logical plan is included. A user reports that they aren't able to identify what the problem is.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added a unit test