Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ae1186f
[SPARK-34581][SQL] Don't optimize out grouping expressions from aggre…
peter-toth Mar 21, 2021
5ab9f75
comment fix
peter-toth Mar 21, 2021
2293fd4
move logic to the beginning of optimization, simplify test
peter-toth Mar 22, 2021
3de19ca
regenerate approved plans
peter-toth Mar 22, 2021
04e61c5
Merge branch 'master' into SPARK-34581-keep-grouping-expressions
peter-toth Mar 23, 2021
6e05f14
define GroupingExpression as TaggingExpression
peter-toth Mar 23, 2021
09f1a85
move test to SQLQueryTestSuite
peter-toth Mar 24, 2021
f46b89d
add more explanation
peter-toth Mar 24, 2021
56589a3
Merge commit 'c8233f1be5c2f853f42cda367475eb135a83afd5' into SPARK-34…
peter-toth Mar 26, 2021
ea95bff
Merge commit '3951e3371a83578a81474ed99fb50d59f27aac62' into SPARK-34…
peter-toth Mar 31, 2021
7ea2306
Merge commit '89ae83d19b9652348a685550c2c49920511160d5' into SPARK-34…
peter-toth Apr 1, 2021
468534f
Merge commit '65da9287bc5112564836a555cd2967fc6b05856f' into SPARK-34…
peter-toth Apr 2, 2021
977c0bf
new GroupingExprRef approach
peter-toth Mar 27, 2021
c2ba804
simplify
peter-toth Apr 11, 2021
0622444
minor fixes
peter-toth Apr 12, 2021
343f35e
Merge commit 'e40fce919ab77f5faeb0bbd34dc86c56c04adbaa' into SPARK-34…
peter-toth Apr 12, 2021
2e79eb9
review fixes
peter-toth Apr 13, 2021
cff9b9a
fix latest test failures, add new test case
peter-toth Apr 14, 2021
78296a8
better non-deterministic test case
peter-toth Apr 14, 2021
72c173b
make new rules non excludable
peter-toth Apr 15, 2021
34f0439
Merge branch 'fork/master' into SPARK-34581-keep-grouping-expressions
peter-toth Apr 15, 2021
fb3a19d
fix validConstraints, minor changes
peter-toth Apr 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-34581][SQL] Don't optimize out grouping expressions from aggre…
…gate expressions
  • Loading branch information
peter-toth committed Mar 21, 2021
commit ae1186f4be87b2136c3e55bf4ae3d41c58b03142
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -155,3 +155,20 @@ object GroupingID {
if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType
}
}

/**
* Wrapper expression to avoid further optizations of child
*/
case class GroupingExpression(child: Expression) extends UnaryExpression {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tagging-like expr (TaggingExpression) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think it is. Changed in 6e05f14

override def eval(input: InternalRow): Any = {
child.eval(input)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
child.genCode(ctx)
}

override def dataType: DataType = {
child.dataType
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,19 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {
if (haveCommonNonDeterministicOutput(p.projectList, agg.aggregateExpressions)) {
p
} else {
agg.copy(aggregateExpressions = buildCleanedProjectList(
p.projectList, agg.aggregateExpressions))
val complexGroupingExpressions =
ExpressionSet(agg.groupingExpressions.filter(_.children.nonEmpty))

def wrapGroupingExpression(e: Expression): Expression = e match {
case _: AggregateExpression => e
case _ if complexGroupingExpressions.contains(e) => GroupingExpression(e)
case _ => e.mapChildren(wrapGroupingExpression)
}

val wrappedAggregateExpressions =
agg.aggregateExpressions.map(wrapGroupingExpression(_).asInstanceOf[NamedExpression])
agg.copy(aggregateExpressions =
buildCleanedProjectList(p.projectList, wrappedAggregateExpressions))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this issue is not related to CollapseProject. For example, we can reproduce it like this;

$ ./bin/spark-shell --conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.CollapseProject

Seq(Some(1), None).toDF("id").createOrReplaceTempView("t")
val df = sql("""
  SELECT NOT(t.id IS NULL) AS X, count(*) AS c
  FROM t
  GROUP BY t.id IS NULL
""")

df.show()

The query fails because BooleanSimplification applys illegal expr transformation to break the group-by constraint (that is, group-by columns must appear in aggregate exprs). In the ohter DBMS-like systems (e.g., PostgreSQL), the transformed query fails like this;

postgres=# SELECT t.id IS NOT NULL AS X, count(*) AS c FROM t GROUP BY t.id IS NULL;
ERROR:  column "t.id" must appear in the GROUP BY clause or be used in an aggregate function
LINE 1: SELECT t.id IS NOT NULL AS X, count(*) AS c FROM t GROUP BY ...

I'm currently not sure that this issue can happen only in BooleanSimplification though, I think we need a general solution to fix this kind of the illegal transformation instead of band-aid fixes.

Copy link
Contributor Author

@peter-toth peter-toth Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh you are right, even the initial query can contain terms in which Spark should keep grouping expressions. I think I just inserted the GroupingExpression wrapper around id IS NULL at the wrong place, it should happen during analysis. Will look into this today.

Copy link
Contributor Author

@peter-toth peter-toth Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the test case @maropu.
I moved the logic to a new rule and simplified the test case in: 2293fd4 and updated the PR description.

}
case Project(l1, g @ GlobalLimit(_, limit @ LocalLimit(_, p2 @ Project(l2, _))))
if isRenaming(l1, l2) =>
Expand Down
17 changes: 17 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4116,6 +4116,23 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}

test("SPARK-34581: Don't optimize out grouping expressions from aggregate expressions") {
withTempView("t") {
Seq[Integer](null, 1, 2, 3, null).toDF("id").createOrReplaceTempView("t")

val df = spark.sql(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a test to check if a query work correctly or not, could you move this test into SQLQueryTestSuite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, moved in 09f1a85

"""
|SELECT not(id), c
|FROM (
| SELECT t.id IS NULL AS id, count(*) AS c
| FROM t
| GROUP BY t.id IS NULL
|) t
|""".stripMargin)
checkAnswer(df, Row(true, 3) :: Row(false, 2) :: Nil)
}
}
}

case class Foo(bar: Option[String])