Skip to content
Next Next commit
Add new optimization rule to flip adjacent Window expressions.
  • Loading branch information
ptkool committed Oct 29, 2017
commit f444b2cf0f2a41149a2d052c34e8a5058d257c53
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
CollapseRepartition,
CollapseProject,
CollapseWindow,
FlipWindow,
CombineFilters,
CombineLimits,
CombineUnions,
Expand Down Expand Up @@ -624,6 +625,19 @@ object CollapseWindow extends Rule[LogicalPlan] {
}
}

/**
* Flip Adjacent Window Expressions.
* - If the partition spec of the parent Window expression is a subset of the partition spec
* of the child window expression, flip them.
*/
object FlipWindow extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
Copy link
Member

@gatorsmile gatorsmile Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expressions in both w1.expressions and w2.expressions must be deterministic. If not, we should not flip

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This seems overly restrictive to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to ensure the results are still the same with and without the rule.

if ps2.containsSlice(ps1) =>
Copy link
Member

@gatorsmile gatorsmile May 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition might not be enough. w1 might depend on the outputs of w2, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are also changing the order of the columns. You will need to add a projection on top to be sure.

Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild))
}
}

/**
* Generate a list of additional filters from an operator's existing constraint but remove those
* that are either already part of the operator's condition or are part of the operator's child
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{RowFrame, SpecifiedWindowFrame, UnboundedFollowing, UnboundedPreceding, UnspecifiedFrame}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor


class FlipWindowSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("CollapseProject", FixedPoint(100), CollapseProject, RemoveRedundantProject) ::
Batch("FlipWindow", Once, CollapseWindow, FlipWindow) :: Nil
}

val testRelation = LocalRelation('a.string, 'b.string, 'c.int, 'd.string)

val a = testRelation.output(0)
val b = testRelation.output(1)
val c = testRelation.output(2)
val d = testRelation.output(3)

val partitionSpec1 = Seq(a)
val partitionSpec2 = Seq(a, b)
val partitionSpec3 = Seq(d)

val orderSpec1 = Seq(d.asc)
val orderSpec2 = Seq(d.desc)

test("flip two adjacent windows with compatible partitions in multiple selects") {
val query = testRelation
.select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2))
.select('a, 'b, 'c, 'sum_a_2, windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1))

val analyzed = query.analyze
val optimized = Optimize.execute(analyzed)

val query2 = testRelation
.select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2),
windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1)
)

val correctAnswer = Optimize.execute(query2.analyze)

comparePlans(optimized, correctAnswer)
}

test("flip two adjacent windows with compatible partitions") {
val query = testRelation
.window(Seq(sum(c).as('sum_a_2)), partitionSpec2, orderSpec2)
.window(Seq(sum(c).as('sum_a_1)), partitionSpec1, orderSpec1)

val analyzed = query.analyze
val optimized = Optimize.execute(analyzed)

val correctAnswer = testRelation
.window(Seq(sum(c).as('sum_a_1)), partitionSpec1, orderSpec1)
.window(Seq(sum(c).as('sum_a_2)), partitionSpec2, orderSpec2)

comparePlans(optimized, correctAnswer.analyze)
}

test("don't flip two adjacent windows with incompatible partitions") {
val query = testRelation
.window(Seq(sum(c).as('sum_a_2)), partitionSpec3, Seq.empty)
.window(Seq(sum(c).as('sum_a_1)), partitionSpec1, Seq.empty)

val analyzed = query.analyze
val optimized = Optimize.execute(analyzed)

val correctAnswer = testRelation
.window(Seq(sum(c).as('sum_a_2)), partitionSpec3, Seq.empty)
.window(Seq(sum(c).as('sum_a_1)), partitionSpec1, Seq.empty)

comparePlans(optimized, correctAnswer.analyze)
}

}