Skip to content

Commit 30c1884

Browse files
committed
Revert "[SPARK-13840][SQL] Split Optimizer Rule ColumnPruning to ColumnPruning and EliminateOperator"
This reverts commit 99bd2f0.
1 parent 82066a1 commit 30c1884

File tree

4 files changed

+14
-26
lines changed

4 files changed

+14
-26
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
7171
PushPredicateThroughAggregate,
7272
LimitPushDown,
7373
ColumnPruning,
74-
EliminateOperators,
7574
InferFiltersFromConstraints,
7675
// Operator combine
7776
CollapseRepartition,
@@ -316,7 +315,11 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
316315
* - LeftSemiJoin
317316
*/
318317
object ColumnPruning extends Rule[LogicalPlan] {
319-
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
318+
private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
319+
output1.size == output2.size &&
320+
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
321+
322+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
320323
// Prunes the unused columns from project list of Project/Aggregate/Expand
321324
case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
322325
p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains)))
@@ -377,6 +380,12 @@ object ColumnPruning extends Rule[LogicalPlan] {
377380
p.copy(child = w.copy(
378381
windowExpressions = w.windowExpressions.filter(p.references.contains)))
379382

383+
// Eliminate no-op Window
384+
case w: Window if w.windowExpressions.isEmpty => w.child
385+
386+
// Eliminate no-op Projects
387+
case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child
388+
380389
// Can't prune the columns on LeafNode
381390
case p @ Project(_, l: LeafNode) => p
382391

@@ -400,24 +409,6 @@ object ColumnPruning extends Rule[LogicalPlan] {
400409
}
401410
}
402411

403-
/**
404-
* Eliminate no-op Project and Window.
405-
*
406-
* Note: this rule should be executed just after ColumnPruning.
407-
*/
408-
object EliminateOperators extends Rule[LogicalPlan] {
409-
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
410-
// Eliminate no-op Projects
411-
case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child
412-
// Eliminate no-op Window
413-
case w: Window if w.windowExpressions.isEmpty => w.child
414-
}
415-
416-
private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
417-
output1.size == output2.size &&
418-
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
419-
}
420-
421412
/**
422413
* Combines two adjacent [[Project]] operators into one and perform alias substitution,
423414
* merging the expressions into one single expression.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ class ColumnPruningSuite extends PlanTest {
3535
object Optimize extends RuleExecutor[LogicalPlan] {
3636
val batches = Batch("Column pruning", FixedPoint(100),
3737
ColumnPruning,
38-
EliminateOperators,
3938
CollapseProject) :: Nil
4039
}
4140

@@ -328,8 +327,8 @@ class ColumnPruningSuite extends PlanTest {
328327
val input2 = LocalRelation('c.int, 'd.string, 'e.double)
329328
val query = Project('b :: Nil,
330329
Union(input1 :: input2 :: Nil)).analyze
331-
val expected =
332-
Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil).analyze
330+
val expected = Project('b :: Nil,
331+
Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze
333332
comparePlans(Optimize.execute(query), expected)
334333
}
335334

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ class CombiningLimitsSuite extends PlanTest {
2828
object Optimize extends RuleExecutor[LogicalPlan] {
2929
val batches =
3030
Batch("Filter Pushdown", FixedPoint(100),
31-
ColumnPruning,
32-
EliminateOperators) ::
31+
ColumnPruning) ::
3332
Batch("Combine Limit", FixedPoint(10),
3433
CombineLimits) ::
3534
Batch("Constant Folding", FixedPoint(10),

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ class JoinOptimizationSuite extends PlanTest {
4343
PushPredicateThroughGenerate,
4444
PushPredicateThroughAggregate,
4545
ColumnPruning,
46-
EliminateOperators,
4746
CollapseProject) :: Nil
4847

4948
}

0 commit comments

Comments
 (0)