Skip to content

Commit 7b06a89

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-16686][SQL] Remove PushProjectThroughSample since it is handled by ColumnPruning
## What changes were proposed in this pull request? We push down `Project` through `Sample` in `Optimizer` by the rule `PushProjectThroughSample`. However, if the projected columns produce new output, they will encounter whole data instead of sampled data. It will bring some inconsistency between original plan (Sample then Project) and optimized plan (Project then Sample). In the extreme case such as attached in the JIRA, if the projected column is an UDF which is supposed to not see the sampled out data, the result of UDF will be incorrect. Since the rule `ColumnPruning` already handles general `Project` pushdown. We don't need `PushProjectThroughSample` anymore. The rule `ColumnPruning` also avoids the described issue. ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #14327 from viirya/fix-sample-pushdown.
1 parent 815f3ee commit 7b06a89

File tree

4 files changed

+40
-29
lines changed

4 files changed

+40
-29
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
7676
Batch("Operator Optimizations", fixedPoint,
7777
// Operator push down
7878
PushThroughSetOperations,
79-
PushProjectThroughSample,
8079
ReorderJoin,
8180
EliminateOuterJoin,
8281
PushPredicateThroughJoin,
@@ -148,17 +147,6 @@ class SimpleTestOptimizer extends Optimizer(
148147
new SimpleCatalystConf(caseSensitiveAnalysis = true)),
149148
new SimpleCatalystConf(caseSensitiveAnalysis = true))
150149

151-
/**
152-
* Pushes projects down beneath Sample to enable column pruning with sampling.
153-
*/
154-
object PushProjectThroughSample extends Rule[LogicalPlan] {
155-
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
156-
// Push down projection into sample
157-
case Project(projectList, Sample(lb, up, replace, seed, child)) =>
158-
Sample(lb, up, replace, seed, Project(projectList, child))()
159-
}
160-
}
161-
162150
/**
163151
* Removes the Project only conducting Alias of its child node.
164152
* It is created mainly for removing extra Project added in EliminateSerialization rule,

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,5 +346,20 @@ class ColumnPruningSuite extends PlanTest {
346346
comparePlans(Optimize.execute(plan1.analyze), correctAnswer1)
347347
}
348348

349+
test("push project down into sample") {
350+
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
351+
val x = testRelation.subquery('x)
352+
353+
val query1 = Sample(0.0, 0.6, false, 11L, x)().select('a)
354+
val optimized1 = Optimize.execute(query1.analyze)
355+
val expected1 = Sample(0.0, 0.6, false, 11L, x.select('a))()
356+
comparePlans(optimized1, expected1.analyze)
357+
358+
val query2 = Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa)
359+
val optimized2 = Optimize.execute(query2.analyze)
360+
val expected2 = Sample(0.0, 0.6, false, 11L, x.select('a))().select('a as 'aa)
361+
comparePlans(optimized2, expected2.analyze)
362+
}
363+
349364
// todo: add more tests for column pruning
350365
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ class FilterPushdownSuite extends PlanTest {
3434
Batch("Subqueries", Once,
3535
EliminateSubqueryAliases) ::
3636
Batch("Filter Pushdown", FixedPoint(10),
37-
PushProjectThroughSample,
3837
CombineFilters,
3938
PushDownPredicate,
4039
BooleanSimplification,
@@ -585,22 +584,6 @@ class FilterPushdownSuite extends PlanTest {
585584
comparePlans(optimized, originalQuery)
586585
}
587586

588-
test("push project and filter down into sample") {
589-
val x = testRelation.subquery('x)
590-
val originalQuery =
591-
Sample(0.0, 0.6, false, 11L, x)().select('a)
592-
593-
val originalQueryAnalyzed =
594-
EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(originalQuery))
595-
596-
val optimized = Optimize.execute(originalQueryAnalyzed)
597-
598-
val correctAnswer =
599-
Sample(0.0, 0.6, false, 11L, x.select('a))()
600-
601-
comparePlans(optimized, correctAnswer.analyze)
602-
}
603-
604587
test("aggregate: push down filter when filter on group by expression") {
605588
val originalQuery = testRelation
606589
.groupBy('a)('a, count('b) as 'c)

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,31 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
422422
3, 17, 27, 58, 62)
423423
}
424424

425+
test("SPARK-16686: Dataset.sample with seed results shouldn't depend on downstream usage") {
426+
val simpleUdf = udf((n: Int) => {
427+
require(n != 1, "simpleUdf shouldn't see id=1!")
428+
1
429+
})
430+
431+
val df = Seq(
432+
(0, "string0"),
433+
(1, "string1"),
434+
(2, "string2"),
435+
(3, "string3"),
436+
(4, "string4"),
437+
(5, "string5"),
438+
(6, "string6"),
439+
(7, "string7"),
440+
(8, "string8"),
441+
(9, "string9")
442+
).toDF("id", "stringData")
443+
val sampleDF = df.sample(false, 0.7, 50)
444+
// After sampling, sampleDF doesn't contain id=1.
445+
assert(!sampleDF.select("id").collect.contains(1))
446+
// simpleUdf should not encounter id=1.
447+
checkAnswer(sampleDF.select(simpleUdf($"id")), List.fill(sampleDF.count.toInt)(Row(1)))
448+
}
449+
425450
test("SPARK-11436: we should rebind right encoder when join 2 datasets") {
426451
val ds1 = Seq("1", "2").toDS().as("a")
427452
val ds2 = Seq(2, 3).toDS().as("b")

0 commit comments

Comments
 (0)