-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13919] [SQL] fix column pruning through filter #11828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
ffe1270
d956aad
6c474a5
b26d1c0
920de45
b1118e5
6e698cc
580bf4e
6a41cd4
bb8f0cc
6ac25f7
cd7132e
d2da9e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,7 +71,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { | |
| PushPredicateThroughAggregate, | ||
| LimitPushDown, | ||
| ColumnPruning, | ||
| InferFiltersFromConstraints, | ||
| // TODO: enable this once it's fixed | ||
| // InferFiltersFromConstraints, | ||
| // Operator combine | ||
| CollapseRepartition, | ||
| CollapseProject, | ||
|
|
@@ -305,21 +306,14 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { | |
| } | ||
|
|
||
| /** | ||
| * Attempts to eliminate the reading of unneeded columns from the query plan using the following | ||
| * transformations: | ||
| * | ||
| * - Inserting Projections beneath the following operators: | ||
| * - Aggregate | ||
| * - Generate | ||
| * - Project <- Join | ||
| * - LeftSemiJoin | ||
| * Attempts to eliminate the reading of unneeded columns from the query plan. | ||
| */ | ||
| object ColumnPruning extends Rule[LogicalPlan] { | ||
| private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = | ||
| output1.size == output2.size && | ||
| output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) | ||
|
|
||
| def apply(plan: LogicalPlan): LogicalPlan = plan transform { | ||
| def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(plan transform { | ||
| // Prunes the unused columns from project list of Project/Aggregate/Expand | ||
| case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty => | ||
| p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains))) | ||
|
|
@@ -398,7 +392,7 @@ object ColumnPruning extends Rule[LogicalPlan] { | |
| } else { | ||
| p | ||
| } | ||
| } | ||
| }) | ||
|
|
||
| /** Applies a projection only when the child is producing unnecessary attributes */ | ||
| private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) = | ||
|
|
@@ -407,6 +401,16 @@ object ColumnPruning extends Rule[LogicalPlan] { | |
| } else { | ||
| c | ||
| } | ||
|
|
||
| /** | ||
| * The Project before Filter is not necessary but conflict with PushPredicatesThroughProject, | ||
| * so remove it. | ||
| */ | ||
| private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan transform { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. We still need to explicitly use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We usually use I think it's fine, or we should update all these places.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's still correct if someone change
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Is that possible there are two continuous
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. two continuous Project will be combined together by other rules.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| case p1 @ Project(_, f @ Filter(_, p2 @ Project(_, child))) | ||
| if p2.outputSet.subsetOf(child.outputSet) => | ||
| p1.copy(child = f.copy(child = child)) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we are using
transform, which is actuallytransformDown. In this ruleColumnPruning, we could add manyProjectinto the child. This could easily cause stack overflow. That is why my PR #11745 is changing it totransformUp. Do you think this change makes sense?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column pruning have to be from top to bottom, or you will need multiple run of this rule. The added Projection is exactly the same whenever you go from top or bottom. If going from bottom, it will not work sometimes (because the added Project will be moved by other rules, for sample filter push down).
Have you actually see the stack overflow on this rule? I donot think so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are using
transformUp, theremoveProjectBeforeFilter's assumption is not right. The following line does not cover all the cases:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw the stack overflow in my local environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think my PR: #11745 covers all the cases even if we change it from
transformtotransformUpThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not change transform to transformUp, it will be great if you can post a test case that cause StackOverflow, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do it tonight. I did not have it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unable to reproduce the stack overflow now, if we keep the following lines in
ColumnPruning:If we remove the above line, we will get the stack overflow easily because we can generate duplicate
Project. Anyway, I am fine if you want to usetransformDown.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no reason we should remove this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If
transformDownis required here, could you changetransformtotransformDown? Got it from the comment in the functiontransformhttps://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L242-L243