Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
NullFiltering rule in catalyst
  • Loading branch information
sameeragarwal committed Mar 2, 2016
commit cc4323f36b3a38f193f049e8949c034adbf69c52
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
NullFiltering,
NullPropagation,
OptimizeIn,
ConstantFolding,
Expand Down Expand Up @@ -585,6 +586,52 @@ object NullPropagation extends Rule[LogicalPlan] {
}
}

/**
* Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
* by inserting isNotNull filters is the query plan. These filters are currently inserted beneath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"in the query plan"

* existing Filters and Join operators and are inferred based on their data constraints.
*
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child: LogicalPlan) =>
// We generate a list of additional isNotNull filters from the operator's existing constraints
// but remove those that are either already part of the filter condition or are part of the
// operator's child constraints.
val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
(child.constraints ++ splitConjunctivePredicates(condition))
val newCondition = if (newIsNotNullConstraints.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove newConditino and just return filter if this doesn't do anything so we can reuse that filter subplan

And(newIsNotNullConstraints.reduce(And), condition)
} else {
condition
}
Filter(newCondition, child)

case join @ Join(left: LogicalPlan, right: LogicalPlan, joinType: JoinType,
condition: Option[Expression]) =>
val leftIsNotNullConstraints = join.constraints
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indenting

.filter(_.isInstanceOf[IsNotNull])
.filter(_.references.subsetOf(left.outputSet)) -- left.constraints
val rightIsNotNullConstraints =
join.constraints
.filter(_.isInstanceOf[IsNotNull])
.filter(_.references.subsetOf(right.outputSet)) -- right.constraints
val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
Filter(leftIsNotNullConstraints.reduce(And), left)
} else {
left
}
val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
Filter(rightIsNotNullConstraints.reduce(And), right)
} else {
right
}
Join(newLeftChild, newRightChild, joinType, condition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, would be nice to reuse join if it is not changed

}
}

/**
* Replaces [[Expression Expressions]] that can be statically evaluated with
* equivalent [[Literal]] values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,9 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
testRelation.where('a > 2 || ('b > 3 && 'b < 5)))
comparePlans(actual, expected)
}

test("evaluate isNotNull expressions before others") {
checkCondition(input = 'a > 0 && IsNotNull('b) && 'c < 3,
expected = IsNotNull('b) && 'a > 0 && 'c < 3)
}
}
Loading