Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b988651
[SPARK-16804][SQL] Correlated subqueries containing LIMIT return inco…
nsyca Jul 29, 2016
069ed8f
[SPARK-16804][SQL] Correlated subqueries containing LIMIT return inco…
nsyca Jul 29, 2016
edca333
New positive test cases
nsyca Jul 30, 2016
64184fd
Fix unit test case failure
nsyca Aug 1, 2016
29f82b0
blocking TABLESAMPLE
nsyca Aug 5, 2016
ac43ab4
Fixing code styling
nsyca Aug 5, 2016
631d396
Correcting Scala test style
nsyca Aug 7, 2016
7eb9b2d
One (last) attempt to correct the Scala style tests
nsyca Aug 8, 2016
1387cf5
Merge remote-tracking branch 'upstream/master'
nsyca Aug 12, 2016
6d9bade
Merge remote-tracking branch 'upstream/master'
nsyca Nov 4, 2016
9a1f80b
Merge remote-tracking branch 'upstream/master'
nsyca Nov 4, 2016
3fe9429
Merge remote-tracking branch 'upstream/master'
nsyca Nov 5, 2016
0757b81
Merge remote-tracking branch 'upstream/master'
nsyca Nov 11, 2016
35b77f0
Merge remote-tracking branch 'upstream/master'
nsyca Nov 12, 2016
c63b8c6
Merge remote-tracking branch 'upstream/master'
nsyca Nov 14, 2016
f3351d5
Merge remote-tracking branch 'upstream/master'
nsyca Nov 18, 2016
9fc5c33
Merge remote-tracking branch 'upstream/master'
nsyca Nov 18, 2016
402e1d9
Merge remote-tracking branch 'upstream/master'
nsyca Nov 22, 2016
b117281
Merge remote-tracking branch 'upstream/master'
nsyca Nov 23, 2016
3023399
Merge remote-tracking branch 'upstream/master'
nsyca Nov 24, 2016
4b692f0
Merge remote-tracking branch 'upstream/master'
nsyca Nov 25, 2016
c8aadb5
Merge remote-tracking branch 'upstream/master'
nsyca Nov 28, 2016
2181647
Merge remote-tracking branch 'upstream/master'
nsyca Nov 28, 2016
c8588de
Merge remote-tracking branch 'upstream/master'
nsyca Nov 30, 2016
0d823d5
Merge remote-tracking branch 'upstream/master'
nsyca Nov 30, 2016
bc27b4e
Merge remote-tracking branch 'upstream/master'
nsyca Dec 5, 2016
e871783
first pass
nsyca Dec 10, 2016
b93b3ce
second pass
nsyca Dec 10, 2016
09b543b
address @gatorsmile's comments
nsyca Dec 11, 2016
f88a205
Address @gatorsmile's 2nd round comments
nsyca Dec 12, 2016
ca1dc96
Address @gatorsmile's 3rd round comments
nsyca Dec 12, 2016
724335a
Address @gatorsmile's 3rd round comments(2)
nsyca Dec 12, 2016
6040dcf
Code the fix based on @hvanhovell's solution
nsyca Dec 13, 2016
0b6bfd4
Address @hvanhovell's 2nd comments
nsyca Dec 13, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ trait CheckAnalysis extends PredicateHelper {
s"Scalar subquery must return only one column, but got ${query.output.size}")

case s @ ScalarSubquery(query, conditions, _) if conditions.nonEmpty =>

// Collect the columns from the subquery for further checking.
var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains)

def checkAggregate(agg: Aggregate): Unit = {
// Make sure correlated scalar subqueries contain one row for every outer row by
// enforcing that they are aggregates which contain exactly one aggregate expressions.
Expand All @@ -136,24 +140,35 @@ trait CheckAnalysis extends PredicateHelper {
failAnalysis("The output of a correlated scalar subquery must be aggregated")
}

// SPARK-18504: block cases where GROUP BY columns
// are not part of the correlated columns
val groupByCols = ExpressionSet.apply(agg.groupingExpressions.flatMap(_.references))
val predicateCols = ExpressionSet.apply(conditions.flatMap(_.references))
val invalidCols = groupByCols.diff(predicateCols)
// SPARK-18504/SPARK-18814: Block cases where GROUP BY columns
// are not part of the correlated columns.
val groupByCols = AttributeSet(agg.groupingExpressions.flatMap(_.references))
val correlatedCols = AttributeSet(subqueryColumns)
val invalidCols = groupByCols -- correlatedCols
// GROUP BY columns must be a subset of columns in the predicates
if (invalidCols.nonEmpty) {
failAnalysis(
"a GROUP BY clause in a scalar correlated subquery " +
"A GROUP BY clause in a scalar correlated subquery " +
"cannot contain non-correlated columns: " +
invalidCols.mkString(","))
}
}

