Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add InferFiltersFromConstraints rule
  • Loading branch information
sameeragarwal committed Mar 16, 2016
commit 382903b56d24213345bd0567134b01ae189dbb38
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
NullFiltering,
InferFiltersFromConstraints,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule doesn't seem related to the comment above it (same for nullfiltering)

NullPropagation,
OptimizeIn,
ConstantFolding,
Expand Down Expand Up @@ -607,50 +607,44 @@ object NullPropagation extends Rule[LogicalPlan] {
}

/**
* Attempts to eliminate reading (unnecessary) NULL values if they are not required for correctness
* by inserting isNotNull filters in the query plan. These filters are currently inserted beneath
* existing Filters and Join operators and are inferred based on their data constraints.
* Eliminate reading unnecessary values if they are not required for correctness (and can help in
* optimizing the query) by inserting relevant filters in the query plan based on an operator's
* data constraints. These filters are currently inserted to the existing conditions in the Filter
* operators and on either side of Join operators.
*
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
* LeftSemi joins.
*/
object NullFiltering extends Rule[LogicalPlan] with PredicateHelper {
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
// We generate a list of additional filters from the operator's existing constraint but remove
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems like a clearer version of the first sentence in the object comment. I'd remove this here and replace the first sentence of the above comment.

// those that are either already part of the operator's condition or are part of the operator's
// child constraints.
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
// We generate a list of additional isNotNull filters from the operator's existing constraints
// but remove those that are either already part of the filter condition or are part of the
// operator's child constraints.
val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
// For the Filter operator, we try to generate additional filters by only inferring the
// IsNotNull constraints. These IsNotNull filters are then used while generating the
// physical plan to quickly short circuit the null checks in the generated code.
val newFilters = filter.constraints.filter(_.isInstanceOf[IsNotNull]) --
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to infer the Filter conditions too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually intentional as I couldn't think of an optimization that'd benefit from inferring other filter conditions. Do you have something on your mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rules PushPredicateThroughJoin and OuterJoinElimination could utilize the inferred conditions in Filter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please give an example where non-IsNotNull filter conditions might be useful? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, given case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) When the join type is left/right outer joins, the common filter conditions are unable to push down into Join condition.

In these cases, we can infer some left/right filter condition by using the common filter conditions, we can push down more conditions into Join.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, slightly unrelated to this discussion but Yin, Davies and I had an offline discussion earlier today and we are considering generating the isNotNull filters in the physical plan to prevent some TPCDS regressions due to filter pushdown.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for letting me know it. Will you do it? Or you want me to do it? Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, please see the last commit 4b0cf5f581901919a5513554b2982ef327a4d87a

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have the physical plan changes in a separate PR (Need to look into TPCDS q56 and q59)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Will expect to see your PR. Thanks!

