-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20334][SQL] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references. #17636
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
a266c8d
bb1bdad
ff88651
c4e1a01
af3d367
55c64ca
d986ddc
2411f3e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1210,16 +1210,41 @@ class Analyzer( | |
| private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { | ||
| val outerReferences = ArrayBuffer.empty[Expression] | ||
|
|
||
| // Validate that correlated aggregate expression do not contain a mixture | ||
| // of outer and local references. | ||
| def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = { | ||
| expr.foreach { | ||
| case a: AggregateExpression if containsOuter(a) => | ||
| val outer = a.collect { case OuterReference(e) => e.toAttribute } | ||
| val local = a.references -- outer | ||
| if (local.nonEmpty) { | ||
| val msg = | ||
| s""" | ||
| |Found an aggregate expression in a correlated predicate that has both | ||
| |outer and local references, which is not supported yet. | ||
| |Aggregate expression: ${SubExprUtils.stripOuterReference(a).sql}, | ||
| |Outer references: ${outer.map(_.sql).mkString(", ")}, | ||
| |Local references: ${local.map(_.sql).mkString(", ")}. | ||
| """.stripMargin.replace("\n", " ").trim() | ||
| failAnalysis(msg) | ||
| } | ||
| case _ => | ||
| } | ||
| } | ||
|
|
||
| // Make sure a plan's subtree does not contain outer references | ||
| def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = { | ||
| if (hasOuterReferences(p)) { | ||
| failAnalysis(s"Accessing outer query column is not allowed in:\n$p") | ||
| } | ||
| } | ||
|
|
||
| // Make sure a plan's expressions do not contain outer references | ||
| def failOnOuterReference(p: LogicalPlan): Unit = { | ||
| if (p.expressions.exists(containsOuter)) { | ||
| // Make sure a plan's expressions do not contain : | ||
| // 1. Aggregate expressions that have mixture of outer and local references. | ||
| // 2. Expressions containing outer references on plan nodes other than Filter. | ||
| def failOnInvalidOuterReference(p: LogicalPlan): Unit = { | ||
| p.expressions.foreach(checkMixedReferencesInsideAggregateExpr) | ||
| if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this new condition
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viirya In the new code, we are calling failOnOuterReference on Filter based on @gatorsmile's comment.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh. I found it.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should put the check of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viirya This is by choice. So we want to return the same error for TC.01.03 and TC.01.04. Currently we return a "..not supported outside of WHERE/HAVING" error for TC.01.04 which is misleading. |
||
| failAnalysis( | ||
| "Expressions referencing the outer query are not supported outside of WHERE/HAVING " + | ||
| s"clauses:\n$p") | ||
|
|
@@ -1289,9 +1314,9 @@ class Analyzer( | |
| // These operators can be anywhere in a correlated subquery. | ||
| // so long as they do not host outer references in the operators. | ||
| case s: Sort => | ||
| failOnOuterReference(s) | ||
| failOnInvalidOuterReference(s) | ||
| case r: RepartitionByExpression => | ||
| failOnOuterReference(r) | ||
| failOnInvalidOuterReference(r) | ||
|
|
||
| // Category 3: | ||
| // Filter is one of the two operators allowed to host correlated expressions. | ||
|
|
@@ -1305,6 +1330,8 @@ class Analyzer( | |
| case _: EqualTo | _: EqualNullSafe => false | ||
| case _ => true | ||
| } | ||
|
|
||
| failOnInvalidOuterReference(f) | ||
| // The aggregate expressions are treated in a special way by getOuterReferences. If the | ||
| // aggregate expression contains only outer reference attributes then the entire aggregate | ||
| // expression is isolated as an OuterReference. | ||
|
|
@@ -1314,23 +1341,23 @@ class Analyzer( | |
| // Project cannot host any correlated expressions | ||
| // but can be anywhere in a correlated subquery. | ||
| case p: Project => | ||
| failOnOuterReference(p) | ||
| failOnInvalidOuterReference(p) | ||
|
|
||
| // Aggregate cannot host any correlated expressions | ||
| // It can be on a correlation path if the correlation contains | ||
| // only equality correlated predicates. | ||
| // It cannot be on a correlation path if the correlation has | ||
| // non-equality correlated predicates. | ||
| case a: Aggregate => | ||
| failOnOuterReference(a) | ||
| failOnInvalidOuterReference(a) | ||
| failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a) | ||
|
|
||
| // Join can host correlated expressions. | ||
| case j @ Join(left, right, joinType, _) => | ||
| joinType match { | ||
| // Inner join, like Filter, can be anywhere. | ||
| case _: InnerLike => | ||
| failOnOuterReference(j) | ||
| failOnInvalidOuterReference(j) | ||
|
|
||
| // Left outer join's right operand cannot be on a correlation path. | ||
| // LeftAnti and ExistenceJoin are special cases of LeftOuter. | ||
|
|
@@ -1341,12 +1368,12 @@ class Analyzer( | |
| // Any correlated references in the subplan | ||
| // of the right operand cannot be pulled up. | ||
| case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => | ||
| failOnOuterReference(j) | ||
| failOnInvalidOuterReference(j) | ||
| failOnOuterReferenceInSubTree(right) | ||
|
|
||
| // Likewise, Right outer join's left operand cannot be on a correlation path. | ||
| case RightOuter => | ||
| failOnOuterReference(j) | ||
| failOnInvalidOuterReference(j) | ||
| failOnOuterReferenceInSubTree(left) | ||
|
|
||
| // Any other join types not explicitly listed above, | ||
|
|
@@ -1362,7 +1389,7 @@ class Analyzer( | |
| // Note: | ||
| // Generator with join=false is treated as Category 4. | ||
| case g: Generate if g.join => | ||
| failOnOuterReference(g) | ||
| failOnInvalidOuterReference(g) | ||
|
|
||
| // Category 4: Any other operators not in the above 3 categories | ||
| // cannot be on a correlation path, that is they are allowed only | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have any test case to cover this scenario?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gatorsmile I couldn't find a test for this. I will add one for now in SubquerySuite. I will move the negative tests to the sqlquerytestsuite in a follow-up pr. |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,42 +1,72 @@ | ||
| -- The test file contains negative test cases | ||
| -- of invalid queries where error messages are expected. | ||
|
|
||
| create temporary view t1 as select * from values | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those just change for case, right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viirya Yeah.. since i was on this test case, thought i should fix the case. |
||
| CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES | ||
| (1, 2, 3) | ||
| as t1(t1a, t1b, t1c); | ||
| AS t1(t1a, t1b, t1c); | ||
|
|
||
| create temporary view t2 as select * from values | ||
| CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES | ||
| (1, 0, 1) | ||
| as t2(t2a, t2b, t2c); | ||
| AS t2(t2a, t2b, t2c); | ||
|
|
||
| create temporary view t3 as select * from values | ||
| CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES | ||
| (3, 1, 2) | ||
| as t3(t3a, t3b, t3c); | ||
| AS t3(t3a, t3b, t3c); | ||
|
|
||
| -- TC 01.01 | ||
| -- The column t2b in the SELECT of the subquery is invalid | ||
| -- because it is neither an aggregate function nor a GROUP BY column. | ||
| select t1a, t2b | ||
| from t1, t2 | ||
| where t1b = t2c | ||
| and t2b = (select max(avg) | ||
| from (select t2b, avg(t2b) avg | ||
| from t2 | ||
| where t2a = t1.t1b | ||
| SELECT t1a, t2b | ||
| FROM t1, t2 | ||
| WHERE t1b = t2c | ||
| AND t2b = (SELECT max(avg) | ||
| FROM (SELECT t2b, avg(t2b) avg | ||
| FROM t2 | ||
| WHERE t2a = t1.t1b | ||
| ) | ||
| ) | ||
| ; | ||
|
|
||
| -- TC 01.02 | ||
| -- Invalid due to the column t2b not part of the output from table t2. | ||
| select * | ||
| from t1 | ||
| where t1a in (select min(t2a) | ||
| from t2 | ||
| group by t2c | ||
| having t2c in (select max(t3c) | ||
| from t3 | ||
| group by t3b | ||
| having t3b > t2b )) | ||
| SELECT * | ||
| FROM t1 | ||
| WHERE t1a IN (SELECT min(t2a) | ||
| FROM t2 | ||
| GROUP BY t2c | ||
| HAVING t2c IN (SELECT max(t3c) | ||
| FROM t3 | ||
| GROUP BY t3b | ||
| HAVING t3b > t2b )) | ||
| ; | ||
|
|
||
| -- TC 01.03 | ||
| -- Invalid due to mixure of outer and local references under an AggegatedExpression | ||
| -- in a correlated predicate | ||
| SELECT t1a | ||
| FROM t1 | ||
| GROUP BY 1 | ||
| HAVING EXISTS (SELECT 1 | ||
| FROM t2 | ||
| WHERE t2a < min(t1a + t2a)); | ||
|
|
||
| -- TC 01.04 | ||
| -- Invalid due to mixure of outer and local references under an AggegatedExpression | ||
| SELECT t1a | ||
| FROM t1 | ||
| WHERE t1a IN (SELECT t2a | ||
| FROM t2 | ||
| WHERE EXISTS (SELECT 1 | ||
| FROM t3 | ||
| GROUP BY 1 | ||
| HAVING min(t2a + t3a) > 1)); | ||
|
|
||
| -- TC 01.05 | ||
| -- Invalid due to outer reference appearing in projection list | ||
| SELECT t1a | ||
| FROM t1 | ||
| WHERE t1a IN (SELECT t2a | ||
| FROM t2 | ||
| WHERE EXISTS (SELECT min(t2a) | ||
| FROM t3)); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -815,7 +815,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext { | |
|
|
||
| // Generate operator | ||
| test("Correlated subqueries in LATERAL VIEW") { | ||
| withTempView("t1", "t2") { | ||
| withTempView("t1", "t2", "t3") { | ||
|
||
| Seq((1, 1), (2, 0)).toDF("c1", "c2").createOrReplaceTempView("t1") | ||
| Seq[(Int, Array[Int])]((1, Array(1, 2)), (2, Array(-1, -3))) | ||
| .toDF("c1", "arr_c2").createTempView("t2") | ||
|
|
@@ -828,6 +828,19 @@ class SubquerySuite extends QueryTest with SharedSQLContext { | |
| | from t2 lateral view explode(arr_c2) q as c2 | ||
| where t1.c1 = t2.c1)""".stripMargin), | ||
| Row(1) :: Row(0) :: Nil) | ||
|
|
||
| val msg1 = intercept[AnalysisException] { | ||
| sql( | ||
| """ | ||
| | select c1 | ||
| | from t2 | ||
| | where exists (select * | ||
| | from t1 lateral view explode(t2.arr_c2) q as c2 | ||
| | where t1.c1 = t2.c1) | ||
|
||
| """.stripMargin) | ||
| } | ||
| assert(msg1.getMessage.contains( | ||
| "Expressions referencing the outer query are not supported outside of WHERE/HAVING")) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do not add comma, we can remove
.replace("\n", " ").trim()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile It will be too many lines sean ? I think adding comma is better ?