Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
bde74f8
Merge remote-tracking branch 'upstream/master' into OuterJoinEliminat…
gatorsmile Jan 3, 2016
e18ba75
Merge remote-tracking branch 'upstream/master' into OuterJoinEliminat…
gatorsmile Jan 4, 2016
d6a6e9c
outer join elimination by parent join.
gatorsmile Jan 4, 2016
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
fab24cf
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 1, 2016
8b2e33b
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 5, 2016
2ee1876
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 11, 2016
b9f0090
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 12, 2016
ade6f7e
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 15, 2016
9fd63d2
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 19, 2016
5199d49
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 22, 2016
404214c
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 23, 2016
c001dd9
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 25, 2016
4c657fd
integrate it into the existing outer-join elimination.
gatorsmile Feb 25, 2016
1a9ebdf
integrate it into the existing outer-join elimination.
gatorsmile Feb 25, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
outer join elimination by parent join.
  • Loading branch information
gatorsmile committed Jan 4, 2016
commit d6a6e9cc31b0f7547b35cf25884135ea65b03676
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueri
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._
Expand All @@ -44,6 +44,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
// Operator push down
SetOperationPushDown,
SamplePushDown,
OuterJoinElimination,
ReorderJoin,
PushPredicateThroughJoin,
PushPredicateThroughProject,
Expand Down Expand Up @@ -768,6 +769,107 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}

/**
* Elimination of Outer Join by Parent Join Condition
*
* Given an outer join is involved in another join (called parent join), when the join type of the
* parent join is inner, left-semi, left-outer and right-outer, checking if the join condition of
* the parent join satisfies the following two conditions:
*
* 1) there exist null filtering predicates against the columns in the null-supplying side of
* parent join.
* 2) these columns are from the child join.
*
* If having such join predicates, execute the elimination rules:
* - full outer -> inner if both sides of the child join have such predicates
* - left outer -> inner if the right side of the child join has such predicates
* - right outer -> inner if the left side of the child join has such predicates
* - full outer -> left outer if only the left side of the child join has such predicates
* - full outer -> right outer if only the right side of the child join has such predicates
*
*/
object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {

private def containsAttr(plan: LogicalPlan, attr: Attribute): Boolean =
plan.outputSet.exists(_.semanticEquals(attr))

private def hasNullFilteringPredicate(predicate: Expression, plan: LogicalPlan): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I commented on the other PR, I think we should have a more general way to infer null propagation / filtering. Maybe you can discuss with @sameeragarwal and then update these PRs after his machinery is available.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do. Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile similar to #10566, I think we should now be just able to apply this optimization rule more generally along the lines of:

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
      Filter(condition, buildNewJoin(f, j))

    // Case 1: when parent join is Inner|LeftSemi|LeftOuter and the child join is on the right side
    case pj @ Join(pLeft, j @ Join(left, right, RightOuter|LeftOuter|FullOuter, condition), Inner|LeftSemi|LeftOuter, Some(pJoinCond)) =>
      Join(pLeft, buildNewJoin(pj, j), pj.joinType, Some(pJoinCond))

    // Case 2: when parent join is Inner|LeftSemi|RightOuter and the child join is on the left side
    case pj @ Join(j @ Join(left, right, RightOuter|LeftOuter|FullOuter, condition), pRight, Inner|LeftSemi|RightOuter, Some(pJoinCond)) =>
      Join(buildNewJoin(pj, j), pRight, pj.joinType, Some(pJoinCond))
  }

Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do the changes. Thank you!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sameeragarwal Unfortunately, they are unable to share the same buildNewJoin function.

For example, if the parent join is full outer, the parent join will not have any IsNotNull constraint. In the current constraint propagation, its constraints is Set.empty[Expression]. However, the join condition of this parent join still can be used for outer join elimination of the child join.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do the outer join elimination by Filter at first. That one can directly use the existing infrastructure of constraint propagation. #10567 Thanks!