(child.constraints ++ splitConjunctivePredicates(condition))
if (newIsNotNullConstraints.nonEmpty) {
Filter(And(newIsNotNullConstraints.reduce(And), condition), child)
if (newFilters.nonEmpty) {
Filter(And(newFilters.reduce(And), condition), child)
} else {
filter
}

case join @ Join(left, right, joinType, condition) =>
val leftIsNotNullConstraints = join.constraints
.filter(_.isInstanceOf[IsNotNull])
.filter(_.references.subsetOf(left.outputSet)) -- left.constraints
val rightIsNotNullConstraints =
join.constraints
.filter(_.isInstanceOf[IsNotNull])
.filter(_.references.subsetOf(right.outputSet)) -- right.constraints
val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) {
Filter(leftIsNotNullConstraints.reduce(And), left)
} else {
left
}
val newRightChild = if (rightIsNotNullConstraints.nonEmpty) {
Filter(rightIsNotNullConstraints.reduce(And), right)
} else {
right
}
if (newLeftChild != left || newRightChild != right) {
Join(newLeftChild, newRightChild, joinType, condition)
} else {
join
case join@Join(left, right, joinType, conditionOpt) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: join @ Join

val additionalConstraints = join.constraints.filter { c =>
// Only consider constraints that can be pushed down to either the left or the right child
c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)} --
(left.constraints ++ right.constraints)
val newConditionOpt = conditionOpt match {
case Some(condition) =>
val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Option -> Some

case None =>
if (additionalConstraints.nonEmpty) Option(additionalConstraints.reduce(And)) else None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Option -> Some

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer Option(x) over Some(x) as the former handles null values better (please see https://github.com/databricks/scala-style-guide#options). Here is how scala handles Option(x):

  /** An Option factory which creates Some(x) if the argument is not null,
   *  and None if it is null.
   *
   *  @param  x the value
   *  @return   Some(value) if value != null, None if value == null
   */
  def apply[A](x: A): Option[A] = if (x == null) None else Some(x)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, I see. In the code base, most of codes are using Some

}
if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,33 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

class NullFilteringSuite extends PlanTest {
class InferFiltersFromConstraintsSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("NullFiltering", Once, NullFiltering) ::
val batches = Batch("InferFilters", FixedPoint(5), InferFiltersFromConstraints) ::
Batch("PredicatePushdown", FixedPoint(5), PushPredicateThroughJoin) ::
Batch("CombineFilters", FixedPoint(5), CombineFilters) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

test("filter: filter out nulls in condition") {
val originalQuery = testRelation.where('a === 1).analyze
val correctAnswer = testRelation.where(IsNotNull('a) && 'a === 1).analyze
val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
val correctAnswer =
testRelation.where(IsNotNull('a) && IsNotNull('b) && 'a === 'b && 'a === 1).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("single inner join: filter out nulls on either side on equi-join keys") {
test("single inner join: filter out values on either side on equi-join keys") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.join(y,
condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
.analyze
val left = x.where(IsNotNull('a) && IsNotNull('b))
val right = y.where(IsNotNull('a) && IsNotNull('c))
val correctAnswer = left.join(right,
condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
condition = Some(("x.a".attr === "y.a".attr) && ("x.a".attr === 1) && ("y.c".attr > 5)))
.analyze
val left = x.where(IsNotNull('a) && "x.a".attr === 1)
val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5 && "y.a".attr === 1)
val correctAnswer = left.join(right, condition = Some("x.a".attr === "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
Expand All @@ -61,24 +61,22 @@ class NullFilteringSuite extends PlanTest {
val originalQuery = x.join(y,
condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
.analyze
val left = x.where(IsNotNull('a) && IsNotNull('b))
val right = y.where(IsNotNull('a) && IsNotNull('c))
val correctAnswer = left.join(right,
condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
.analyze
val left = x.where(IsNotNull('a) && IsNotNull('b) && "x.b".attr === 1)
val right = y.where(IsNotNull('a) && IsNotNull('c) && "y.c".attr > 5)
val correctAnswer = left.join(right, condition = Some("x.a".attr =!= "y.a".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}

test("single inner join with pre-existing filters: filter out nulls on either side") {
test("single inner join with pre-existing filters: filter out values on either side") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery = x.where('b > 5).join(y.where('c === 10),
condition = Some("x.a".attr === "y.a".attr)).analyze
val left = x.where(IsNotNull('a) && IsNotNull('b) && 'b > 5)
val right = y.where(IsNotNull('a) && IsNotNull('c) && 'c === 10)
val originalQuery = x.where('b > 5).join(y.where('a === 10),
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
val left = x.where(IsNotNull('a) && 'a === 10 && IsNotNull('b) && 'b > 5)
val right = y.where(IsNotNull('a) && IsNotNull('b) && 'a === 10 && 'b > 5)
val correctAnswer = left.join(right,
condition = Some("x.a".attr === "y.a".attr)).analyze
condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
Expand All @@ -92,20 +90,21 @@ class NullFilteringSuite extends PlanTest {
comparePlans(optimized, originalQuery)
}

test("multiple inner joins: filter out nulls on all sides on equi-join keys") {
test("multiple inner joins: filter out values on all sides on equi-join keys") {
val t1 = testRelation.subquery('t1)
val t2 = testRelation.subquery('t2)
val t3 = testRelation.subquery('t3)
val t4 = testRelation.subquery('t4)

val originalQuery = t1
val originalQuery = t1.where('b > 5)
.join(t2, condition = Some("t1.b".attr === "t2.b".attr))
.join(t3, condition = Some("t2.b".attr === "t3.b".attr))
.join(t4, condition = Some("t3.b".attr === "t4.b".attr)).analyze
val correctAnswer = t1.where(IsNotNull('b))
.join(t2.where(IsNotNull('b)), condition = Some("t1.b".attr === "t2.b".attr))
.join(t3.where(IsNotNull('b)), condition = Some("t2.b".attr === "t3.b".attr))
.join(t4.where(IsNotNull('b)), condition = Some("t3.b".attr === "t4.b".attr)).analyze
val correctAnswer = t1.where(IsNotNull('b) && 'b > 5)
.join(t2.where(IsNotNull('b) && 'b > 5), condition = Some("t1.b".attr === "t2.b".attr))
.join(t3.where(IsNotNull('b) && 'b > 5), condition = Some("t2.b".attr === "t3.b".attr))
.join(t4.where(IsNotNull('b) && 'b > 5), condition = Some("t3.b".attr === "t4.b".attr))
.analyze
val optimized = Optimize.execute(originalQuery)
comparePlans(optimized, correctAnswer)
}
Expand Down