Skip to content

Conversation

@nsyca
Copy link
Contributor

@nsyca nsyca commented Dec 10, 2016

What changes were proposed in this pull request?

Move the checking of GROUP BY column in correlated scalar subquery from CheckAnalysis
to Analysis to fix a regression caused by SPARK-18504.

This problem can be reproduced with a simple script now.

Seq((1,1)).toDF("pk","pv").createOrReplaceTempView("p")
Seq((1,1)).toDF("ck","cv").createOrReplaceTempView("c")
sql("select * from p,c where p.pk=c.ck and c.cv = (select avg(c1.cv) from c c1 where c1.ck = p.pk)").show

The requirements are:

  1. We need to reference the same table twice in both the parent and the subquery. Here is the table c.
  2. We need to have a correlated predicate but to a different table. Here is from c (as c1) in the subquery to p in the parent.
  3. We will then "deduplicate" c1.ck in the subquery to ck#<n1>#<n2> at Project above Aggregate of avg. Then when we compare ck#<n1>#<n2> and the original group by column ck#<n1> by their canonicalized form, which is # != #. That's how we trigger the exception added in SPARK-18504.

How was this patch tested?

SubquerySuite and a simplified version of TPCDS-Q32

nsyca added 28 commits July 29, 2016 17:43
…rrect results

## What changes were proposed in this pull request?

This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase.

## How was this patch tested?
./dev/run-tests
a new unit test on the problematic pattern.
…rrect results

## What changes were proposed in this pull request?

This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase.

## How was this patch tested?
./dev/run-tests
a new unit test on the problematic pattern.
@SparkQA
Copy link

SparkQA commented Dec 10, 2016

Test build #69967 has finished for PR 16246 at commit e871783.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 10, 2016

