Skip to content
Prev Previous commit
Next Next commit
Changes based on feedback.
  • Loading branch information
ptkool committed Oct 29, 2017
commit 9a28535983b35b0c71baf2c5c70d2d00a2f813cc
Original file line number Diff line number Diff line change
Expand Up @@ -627,13 +627,19 @@ object CollapseWindow extends Rule[LogicalPlan] {

/**
* Transpose Adjacent Window Expressions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this rule useful?

* - If the partition spec of the parent Window expression is a subset of the partition spec
* - If the partition spec of the parent Window expression is compatible with the partition spec
* of the child window expression, transpose them.
*/
object TransposeWindow extends Rule[LogicalPlan] {
private def compatibleParititions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = {
ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall {
case (l, r) => l.semanticEquals(r)
})
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
Copy link
Member

@gatorsmile gatorsmile Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expressions in both w1.expressions and w2.expressions must be deterministic. If not, we should not flip

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This seems overly restrictive to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to ensure the results are still the same with and without the rule.

if ps1.length < ps2.length && ps2.containsSlice(ps1) =>
if w1.references.intersect(w2.windowOutputSet).isEmpty && compatibleParititions(ps1, ps2) =>
Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably warrants a follow-up that tries to move projections that are wedged in between two window clauses.

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,12 @@ class TransposeWindowSuite extends PlanTest {
val partitionSpec1 = Seq(a)
val partitionSpec2 = Seq(a, b)
val partitionSpec3 = Seq(d)
val partitionSpec4 = Seq(b, a, d)

val orderSpec1 = Seq(d.asc)
val orderSpec2 = Seq(d.desc)

test("flip two adjacent windows with compatible partitions in multiple selects") {
val wexpr1 = windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame))
val wexpr2 = windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame))

val query = testRelation
.select('a, 'b, 'c, wexpr1.as('sum_a_2))
.select('a, 'b, 'c, 'sum_a_2, wexpr2.as('sum_a_1))

val optimized = Optimize.execute(query.analyze)

val query2 = testRelation
.select('a, 'b, 'c)
.select('a, 'b, 'c, wexpr2.as('sum_a_1))
.select('a, 'b, 'c, wexpr1.as('sum_a_2), 'sum_a_1)
.select('a, 'b, 'c, 'sum_a_2, 'sum_a_1)

val correctAnswer = Optimize.execute(query2.analyze)

comparePlans(optimized, correctAnswer)
}

test("flip two adjacent windows with compatible partitions") {
test("transpose two adjacent windows with compatible partitions") {
val query = testRelation
.window(Seq(sum(c).as('sum_a_2)), partitionSpec2, orderSpec2)
.window(Seq(sum(c).as('sum_a_1)), partitionSpec1, orderSpec1)
Expand All @@ -83,19 +63,31 @@ class TransposeWindowSuite extends PlanTest {
comparePlans(optimized, correctAnswer.analyze)
}

test("don't flip two adjacent windows with incompatible partitions") {
test("transpose two adjacent windows with differently ordered compatible partitions") {
val query = testRelation
.window(Seq(sum(c).as('sum_a_2)), partitionSpec3, Seq.empty)
.window(Seq(sum(c).as('sum_a_1)), partitionSpec1, Seq.empty)
.window(Seq(sum(c).as('sum_a_2)), partitionSpec4, Seq.empty)
.window(Seq(sum(c).as('sum_a_1)), partitionSpec2, Seq.empty)

val analyzed = query.analyze
val optimized = Optimize.execute(analyzed)

val correctAnswer = testRelation
.window(Seq(sum(c).as('sum_a_1)), partitionSpec2, Seq.empty)
.window(Seq(sum(c).as('sum_a_2)), partitionSpec4, Seq.empty)
.select('a, 'b, 'c, 'd, 'sum_a_2, 'sum_a_1)

comparePlans(optimized, correctAnswer.analyze)
}

test("don't transpose two adjacent windows with incompatible partitions") {
val query = testRelation
.window(Seq(sum(c).as('sum_a_2)), partitionSpec3, Seq.empty)
.window(Seq(sum(c).as('sum_a_1)), partitionSpec1, Seq.empty)

comparePlans(optimized, correctAnswer.analyze)
val analyzed = query.analyze
val optimized = Optimize.execute(analyzed)

comparePlans(optimized, analyzed)
}

}