-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21835][SQL] RewritePredicateSubquery should not produce unresolved query plans #19050
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #81117 has finished for PR 19050 at commit
|
|
retest this please. |
|
Test build #81121 has finished for PR 19050 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we just move this into a separate file? The analyzer is way to big as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain when this becomes a problem? LeftSemi/LeftAnti joins do output attributes from the right hand side of the join. Is the condition messed up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicateResolved in logical Join doesn't consider if LeftSemi/LeftAnti or not.
But as LeftSemi/LeftAnti only do output from one side (left.output in logical.Join), should we revise duplicateResolved for logical.Join? Then we don't need to dedup here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Just for my understanding, Can you please put an example query that depicts this problem ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One example:
Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
sql(s"CREATE TABLE t1 USING parquet LOCATION '${path.toURI}'")
val sqlText =
"""
|SELECT * FROM t1
|WHERE
|NOT EXISTS (SELECT * FROM t1)
""".stripMargin
val ds = sql(sqlText)
println(ds.queryExecution.analyzed)
println(ds.queryExecution.optimizedPlan)
Project [i#236, j#237]
+- Filter NOT exists#235 []
: +- Project [i#236, j#237]
: +- SubqueryAlias t1
: +- Relation[i#236,j#237] parquet
+- SubqueryAlias t1
+- Relation[i#236,j#237] parquet
'Join LeftAnti
:- Relation[i#236,j#237] parquet
+- Relation[i#236,j#237] parquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to fix duplicateResolved.
We should add a project when outputs are duplicate. Could you check this for a correlated subquery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Thanks for suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Thanks. I think, we should be de-duping the subquery output in case of correlated subquery. Thats why i was wondering about how we are ending up with duplicate attributes.
@hvanhovell I had a question. Do we anticipate to have future rewrites to convert left-semi/left-anti joins to other types join such as inner join ? Can this be a problem then ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Because predicate subqueries are rewritten into left semi/anti joins which don't have duplicate outputs. I think you mean correlated scalar subqueries which are rewritten into left outer joins, is it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell We may not be able to revise duplicateResolved.
Even LeftSemi do output from only left side of the join, we still need duplicateResolved as false if there are duplicate attributes between left and right sides.
Otherwise, if there is a condition, the condition will be pushdown to left side of the join, because all attribute references in the condition is belonging to one side. It changes the join results.
121ad5a to
bf07e2a
Compare
|
Test build #81173 has started for PR 19050 at commit |
|
Test build #81172 has finished for PR 19050 at commit
|
|
Test build #81179 has finished for PR 19050 at commit
|
|
ping @hvanhovell Does the current change look good for you? |
|
also cc @cloud-fan for review. Thanks. |
|
ping @cloud-fan @hvanhovell Can you have time to review this? Thanks. |
|
ping @cloud-fan @hvanhovell This blocks the #18956 going forward, can you help review this change? Thanks. |
|
@cloud-fan and @hvanhovell seems too busy these days. maybe @gatorsmile can also help review this. Thanks. |
| } | ||
| } | ||
|
|
||
| def dedupJoin(plan: LogicalPlan): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> private def
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
|
|
||
| def dedupJoin(plan: LogicalPlan): LogicalPlan = { | ||
| plan transform { | ||
| case j @ Join(left, right, joinType, joinCond) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All join types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be safe, this only makes sense for LeftAnti and LeftSemi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
| val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p) | ||
| Project(p.output, Filter(newCond.get, inputPlan)) | ||
| } | ||
| dedupJoin(rewritten) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment above this line to explain it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After rethinking it, we can be more conservative. Instead of doing a dedup at the end, we should do it when we convert it to the Join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point. Will follow it.
| |WHERE | ||
| |NOT EXISTS (SELECT * FROM t1) | ||
| """.stripMargin | ||
| val ds = sql(sqlText) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useless ds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, missing this. I'll remove it.
| val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan | ||
| val join = optimizedPlan.collect { | ||
| case j: Join => j | ||
| }.head.asInstanceOf[Join] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val join = optimizedPlan.collectFirst { case j: Join => j }.get| val joinNodes = optimizedPlan.collect { | ||
| case j: Join => j | ||
| }.map(_.asInstanceOf[Join]) | ||
| joinNodes.map(j => assert(j.duplicateResolved)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val joinNodes = optimizedPlan.collect { case j: Join => j }
joinNodes.foreach(j => assert(j.duplicateResolved))|
LGTM pending Jenkins |
|
Thanks @gatorsmile |
|
Test build #81439 has finished for PR 19050 at commit
|
|
Test build #81440 has finished for PR 19050 at commit
|
|
Thanks! Merging to master |
…duce unresolved query plans ## What changes were proposed in this pull request? This is a follow-up of apache#19050 to deal with `ExistenceJoin` case. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <[email protected]> Closes apache#19151 from viirya/SPARK-21835-followup.
What changes were proposed in this pull request?
Correlated predicate subqueries are rewritten into
Joinby the ruleRewritePredicateSubqueryduring optimization.It is possibly that the two sides of the
Joinhave conflicting attributes. The query plans produced byRewritePredicateSubquerybecome unresolved and break structural integrity.We should check if there are conflicting attributes in the
Joinand de-duplicate them by adding aProject.How was this patch tested?
Added tests.