-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11275][SQL] Rollup and Cube Generates the Incorrect Results when Aggregation Functions Use Group By Columns #9419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
Hi, Rick,
Thanks, Xiao Li |
|
If I understand the problem correctly, the logical Expand operator makes items which are not in the grouping set What I don't understand is why we are not fixing the logical Expand by introducing two sets of expressions, one for grouping and one for aggregation. This is much simpler and fixes the problem at its root. |
|
@holdenk This is the PR I mentioned in the email. Could you review it too? |
|
It would also help alot to have unit tests covering this problem. |
|
@hvanhovell Your understanding is right. If we merge both grouping and aggregation together, it will introduce extra complexity to generate the logical plan for the following case: Of course, in theory, it is doable, but the code will be harder to maintain in the future. Extra Project will be collapsed by optimizer. Thus, in analyzer, I just introduce the extra Project. I am writing unit test cases. Will try to deliver them ASAP. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe in addition to describing what we do we should explain why we are inserting this projection here as well?
|
Thanks for the ongoing discussion, Xiao and Herman. At the very least, it would be useful to capture Herman's analysis in the code comment which explains why we are inserting an extra projection. Xiao's approach sounds like good, incremental improvement. More test cases would help test the correctness of an alternative solution if someone wants to try their hand at one. It would be interesting to see the code for Herman's approach so that we can evaluate whether it is simple (as Herman says) or complex (as Xiao says). Thanks, |
|
@rick-ibm Will add more comments to explain it. Especially, I will emphasize this design will expect the optimizer collapses these two projections into a single one. @chenghao-intel , could you also review the code changes? Does the solution look ok? Really appreciate your original work. It looks very concise to me. @holdenk Got it. Will try to follow your suggestions and do more code cleaning and resend a request to you again. Thank you! |
|
@gatorsmile thanks for working on this fixing, can you also add the unit test? |
|
@chenghao-intel @hvanhovell Unit test cases are added. Will finish the code for resolving the comments by @holdenk @rick-ibm @rxin @marmbrus @liancheng @yhuai I am wondering if my incremental low-risk fix will be merged to Spark 1.6? If not, personally, I prefer to fixing all the bugs and improve the solution by @aray (Andrew Ray). That solution simplifies the implementation of rollup and cube. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unit tests for the DataFrame methods should be in the base sql package not hive so that they are run when not compiled with hive support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, Andrew!
Using SQL statements might generate a different logical plan. Thus, we need to do the unit test for both SQL and Dataframe in this package org.apache.spark.sql.hive.
Based on my understanding, DataFrame methods are shared by both hiveContext and sqlContext. Thus, we do not need to do it again in the base sql package. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take this for example:
select sum(a-b) as ab from mytable group by b with rollup;I think we probably need to add extra column(s) for the output of Expand, (e.g. (a-b) in this case). Will that be more simple for this fixing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @chenghao-intel ,
Could you explain it a little bit more?
So far, this query is correctly processed and returned a correct result. Since b is part of an aggregated function, the fix added extra columns for b. Below is the generated plan:
== Analyzed Logical Plan ==
ab: bigint
Aggregate [b#3,grouping__id#12], [sum(cast((a#2 - b#3#13) as bigint)) AS ab#4L]
Expand [0,1], [b#3], grouping__id#12
Project [a#2,b#3,b#3 AS b#3#13]
Subquery mytable
Project [_1#0 AS a#2,_2#1 AS b#3]
LocalRelation [_1#0,_2#1], [[1,2],[2,4],[2,9]]
== Optimized Logical Plan ==
Aggregate [b#3,grouping__id#12], [sum(cast((a#2 - b#3#13) as bigint)) AS ab#4L]
Expand [0,1], [b#3], grouping__id#12
LocalRelation [a#2,b#3,b#3#13], [[1,2,2],[2,4,4],[2,9,9]]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry, actually there are so big difference as I mentioned.
But I got error when do the query like below, can you please take look at it?
select sum(a+b) as ab from mytable group by a+b, b with rollup;
15/11/04 17:46:36 ERROR thriftserver.SparkSQLDriver: Failed in [select sum(a+b) as ab from mytable group by a+b, b with rollup]
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: '(cast(a#109 as double) + b#110)
at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:59)
at org.apache.spark.sql.catalyst.plans.logical.Expand$$anonfun$expand$1$$anonfun$5$$anonfun$apply$3.applyOrElse(basicOperators.scala:291)
at org.apache.spark.sql.catalyst.plans.logical.Expand$$anonfun$expand$1$$anonfun$5$$anonfun$apply$3.applyOrElse(basicOperators.scala:287)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226)
at org.apache.spark.sql.catalyst.plans.logical.Expand$$anonfun$expand$1$$anonfun$5.apply(basicOperators.scala:287)
at org.apache.spark.sql.catalyst.plans.logical.Expand$$anonfun$expand$1$$anonfun$5.apply(basicOperators.scala:287)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.logical.Expand$$anonfun$expand$1.apply(basicOperators.scala:287)
at org.apache.spark.sql.catalyst.plans.logical.Expand$$anonfun$expand$1.apply(basicOperators.scala:283)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.plans.logical.Expand.expand(basicOperators.scala:283)
at org.apache.spark.sql.catalyst.plans.logical.Expand.<init>(basicOperators.scala:254)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$apply$6.applyOrElse(Analyzer.scala:293)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$apply$6.applyOrElse(Analyzer.scala:200)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$.apply(Analyzer.scala:200)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$.apply(Analyzer.scala:173)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:38)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:38)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:36)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:784)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenghao-intel , a good catch! Thank you!
I fixed this issue if you integrate the latest change. Also added two more test cases to cover it.
|
is this PR fix the sample problem with #9429? |
|
Yes, true, but in different approach. |
…non-attribute expressions in group by
|
Please let me know if I need to resolve these conflicts. @cloud-fan @chenghao-intel @marmbrus @rxin |
|
Yes, I think so, and sorry, that I am busy with something else, and will dive in next week for the detailed review. |
|
Thank you, Hao! Will do it in the next few days. |
Fixes bug with grouping sets (including cube/rollup) where aggregates that included grouping expressions would return the wrong (null) result. Also simplifies the analyzer rule a bit and leaves column pruning to the optimizer. Added multiple unit tests to DataFrameAggregateSuite and verified it passes hive compatibility suite: ``` build/sbt -Phive -Dspark.hive.whitelist='groupby.*_grouping.*' 'test-only org.apache.spark.sql.hive.execution.HiveCompatibilitySuite' ``` This is an alternative to pr #9419 but I think its better as it simplifies the analyzer rule instead of adding another special case to it. Author: Andrew Ray <[email protected]> Closes #9815 from aray/groupingset-agg-fix. (cherry picked from commit 37cff1b) Signed-off-by: Yin Huai <[email protected]>
Fixes bug with grouping sets (including cube/rollup) where aggregates that included grouping expressions would return the wrong (null) result. Also simplifies the analyzer rule a bit and leaves column pruning to the optimizer. Added multiple unit tests to DataFrameAggregateSuite and verified it passes hive compatibility suite: ``` build/sbt -Phive -Dspark.hive.whitelist='groupby.*_grouping.*' 'test-only org.apache.spark.sql.hive.execution.HiveCompatibilitySuite' ``` This is an alternative to pr #9419 but I think its better as it simplifies the analyzer rule instead of adding another special case to it. Author: Andrew Ray <[email protected]> Closes #9815 from aray/groupingset-agg-fix.
|
@gatorsmile Thank you for working on this. However, I believe #9815 is a better approach to fix this issue and I have merged that. How about we close this PR? |
|
@yhuai Sorry. I did not pay attention to this. I will try the new implementation this weekend. |
|
Thank you everyone! |
Fixes bug with grouping sets (including cube/rollup) where aggregates that included grouping expressions would return the wrong (null) result. Also simplifies the analyzer rule a bit and leaves column pruning to the optimizer. Added multiple unit tests to DataFrameAggregateSuite and verified it passes hive compatibility suite: ``` build/sbt -Phive -Dspark.hive.whitelist='groupby.*_grouping.*' 'test-only org.apache.spark.sql.hive.execution.HiveCompatibilitySuite' ``` This is an alternative to pr apache/spark#9419 but I think its better as it simplifies the analyzer rule instead of adding another special case to it. Author: Andrew Ray <[email protected]> Closes #9815 from aray/groupingset-agg-fix.
In the current implementation, Rollup and Cube are unable to generate the correct results for the following cases:
When the aggregation functions use the group by key columns:
sql("select b, a, sum(a), min(a), min(b+b) from mytable group by a, b with rollup").collect()
sql("select a, b, sum(a), min(a), min(b+b) from mytable group by b, a with cube").collect()
The problem becomes more complex if the group by clauses have the functions whose inputs are also appear in the group by.
sql("select a + b, b, sum(a - b) from mytable group by a + b, b with rollup").collect()
sql("select a + b, b, sum(a - b) from mytable group by a + b, b with cube").collect()
The basic solutions are adding extra Projection when the query are part of the above situations. The projection will add duplicate values for these affected columns with alias names so that the column values will not be removed when expand is evaluated during the runtime.
Working on the test cases. Will add more cases into Hive golden buckets. Welcome any comment and suggestion! Thank you!