predicate match {
case EqualTo(ar: AttributeReference, _) if containsAttr(plan, ar) => true
case EqualTo(_, ar: AttributeReference) if containsAttr(plan, ar) => true
case EqualNullSafe(ar: AttributeReference, l)
if !l.nullable && containsAttr(plan, ar) => true
case EqualNullSafe(l, ar: AttributeReference)
if !l.nullable && containsAttr(plan, ar) => true
case GreaterThan(ar: AttributeReference, _) if containsAttr(plan, ar) => true
case GreaterThan(_, ar: AttributeReference) if containsAttr(plan, ar) => true
case GreaterThanOrEqual(ar: AttributeReference, _) if containsAttr(plan, ar) => true
case GreaterThanOrEqual(_, ar: AttributeReference) if containsAttr(plan, ar) => true
case LessThan(ar: AttributeReference, _) if containsAttr(plan, ar) => true
case LessThan(_, ar: AttributeReference) if containsAttr(plan, ar) => true
case LessThanOrEqual(ar: AttributeReference, _) if containsAttr(plan, ar) => true
case LessThanOrEqual(_, ar: AttributeReference) if containsAttr(plan, ar) => true
case In(ar: AttributeReference, _) if containsAttr(plan, ar) => true
case IsNotNull(ar: AttributeReference) if containsAttr(plan, ar) => true
case And(l, r) => hasNullFilteringPredicate(l, plan) || hasNullFilteringPredicate(r, plan)
case Or(l, r) => hasNullFilteringPredicate(l, plan) && hasNullFilteringPredicate(r, plan)
case Not(e) => !hasNullFilteringPredicate(e, plan)
case _ => false
}
}

