Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
PushPredicateThroughAggregate,
LimitPushDown,
ColumnPruning,
EliminateOperators,
// Operator combine
CollapseRepartition,
CollapseProject,
Expand Down Expand Up @@ -315,11 +316,7 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
* - LeftSemiJoin
*/
object ColumnPruning extends Rule[LogicalPlan] {
private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// Prunes the unused columns from project list of Project/Aggregate/Expand
case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
p.copy(child = p2.copy(projectList = p2.projectList.filter(p.references.contains)))
Expand Down Expand Up @@ -380,12 +377,6 @@ object ColumnPruning extends Rule[LogicalPlan] {
p.copy(child = w.copy(
windowExpressions = w.windowExpressions.filter(p.references.contains)))

// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child

// Eliminate no-op Projects
case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child

// Can't prune the columns on LeafNode
case p @ Project(_, l: LeafNode) => p

Expand All @@ -409,6 +400,24 @@ object ColumnPruning extends Rule[LogicalPlan] {
}
}

/**
* Eliminate no-op Project and Window.
*
* Note: this rule should be executed just after ColumnPruning.
*/
object EliminateOperators extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not what we want, it cause that we can't prune columsn for Filter(Join()).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the latest PR: #11745. It will enable it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize it after this PR. Thus, I submitted a new one to fix the issue. Sorry for that.

// Eliminate no-op Projects
case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child
// Eliminate no-op Window
case w: Window if w.windowExpressions.isEmpty => w.child
}

private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
}

/**
* Combines two adjacent [[Project]] operators into one and perform alias substitution,
* merging the expressions into one single expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ColumnPruningSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("Column pruning", FixedPoint(100),
ColumnPruning,
EliminateOperators,
CollapseProject) :: Nil
}

Expand Down Expand Up @@ -327,8 +328,8 @@ class ColumnPruningSuite extends PlanTest {
val input2 = LocalRelation('c.int, 'd.string, 'e.double)
val query = Project('b :: Nil,
Union(input1 :: input2 :: Nil)).analyze
val expected = Project('b :: Nil,
Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil)).analyze
val expected =
Union(Project('b :: Nil, input1) :: Project('d :: Nil, input2) :: Nil).analyze
comparePlans(Optimize.execute(query), expected)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class CombiningLimitsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Filter Pushdown", FixedPoint(100),
ColumnPruning) ::
ColumnPruning,
EliminateOperators) ::
Batch("Combine Limit", FixedPoint(10),
CombineLimits) ::
Batch("Constant Folding", FixedPoint(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class JoinOptimizationSuite extends PlanTest {
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
ColumnPruning,
EliminateOperators,
CollapseProject) :: Nil

}
Expand Down