Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
split columnpruning to two rules
  • Loading branch information
gatorsmile committed Mar 14, 2016
commit dd8c54273f3b4db2532aa86f095ea912b75661a1
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
PushPredicateThroughAggregate,
LimitPushDown,
ColumnPruning,
EliminateObjects,
// Operator combine
CollapseRepartition,
CollapseProject,
Expand Down Expand Up @@ -315,10 +316,6 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
* - LeftSemiJoin
*/
object ColumnPruning extends Rule[LogicalPlan] {
private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Prunes the unused columns from project list of Project/Aggregate/Expand
case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
Expand Down Expand Up @@ -355,9 +352,6 @@ object ColumnPruning extends Rule[LogicalPlan] {
case j @ Join(left, right, LeftSemi, condition) =>
j.copy(right = prunedChild(right, j.references))

// Project should not be pushed below Filter. See PushPredicateThroughProject
case p @ Project(_, _: Filter) => p

// all the columns will be used to compare, so we can't prune them
case p @ Project(_, _: SetOperation) => p
case p @ Project(_, _: Distinct) => p
Expand All @@ -383,12 +377,6 @@ object ColumnPruning extends Rule[LogicalPlan] {
p.copy(child = w.copy(
windowExpressions = w.windowExpressions.filter(p.references.contains)))

// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child

// Eliminate no-op Projects
case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child

// Can't prune the columns on LeafNode
case p @ Project(_, l: LeafNode) => p

Expand All @@ -412,6 +400,24 @@ object ColumnPruning extends Rule[LogicalPlan] {
}
}

/**
* Eliminate no-op Project and Window.
*
* Note: this rule should be executed just after ColumnPruning.
*/
object EliminateObjects extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not what we want, it cause that we can't prune columsn for Filter(Join()).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the latest PR: #11745. It will enable it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize it after this PR. Thus, I submitted a new one to fix the issue. Sorry for that.

// Eliminate no-op Projects
case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child
// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child
}

private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
}

/**
* Combines two adjacent [[Project]] operators into one and perform alias substitution,
* merging the expressions into one single expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ColumnPruningSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("Column pruning", FixedPoint(100),
ColumnPruning,
EliminateObjects,
CollapseProject) :: Nil
}

Expand Down Expand Up @@ -136,7 +137,8 @@ class ColumnPruningSuite extends PlanTest {
val query = Project('a :: Nil, Filter('c > Literal(0.0), input)).analyze
val expected =
Project('a :: Nil,
Filter('c > Literal(0.0), input)).analyze
Filter('c > Literal(0.0),
Project(Seq('a, 'c), input))).analyze
comparePlans(Optimize.execute(query), expected)
}

Expand Down Expand Up @@ -326,8 +328,8 @@ class ColumnPruningSuite extends PlanTest {
val input2 = LocalRelation('c.int, 'd.string, 'e.double)
val query = Project('b :: Nil,
Union(input1 :: input2 :: Nil)).analyze
val expected = Project('b :: Nil,
Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze
val expected =
Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil).analyze
comparePlans(Optimize.execute(query), expected)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class CombiningLimitsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Filter Pushdown", FixedPoint(100),
ColumnPruning) ::
ColumnPruning,
EliminateObjects) ::
Batch("Combine Limit", FixedPoint(10),
CombineLimits) ::
Batch("Constant Folding", FixedPoint(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class JoinOptimizationSuite extends PlanTest {
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
ColumnPruning,
EliminateObjects,
CollapseProject) :: Nil

}
Expand Down