From 9521a5aca87bead3dcfeabd7abe3468194984ea3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 23 Jul 2016 18:13:07 +0800 Subject: [PATCH 1/8] Project shouldn't be pushed down through Sample if it has new output. --- .../sql/catalyst/optimizer/Optimizer.scala | 9 +++++- .../optimizer/FilterPushdownSuite.scala | 16 ++++++++++ .../org/apache/spark/sql/DatasetSuite.scala | 29 +++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c8e9d8e2f9dd..1d98b4e7d691 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -150,13 +150,20 @@ class SimpleTestOptimizer extends Optimizer( /** * Pushes projects down beneath Sample to enable column pruning with sampling. + * This rule is only doable when the projects don't add new attributes. */ object PushProjectThroughSample extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Push down projection into sample - case Project(projectList, Sample(lb, up, replace, seed, child)) => + case p @ Project(projectList, Sample(lb, up, replace, seed, child)) + if !hasNewOutput(projectList, p.child.output) => Sample(lb, up, replace, seed, Project(projectList, child))() } + private def hasNewOutput( + projectList: Seq[NamedExpression], + childOutput: Seq[Attribute]): Boolean = { + projectList.exists(p => !childOutput.exists(_.semanticEquals(p))) + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 780e78ed1cf2..0a5d97632a2d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -601,6 +601,22 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer.analyze) } + test("don't push project down into sample if project brings new attributes") { + val x = testRelation.subquery('x) + val originalQuery = + Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa) + + val originalQueryAnalyzed = + EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery)) + + val optimized = Optimize.execute(originalQueryAnalyzed) + + val correctAnswer = + Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa) + + comparePlans(optimized, correctAnswer.analyze) + } + test("aggregate: push down filter when filter on group by expression") { val originalQuery = testRelation .groupBy('a)('a, count('b) as 'c) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 0b6f40872f2e..27b504dc40e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -422,6 +422,35 @@ class DatasetSuite extends QueryTest with SharedSQLContext { 3, 17, 27, 58, 62) } + test("SPARK-16686: Dataset.sample with seed results shouldn't depend on downstream usage") { + val udfOne = spark.udf.register("udfOne", (n: Int) => { + if (n == 1) { + throw new RuntimeException("udfOne shouldn't see swid=1!") + } else { + 1 + } + }) + + val d = Seq( + (0, "string0"), + (1, "string1"), + (2, "string2"), + (3, "string3"), + (4, "string4"), + (5, "string5"), + (6, "string6"), + (7, "string7"), + (8, "string8"), + (9, "string9") + ) + val df = spark.createDataFrame(d).toDF("swid", "stringData") + val sampleDF = df.sample(false, 0.7, 50) + // After sampling, sampleDF doesn't contain swid=1. + assert(!sampleDF.select("swid").collect.contains(1)) + // udfOne should not encounter swid=1. + sampleDF.select(udfOne($"swid")).collect + } + test("SPARK-11436: we should rebind right encoder when join 2 datasets") { val ds1 = Seq("1", "2").toDS().as("a") val ds2 = Seq(2, 3).toDS().as("b") From 6d1616d41cc1158089ac0f38a6402a0fef58b191 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 23 Jul 2016 21:33:24 +0800 Subject: [PATCH 2/8] Address comment. --- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 27b504dc40e5..eecf9579c9ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -424,11 +424,8 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("SPARK-16686: Dataset.sample with seed results shouldn't depend on downstream usage") { val udfOne = spark.udf.register("udfOne", (n: Int) => { - if (n == 1) { - throw new RuntimeException("udfOne shouldn't see swid=1!") - } else { - 1 - } + require(n != 1, "udfOne shouldn't see swid=1!") + 1 }) val d = Seq( From 31a6f6fef63b0f130738a6fc7a5f628e60947cb9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 25 Jul 2016 10:35:13 +0800 Subject: [PATCH 3/8] Address comments. --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 7 +------ .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 7 +++---- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1d98b4e7d691..c47ae91d9a74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -156,14 +156,9 @@ object PushProjectThroughSample extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Push down projection into sample case p @ Project(projectList, Sample(lb, up, replace, seed, child)) - if !hasNewOutput(projectList, p.child.output) => + if p.outputSet.subsetOf(p.child.outputSet) => Sample(lb, up, replace, seed, Project(projectList, child))() } - private def hasNewOutput( - projectList: Seq[NamedExpression], - childOutput: Seq[Attribute]): Boolean = { - projectList.exists(p => !childOutput.exists(_.semanticEquals(p))) - } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index eecf9579c9ea..923065f4c5df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -423,12 +423,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext { } test("SPARK-16686: Dataset.sample with seed results shouldn't depend on downstream usage") { - val udfOne = spark.udf.register("udfOne", (n: Int) => { + val udfOne = udf((n: Int) => { require(n != 1, "udfOne shouldn't see swid=1!") 1 }) - val d = Seq( + val df = Seq( (0, "string0"), (1, "string1"), (2, "string2"), @@ -439,8 +439,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { (7, "string7"), (8, "string8"), (9, "string9") - ) - val df = spark.createDataFrame(d).toDF("swid", "stringData") + ).toDF("swid", "stringData") val sampleDF = df.sample(false, 0.7, 50) // After sampling, sampleDF doesn't contain swid=1. assert(!sampleDF.select("swid").collect.contains(1)) From 5c4e7ffff5bd438ad7abbb37dc84cff9e5036567 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 25 Jul 2016 12:04:46 +0800 Subject: [PATCH 4/8] Check Dataset result than just calling collect. --- sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 923065f4c5df..45c1d577ee80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -444,7 +444,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { // After sampling, sampleDF doesn't contain swid=1. assert(!sampleDF.select("swid").collect.contains(1)) // udfOne should not encounter swid=1. - sampleDF.select(udfOne($"swid")).collect + checkAnswer(sampleDF.select(udfOne($"swid")), List.fill(sampleDF.count.toInt)(Row(1))) } test("SPARK-11436: we should rebind right encoder when join 2 datasets") { From 20e94367bf1b4aba226e2929aa2056427e9651ba Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 25 Jul 2016 14:36:55 +0800 Subject: [PATCH 5/8] Address comments. --- .../sql/catalyst/optimizer/Optimizer.scala | 14 -------- .../optimizer/ColumnPruningSuite.scala | 31 +++++++++++++++++ .../optimizer/FilterPushdownSuite.scala | 33 ------------------- 3 files changed, 31 insertions(+), 47 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c47ae91d9a74..fe328fd598d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -76,7 +76,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Operator Optimizations", fixedPoint, // Operator push down PushThroughSetOperations, - PushProjectThroughSample, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, @@ -148,19 +147,6 @@ class SimpleTestOptimizer extends Optimizer( new SimpleCatalystConf(caseSensitiveAnalysis = true)), new SimpleCatalystConf(caseSensitiveAnalysis = true)) -/** - * Pushes projects down beneath Sample to enable column pruning with sampling. - * This rule is only doable when the projects don't add new attributes. - */ -object PushProjectThroughSample extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - // Push down projection into sample - case p @ Project(projectList, Sample(lb, up, replace, seed, child)) - if p.outputSet.subsetOf(p.child.outputSet) => - Sample(lb, up, replace, seed, Project(projectList, child))() - } -} - /** * Removes the Project only conducting Alias of its child node. * It is created mainly for removing extra Project added in EliminateSerialization rule, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index b5664a5e699e..bf5a68692c57 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.reflect.runtime.universe.TypeTag import org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -346,5 +347,35 @@ class ColumnPruningSuite extends PlanTest { comparePlans(Optimize.execute(plan1.analyze), correctAnswer1) } + test("push project down into sample") { + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + val x = testRelation.subquery('x) + val originalQuery = + Sample(0.0, 0.6, false, 11L, x)().select('a) + + val originalQueryAnalyzed = + EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery)) + + val optimized = Optimize.execute(originalQueryAnalyzed) + + val correctAnswer = + Sample(0.0, 0.6, false, 11L, x.select('a))() + + comparePlans(optimized, correctAnswer.analyze) + + val originalQuery2 = + Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa) + + val originalQueryAnalyzed2 = + EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery2)) + + val optimized2 = Optimize.execute(originalQueryAnalyzed2) + + val correctAnswer2 = + Sample(0.0, 0.6, false, 11L, x.select('a))().select('a as 'aa) + + comparePlans(optimized2, correctAnswer2.analyze) + } + // todo: add more tests for column pruning } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 0a5d97632a2d..596b8fcea194 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -34,7 +34,6 @@ class FilterPushdownSuite extends PlanTest { Batch("Subqueries", Once, EliminateSubqueryAliases) :: Batch("Filter Pushdown", FixedPoint(10), - PushProjectThroughSample, CombineFilters, PushDownPredicate, BooleanSimplification, @@ -585,38 +584,6 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery) } - test("push project and filter down into sample") { - val x = testRelation.subquery('x) - val originalQuery = - Sample(0.0, 0.6, false, 11L, x)().select('a) - - val originalQueryAnalyzed = - EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery)) - - val optimized = Optimize.execute(originalQueryAnalyzed) - - val correctAnswer = - Sample(0.0, 0.6, false, 11L, x.select('a))() - - comparePlans(optimized, correctAnswer.analyze) - } - - test("don't push project down into sample if project brings new attributes") { - val x = testRelation.subquery('x) - val originalQuery = - Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa) - - val originalQueryAnalyzed = - EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery)) - - val optimized = Optimize.execute(originalQueryAnalyzed) - - val correctAnswer = - Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa) - - comparePlans(optimized, correctAnswer.analyze) - } - test("aggregate: push down filter when filter on group by expression") { val originalQuery = testRelation .groupBy('a)('a, count('b) as 'c) From dc70f1d0238a6a45e8026ab4938c0938cac048fe Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 25 Jul 2016 16:19:54 +0800 Subject: [PATCH 6/8] Use proper name. --- .../scala/org/apache/spark/sql/DatasetSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 45c1d577ee80..7e3b7b63d8b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -423,8 +423,8 @@ class DatasetSuite extends QueryTest with SharedSQLContext { } test("SPARK-16686: Dataset.sample with seed results shouldn't depend on downstream usage") { - val udfOne = udf((n: Int) => { - require(n != 1, "udfOne shouldn't see swid=1!") + val simpleUdf = udf((n: Int) => { + require(n != 1, "simpleUdf shouldn't see id=1!") 1 }) @@ -439,12 +439,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext { (7, "string7"), (8, "string8"), (9, "string9") - ).toDF("swid", "stringData") + ).toDF("id", "stringData") val sampleDF = df.sample(false, 0.7, 50) - // After sampling, sampleDF doesn't contain swid=1. - assert(!sampleDF.select("swid").collect.contains(1)) - // udfOne should not encounter swid=1. - checkAnswer(sampleDF.select(udfOne($"swid")), List.fill(sampleDF.count.toInt)(Row(1))) + // After sampling, sampleDF doesn't contain id=1. + assert(!sampleDF.select("id").collect.contains(1)) + // simpleUdf should not encounter id=1. + checkAnswer(sampleDF.select(simpleUdf($"id")), List.fill(sampleDF.count.toInt)(Row(1))) } test("SPARK-11436: we should rebind right encoder when join 2 datasets") { From 2186b7ef95a2a2fc3dd047fb256d6c71fdd6c6ad Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 25 Jul 2016 21:19:34 +0800 Subject: [PATCH 7/8] Address comment. --- .../spark/sql/catalyst/optimizer/ColumnPruningSuite.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index bf5a68692c57..d9343d8cf2d8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.optimizer import scala.reflect.runtime.universe.TypeTag import org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -353,8 +352,7 @@ class ColumnPruningSuite extends PlanTest { val originalQuery = Sample(0.0, 0.6, false, 11L, x)().select('a) - val originalQueryAnalyzed = - EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery)) + val originalQueryAnalyzed = originalQuery.analyze val optimized = Optimize.execute(originalQueryAnalyzed) @@ -366,8 +364,7 @@ class ColumnPruningSuite extends PlanTest { val originalQuery2 = Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa) - val originalQueryAnalyzed2 = - EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery2)) + val originalQueryAnalyzed2 = originalQuery2.analyze val optimized2 = Optimize.execute(originalQueryAnalyzed2) From 3e134f18f5b3fc678d95e2bf10997185ab6dd6e9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 26 Jul 2016 09:39:10 +0800 Subject: [PATCH 8/8] Fix style. --- .../optimizer/ColumnPruningSuite.scala | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index d9343d8cf2d8..589607e3ad5c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -349,29 +349,16 @@ class ColumnPruningSuite extends PlanTest { test("push project down into sample") { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) val x = testRelation.subquery('x) - val originalQuery = - Sample(0.0, 0.6, false, 11L, x)().select('a) - - val originalQueryAnalyzed = originalQuery.analyze - - val optimized = Optimize.execute(originalQueryAnalyzed) - - val correctAnswer = - Sample(0.0, 0.6, false, 11L, x.select('a))() - - comparePlans(optimized, correctAnswer.analyze) - - val originalQuery2 = - Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa) - - val originalQueryAnalyzed2 = originalQuery2.analyze - - val optimized2 = Optimize.execute(originalQueryAnalyzed2) - val correctAnswer2 = - Sample(0.0, 0.6, false, 11L, x.select('a))().select('a as 'aa) + val query1 = Sample(0.0, 0.6, false, 11L, x)().select('a) + val optimized1 = Optimize.execute(query1.analyze) + val expected1 = Sample(0.0, 0.6, false, 11L, x.select('a))() + comparePlans(optimized1, expected1.analyze) - comparePlans(optimized2, correctAnswer2.analyze) + val query2 = Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa) + val optimized2 = Optimize.execute(query2.analyze) + val expected2 = Sample(0.0, 0.6, false, 11L, x.select('a))().select('a as 'aa) + comparePlans(optimized2, expected2.analyze) } // todo: add more tests for column pruning