Skip to content
Prev Previous commit
Next Next commit
Changes based on feedback.
  • Loading branch information
ptkool committed Oct 29, 2017
commit 6f8c036bb1c009f868bbce55cf4665ea40eb88bf
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
CollapseRepartition,
CollapseProject,
CollapseWindow,
FlipWindow,
TransposeWindow,
CombineFilters,
CombineLimits,
CombineUnions,
Expand Down Expand Up @@ -626,15 +626,15 @@ object CollapseWindow extends Rule[LogicalPlan] {
}

/**
* Flip Adjacent Window Expressions.
* Transpose Adjacent Window Expressions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this rule useful?

* - If the partition spec of the parent Window expression is a subset of the partition spec
* of the child window expression, flip them.
* of the child window expression, transpose them.
*/
object FlipWindow extends Rule[LogicalPlan] {
object TransposeWindow extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
Copy link
Member

@gatorsmile gatorsmile Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expressions in both w1.expressions and w2.expressions must be deterministic. If not, we should not flip

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This seems overly restrictive to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to ensure the results are still the same with and without the rule.

if ps2.containsSlice(ps1) =>
Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild))
Project(w1.output, Window(we2, ps2, os2, Window(we1, ps1, os1, grandChild)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably warrants a follow-up that tries to move projections that are wedged in between two window clauses.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor


class FlipWindowSuite extends PlanTest {
class TransposeWindowSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("CollapseProject", FixedPoint(100), CollapseProject, RemoveRedundantProject) ::
Batch("FlipWindow", Once, CollapseWindow, FlipWindow) :: Nil
Batch("FlipWindow", Once, CollapseWindow, TransposeWindow) :: Nil
}

val testRelation = LocalRelation('a.string, 'b.string, 'c.int, 'd.string)
Expand All @@ -51,13 +51,13 @@ class FlipWindowSuite extends PlanTest {
.select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2))
.select('a, 'b, 'c, 'sum_a_2, windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1))

val analyzed = query.analyze
val optimized = Optimize.execute(analyzed)
val optimized = Optimize.execute(query.analyze)

val query2 = testRelation
.select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2),
windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1)
)
.select('a, 'b, 'c)
.select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec1, Seq.empty, UnspecifiedFrame)).as('sum_a_1))
.select('a, 'b, 'c, windowExpr(sum('c), windowSpec(partitionSpec2, Seq.empty, UnspecifiedFrame)).as('sum_a_2), 'sum_a_1)
.select('a, 'b, 'c, 'sum_a_2, 'sum_a_1)

val correctAnswer = Optimize.execute(query2.analyze)

Expand All @@ -75,6 +75,7 @@ class FlipWindowSuite extends PlanTest {
val correctAnswer = testRelation
.window(Seq(sum(c).as('sum_a_1)), partitionSpec1, orderSpec1)
.window(Seq(sum(c).as('sum_a_2)), partitionSpec2, orderSpec2)
.select('a, 'b, 'c, 'd, 'sum_a_2, 'sum_a_1)

comparePlans(optimized, correctAnswer.analyze)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,25 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
spark.read.schema(sampleSchema).json(input.toDS()).select(c0, c1).foreach { _ => () }
}
}

test("window functions in multiple selects") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this test? It does not really add anything new.

Copy link
Contributor Author

@ptkool ptkool May 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I'll remove it.

val df = Seq(
("S1", "P1", 100),
("S1", "P1", 700),
("S2", "P1", 200),
("S2", "P2", 300)
).toDF("sno", "pno", "qty")

val w1 = Window.partitionBy("sno")
val w2 = Window.partitionBy("sno", "pno")

checkAnswer(
df.select($"sno", $"pno", $"qty", sum($"qty").over(w2).alias("sum_qty_2"))
.select($"sno", $"pno", $"qty", col("sum_qty_2"), sum("qty").over(w1).alias("sum_qty_1")),
Seq(
Row("S1", "P1", 100, 800, 800),
Row("S1", "P1", 700, 800, 800),
Row("S2", "P1", 200, 200, 500),
Row("S2", "P2", 300, 300, 500)))
}
}