Test build #69968 has finished for PR 16246 at commit b93b3ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Block cases where GROUP BY columns are not part of the correlated columns
// of a scalar subquery.
sub collect {
case a @ Aggregate(grouping, _, _) if (isScalarSubq) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case a @ Aggregate(grouping, _, _) if (isScalarSubq)
->
case Aggregate(grouping, _, _) if isScalarSubq

@SparkQA
Copy link

SparkQA commented Dec 12, 2016

Test build #70020 has finished for PR 16246 at commit f88a205.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 12, 2016

Test build #70025 has finished for PR 16246 at commit 724335a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Block cases where GROUP BY columns are not part of the correlated columns
// of a scalar subquery.
sub collect {
case Aggregate(grouping, _, _) if isScalarSubq =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this break if you have nested aggregates in the subquery? I.e.:

create or replace temporary view t as
select id,
       id % 10 val,
       id % 100 fk
from   range(1000);

select *
from   t
where t.id = (select max(id)
              from   (select tt.val,
                             max(tt.id) as id
                      from   t as tt
                      where  t.fk = tt.fk
                      group by tt.val))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Thank you. It seems we need to have this part from CheckAnalysis included to walk the subquery plan:

153             // Skip projects and subquery aliases added by the Analyzer and the SQLBuilder.
154             def cleanQuery(p: LogicalPlan): LogicalPlan = p match {
155               case s: SubqueryAlias => cleanQuery(s.child)
156               case p: Project => cleanQuery(p.child)
157               case child => child
158             }
159
160             cleanQuery(query) match {
161               case a: Aggregate => checkAggregate(a)
162               case Filter(_, a: Aggregate) => checkAggregate(a)
163               case fail => failAnalysis(s"Correlated scalar subqueries must be Aggregated: $fail")
164             }    

@hvanhovell
Copy link
Contributor

I think we need to revisit the rule in CheckAnalysis, and make it Alias aware. We only need to keep track of the inner references. We traverse/recurse down the tree and do a few things:

  • When we encounter any other than a Project/Filter/SubqueryAlias/Aggregate, we should fail.
  • When we encounter a Project. Assume that there are only Alias or AttributeReference expressions (this should be the case anyway), and update the references for columns that are aliased.
  • When we encounter a Filter/SubqueryAlias, do Nothing.
  • When we encounter a toplevel Aggregate, validate the grouping expressions using the updated references (the current logic should be fine here), and return.

@gatorsmile
Copy link
Member

@hvanhovell I like your ideas. : )

This JIRA is in the Blocker level of Spark 2.1. If we are doing a major refactoring in CheckAnalysis , is it too risky at the last minute? I am fine if you think the above proposal has a limited impact. Thanks!

@nsyca
Copy link
Contributor Author

nsyca commented Dec 13, 2016

I am working on the code based on @hvanhovell's proposal.

@gatorsmile
Copy link
Member

Sure, thanks! @nsyca

@SparkQA
Copy link

SparkQA commented Dec 13, 2016

Test build #70094 has finished for PR 16246 at commit 6040dcf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case s @ ScalarSubquery(query, conditions, _) if conditions.nonEmpty =>

// Collect the columns from the subquery for further checking.
var subqueryColumns = conditions.flatMap(_.references).collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: conditions.flatMap(_.references).filter(query.output.contains)

val invalidCols = groupByCols.diff(predicateCols)
// SPARK-18504/SPARK-18814: Block cases where GROUP BY columns
// are not part of the correlated columns.
val groupByCols = ExpressionSet(agg.groupingExpressions.flatMap(_.references))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Using an AttributeSet is more natural (I should have seen that for the initial PR), and probably a little faster. You can perform a diff by subtracting the sets, i.e.: 'val invalidCols = groupByCols -- correlatedCols`

subqueryColumns = subqueryColumns.map {
case xs =>
p.projectList.collectFirst {
case e @ Alias(child : AttributeReference, _) if e.toAttribute equals xs =>
Copy link
Contributor

@hvanhovell hvanhovell Dec 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It is quicker to compare the exprIds of the alias and the attribute, e.g.:
case e @ Alias(child: Attribute, _) if e.exprId == xs.exprId =>

// SPARK-18814: Map any aliases to their AttributeReference children
// for the checking in the Aggregate operators below this Project.
subqueryColumns = subqueryColumns.map {
case xs =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you don't need the case statement here.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor things, but otherwise LGTM.

@hvanhovell
Copy link
Contributor

LGTM - pending jenkins.

@nsyca
Copy link
Contributor Author

nsyca commented Dec 13, 2016

@hvanhovell, I really appreciate your time reviewing my code. I have addressed all your four comments. Thanks!

@SparkQA
Copy link

SparkQA commented Dec 14, 2016

Test build #70110 has finished for PR 16246 at commit 0b6bfd4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

FROM (SELECT c1.cv, avg(c1.cv) avg
FROM c c1
WHERE c1.ck = p.pk
GROUP BY c1.cv));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question that is not related to this JIRA. In the above query, if we do not have the GROUP BY c1.cv, it still works. It sounds like the subquery progressing ignores group by clauses. What is the reason?

Copy link
Contributor Author

@nsyca nsyca Dec 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! I think this is a bug. There could be multiple values of c1.cv. Without a GROUP BY clause, which value does it return? Could you please open a JIRA to track this? I will investigate along with my subquery work. Do you think this is a blocker?

Copy link
Contributor

@hvanhovell hvanhovell Dec 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a blocker. We are probably missing this case in CheckAnalysis. This currently works because it gets eliminated during optimization (the optimizer prunes the unused output). @nsyca it would be great if you can take a look at it, could you also create a separate JIRA to track this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened SPARK-18863 to track this problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you confirming this is a bug. I expect I can get the error message like

org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and 'c1.`cv`' is not an aggregate function. 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or in some other cases, we should see the error message like

org.apache.spark.sql.AnalysisException: expression 'c1.`cv`' is neither present in the group by, nor is it an aggregate function.

Both error handling are missing.

@hvanhovell
Copy link
Contributor

hvanhovell commented Dec 14, 2016

Ok, I am merging this to master/2.1. Thanks!

asfgit pushed a commit that referenced this pull request Dec 14, 2016
## What changes were proposed in this pull request?
Move the checking of GROUP BY column in correlated scalar subquery from CheckAnalysis
to Analysis to fix a regression caused by SPARK-18504.

This problem can be reproduced with a simple script now.

Seq((1,1)).toDF("pk","pv").createOrReplaceTempView("p")
Seq((1,1)).toDF("ck","cv").createOrReplaceTempView("c")
sql("select * from p,c where p.pk=c.ck and c.cv = (select avg(c1.cv) from c c1 where c1.ck = p.pk)").show

The requirements are:
1. We need to reference the same table twice in both the parent and the subquery. Here is the table c.
2. We need to have a correlated predicate but to a different table. Here is from c (as c1) in the subquery to p in the parent.
3. We will then "deduplicate" c1.ck in the subquery to `ck#<n1>#<n2>` at `Project` above `Aggregate` of `avg`. Then when we compare `ck#<n1>#<n2>` and the original group by column `ck#<n1>` by their canonicalized form, which is #<n2> != #<n1>. That's how we trigger the exception added in SPARK-18504.

## How was this patch tested?

SubquerySuite and a simplified version of TPCDS-Q32

Author: Nattavut Sutyanyong <[email protected]>

Closes #16246 from nsyca/18814.

(cherry picked from commit cccd643)
Signed-off-by: Herman van Hovell <[email protected]>
@asfgit asfgit closed this in cccd643 Dec 14, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
## What changes were proposed in this pull request?
Move the checking of GROUP BY column in correlated scalar subquery from CheckAnalysis
to Analysis to fix a regression caused by SPARK-18504.

This problem can be reproduced with a simple script now.

Seq((1,1)).toDF("pk","pv").createOrReplaceTempView("p")
Seq((1,1)).toDF("ck","cv").createOrReplaceTempView("c")
sql("select * from p,c where p.pk=c.ck and c.cv = (select avg(c1.cv) from c c1 where c1.ck = p.pk)").show

The requirements are:
1. We need to reference the same table twice in both the parent and the subquery. Here is the table c.
2. We need to have a correlated predicate but to a different table. Here is from c (as c1) in the subquery to p in the parent.
3. We will then "deduplicate" c1.ck in the subquery to `ck#<n1>#<n2>` at `Project` above `Aggregate` of `avg`. Then when we compare `ck#<n1>#<n2>` and the original group by column `ck#<n1>` by their canonicalized form, which is #<n2> != #<n1>. That's how we trigger the exception added in SPARK-18504.

## How was this patch tested?

SubquerySuite and a simplified version of TPCDS-Q32

Author: Nattavut Sutyanyong <[email protected]>

Closes apache#16246 from nsyca/18814.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?
Move the checking of GROUP BY column in correlated scalar subquery from CheckAnalysis
to Analysis to fix a regression caused by SPARK-18504.

This problem can be reproduced with a simple script now.

Seq((1,1)).toDF("pk","pv").createOrReplaceTempView("p")
Seq((1,1)).toDF("ck","cv").createOrReplaceTempView("c")
sql("select * from p,c where p.pk=c.ck and c.cv = (select avg(c1.cv) from c c1 where c1.ck = p.pk)").show

The requirements are:
1. We need to reference the same table twice in both the parent and the subquery. Here is the table c.
2. We need to have a correlated predicate but to a different table. Here is from c (as c1) in the subquery to p in the parent.
3. We will then "deduplicate" c1.ck in the subquery to `ck#<n1>#<n2>` at `Project` above `Aggregate` of `avg`. Then when we compare `ck#<n1>#<n2>` and the original group by column `ck#<n1>` by their canonicalized form, which is #<n2> != #<n1>. That's how we trigger the exception added in SPARK-18504.

## How was this patch tested?

SubquerySuite and a simplified version of TPCDS-Q32

Author: Nattavut Sutyanyong <[email protected]>

Closes apache#16246 from nsyca/18814.
@nsyca nsyca deleted the 18814 branch March 24, 2017 18:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants