Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
86db9b2
[SPARK-22833][IMPROVEMENT] in SparkHive Scala Examples
chetkhatri Dec 23, 2017
ea2642e
[SPARK-20694][EXAMPLES] Update SQLDataSourceExample.scala
CNRui Dec 23, 2017
f6084a8
[HOTFIX] Fix Scala style checks
HyukjinKwon Dec 23, 2017
aeb45df
[SPARK-22844][R] Adds date_trunc in R API
HyukjinKwon Dec 23, 2017
1219d7a
[SPARK-22889][SPARKR] Set overwrite=T when install SparkR in tests
shivaram Dec 23, 2017
0bf1a74
[SPARK-22465][CORE] Add a safety-check to RDD defaultPartitioner
Dec 24, 2017
fba0313
[SPARK-22707][ML] Optimize CrossValidator memory occupation by models…
WeichenXu123 Dec 25, 2017
33ae243
[SPARK-22893][SQL] Unified the data type mismatch message
wangyum Dec 25, 2017
12d20dd
[SPARK-22874][PYSPARK][SQL][FOLLOW-UP] Modify error messages to show …
ueshin Dec 25, 2017
be03d3a
[SPARK-22893][SQL][HOTFIX] Fix a error message of VersionsSuite
dongjoon-hyun Dec 26, 2017
0e68330
[SPARK-20168][DSTREAM] Add changes to use kinesis fetches from specif…
yashs360 Dec 26, 2017
eb386be
[SPARK-21552][SQL] Add DecimalType support to ArrowWriter.
ueshin Dec 26, 2017
ff48b1b
[SPARK-22901][PYTHON] Add deterministic flag to pyspark UDF
mgaido91 Dec 26, 2017
9348e68
[SPARK-22833][EXAMPLE] Improvement SparkHive Scala Examples
cloud-fan Dec 26, 2017
91d1b30
[SPARK-22894][SQL] DateTimeOperations should accept SQL like string type
wangyum Dec 26, 2017
6674acd
[SPARK-22846][SQL] Fix table owner is null when creating table throug…
Dec 27, 2017
b8bfce5
[SPARK-22324][SQL][PYTHON][FOLLOW-UP] Update setup.py file.
ueshin Dec 27, 2017
774715d
[SPARK-22904][SQL] Add tests for decimal operations and string casts
mgaido91 Dec 27, 2017
753793b
[SPARK-22899][ML][STREAMING] Fix OneVsRestModel transform on streamin…
WeichenXu123 Dec 28, 2017
5683984
[SPARK-18016][SQL][FOLLOW-UP] Code Generation: Constant Pool Limit - …
kiszk Dec 28, 2017
32ec269
[SPARK-22909][SS] Move Structured Streaming v2 APIs to streaming folder
zsxwing Dec 28, 2017
171f6dd
[SPARK-22757][KUBERNETES] Enable use of remote dependencies (http, s3…
liyinan926 Dec 28, 2017
ded6d27
[SPARK-22648][K8S] Add documentation covering init containers and sec…
liyinan926 Dec 28, 2017
76e8a1d
[SPARK-22843][R] Adds localCheckpoint in R
HyukjinKwon Dec 28, 2017
1eebfbe
[SPARK-21208][R] Adds setLocalProperty and getLocalProperty in R
HyukjinKwon Dec 28, 2017
755f2f5
[SPARK-20392][SQL][FOLLOWUP] should not add extra AnalysisBarrier
cloud-fan Dec 28, 2017
2877817
[SPARK-22917][SQL] Should not try to generate histogram for empty/nul…
Dec 28, 2017
5536f31
[MINOR][BUILD] Fix Java linter errors
dongjoon-hyun Dec 28, 2017
8f6d573
[SPARK-22875][BUILD] Assembly build fails for a high user id
gerashegalov Dec 28, 2017
9c21ece
[SPARK-22836][UI] Show driver logs in UI when available.
Dec 28, 2017
613b71a
[SPARK-22890][TEST] Basic tests for DateTimeOperations
wangyum Dec 28, 2017
cfcd746
[SPARK-11035][CORE] Add in-process Spark app launcher.
Dec 28, 2017
ffe6fd7
[SPARK-22818][SQL] csv escape of quote escape
Dec 28, 2017
c745730
[SPARK-22905][MLLIB] Fix ChiSqSelectorModel save implementation
WeichenXu123 Dec 29, 2017
796e48c
[SPARK-22313][PYTHON][FOLLOWUP] Explicitly import warnings namespace …
HyukjinKwon Dec 29, 2017
67ea11e
[SPARK-22891][SQL] Make hive client creation thread safe
Dec 29, 2017
d4f0b1d
[SPARK-22834][SQL] Make insertion commands have real children to fix …
gengliangwang Dec 29, 2017
224375c
[SPARK-22892][SQL] Simplify some estimation logic by using double ins…
Dec 29, 2017
cc30ef8
[SPARK-22916][SQL] shouldn't bias towards build right if user does no…
Dec 29, 2017
fcf66a3
[SPARK-21657][SQL] optimize explode quadratic memory consumpation
uzadude Dec 29, 2017
dbd492b
[SPARK-22921][PROJECT-INFRA] Choices for Assigning Jira on Merge
squito Dec 29, 2017
11a849b
[SPARK-22370][SQL][PYSPARK][FOLLOW-UP] Fix a test failure when xmlrun…
ueshin Dec 29, 2017
8b49704
[SPARK-20654][CORE] Add config to limit disk usage of the history ser…
Dec 29, 2017
4e9e6ae
[SPARK-22864][CORE] Disable allocation schedule in ExecutorAllocation…
Dec 29, 2017
afc3641
[SPARK-22905][ML][FOLLOWUP] Fix GaussianMixtureModel save
zhengruifeng Dec 29, 2017
66a7d6b
[SPARK-22920][SPARKR] sql functions for current_date, current_timesta…
felixcheung Dec 29, 2017
ccda75b
[SPARK-22921][PROJECT-INFRA] Bug fix in jira assigning
squito Dec 29, 2017
30fcdc0
[SPARK-22922][ML][PYSPARK] Pyspark portion of the fit-multiple API
MrBago Dec 30, 2017
8169630
[SPARK-22734][ML][PYSPARK] Added Python API for VectorSizeHint.
MrBago Dec 30, 2017
2ea17af
[SPARK-22881][ML][TEST] ML regression package testsuite add Structure…
WeichenXu123 Dec 30, 2017
f2b3525
[SPARK-22771][SQL] Concatenate binary inputs into a binary output
maropu Dec 30, 2017
14c4a62
[SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to…
zsxwing Dec 30, 2017
234d943
[TEST][MINOR] remove redundant `EliminateSubqueryAliases` in test code
wzhfy Dec 30, 2017
fd7d141
[SPARK-22919] Bump httpclient versions
Dec 30, 2017
ea0a5ee
[SPARK-22924][SPARKR] R API for sortWithinPartitions
felixcheung Dec 30, 2017
ee3af15
[SPARK-22363][SQL][TEST] Add unit test for Window spilling
gaborgsomogyi Dec 31, 2017
cfbe11e
[SPARK-22895][SQL] Push down the deterministic predicates that are af…
gatorsmile Dec 31, 2017
3d8837e
[SPARK-22397][ML] add multiple columns support to QuantileDiscretizer
huaxingao Dec 31, 2017
028ee40
[SPARK-22801][ML][PYSPARK] Allow FeatureHasher to treat numeric colum…
Dec 31, 2017
5955a2d
[MINOR][DOCS] s/It take/It takes/g
jkremser Dec 31, 2017
994065d
[SPARK-13030][ML] Create OneHotEncoderEstimator for OneHotEncoder as …
viirya Dec 31, 2017
f5b7714
[BUILD] Close stale PRs
srowen Jan 1, 2018
7a702d8
[SPARK-21616][SPARKR][DOCS] update R migration guide and vignettes
felixcheung Jan 1, 2018
c284c4e
[MINOR] Fix a bunch of typos
srowen Dec 31, 2017
1c9f95c
[SPARK-22530][PYTHON][SQL] Adding Arrow support for ArrayType
BryanCutler Jan 1, 2018
e734a4b
[SPARK-21893][SPARK-22142][TESTS][FOLLOWUP] Enables PySpark tests for…
HyukjinKwon Jan 1, 2018
e0c090f
[SPARK-22932][SQL] Refactor AnalysisContext
gatorsmile Jan 2, 2018
a6fc300
[SPARK-22897][CORE] Expose stageAttemptId in TaskContext
advancedxy Jan 2, 2018
247a089
[SPARK-22938] Assert that SQLConf.get is accessed only on the driver.
juliuszsompolski Jan 3, 2018
1a87a16
[SPARK-22934][SQL] Make optional clauses order insensitive for CREATE…
gatorsmile Jan 3, 2018
a66fe36
[SPARK-20236][SQL] dynamic partition overwrite
cloud-fan Jan 3, 2018
9a2b65a
[SPARK-22896] Improvement in String interpolation
chetkhatri Jan 3, 2018
b297029
[SPARK-20960][SQL] make ColumnVector public
cloud-fan Jan 3, 2018
7d045c5
[SPARK-22944][SQL] improve FoldablePropagation
cloud-fan Jan 4, 2018
df95a90
[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, t…
felixcheung Jan 4, 2018
9fa703e
[SPARK-22950][SQL] Handle ChildFirstURLClassLoader's parent
yaooqinn Jan 4, 2018
d5861ab
[SPARK-22945][SQL] add java UDF APIs in the functions object
cloud-fan Jan 4, 2018
5aadbc9
[SPARK-22939][PYSPARK] Support Spark UDF in registerFunction
gatorsmile Jan 4, 2018
6f68316
[SPARK-22771][SQL] Add a missing return statement in Concat.checkInpu…
maropu Jan 4, 2018
93f92c0
[SPARK-21475][CORE][2ND ATTEMPT] Change to use NIO's Files API for ex…
jerryshao Jan 4, 2018
d2cddc8
[SPARK-22850][CORE] Ensure queued events are delivered to all event q…
Jan 4, 2018
95f9659
[SPARK-22948][K8S] Move SparkPodInitContainer to correct package.
Jan 4, 2018
e288fc8
[SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-…
liyinan926 Jan 4, 2018
0428368
[SPARK-22960][K8S] Make build-push-docker-images.sh more dev-friendly.
Jan 5, 2018
df7fc3e
[SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt
juliuszsompolski Jan 5, 2018
52fc5c1
[SPARK-22825][SQL] Fix incorrect results of Casting Array to String
maropu Jan 5, 2018
cf0aa65
[SPARK-22949][ML] Apply CrossValidator approach to Driver/Distributed…
MrBago Jan 5, 2018
6cff7d1
[SPARK-22757][K8S] Enable spark.jars and spark.files in KUBERNETES mode
liyinan926 Jan 5, 2018
51c33bd
[SPARK-22961][REGRESSION] Constant columns should generate QueryPlanC…
adrian-ionescu Jan 5, 2018
c0b7424
[SPARK-22940][SQL] HiveExternalCatalogVersionsSuite should succeed on…
bersprockets Jan 5, 2018
930b90a
[SPARK-13030][ML] Follow-up cleanups for OneHotEncoderEstimator
jkbradley Jan 5, 2018
ea95683
[SPARK-22914][DEPLOY] Register history.ui.port
gerashegalov Jan 6, 2018
e8af7e8
[SPARK-22937][SQL] SQL elt output binary for binary inputs
maropu Jan 6, 2018
bf65cd3
[SPARK-22960][K8S] Revert use of ARG base_image in images
liyinan926 Jan 6, 2018
f2dd8b9
[SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs…
icexelloss Jan 6, 2018
be9a804
[SPARK-22793][SQL] Memory leak in Spark Thrift Server
Jan 6, 2018
7b78041
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'Par…
fjh100456 Jan 6, 2018
993f215
[SPARK-22901][PYTHON][FOLLOWUP] Adds the doc for asNondeterministic f…
HyukjinKwon Jan 6, 2018
9a7048b
[HOTFIX] Fix style checking failure
gatorsmile Jan 6, 2018
18e9414
[SPARK-22973][SQL] Fix incorrect results of Casting Map to String
maropu Jan 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-22895][SQL] Push down the deterministic predicates that are af…
…ter the first non-deterministic

## What changes were proposed in this pull request?
Currently, we do not guarantee an order evaluation of conjuncts in either Filter or Join operator. This is also true to the mainstream RDBMS vendors like DB2 and MS SQL Server. Thus, we should also push down the deterministic predicates that are after the first non-deterministic, if possible.

## How was this patch tested?
Updated the existing test cases.

Author: gatorsmile <[email protected]>

Closes apache#20069 from gatorsmile/morePushDown.
  • Loading branch information
gatorsmile committed Dec 31, 2017
commit cfbe11e8164c04cd7d388e4faeded21a9331dac4
1 change: 1 addition & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,7 @@ options.

- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.
- The `percentile_approx` function previously accepted numeric type input and output double type results. Now it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles.
- Since Spark 2.3, the Join/Filter's deterministic predicates that are after the first non-deterministic predicates are also pushed down/through the child operators, if possible. In prior Spark versions, these filters are not eligible for predicate pushdown.
- Partition column inference previously found incorrect common type for different inferred types, for example, previously it ended up with double type as the common type for double type and date type. Now it finds the correct common type for such conflicts. The conflict resolution follows the table below:

<table class="table">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,15 +805,15 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {

// For each filter, expand the alias and check if the filter can be evaluated using
// attributes produced by the aggregate operator's child operator.
val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(condition).span(_.deterministic)
val (candidates, nonDeterministic) =
splitConjunctivePredicates(condition).partition(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
cond.references.nonEmpty && replaced.references.subsetOf(aggregate.child.outputSet)
}

val stayUp = rest ++ containingNonDeterministic
val stayUp = rest ++ nonDeterministic

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
Expand All @@ -835,14 +835,14 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))

val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(condition).span(_.deterministic)
val (candidates, nonDeterministic) =
splitConjunctivePredicates(condition).partition(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
cond.references.subsetOf(partitionAttrs)
}

val stayUp = rest ++ containingNonDeterministic
val stayUp = rest ++ nonDeterministic

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
Expand All @@ -854,7 +854,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {

case filter @ Filter(condition, union: Union) =>
// Union could change the rows, so non-deterministic predicate can't be pushed down
val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic)
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition(_.deterministic)

if (pushDown.nonEmpty) {
val pushDownCond = pushDown.reduceLeft(And)
Expand All @@ -878,13 +878,9 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
}

case filter @ Filter(condition, watermark: EventTimeWatermark) =>
// We can only push deterministic predicates which don't reference the watermark attribute.
// We could in theory span() only on determinism and pull out deterministic predicates
// on the watermark separately. But it seems unnecessary and a bit confusing to not simply
// use the prefix as we do for nondeterminism in other cases.

val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(
p => p.deterministic && !p.references.contains(watermark.eventTime))
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { p =>
p.deterministic && !p.references.contains(watermark.eventTime)
}

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduceLeft(And)
Expand Down Expand Up @@ -925,14 +921,14 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// come from grandchild.
// TODO: non-deterministic predicates could be pushed through some operators that do not change
// the rows.
val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(filter.condition).span(_.deterministic)
val (candidates, nonDeterministic) =
splitConjunctivePredicates(filter.condition).partition(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
cond.references.subsetOf(grandchild.outputSet)
}

val stayUp = rest ++ containingNonDeterministic
val stayUp = rest ++ nonDeterministic

if (pushDown.nonEmpty) {
val newChild = insertFilter(pushDown.reduceLeft(And))
Expand Down Expand Up @@ -975,23 +971,19 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Splits join condition expressions or filter predicates (on a given join's output) into three
* categories based on the attributes required to evaluate them. Note that we explicitly exclude
* on-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
* non-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
* canEvaluateInRight to prevent pushing these predicates on either side of the join.
*
* @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
*/
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
// Note: In order to ensure correctness, it's important to not change the relative ordering of
// any deterministic expression that follows a non-deterministic expression. To achieve this,
// we only consider pushing down those expressions that precede the first non-deterministic
// expression in the condition.
val (pushDownCandidates, containingNonDeterministic) = condition.span(_.deterministic)
val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic)
val (leftEvaluateCondition, rest) =
pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
val (rightEvaluateCondition, commonCondition) =
rest.partition(expr => expr.references.subsetOf(right.outputSet))

(leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ containingNonDeterministic)
(leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic)
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,9 +831,9 @@ class FilterPushdownSuite extends PlanTest {
val optimized = Optimize.execute(originalQuery.analyze)

val correctAnswer = Union(Seq(
testRelation.where('a === 2L),
testRelation2.where('d === 2L)))
.where('b + Rand(10).as("rnd") === 3 && 'c > 5L)
testRelation.where('a === 2L && 'c > 5L),
testRelation2.where('d === 2L && 'f > 5L)))
.where('b + Rand(10).as("rnd") === 3)
.analyze

comparePlans(optimized, correctAnswer)
Expand Down Expand Up @@ -1134,29 +1134,30 @@ class FilterPushdownSuite extends PlanTest {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

// Verify that all conditions preceding the first non-deterministic condition are pushed down
// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
val originalQuery = x.join(y, condition = Some("x.a".attr === 5 && "y.a".attr === 5 &&
"x.a".attr === Rand(10) && "y.b".attr === 5))
val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5),
condition = Some("x.a".attr === Rand(10) && "y.b".attr === 5))
val correctAnswer =
x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5),
condition = Some("x.a".attr === Rand(10)))

// CheckAnalysis will ensure nondeterministic expressions not appear in join condition.
// TODO support nondeterministic expressions in join condition.
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
checkAnalysis = false)
}

test("watermark pushdown: no pushdown on watermark attribute") {
test("watermark pushdown: no pushdown on watermark attribute #1") {
val interval = new CalendarInterval(2, 2000L)

// Verify that all conditions preceding the first watermark touching condition are pushed down
// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
val originalQuery = EventTimeWatermark('b, interval, testRelation)
.where('a === 5 && 'b === 10 && 'c === 5)
val correctAnswer = EventTimeWatermark(
'b, interval, testRelation.where('a === 5))
.where('b === 10 && 'c === 5)
'b, interval, testRelation.where('a === 5 && 'c === 5))
.where('b === 10)

comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
checkAnalysis = false)
Expand All @@ -1165,7 +1166,7 @@ class FilterPushdownSuite extends PlanTest {
test("watermark pushdown: no pushdown for nondeterministic filter") {
val interval = new CalendarInterval(2, 2000L)

// Verify that all conditions preceding the first watermark touching condition are pushed down
// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
val originalQuery = EventTimeWatermark('c, interval, testRelation)
.where('a === 5 && 'b === Rand(10) && 'c === 5)
Expand All @@ -1180,7 +1181,7 @@ class FilterPushdownSuite extends PlanTest {
test("watermark pushdown: full pushdown") {
val interval = new CalendarInterval(2, 2000L)

// Verify that all conditions preceding the first watermark touching condition are pushed down
// Verify that all conditions except the watermark touching condition are pushed down
// by the optimizer and others are not.
val originalQuery = EventTimeWatermark('c, interval, testRelation)
.where('a === 5 && 'b === 10)
Expand All @@ -1191,15 +1192,15 @@ class FilterPushdownSuite extends PlanTest {
checkAnalysis = false)
}

test("watermark pushdown: empty pushdown") {
test("watermark pushdown: no pushdown on watermark attribute #2") {
val interval = new CalendarInterval(2, 2000L)

// Verify that all conditions preceding the first watermark touching condition are pushed down
// by the optimizer and others are not.
val originalQuery = EventTimeWatermark('a, interval, testRelation)
.where('a === 5 && 'b === 10)
val correctAnswer = EventTimeWatermark(
'a, interval, testRelation.where('b === 10)).where('a === 5)

comparePlans(Optimize.execute(originalQuery.analyze), originalQuery.analyze,
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
checkAnalysis = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHel
// top-down, then we can simplify the logic here and only collect target operators.
val filterPushed = plan transformUp {
case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, reader)) =>
// Non-deterministic expressions are stateful and we must keep the input sequence unchanged
// to avoid changing the result. This means, we can't evaluate the filter conditions that
// are after the first non-deterministic condition ahead. Here we only try to push down
// deterministic conditions that are before the first non-deterministic condition.
val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(condition).span(_.deterministic)
val (candidates, nonDeterministic) =
splitConjunctivePredicates(condition).partition(_.deterministic)

val stayUpFilters: Seq[Expression] = reader match {
case r: SupportsPushDownCatalystFilters =>
Expand Down Expand Up @@ -74,7 +70,7 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHel
case _ => candidates
}

val filterCondition = (stayUpFilters ++ containingNonDeterministic).reduceLeftOption(And)
val filterCondition = (stayUpFilters ++ nonDeterministic).reduceLeftOption(And)
val withFilter = filterCondition.map(Filter(_, r)).getOrElse(r)
if (withFilter.output == fields) {
withFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
private def trySplitFilter(plan: SparkPlan): SparkPlan = {
plan match {
case filter: FilterExec =>
val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(filter.condition).span(_.deterministic)
val (candidates, nonDeterministic) =
splitConjunctivePredicates(filter.condition).partition(_.deterministic)
val (pushDown, rest) = candidates.partition(!hasPythonUDF(_))
if (pushDown.nonEmpty) {
val newChild = FilterExec(pushDown.reduceLeft(And), filter.child)
FilterExec((rest ++ containingNonDeterministic).reduceLeft(And), newChild)
FilterExec((rest ++ nonDeterministic).reduceLeft(And), newChild)
} else {
filter
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ object StreamingSymmetricHashJoinHelper extends Logging {
* left AND right AND joined is equivalent to full.
*
* Note that left and right do not necessarily contain *all* conjuncts which satisfy
* their condition. Any conjuncts after the first nondeterministic one are treated as
* nondeterministic for purposes of the split.
* their condition.
*
* @param leftSideOnly Deterministic conjuncts which reference only the left side of the join.
* @param rightSideOnly Deterministic conjuncts which reference only the right side of the join.
Expand Down Expand Up @@ -111,7 +110,7 @@ object StreamingSymmetricHashJoinHelper extends Logging {
// Span rather than partition, because nondeterministic expressions don't commute
// across AND.
val (deterministicConjuncts, nonDeterministicConjuncts) =
splitConjunctivePredicates(condition.get).span(_.deterministic)
splitConjunctivePredicates(condition.get).partition(_.deterministic)

val (leftConjuncts, nonLeftConjuncts) = deterministicConjuncts.partition { cond =>
cond.references.subsetOf(left.outputSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
assert(qualifiedPlanNodes.size == 2)
}

test("Python UDF: no push down on predicates starting from the first non-deterministic") {
test("Python UDF: push down on deterministic predicates after the first non-deterministic") {
val df = Seq(("Hello", 4)).toDF("a", "b")
.where("dummyPythonUDF(a) and rand() > 0.3 and b > 4")

val qualifiedPlanNodes = df.queryExecution.executedPlan.collect {
case f @ FilterExec(And(_: And, _: GreaterThan), InputAdapter(_: BatchEvalPythonExec)) => f
case f @ FilterExec(
And(_: AttributeReference, _: GreaterThan),
InputAdapter(_: BatchEvalPythonExec)) => f
case b @ BatchEvalPythonExec(_, _, WholeStageCodegenExec(_: FilterExec)) => b
}
assert(qualifiedPlanNodes.size == 1)
assert(qualifiedPlanNodes.size == 2)
}

test("Python UDF refers to the attributes from more than one child") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.spark.sql.streaming

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, SparkPlan}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -95,19 +93,17 @@ class StreamingSymmetricHashJoinHelperSuite extends StreamTest {
}

test("conjuncts after nondeterministic") {
// All conjuncts after a nondeterministic conjunct shouldn't be split because they don't
// commute across it.
val predicate =
(rand() > lit(0)
(rand(9) > lit(0)
&& leftColA > leftColB
&& rightColC > rightColD
&& leftColA === rightColC
&& lit(1) === lit(1)).expr
val split = JoinConditionSplitPredicates(Some(predicate), left, right)

assert(split.leftSideOnly.isEmpty)
assert(split.rightSideOnly.isEmpty)
assert(split.bothSides.contains(predicate))
assert(split.leftSideOnly.contains((leftColA > leftColB && lit(1) === lit(1)).expr))
assert(split.rightSideOnly.contains((rightColC > rightColD && lit(1) === lit(1)).expr))
assert(split.bothSides.contains((leftColA === rightColC && rand(9) > lit(0)).expr))
assert(split.full.contains(predicate))
}

Expand Down