From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 01/17] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdb..5a5b71e52dd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da0..5e00546a74c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 02/17] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00a..47962ebe6ef8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c0..61d9dcd37572 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 03/17] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572..3427152b2da0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From 458f7be8f88cb83e348b72fe9350bfa9c12282a4 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 10 Mar 2016 10:26:36 -0800 Subject: [PATCH 04/17] ppd for window --- .../spark/sql/catalyst/dsl/package.scala | 9 +++++- .../sql/catalyst/optimizer/Optimizer.scala | 28 +++++++++++++++++++ .../optimizer/FilterPushdownSuite.scala | 27 ++++++++++++++++++ 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index a12f7396fe81..5841df830f2c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -268,7 +268,14 @@ package object dsl { Aggregate(groupingExprs, aliasedExprs, logicalPlan) } - def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) + def window( + projectList: Seq[Attribute], + windowExpressions: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder]): LogicalPlan = + Window(projectList, windowExpressions, partitionSpec, orderSpec, logicalPlan) + + def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index deea7238f564..123c2693acd8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -68,6 +68,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { PushPredicateThroughProject, PushPredicateThroughGenerate, PushPredicateThroughAggregate, + PushPredicateThroughWindow, LimitPushDown, ColumnPruning, // Operator combine @@ -914,6 +915,33 @@ object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelp } } +/** + * Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be pushed + * beneath must satisfy three conditions: + * 1. involving one and only one column that is part of window partitioning key. + * 2. Window partitioning key should be just a sequence of [[AttributeReference]]. + * 3. deterministic + */ +object PushPredicateThroughWindow extends Rule[LogicalPlan] with PredicateHelper { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case filter @ Filter(condition, w: Window) + if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => + val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => + cond.references.size == 1 && + cond.references.subsetOf(w.outputSet) && + cond.deterministic + } + if (pushDown.nonEmpty) { + val pushDownPredicate = pushDown.reduce(And) + val newWindow = w.copy(child = Filter(pushDownPredicate, w.child)) + if (stayUp.isEmpty) newWindow else Filter(stayUp.reduce(And), newWindow) + } else { + filter + } + } +} + /** * Push [[Filter]] operators through [[Aggregate]] operators, iff the filters reference only * non-aggregate attributes (typically literals or grouping expressions). diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 97a0cde38123..a296eaa92203 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count} import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, PlanTest, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -41,6 +42,7 @@ class FilterPushdownSuite extends PlanTest { PushPredicateThroughJoin, PushPredicateThroughGenerate, PushPredicateThroughAggregate, + PushPredicateThroughWindow, CollapseProject) :: Nil } @@ -666,4 +668,29 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + test("Window: push down filters -- basic") { + val originalQuery = testRelation + .select('a, 'b, 'c, + WindowExpression( + AggregateExpression(Count('b), Complete, isDistinct = false), + WindowSpecDefinition( 'a.attr :: 'b.attr :: Nil, + SortOrder('b, Ascending) :: Nil, + UnspecifiedFrame)).as('window)).where('a > 1).select('a, 'c) + + val correctAnswer = testRelation + .select('a, 'b, 'c).where('a > 1) + .window('a.attr :: 'b.attr :: 'c.attr :: Nil, + WindowExpression( + AggregateExpression(Count('b), Complete, isDistinct = false), + WindowSpecDefinition( 'a.attr :: 'b.attr :: Nil, + SortOrder('b, Ascending) :: Nil, + UnspecifiedFrame)).as('window) :: Nil, + 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'c).analyze + + val optimized = Optimize.execute(originalQuery.analyze) + + comparePlans(optimized, correctAnswer) + } } From f401d8bdc97ac163ff5fc3d0a75344a8b867414e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 10 Mar 2016 10:47:02 -0800 Subject: [PATCH 05/17] only partitioning key --- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../optimizer/FilterPushdownSuite.scala | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1fef6a7f49c4..bcd25bba02e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -946,7 +946,7 @@ object PushPredicateThroughWindow extends Rule[LogicalPlan] with PredicateHelper if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => cond.references.size == 1 && - cond.references.subsetOf(w.outputSet) && + cond.references.subsetOf(AttributeSet(w.partitionSpec.flatMap(_.references))) && cond.deterministic } if (pushDown.nonEmpty) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index a296eaa92203..c979ebd2a1df 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -693,4 +693,30 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + test("Window: no push down -- predicates are not from partitioning keys") { + val originalQuery = testRelation + .select('a, 'b, 'c, + WindowExpression( + AggregateExpression(Count('b), Complete, isDistinct = false), + WindowSpecDefinition( 'a.attr :: 'b.attr :: Nil, + SortOrder('b, Ascending) :: Nil, + UnspecifiedFrame)).as('window)).where('c > 1).select('a, 'c) + + + val correctAnswer = testRelation + .select('a, 'b, 'c) + .window('a.attr :: 'b.attr :: 'c.attr :: Nil, + WindowExpression( + AggregateExpression(Count('b), Complete, isDistinct = false), + WindowSpecDefinition( 'a.attr :: 'b.attr :: Nil, + SortOrder('b, Ascending) :: Nil, + UnspecifiedFrame)).as('window) :: Nil, + 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil).where('c > 1) + .select('a, 'c).analyze + + val optimized = Optimize.execute(originalQuery.analyze) + + comparePlans(optimized, correctAnswer) + } } From d42024647dc04214fe08adb236399367f83a86f2 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 12 Mar 2016 10:30:45 -0800 Subject: [PATCH 06/17] add windowExpr and windowSpec to DSL --- .../spark/sql/catalyst/dsl/package.scala | 9 ++++ .../optimizer/ColumnPruningSuite.scala | 42 +++++------------- .../optimizer/FilterPushdownSuite.scala | 43 ++++++------------- 3 files changed, 32 insertions(+), 62 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index dc5264e2660d..c00e52c96846 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -162,6 +162,15 @@ package object dsl { def sqrt(e: Expression): Expression = Sqrt(e) def abs(e: Expression): Expression = Abs(e) + def windowSpec( + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + frame: WindowFrame): WindowSpecDefinition = + WindowSpecDefinition(partitionSpec, orderSpec, frame) + + def windowExpr(windowFunc: Expression, windowSpec: WindowSpecDefinition): WindowExpression = + WindowExpression(windowFunc, windowSpec) + implicit class DslSymbol(sym: Symbol) extends ImplicitAttribute { def s: String = sym.name } // TODO more implicit class for literal? implicit class DslString(val s: String) extends ImplicitOperators { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index dd7d65ddc9e9..79e267af30c2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -262,17 +262,11 @@ class ColumnPruningSuite extends PlanTest { test("Column pruning on Window with useless aggregate functions") { val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int) + val winSpec = windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) - val originalQuery = - input.groupBy('a, 'c, 'd)('a, 'c, 'd, - WindowExpression( - AggregateExpression(Count('b), Complete, isDistinct = false), - WindowSpecDefinition( 'a :: Nil, - SortOrder('b, Ascending) :: Nil, - UnspecifiedFrame)).as('window)).select('a, 'c) - + val originalQuery = input.groupBy('a, 'c, 'd)('a, 'c, 'd, winExpr.as('window)).select('a, 'c) val correctAnswer = input.select('a, 'c, 'd).groupBy('a, 'c, 'd)('a, 'c).analyze - val optimized = Optimize.execute(originalQuery.analyze) comparePlans(optimized, correctAnswer) @@ -280,25 +274,15 @@ class ColumnPruningSuite extends PlanTest { test("Column pruning on Window with selected agg expressions") { val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int) + val winSpec = windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) val originalQuery = - input.select('a, 'b, 'c, 'd, - WindowExpression( - AggregateExpression(Count('b), Complete, isDistinct = false), - WindowSpecDefinition( 'a :: Nil, - SortOrder('b, Ascending) :: Nil, - UnspecifiedFrame)).as('window)).where('window > 1).select('a, 'c) - + input.select('a, 'b, 'c, 'd, winExpr.as('window)).where('window > 1).select('a, 'c) val correctAnswer = input.select('a, 'b, 'c) - .window(WindowExpression( - AggregateExpression(Count('b), Complete, isDistinct = false), - WindowSpecDefinition( 'a :: Nil, - SortOrder('b, Ascending) :: Nil, - UnspecifiedFrame)).as('window) :: Nil, - 'a :: Nil, 'b.asc :: Nil) + .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) .select('a, 'c, 'window).where('window > 1).select('a, 'c).analyze - val optimized = Optimize.execute(originalQuery.analyze) comparePlans(optimized, correctAnswer) @@ -306,17 +290,11 @@ class ColumnPruningSuite extends PlanTest { test("Column pruning on Window in select") { val input = LocalRelation('a.int, 'b.string, 'c.double, 'd.int) + val winSpec = windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) - val originalQuery = - input.select('a, 'b, 'c, 'd, - WindowExpression( - AggregateExpression(Count('b), Complete, isDistinct = false), - WindowSpecDefinition( 'a :: Nil, - SortOrder('b, Ascending) :: Nil, - UnspecifiedFrame)).as('window)).select('a, 'c) - + val originalQuery = input.select('a, 'b, 'c, 'd, winExpr.as('window)).select('a, 'c) val correctAnswer = input.select('a, 'c).analyze - val optimized = Optimize.execute(originalQuery.analyze) comparePlans(optimized, correctAnswer) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 92e0d7f0d4b8..678302638750 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -670,24 +670,16 @@ class FilterPushdownSuite extends PlanTest { } test("Window: push down filters -- basic") { - val originalQuery = testRelation - .select('a, 'b, 'c, - WindowExpression( - AggregateExpression(Count('b), Complete, isDistinct = false), - WindowSpecDefinition( 'a.attr :: 'b.attr :: Nil, - SortOrder('b, Ascending) :: Nil, - UnspecifiedFrame)).as('window)).where('a > 1).select('a, 'c) + val winSpec = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + val originalQuery = + testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1).select('a, 'c) val correctAnswer = testRelation .select('a, 'b, 'c).where('a > 1) - .window( - WindowExpression( - AggregateExpression(Count('b), Complete, isDistinct = false), - WindowSpecDefinition( 'a.attr :: 'b.attr :: Nil, - SortOrder('b, Ascending) :: Nil, - UnspecifiedFrame)).as('window) :: Nil, - 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) - .select('a, 'c).analyze + .window( winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'c).analyze val optimized = Optimize.execute(originalQuery.analyze) @@ -695,25 +687,16 @@ class FilterPushdownSuite extends PlanTest { } test("Window: no push down -- predicates are not from partitioning keys") { - val originalQuery = testRelation - .select('a, 'b, 'c, - WindowExpression( - AggregateExpression(Count('b), Complete, isDistinct = false), - WindowSpecDefinition( 'a.attr :: 'b.attr :: Nil, - SortOrder('b, Ascending) :: Nil, - UnspecifiedFrame)).as('window)).where('c > 1).select('a, 'c) + val winSpec = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + val originalQuery = + testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1).select('a, 'c) val correctAnswer = testRelation .select('a, 'b, 'c) - .window( - WindowExpression( - AggregateExpression(Count('b), Complete, isDistinct = false), - WindowSpecDefinition( 'a.attr :: 'b.attr :: Nil, - SortOrder('b, Ascending) :: Nil, - UnspecifiedFrame)).as('window) :: Nil, - 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil).where('c > 1) - .select('a, 'c).analyze + .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .where('c > 1).select('a, 'c).analyze val optimized = Optimize.execute(originalQuery.analyze) From c05b4aee70a6037ea3802f3cf8787c221c95aa28 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 12 Mar 2016 10:34:52 -0800 Subject: [PATCH 07/17] remove useless import --- .../apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala | 1 - .../spark/sql/catalyst/optimizer/FilterPushdownSuite.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 79e267af30c2..12fdf9ff3b44 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count} import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 678302638750..b2a0ca2f83f7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count} import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, PlanTest, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ From 6db1940883e5c38d89a50c129fa30f26635ba3cb Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Mar 2016 00:08:02 -0700 Subject: [PATCH 08/17] added more test cases --- .../optimizer/FilterPushdownSuite.scala | 142 +++++++++++++++--- 1 file changed, 122 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index b2a0ca2f83f7..a71eaade3db0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -668,37 +668,139 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("Window: push down filters -- basic") { - val winSpec = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) - val winExpr = windowExpr(count('b), winSpec) + test("Window: predicate push down -- basic") { + val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) - val originalQuery = - testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1).select('a, 'c) + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) + val correctAnswer = testRelation.select('a, 'b, 'c) + .where('a > 1).window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window).analyze - val correctAnswer = testRelation - .select('a, 'b, 'c).where('a > 1) - .window( winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) - .select('a, 'c).analyze + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } - val optimized = Optimize.execute(originalQuery.analyze) + test("Window: predicate push down -- predicates with compound predicate using only one column") { + val winExpr = + windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) - comparePlans(optimized, correctAnswer) + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a * 3 > 15) + val correctAnswer = testRelation.select('a, 'b, 'c) + .where('a * 3 > 15) + .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } - test("Window: no push down -- predicates are not from partitioning keys") { + test("Window: predicate push down -- multi window expressions with the same window spec") { val winSpec = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) - val winExpr = windowExpr(count('b), winSpec) + val winExpr1 = windowExpr(count('b), winSpec) + val winExpr2 = windowExpr(sum('b), winSpec) + val originalQuery = testRelation + .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1) - val originalQuery = - testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1).select('a, 'c) + val correctAnswer = testRelation.select('a, 'b, 'c) + .where('a > 1) + .window(winExpr1.as('window1) :: winExpr2.as('window2) :: Nil, + 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window1, 'window2).analyze - val correctAnswer = testRelation - .select('a, 'b, 'c) + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + test("Window: predicate push down -- multi window specification - 1") { + // order by clauses are different between winSpec1 and winSpec2 + val winSpec1 = windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr1 = windowExpr(count('b), winSpec1) + val winSpec2 = windowSpec('a.attr :: 'b.attr :: Nil, 'a.asc :: Nil, UnspecifiedFrame) + val winExpr2 = windowExpr(count('b), winSpec2) + val originalQuery = testRelation + .select('a, 'b, 'c, winExpr1.as('window1), winExpr2.as('window2)).where('a > 1) + + val correctAnswer1 = testRelation.select('a, 'b, 'c) + .where('a > 1) + .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil) + .select('a, 'b, 'c, 'window1, 'window2).analyze + + val correctAnswer2 = testRelation.select('a, 'b, 'c) + .where('a > 1) + .window(winExpr2.as('window2) :: Nil, 'a.attr :: 'b.attr :: Nil, 'a.asc :: Nil) + .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window1, 'window2).analyze + + val optimizedQuery = Optimize.execute(originalQuery.analyze) + try { + comparePlans(optimizedQuery, correctAnswer1) + } catch { + case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2) + } + } + + test("Window: predicate push down -- multi window specification - 2") { + // partitioning clauses are different between winSpec1 and winSpec2 + val winSpec1 = windowSpec('a.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr1 = windowExpr(count('b), winSpec1) + val winSpec2 = windowSpec('b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr2 = windowExpr(count('a), winSpec2) + val originalQuery = testRelation + .select('a, winExpr1.as('window1), 'b, 'c, winExpr2.as('window2)).where('b > 1) + + val correctAnswer1 = testRelation.select('a, 'b, 'c) + .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil) + .where('b > 1) + .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil) + .select('a, 'window1, 'b, 'c, 'window2).analyze + + val correctAnswer2 = testRelation.select('a, 'b, 'c) + .window(winExpr2.as('window2) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil) + .window(winExpr1.as('window1) :: Nil, 'a.attr :: Nil, 'b.asc :: Nil) + .where('b > 1) + .select('a, 'window1, 'b, 'c, 'window2).analyze + + val optimizedQuery = Optimize.execute(originalQuery.analyze) + try { + comparePlans(optimizedQuery, correctAnswer1) + } catch { + case ae: Throwable => comparePlans(optimizedQuery, correctAnswer2) + } + } + + test("Window: no predicate push down -- predicates are not from partitioning keys") { + val winExpr = + windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) + + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1) + val correctAnswer = testRelation.select('a, 'b, 'c) .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) - .where('c > 1).select('a, 'c).analyze + .where('c > 1).select('a, 'b, 'c, 'window).analyze - val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } - comparePlans(optimized, correctAnswer) + test("Window: no predicate push down -- predicates with multiple partitioning columns") { + val winExpr = + windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) + + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) + val correctAnswer = testRelation.select('a, 'b, 'c) + .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) + .where('a + 'b > 1).select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + test("Window: no predicate push down -- compound partition key") { + val winSpec = windowSpec('a.attr + 'b.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) + + val winSpecAnalyzed = windowSpec('_w0.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) + val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0")) + .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: 'b.attr :: Nil, 'b.asc :: Nil) + .where('a > 1).select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } } From 8fa029461ede82d659c4ab4f9ac889deb8a4eb46 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 20 Mar 2016 19:30:10 -0700 Subject: [PATCH 09/17] added two more test case. --- .../spark/sql/DataFrameWindowSuite.scala | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala index 2bcbb1983f7a..91095af0ddae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala @@ -354,4 +354,38 @@ class DataFrameWindowSuite extends QueryTest with SharedSQLContext { val df = src.select($"*", max("c").over(winSpec) as "max") checkAnswer(df, Row(5, Row(0, 3), 5)) } + + test("aggregation and rows between with unbounded + predicate pushdown") { + val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, "3")).toDF("key", "value") + df.registerTempTable("window_table") + val selectList = Seq($"key", $"value", + last("key").over( + Window.partitionBy($"value").orderBy($"key").rowsBetween(0, Long.MaxValue)), + last("key").over( + Window.partitionBy($"value").orderBy($"key").rowsBetween(Long.MinValue, 0)), + last("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 1))) + + checkAnswer( + df.select(selectList: _*).where($"value" < "3"), + Seq(Row(1, "1", 1, 1, 1), Row(2, "2", 3, 2, 3), Row(3, "2", 3, 3, 3))) + } + + test("aggregation and range between with unbounded + predicate pushdown") { + val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, "2")).toDF("key", "value") + df.registerTempTable("window_table") + val selectList = Seq($"key", $"value", + last("value").over( + Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, -1)).equalTo("2") + .as("last_v"), + avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue, 1)) + .as("avg_key1"), + avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(0, Long.MaxValue)) + .as("avg_key2"), + avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(-1, 1)) + .as("avg_key3")) + + checkAnswer( + df.select(selectList: _*).where($"value" < 2), + Seq(Row(3, "1", null, 3.0, 4.0, 3.0), Row(5, "1", false, 4.0, 5.0, 5.0))) + } } From cc5f0bd7ca365b431bb1218879f1495c608225a0 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 19 Apr 2016 23:22:19 -0700 Subject: [PATCH 10/17] merge --- .../scala/org/apache/spark/sql/catalyst/dsl/package.scala | 6 +++--- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index b5d10e4a584f..848e079761a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -176,9 +176,9 @@ package object dsl { } def windowSpec( - partitionSpec: Seq[Expression], - orderSpec: Seq[SortOrder], - frame: WindowFrame): WindowSpecDefinition = + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + frame: WindowFrame): WindowSpecDefinition = WindowSpecDefinition(partitionSpec, orderSpec, frame) def windowExpr(windowFunc: Expression, windowSpec: WindowSpecDefinition): WindowExpression = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 60e300414de1..0cedb1514889 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -963,8 +963,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // 1. involving one and only one column that is part of window partitioning key. // 2. Window partitioning key should be just a sequence of [[AttributeReference]]. // 3. deterministic - case filter @ Filter(condition, w: Window) - if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => + case filter@Filter(condition, w: Window) + if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => cond.references.size == 1 && cond.references.subsetOf(AttributeSet(w.partitionSpec.flatMap(_.references))) && From 436359fbd25854266215916a01e850980c953743 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 19 Apr 2016 23:24:33 -0700 Subject: [PATCH 11/17] style fix. --- .../scala/org/apache/spark/sql/catalyst/dsl/package.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 848e079761a3..b5d10e4a584f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -176,9 +176,9 @@ package object dsl { } def windowSpec( - partitionSpec: Seq[Expression], - orderSpec: Seq[SortOrder], - frame: WindowFrame): WindowSpecDefinition = + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + frame: WindowFrame): WindowSpecDefinition = WindowSpecDefinition(partitionSpec, orderSpec, frame) def windowExpr(windowFunc: Expression, windowSpec: WindowSpecDefinition): WindowExpression = From fae2694d88fe1bd9c3a0425c1d23bf224d33ca43 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 20 Apr 2016 06:57:44 -0700 Subject: [PATCH 12/17] address comments. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0cedb1514889..28c5396e00c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -965,9 +965,9 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // 3. deterministic case filter@Filter(condition, w: Window) if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => + val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => - cond.references.size == 1 && - cond.references.subsetOf(AttributeSet(w.partitionSpec.flatMap(_.references))) && + cond.references.size == 1 && partitionAttrs.contains(cond.references.head) && cond.deterministic } if (pushDown.nonEmpty) { From 875d6b6cdeab3a82f63b4e82fdfc792b54345a40 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 20 Apr 2016 06:58:31 -0700 Subject: [PATCH 13/17] style fix. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 28c5396e00c2..073ed77b5035 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -963,7 +963,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // 1. involving one and only one column that is part of window partitioning key. // 2. Window partitioning key should be just a sequence of [[AttributeReference]]. // 3. deterministic - case filter@Filter(condition, w: Window) + case filter @ Filter(condition, w: Window) if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => From 04699231e53309861c62d3480c9f2e96bd542564 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 20 Apr 2016 06:59:17 -0700 Subject: [PATCH 14/17] style fix --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 073ed77b5035..0525ae595ca9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -964,7 +964,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // 2. Window partitioning key should be just a sequence of [[AttributeReference]]. // 3. deterministic case filter @ Filter(condition, w: Window) - if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => + if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => cond.references.size == 1 && partitionAttrs.contains(cond.references.head) && From 5c4f4d3e02546eb4f011da6375aad2ea575255f0 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 20 Apr 2016 08:26:11 -0700 Subject: [PATCH 15/17] address comments. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 7 +++---- .../catalyst/optimizer/FilterPushdownSuite.scala | 15 ++++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0525ae595ca9..6a1ff3f3e993 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -960,15 +960,14 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be // pushed beneath must satisfy three conditions: - // 1. involving one and only one column that is part of window partitioning key. + // 1. All the columns are part of window partitioning key. // 2. Window partitioning key should be just a sequence of [[AttributeReference]]. - // 3. deterministic + // 3. Deterministic case filter @ Filter(condition, w: Window) if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => - cond.references.size == 1 && partitionAttrs.contains(cond.references.head) && - cond.deterministic + cond.references.subsetOf(partitionAttrs) && cond.deterministic } if (pushDown.nonEmpty) { val pushDownPredicate = pushDown.reduce(And) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index a2725247b7ff..90c521a34420 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -858,26 +858,27 @@ class FilterPushdownSuite extends PlanTest { } } - test("Window: no predicate push down -- predicates are not from partitioning keys") { + test("Window: predicate push down -- predicates with multiple partitioning columns") { val winExpr = windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) - val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1) - val correctAnswer = testRelation.select('a, 'b, 'c) + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) + val correctAnswer = testRelation + .where('a + 'b > 1).select('a, 'b, 'c) .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) - .where('c > 1).select('a, 'b, 'c, 'window).analyze + .select('a, 'b, 'c, 'window).analyze comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } - test("Window: no predicate push down -- predicates with multiple partitioning columns") { + test("Window: no predicate push down -- predicates are not from partitioning keys") { val winExpr = windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) - val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1) val correctAnswer = testRelation.select('a, 'b, 'c) .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) - .where('a + 'b > 1).select('a, 'b, 'c, 'window).analyze + .where('c > 1).select('a, 'b, 'c, 'window).analyze comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } From c4dedd21fd6a7986814d2da970e22762e8292aed Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 20 Apr 2016 11:16:51 -0700 Subject: [PATCH 16/17] address comments and added more test cases. --- .../sql/catalyst/optimizer/Optimizer.scala | 13 +-- .../optimizer/FilterPushdownSuite.scala | 85 +++++++++++++++++-- 2 files changed, 88 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6a1ff3f3e993..39ff1ad269f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -959,15 +959,18 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be - // pushed beneath must satisfy three conditions: - // 1. All the columns are part of window partitioning key. - // 2. Window partitioning key should be just a sequence of [[AttributeReference]]. - // 3. Deterministic + // pushed beneath must satisfy the following two conditions: + // 1. All the expressions are part of window partitioning key. The expressions can be compound. + // 2. Deterministic case filter @ Filter(condition, w: Window) if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => - cond.references.subsetOf(partitionAttrs) && cond.deterministic + cond.references.subsetOf(partitionAttrs) && cond.deterministic && + // This is for ensuring all the partitioning expressions have been converted to alias + // in Analyzer. Thus, we do not need to check if the expressions in conditions are + // the same as the expressions used in partitioning columns. + partitionAttrs.forall(_.isInstanceOf[Attribute]) } if (pushDown.nonEmpty) { val pushDownPredicate = pushDown.reduce(And) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 90c521a34420..4da7d45e3544 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -871,10 +871,39 @@ class FilterPushdownSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } + // complex predicates with the same references but the same expressions + // Todo: in Analyzer, to enable it, we need to convert the expression in conditions + // to the alias that is defined as the same expression + ignore("Window: predicate push down -- complex predicate with the same expressions") { + val winSpec = windowSpec( + partitionSpec = 'a.attr + 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + val winSpecAnalyzed = windowSpec( + partitionSpec = '_w0.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) + + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) + val correctAnswer = testRelation + .where('a + 'b > 1).select('a, 'b, 'c, ('a + 'b).as("_w0")) + .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + test("Window: no predicate push down -- predicates are not from partitioning keys") { - val winExpr = - windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) + val winSpec = windowSpec( + partitionSpec = 'a.attr :: 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + // No push down: the predicate is c > 1, but the partitioning key is (a, b). val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1) val correctAnswer = testRelation.select('a, 'b, 'c) .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) @@ -883,12 +912,20 @@ class FilterPushdownSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } - test("Window: no predicate push down -- compound partition key") { - val winSpec = windowSpec('a.attr + 'b.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + test("Window: no predicate push down -- partial compound partition key") { + val winSpec = windowSpec( + partitionSpec = 'a.attr + 'b.attr :: 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) val winExpr = windowExpr(count('b), winSpec) + + // No push down: the predicate is a > 1, but the partitioning key is (a + b, b) val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) - val winSpecAnalyzed = windowSpec('_w0.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winSpecAnalyzed = windowSpec( + partitionSpec = '_w0.attr :: 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0")) .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: 'b.attr :: Nil, 'b.asc :: Nil) @@ -896,4 +933,42 @@ class FilterPushdownSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } + + test("Window: no predicate push down -- complex predicates containing non partitioning columns") { + val winSpec = + windowSpec(partitionSpec = 'b.attr :: Nil, orderSpec = 'b.asc :: Nil, UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + // No push down: the predicate is a + b > 1, but the partitioning key is b. + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) + val correctAnswer = testRelation + .select('a, 'b, 'c) + .window(winExpr.as('window) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil) + .where('a + 'b > 1).select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + // complex predicates with the same references but different expressions + test("Window: no predicate push down -- complex predicate with different expressions") { + val winSpec = windowSpec( + partitionSpec = 'a.attr + 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + val winSpecAnalyzed = windowSpec( + partitionSpec = '_w0.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) + + // No push down: the predicate is a + b > 1, but the partitioning key is a + b. + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a - 'b > 1) + val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0")) + .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil) + .where('a - 'b > 1).select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } } From e427ce98638504213acf6c11fa913cfb0fe0eca3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 24 Apr 2016 21:20:23 -0700 Subject: [PATCH 17/17] address comments. --- .../spark/sql/catalyst/optimizer/FilterPushdownSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 4da7d45e3544..e2cc80c56422 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -821,6 +821,9 @@ class FilterPushdownSuite extends PlanTest { .window(winExpr1.as('window1) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) .select('a, 'b, 'c, 'window1, 'window2).analyze + // When Analyzer adding Window operators after grouping the extracted Window Expressions + // based on their Partition and Order Specs, the order of Window operators is + // non-deterministic. Thus, we have two correct plans val optimizedQuery = Optimize.execute(originalQuery.analyze) try { comparePlans(optimizedQuery, correctAnswer1) @@ -851,6 +854,9 @@ class FilterPushdownSuite extends PlanTest { .select('a, 'window1, 'b, 'c, 'window2).analyze val optimizedQuery = Optimize.execute(originalQuery.analyze) + // When Analyzer adding Window operators after grouping the extracted Window Expressions + // based on their Partition and Order Specs, the order of Window operators is + // non-deterministic. Thus, we have two correct plans try { comparePlans(optimizedQuery, correctAnswer1) } catch {