Skip to content

Commit cdcccd7

Browse files
huleileicloud-fan
authored andcommitted
[SPARK-23405] Generate additional constraints for Join's children
## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) I run a sql: `select ls.cs_order_number from ls left semi join catalog_sales cs on ls.cs_order_number = cs.cs_order_number`, The `ls` table is a small table ,and the number is one. The `catalog_sales` table is a big table, and the number is 10 billion. The task will be hang up. And i find the many null values of `cs_order_number` in the `catalog_sales` table. I think the null value should be removed in the logical plan. >== Optimized Logical Plan == >Join LeftSemi, (cs_order_number#1 = cs_order_number#22) >:- Project cs_order_number#1 > : +- Filter isnotnull(cs_order_number#1) > : +- MetastoreRelation 100t, ls >+- Project cs_order_number#22 > +- MetastoreRelation 100t, catalog_sales Now, use this patch, the plan will be: >== Optimized Logical Plan == >Join LeftSemi, (cs_order_number#1 = cs_order_number#22) >:- Project cs_order_number#1 > : +- Filter isnotnull(cs_order_number#1) > : +- MetastoreRelation 100t, ls >+- Project cs_order_number#22 > : **+- Filter isnotnull(cs_order_number#22)** > :+- MetastoreRelation 100t, catalog_sales ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: KaiXinXiaoLei <[email protected]> Author: hanghang <[email protected]> Closes #20670 from KaiXinXiaoLei/Spark-23405.
1 parent ff14801 commit cdcccd7

File tree

3 files changed

+28
-13
lines changed

3 files changed

+28
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
661661
case join @ Join(left, right, joinType, conditionOpt) =>
662662
// Only consider constraints that can be pushed down completely to either the left or the
663663
// right child
664-
val constraints = join.constraints.filter { c =>
664+
val constraints = join.allConstraints.filter { c =>
665665
c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
666666
}
667667
// Remove those constraints that are already enforced by either the left or the right child

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._
2323
trait QueryPlanConstraints { self: LogicalPlan =>
2424

2525
/**
26-
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
27-
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
28-
* evaluate to `true` for all rows produced.
26+
* An [[ExpressionSet]] that contains an additional set of constraints, such as equality
27+
* constraints and `isNotNull` constraints, etc.
2928
*/
30-
lazy val constraints: ExpressionSet = {
29+
lazy val allConstraints: ExpressionSet = {
3130
if (conf.constraintPropagationEnabled) {
32-
ExpressionSet(
33-
validConstraints
34-
.union(inferAdditionalConstraints(validConstraints))
35-
.union(constructIsNotNullConstraints(validConstraints))
36-
.filter { c =>
37-
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
38-
}
39-
)
31+
ExpressionSet(validConstraints
32+
.union(inferAdditionalConstraints(validConstraints))
33+
.union(constructIsNotNullConstraints(validConstraints)))
4034
} else {
4135
ExpressionSet(Set.empty)
4236
}
4337
}
4438

39+
/**
40+
* An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
41+
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
42+
* evaluate to `true` for all rows produced.
43+
*/
44+
lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c =>
45+
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
46+
})
47+
4548
/**
4649
* This method can be overridden by any child class of QueryPlan to specify a set of constraints
4750
* based on the given operator's constraint propagation logic. These constraints are then

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,16 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
192192

193193
comparePlans(Optimize.execute(original.analyze), correct.analyze)
194194
}
195+
196+
test("SPARK-23405: left-semi equal-join should filter out null join keys on both sides") {
197+
val x = testRelation.subquery('x)
198+
val y = testRelation.subquery('y)
199+
val condition = Some("x.a".attr === "y.a".attr)
200+
val originalQuery = x.join(y, LeftSemi, condition).analyze
201+
val left = x.where(IsNotNull('a))
202+
val right = y.where(IsNotNull('a))
203+
val correctAnswer = left.join(right, LeftSemi, condition).analyze
204+
val optimized = Optimize.execute(originalQuery)
205+
comparePlans(optimized, correctAnswer)
206+
}
195207
}

0 commit comments

Comments
 (0)