private def buildNewJoin(
otherCondition: Expression,
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
condition: Option[Expression]): Join = {
val leftHasNonNullPredicate = hasNullFilteringPredicate(otherCondition, left)
val rightHasNonNullPredicate = hasNullFilteringPredicate(otherCondition, right)

joinType match {
case RightOuter if leftHasNonNullPredicate =>
Join(left, right, Inner, condition)
case LeftOuter if rightHasNonNullPredicate =>
Join(left, right, Inner, condition)
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate =>
Join(left, right, Inner, condition)
case FullOuter if leftHasNonNullPredicate =>
Join(left, right, LeftOuter, condition)
case FullOuter if rightHasNonNullPredicate =>
Join(left, right, RightOuter, condition)
case _ => Join(left, right, joinType, condition)
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {

// Case 1: when parent join is Inner|LeftSemi|LeftOuter and the child join is on the right side
case pj @ Join(
pLeft,
j @ Join(left, right, RightOuter|LeftOuter|FullOuter, condition),
Inner|LeftSemi|LeftOuter,
Some(pJoinCond)) =>
Join(
pLeft,
buildNewJoin(pJoinCond, left, right, j.joinType, condition),
pj.joinType,
Some(pJoinCond))

// Case 2: when parent join is Inner|LeftSemi|RightOuter and the child join is on the left side
case pj @ Join(
j @ Join(left, right, RightOuter|LeftOuter|FullOuter, condition),
pRight,
Inner|LeftSemi|RightOuter,
Some(pJoinCond)) =>
Join(
buildNewJoin(pJoinCond, left, right, j.joinType, condition),
pRight,
pj.joinType,
Some(pJoinCond))
}
}

/**
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
Expand Down
155 changes: 155 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Project, Join}
import org.apache.spark.sql.execution.joins.BroadcastHashJoin
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -140,4 +142,157 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
assert(df1.join(broadcast(pf1)).count() === 4)
}
}

test("join - left outer to inner by the parent join's join condition") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c")

// Left -> Inner
val right = df.join(df2, $"a.int" === $"b.int", "left")
val left2Inner =
df3.join(right, $"c.int" === $"b.int", "inner").select($"a.*", $"b.*", $"c.*")

left2Inner.explain(true)

// The order before conversion: Left Then Inner
assert(left2Inner.queryExecution.analyzed.collect {
case j@Join(_, Join(_, _, LeftOuter, _), Inner, _) => j
}.size === 1)

// The order after conversion: Inner Then Inner
assert(left2Inner.queryExecution.optimizedPlan.collect {
case j@Join(_, Join(_, _, Inner, _), Inner, _) => j
}.size === 1)

checkAnswer(
left2Inner,
Row(1, 2, "1", 1, 3, "1", 1, 3, "1") :: Nil)
}

test("join - right outer to inner by the parent join's join condition") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 9, "8"), (5, 0, "4")).toDF("int", "int2", "str").as("c")

// Right Then Inner -> Inner Then Right
val right2Inner = df.join(df2, $"a.int" === $"b.int", "right")
.join(df3, $"a.int" === $"b.int", "inner").select($"a.*", $"b.*", $"c.*")

// The order before conversion: Left Then Inner
assert(right2Inner.queryExecution.analyzed.collect {
case j@Join(Join(_, _, RightOuter, _), _, Inner, _) => j
}.size === 1)

// The order after conversion: Inner Then Inner
assert(right2Inner.queryExecution.optimizedPlan.collect {
case j@Join(Join(_, _, Inner, _), _, Inner, _) => j
}.size === 1)

checkAnswer(
right2Inner,
Row(1, 2, "1", 1, 3, "1", 1, 9, "8") ::
Row(1, 2, "1", 1, 3, "1", 5, 0, "4") :: Nil)
}

test("join - full outer to inner by the parent join's join condition") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c")

// Full -> Inner
val right = df.join(df2, $"a.int" === $"b.int", "full")
val full2Inner = df3.join(right, $"c.int" === $"a.int" && $"b.int" === 1, "inner")
.select($"a.*", $"b.*", $"c.*")

// The order before conversion: Left Then Inner
assert(full2Inner.queryExecution.analyzed.collect {
case j@Join(_, Join(_, _, FullOuter, _), Inner, _) => j
}.size === 1)

// The order after conversion: Inner Then Inner
assert(full2Inner.queryExecution.optimizedPlan.collect {
case j@Join(_, Join(_, _, Inner, _), Inner, _) => j
}.size === 1)

checkAnswer(
full2Inner,
Row(1, 2, "1", 1, 2, "1", 1, 3, "1") :: Nil)
}

test("join - full outer to right by the parent join's join condition") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c")

// Full -> Right
val right = df.join(df2, $"a.int" === $"b.int", "full")
val full2Right = df3.join(right, $"b.int" === 1, "leftsemi")

// The order before conversion: Left Then Inner
assert(full2Right.queryExecution.analyzed.collect {
case j@Join(_, Join(_, _, FullOuter, _), LeftSemi, _) => j
}.size === 1)

// The order after conversion: Inner Then Inner
assert(full2Right.queryExecution.optimizedPlan.collect {
case j@Join(_, Project(_, Join(_, _, RightOuter, _)), LeftSemi, _) => j
}.size === 1)

checkAnswer(
full2Right,
Row(1, 3, "1") :: Row(3, 6, "5") :: Nil)
}


test("join - full outer to left by the parent join's join condition #1") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (4, 6, "5")).toDF("int", "int2", "str").as("c")

// Full -> Left
val right = df.join(df2, $"a.int" === $"b.int", "full")
val full2Left = df3.join(right, $"c.int" === $"a.int", "left")
.select($"a.*", $"b.*", $"c.*")

// The order before conversion: Full Then Left
assert(full2Left.queryExecution.analyzed.collect {
case j@Join(_, Join(_, _, FullOuter, _), LeftOuter, _) => j
}.size === 1)

// The order after conversion: Left Then Left
assert(full2Left.queryExecution.optimizedPlan.collect {
case j@Join(_, Join(_, _, LeftOuter, _), LeftOuter, _) => j
}.size === 1)

checkAnswer(
full2Left,
Row(1, 2, "1", 1, 2, "1", 1, 3, "1") ::
Row(null, null, null, null, null, null, 4, 6, "5") :: Nil)
}

test("join - full outer to left by the parent join's join condition #2") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (4, 6, "5")).toDF("int", "int2", "str").as("c")

// Full -> Left
val full2Left = df.join(df2, $"a.int" === $"b.int", "full")
.join(df3, $"c.int" === $"a.int", "right").select($"a.*", $"b.*", $"c.*")

// The order before conversion: Full Then Right
assert(full2Left.queryExecution.analyzed.collect {
case j@Join(Join(_, _, FullOuter, _), _, RightOuter, _) => j
}.size === 1)

// The order after conversion: Left Then Right
assert(full2Left.queryExecution.optimizedPlan.collect {
case j@Join(Join(_, _, LeftOuter, _), _, RightOuter, _) => j
}.size === 1)

checkAnswer(
full2Left,
Row(1, 2, "1", 1, 2, "1", 1, 3, "1") ::
Row(null, null, null, null, null, null, 4, 6, "5") :: Nil)
}
}