// Skip projects and subquery aliases added by the Analyzer and the SQLBuilder.
// Skip subquery aliases added by the Analyzer and the SQLBuilder.
// For projects, do the necessary mapping and skip to its child.
def cleanQuery(p: LogicalPlan): LogicalPlan = p match {
case s: SubqueryAlias => cleanQuery(s.child)
case p: Project => cleanQuery(p.child)
case p: Project =>
// SPARK-18814: Map any aliases to their AttributeReference children
// for the checking in the Aggregate operators below this Project.
subqueryColumns = subqueryColumns.map {
xs => p.projectList.collectFirst {
case e @ Alias(child : AttributeReference, _) if e.exprId == xs.exprId =>
child
}.getOrElse(xs)
}

cleanQuery(p.child)
case child => child
}

Expand Down
20 changes: 20 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE OR REPLACE TEMPORARY VIEW p AS VALUES (1, 1) AS T(pk, pv);
CREATE OR REPLACE TEMPORARY VIEW c AS VALUES (1, 1) AS T(ck, cv);

-- SPARK-18814.1: Simplified version of TPCDS-Q32
SELECT pk, cv
FROM p, c
WHERE p.pk = c.ck
AND c.cv = (SELECT avg(c1.cv)
FROM c c1
WHERE c1.ck = p.pk);

-- SPARK-18814.2: Adding stack of aggregates
SELECT pk, cv
FROM p, c
WHERE p.pk = c.ck
AND c.cv = (SELECT max(avg)
FROM (SELECT c1.cv, avg(c1.cv) avg
FROM c c1
WHERE c1.ck = p.pk
GROUP BY c1.cv));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question that is not related to this JIRA. In the above query, if we do not have the GROUP BY c1.cv, it still works. It sounds like the subquery progressing ignores group by clauses. What is the reason?

Copy link
Contributor Author

@nsyca nsyca Dec 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! I think this is a bug. There could be multiple values of c1.cv. Without a GROUP BY clause, which value does it return? Could you please open a JIRA to track this? I will investigate along with my subquery work. Do you think this is a blocker?

Copy link
Contributor

@hvanhovell hvanhovell Dec 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a blocker. We are probably missing this case in CheckAnalysis. This currently works because it gets eliminated during optimization (the optimizer prunes the unused output). @nsyca it would be great if you can take a look at it, could you also create a separate JIRA to track this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened SPARK-18863 to track this problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you confirming this is a bug. I expect I can get the error message like

org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and 'c1.`cv`' is not an aggregate function. 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or in some other cases, we should see the error message like

org.apache.spark.sql.AnalysisException: expression 'c1.`cv`' is neither present in the group by, nor is it an aggregate function.

Both error handling are missing.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 4


-- !query 0
CREATE OR REPLACE TEMPORARY VIEW p AS VALUES (1, 1) AS T(pk, pv)
-- !query 0 schema
struct<>
-- !query 0 output



-- !query 1
CREATE OR REPLACE TEMPORARY VIEW c AS VALUES (1, 1) AS T(ck, cv)
-- !query 1 schema
struct<>
-- !query 1 output



-- !query 2
SELECT pk, cv
FROM p, c
WHERE p.pk = c.ck
AND c.cv = (SELECT avg(c1.cv)
FROM c c1
WHERE c1.ck = p.pk)
-- !query 2 schema
struct<pk:int,cv:int>
-- !query 2 output
1 1


-- !query 3
SELECT pk, cv
FROM p, c
WHERE p.pk = c.ck
AND c.cv = (SELECT max(avg)
FROM (SELECT c1.cv, avg(c1.cv) avg
FROM c c1
WHERE c1.ck = p.pk
GROUP BY c1.cv))
-- !query 3 schema
struct<pk:int,cv:int>
-- !query 3 output
1 1
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
sql("select (select sum(-1) from t t2 where t1.c2 = t2.c1 group by t2.c2) sum from t t1")
}
assert(errMsg.getMessage.contains(
"a GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns:"))
"A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns:"))
}
}

Expand Down