Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,29 @@ class Analyzer(
private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = {
val outerReferences = ArrayBuffer.empty[Expression]

// Validate that correlated aggregate expression do not contain a mixture
// of outer and local references.
def checkMixedReferencesInsideAggregation(expr: Expression): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: checkMixedReferencesInsideAggregation -> checkMixedReferencesInsideAggregationExpr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Will change it to checkMixedReferencesInsideAggregateExpr

expr.foreach {
case a: AggregateExpression if containsOuter(a) =>
val outer = a.collect { case OuterReference(e) => e.toAttribute }
val local = a.references -- outer
if (local.nonEmpty) {
val msg =
s"""
|Found an aggregate expression in a correlated predicate that has both
|outer and local references, which is not supported yet.
|Aggregate expression: ${a.sql}
|Outer references: ${outer.map(_.sql).mkString(", ")}
|Local references: ${local.map(_.sql).mkString(", ")}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comma at each line

                   |Aggregate expression: ${a.sql},
                   |Outer references: ${outer.map(_.sql).mkString(", ")},
                   |Local references: ${local.map(_.sql).mkString(", ")}.

""".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Thanks. fixed.

stripMargin.replace("\n", " ").trim()
failAnalysis(msg)
}
case _ =>
}
}

// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (hasOuterReferences(p)) {
Expand All @@ -1219,6 +1242,7 @@ class Analyzer(

// Make sure a plan's expressions do not contain outer references
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this comment accordingly too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Ok. will do.

def failOnOuterReference(p: LogicalPlan): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failOnOuterReference -> failOnOuterOrMixedReference? Or is it fine to keep it unchanged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya ok, will change it to failOnInvalidOuterReference. That should include both the cases.

p.expressions.foreach(checkMixedReferencesInsideAggregation)
Copy link
Member

@viirya viirya Apr 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p is not definitely an Aggregate, so I think checkMixedReferencesInsideAggregation will not only apply to aggregate expression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya checkMixedReferencesInsideAggregation only looks for AggregateExpression ? For other types its just a pass-through.

Copy link
Member

@viirya viirya Apr 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. got it. :-) Btw, Filter can also have aggregate expression here.

if (p.expressions.exists(containsOuter)) {
failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
Expand Down Expand Up @@ -1305,6 +1329,8 @@ class Analyzer(
case _: EqualTo | _: EqualNullSafe => false
case _ => true
}

correlated.foreach(checkMixedReferencesInsideAggregation(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: correlated.foreach(checkMixedReferencesInsideAggregation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Thanks. fixed.

// The aggregate expressions are treated in a special way by getOuterReferences. If the
// aggregate expression contains only outer reference attributes then the entire aggregate
// expression is isolated as an OuterReference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ case class OuterReference(e: NamedExpression)
override def exprId: ExprId = e.exprId
override def toAttribute: Attribute = e.toAttribute
override def newInstance(): NamedExpression = OuterReference(e.newInstance())
override def sql: String = e.sql
override def toString: String = e.toString
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to override this? If we keep it unchanged, will it be easier for us to find whether it is an outer reference in the plans?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Thats a good point Sean. I will revert this and error raising code to make sure its printed properly.

}

object VirtualColumn {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,72 @@
-- The test file contains negative test cases
-- of invalid queries where error messages are expected.

create temporary view t1 as select * from values
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those just change for case, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Yeah.. since i was on this test case, thought i should fix the case.

CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
(1, 2, 3)
as t1(t1a, t1b, t1c);
AS t1(t1a, t1b, t1c);

create temporary view t2 as select * from values
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
(1, 0, 1)
as t2(t2a, t2b, t2c);
AS t2(t2a, t2b, t2c);

create temporary view t3 as select * from values
CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
(3, 1, 2)
as t3(t3a, t3b, t3c);
AS t3(t3a, t3b, t3c);

-- TC 01.01
-- The column t2b in the SELECT of the subquery is invalid
-- because it is neither an aggregate function nor a GROUP BY column.
select t1a, t2b
from t1, t2
where t1b = t2c
and t2b = (select max(avg)
from (select t2b, avg(t2b) avg
from t2
where t2a = t1.t1b
SELECT t1a, t2b
FROM t1, t2
WHERE t1b = t2c
AND t2b = (SELECT max(avg)
FROM (SELECT t2b, avg(t2b) avg
FROM t2
WHERE t2a = t1.t1b
)
)
;

-- TC 01.02
-- Invalid due to the column t2b not part of the output from table t2.
select *
from t1
where t1a in (select min(t2a)
from t2
group by t2c
having t2c in (select max(t3c)
from t3
group by t3b
having t3b > t2b ))
SELECT *
FROM t1
WHERE t1a in (SELECT min(t2a)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: in -> IN

FROM t2
GROUP by t2c
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: by -> BY

HAVING t2c IN (SELECT max(t3c)
FROM t3
GROUP BY t3b
HAVING t3b > t2b ))
;

-- TC 01.03
-- Invalid due to mixure of outer and local references under an AggegatedExpression
-- in a correlated predicate
SELECT t1a
FROM t1
GROUP BY 1
HAVING EXISTS (SELECT 1
FROM t2
WHERE t2a < min(t1a + t2a));

-- TC 01.04
-- Invalid due to mixure of outer and local references under an AggegatedExpression
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT 1
FROM t3
GROUP BY 1
HAVING min(t2a + t3a) > 1));

-- TC 01.05
-- Invalid due to outer reference appearing in projection list
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT min(t2a)
FROM t3));

Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 5
-- Number of queries: 8


-- !query 0
create temporary view t1 as select * from values
CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
(1, 2, 3)
as t1(t1a, t1b, t1c)
AS t1(t1a, t1b, t1c)
-- !query 0 schema
struct<>
-- !query 0 output



-- !query 1
create temporary view t2 as select * from values
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
(1, 0, 1)
as t2(t2a, t2b, t2c)
AS t2(t2a, t2b, t2c)
-- !query 1 schema
struct<>
-- !query 1 output



-- !query 2
create temporary view t3 as select * from values
CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
(3, 1, 2)
as t3(t3a, t3b, t3c)
AS t3(t3a, t3b, t3c)
-- !query 2 schema
struct<>
-- !query 2 output



-- !query 3
select t1a, t2b
from t1, t2
where t1b = t2c
and t2b = (select max(avg)
from (select t2b, avg(t2b) avg
from t2
where t2a = t1.t1b
SELECT t1a, t2b
FROM t1, t2
WHERE t1b = t2c
AND t2b = (SELECT max(avg)
FROM (SELECT t2b, avg(t2b) avg
FROM t2
WHERE t2a = t1.t1b
)
)
-- !query 3 schema
Expand All @@ -50,17 +50,67 @@ grouping expressions sequence is empty, and 't2.`t2b`' is not an aggregate funct


-- !query 4
select *
from t1
where t1a in (select min(t2a)
from t2
group by t2c
having t2c in (select max(t3c)
from t3
group by t3b
having t3b > t2b ))
SELECT *
FROM t1
WHERE t1a in (SELECT min(t2a)
FROM t2
GROUP by t2c
HAVING t2c IN (SELECT max(t3c)
FROM t3
GROUP BY t3b
HAVING t3b > t2b ))
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.AnalysisException
resolved attribute(s) t2b#x missing from min(t2a)#x,t2c#x in operator !Filter t2c#x IN (list#x [t2b#x]);


-- !query 5
SELECT t1a
FROM t1
GROUP BY 1
HAVING EXISTS (SELECT 1
FROM t2
WHERE t2a < min(t1a + t2a))
-- !query 5 schema
struct<>
-- !query 5 output
org.apache.spark.sql.AnalysisException
Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)) Outer references: t1.`t1a` Local references: t2.`t2a`;


-- !query 6
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT 1
FROM t3
GROUP BY 1
HAVING min(t2a + t3a) > 1))
-- !query 6 schema
struct<>
-- !query 6 output
org.apache.spark.sql.AnalysisException
Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t2.`t2a` + t3.`t3a`)) Outer references: t2.`t2a` Local references: t3.`t3a`;


-- !query 7
SELECT t1a
FROM t1
WHERE t1a IN (SELECT t2a
FROM t2
WHERE EXISTS (SELECT min(t2a)
FROM t3))
-- !query 7 schema
struct<>
-- !query 7 output
org.apache.spark.sql.AnalysisException
Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses:
Aggregate [min(t2a#x) AS min(t2.`t2a`)#x]
+- SubqueryAlias t3
+- Project [t3a#x, t3b#x, t3c#x]
+- SubqueryAlias t3
+- LocalRelation [t3a#x, t3b#x, t3c#x]
;