From 7b314e1d68960eb47921ad269f47b32b6e1b0fe5 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 10 Mar 2016 22:31:22 -0800 Subject: [PATCH 1/4] no nulll filtering is added for compound expressions. --- .../sql/catalyst/optimizer/Optimizer.scala | 21 ++++++++++++------- .../optimizer/NullFilteringSuite.scala | 15 +++++++++++++ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 650b4eef6e44..d4a235bef9ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -606,11 +606,12 @@ object NullPropagation extends Rule[LogicalPlan] { object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter(condition, child) => - // We generate a list of additional isNotNull filters from the operator's existing constraints - // but remove those that are either already part of the filter condition or are part of the - // operator's child constraints. - val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) -- - (child.constraints ++ splitConjunctivePredicates(condition)) + // We generate a list of additional isNotNull filters from the operator's existing + // non-compound constraints but remove those that are either already part of the filter + // condition or are part of the operator's child constraints. + val newIsNotNullConstraints = + filter.constraints.filter(isNullFilteringForNonCompoundExpr) -- + (child.constraints ++ splitConjunctivePredicates(condition)) if (newIsNotNullConstraints.nonEmpty) { Filter(And(newIsNotNullConstraints.reduce(And), condition), child) } else { @@ -619,11 +620,11 @@ object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { case join @ Join(left, right, joinType, condition) => val leftIsNotNullConstraints = join.constraints - .filter(_.isInstanceOf[IsNotNull]) + .filter(isNullFilteringForNonCompoundExpr) .filter(_.references.subsetOf(left.outputSet)) -- left.constraints val rightIsNotNullConstraints = join.constraints - .filter(_.isInstanceOf[IsNotNull]) + .filter(isNullFilteringForNonCompoundExpr) .filter(_.references.subsetOf(right.outputSet)) -- right.constraints val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) { Filter(leftIsNotNullConstraints.reduce(And), left) @@ -641,6 +642,12 @@ object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { join } } + private def isNullFilteringForNonCompoundExpr(exp: Expression): Boolean = { + exp match { + case c: IsNotNull if c.child.isInstanceOf[AttributeReference] => true + case _ => false + } + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala index 142e4ae6e439..fc058ccac4d4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala @@ -40,6 +40,12 @@ class NullFilteringSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("filter: do not push Null-filtering of compound expressions") { + val originalQuery = testRelation.where('a + 'b === 1).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, originalQuery) + } + test("single inner join: filter out nulls on either side on equi-join keys") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) @@ -83,6 +89,15 @@ class NullFilteringSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("single inner join: no null filters are generated for compound expression") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, + condition = Some("x.a".attr * 2 === "y.a".attr - 4)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, originalQuery) + } + test("single outer join: no null filters are generated") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) From 35a69cda40a02a0c046ce335e96010684711bb35 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 11 Mar 2016 14:21:43 -0800 Subject: [PATCH 2/4] removed isnotnull of compound expressions from constraints. added a support for operator Not for all the binary operators. --- .../sql/catalyst/optimizer/Optimizer.scala | 22 ++++++---------- .../spark/sql/catalyst/plans/QueryPlan.scala | 26 +++++++------------ .../plans/ConstraintPropagationSuite.scala | 15 +++++++++++ 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6e91d7860859..2d89ad0e63a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -608,12 +608,12 @@ object NullPropagation extends Rule[LogicalPlan] { object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter(condition, child) => - // We generate a list of additional isNotNull filters from the operator's existing - // non-compound constraints but remove those that are either already part of the filter - // condition or are part of the operator's child constraints. - val newIsNotNullConstraints = - filter.constraints.filter(isNullFilteringForNonCompoundExpr) -- - (child.constraints ++ splitConjunctivePredicates(condition)) + // We generate a list of additional isNotNull filters from the operator's existing constraints + // but remove those that are either already part of the filter condition or are part of the + // operator's child constraints. + val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) -- + (child.constraints ++ splitConjunctivePredicates(condition)) + if (newIsNotNullConstraints.nonEmpty) { Filter(And(newIsNotNullConstraints.reduce(And), condition), child) } else { @@ -622,11 +622,11 @@ object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { case join @ Join(left, right, joinType, condition) => val leftIsNotNullConstraints = join.constraints - .filter(isNullFilteringForNonCompoundExpr) + .filter(_.isInstanceOf[IsNotNull]) .filter(_.references.subsetOf(left.outputSet)) -- left.constraints val rightIsNotNullConstraints = join.constraints - .filter(isNullFilteringForNonCompoundExpr) + .filter(_.isInstanceOf[IsNotNull]) .filter(_.references.subsetOf(right.outputSet)) -- right.constraints val newLeftChild = if (leftIsNotNullConstraints.nonEmpty) { Filter(leftIsNotNullConstraints.reduce(And), left) @@ -644,12 +644,6 @@ object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { join } } - private def isNullFilteringForNonCompoundExpr(exp: Expression): Boolean = { - exp match { - case c: IsNotNull if c.child.isInstanceOf[AttributeReference] => true - case _ => false - } - } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index c222571a3464..f8921e409360 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -46,22 +46,16 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression] = { // Currently we only propagate constraints if the condition consists of equality // and ranges. For all other cases, we return an empty set of constraints - constraints.map { - case EqualTo(l, r) => - Set(IsNotNull(l), IsNotNull(r)) - case GreaterThan(l, r) => - Set(IsNotNull(l), IsNotNull(r)) - case GreaterThanOrEqual(l, r) => - Set(IsNotNull(l), IsNotNull(r)) - case LessThan(l, r) => - Set(IsNotNull(l), IsNotNull(r)) - case LessThanOrEqual(l, r) => - Set(IsNotNull(l), IsNotNull(r)) - case Not(EqualTo(l, r)) => - Set(IsNotNull(l), IsNotNull(r)) - case _ => - Set.empty[Expression] - }.foldLeft(Set.empty[Expression])(_ union _.toSet) + var isNotNullConstraints = Set.empty[Expression] + constraints.collect { + case b @ BinaryComparison(l, r) if !b.isInstanceOf[EqualNullSafe] => + if (l.isInstanceOf[AttributeReference]) isNotNullConstraints += IsNotNull(l) + if (r.isInstanceOf[AttributeReference]) isNotNullConstraints += IsNotNull(r) + case Not(b @ BinaryComparison(l, r)) if !b.isInstanceOf[EqualNullSafe] => + if (l.isInstanceOf[AttributeReference]) isNotNullConstraints += IsNotNull(l) + if (r.isInstanceOf[AttributeReference]) isNotNullConstraints += IsNotNull(r) + } + isNotNullConstraints } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index a9375a740daa..1b43be5734e2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -217,4 +217,19 @@ class ConstraintPropagationSuite extends SparkFunSuite { IsNotNull(resolveColumn(tr, "a")), IsNotNull(resolveColumn(tr, "b"))))) } + + test("IsNotNull constraints of compound expressions in filters") { + val tr = LocalRelation('a.int, 'b.string, 'c.int) + verifyConstraints(tr + .where('a.attr + 'c.attr > 10).analyze.constraints, + ExpressionSet(Seq(resolveColumn(tr, "a") + resolveColumn(tr, "c") > 10))) + } + + test("IsNotNull constraints of BinaryComparison in Not in filters") { + val tr = LocalRelation('a.int, 'b.string, 'c.int) + verifyConstraints(tr + .where(!('a.attr < 10)).analyze.constraints, + ExpressionSet(Seq(IsNotNull(resolveColumn(tr, "a")), + Not(resolveColumn(tr, "a") < 10)))) + } } From baa2cda39134c68316491d22ee3a0c5bff26f52c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 11 Mar 2016 14:24:04 -0800 Subject: [PATCH 3/4] remove the space. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2d89ad0e63a4..85776670e5c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -613,7 +613,6 @@ object NullFiltering extends Rule[LogicalPlan] with PredicateHelper { // operator's child constraints. val newIsNotNullConstraints = filter.constraints.filter(_.isInstanceOf[IsNotNull]) -- (child.constraints ++ splitConjunctivePredicates(condition)) - if (newIsNotNullConstraints.nonEmpty) { Filter(And(newIsNotNullConstraints.reduce(And), condition), child) } else { From 55155a76f9e7e01d85819352b3e0852d7b6f6a20 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 11 Mar 2016 14:56:45 -0800 Subject: [PATCH 4/4] added a comment. --- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index f8921e409360..e7ca11963de8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -46,6 +46,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression] = { // Currently we only propagate constraints if the condition consists of equality // and ranges. For all other cases, we return an empty set of constraints + // Note: Almost all the subclasses of BinaryComparison (EqualTo, LessThan, LessThanOrEqual, + // GreaterThan and GreaterThanOrEqual) are NULL intolerant. The only exception is EqualNullSafe var isNotNullConstraints = Set.empty[Expression] constraints.collect { case b @ BinaryComparison(l, r) if !b.isInstanceOf[EqualNullSafe] =>