Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
LimitPushDown,
ColumnPruning,
EliminateOperators,
InferFiltersFromConstraints,
// Operator combine
CollapseRepartition,
CollapseProject,
CombineFilters,
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
NullFiltering,
NullPropagation,
OptimizeIn,
ConstantFolding,
Expand Down Expand Up @@ -607,50 +607,40 @@ object NullPropagation extends Rule[LogicalPlan] {
}

/**
* Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
* by inserting isNotNull filters in the query plan. These filters are currently inserted beneath
* existing Filters and Join operators and are inferred based on their data constraints.
* Generate a list of additional filters from an operator's existing constraint but remove those
* that are either already part of the operator's condition or are part of the operator's child
* constraints. These filters are currently inserted to the existing conditions in the Filter
* operators and on either side of Join operators.
*
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
// We generate a list of additional isNotNull filters from the operator's existing constraints
// but remove those that are either already part of the filter condition or are part of the
// operator's child constraints.
val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
val newFilters = filter.constraints --
(child.constraints ++ splitConjunctivePredicates(condition))
if (newIsNotNullConstraints.nonEmpty) {
Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
if (newFilters.nonEmpty) {
Filter(And(newFilters.reduce(And), condition), child)
} else {
filter
}

case join @ Join(left, right, joinType, condition) =>
val leftIsNotNullConstraints = join.constraints
.filter(_.isInstanceOf[IsNotNull])
.filter(_.references.subsetOf(left.outputSet)) -- left.constraints
val rightIsNotNullConstraints =
join.constraints
.filter(_.isInstanceOf[IsNotNull])
.filter(_.references.subsetOf(right.outputSet)) -- right.constraints
val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
Filter(leftIsNotNullConstraints.reduce(And), left)
} else {
left
}
val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
Filter(rightIsNotNullConstraints.reduce(And), right)
} else {
right
}
if (newLeftChild != left || newRightChild != right) {
Join(newLeftChild, newRightChild, joinType, condition)
} else {
join
case join @ Join(left, right, joinType, conditionOpt) =>
// Only consider constraints that can be pushed down completely to either the left or the
// right child
val constraints = join.constraints.filter { c =>
c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)}
// Remove those constraints that are already enforced by either the left or the right child
val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
val newConditionOpt = conditionOpt match {
case Some(condition) =>
val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None
case None =>
additionalConstraints.reduceOption(And)
}
if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,33 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

class NullFilteringSuite extends PlanTest {
class InferFiltersFromConstraintsSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("NullFiltering", Once, NullFiltering) ::
val batches = Batch("InferFilters", FixedPoint(5), InferFiltersFromConstraints) ::
Batch("PredicatePushdown", FixedPoint(5), PushPredicateThroughJoin) ::
Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

test("filter: filter out nulls in condition") {
val originalQuery = testRelation.where('a === 1).analyze
val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze
test("filter: filter out constraints in condition") {
val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
val correctAnswer = testRelation
.where(IsNotNull('a) && IsNotNull('b) && 'a === 'b && 'a === 1 && 'b === 1).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("single inner join: filter out nulls on either side on equi-join keys") {
test("single inner join: filter out values on either side on equi-join keys") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y,
condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
.analyze
val left = x.where(IsNotNull('a) && IsNotNull('b))
val right = y.where(IsNotNull('a) && IsNotNull('c))
val correctAnswer = left.join(right,
condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
condition = Some(("x.a".attr === "y.a".attr) && ("x.a".attr === 1) && ("y.c".attr > 5)))
.analyze
val left = x.where(IsNotNull('a) && "x.a".attr === 1)
val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5 && "y.a".attr === 1)
val correctAnswer = left.join(right, condition = Some("x.a".attr === "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
Expand All @@ -61,24 +61,22 @@ class NullFilteringSuite extends PlanTest {
val originalQuery = x.join(y,
condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
.analyze
val left = x.where(IsNotNull('a) && IsNotNull('b))
val right = y.where(IsNotNull('a) && IsNotNull('c))
val correctAnswer = left.join(right,
condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
.analyze
val left = x.where(IsNotNull('a) && IsNotNull('b) && "x.b".attr === 1)
val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5)
val correctAnswer = left.join(right, condition = Some("x.a".attr =!= "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("single inner join with pre-existing filters: filter out nulls on either side") {
test("single inner join with pre-existing filters: filter out values on either side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.where('b > 5).join(y.where('c === 10),
condition = Some("x.a".attr === "y.a".attr)).analyze
val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5)
val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10)
val originalQuery = x.where('b > 5).join(y.where('a === 10),
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
val left = x.where(IsNotNull('a) && 'a === 10 && IsNotNull('b) && 'b > 5)
val right = y.where(IsNotNull('a) && IsNotNull('b) && 'a === 10 && 'b > 5)
val correctAnswer = left.join(right,
condition = Some("x.a".attr === "y.a".attr)).analyze
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
Expand All @@ -92,20 +90,33 @@ class NullFilteringSuite extends PlanTest {
comparePlans(optimized, originalQuery)
}

test("multiple inner joins: filter out nulls on all sides on equi-join keys") {
test("multiple inner joins: filter out values on all sides on equi-join keys") {
val t1 = testRelation.subquery('t1)
val t2 = testRelation.subquery('t2)
val t3 = testRelation.subquery('t3)
val t4 = testRelation.subquery('t4)

val originalQuery = t1
val originalQuery = t1.where('b > 5)
.join(t2, condition = Some("t1.b".attr === "t2.b".attr))
.join(t3, condition = Some("t2.b".attr === "t3.b".attr))
.join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
val correctAnswer = t1.where(IsNotNull('b))
.join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr))
.join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr))
.join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze
val correctAnswer = t1.where(IsNotNull('b) && 'b > 5)
.join(t2.where(IsNotNull('b) && 'b > 5), condition = Some("t1.b".attr === "t2.b".attr))
.join(t3.where(IsNotNull('b) && 'b > 5), condition = Some("t2.b".attr === "t3.b".attr))
.join(t4.where(IsNotNull('b) && 'b > 5), condition = Some("t3.b".attr === "t4.b".attr))
.analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("inner join with filter: filter out values on all sides on equi-join keys") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery =
x.join(y, Inner, Some("x.a".attr === "y.a".attr)).where("x.a".attr > 5).analyze
val correctAnswer = x.where(IsNotNull('a) && 'a.attr > 5)
.join(y.where(IsNotNull('a) && 'a.attr > 5), Inner, Some("x.a".attr === "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
Expand Down