Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
PushPredicateThroughJoin,
PushDownPredicate,
PushDownLeftSemiAntiJoin,
PushLeftSemiLeftAntiThroughJoin,
LimitPushDown,
ColumnPruning,
InferFiltersFromConstraints,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,106 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}
}

/**
* This rule is a variant of [[PushPredicateThroughJoin]] which can handle
* pushing down Left semi and Left Anti joins below a join operator. The
* allowable join types are:
* 1) Inner
* 2) Cross
* 3) LeftOuter
* 4) RightOuter
*/
object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Define an enumeration to identify whether a LeftSemi/LeftAnti join can be pushed down to
* the left leg or the right leg of the join.
*/
object pushdownDirection extends Enumeration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we upper case the first letter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Sure.

val toRightBranch, toLeftBranch, none = Value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upper-case the first letter for them too. I think this is the policy when defining enums.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, the policy is upper-case all the letters, e.g.

object PushdownDirection extends Enumeration {
  val TO_RIGHT_BRANCH, ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Will change.

}

/**
* LeftSemi/LeftAnti joins are pushed down when its left child is a join operator
* with a join type that is in the AllowedJoinTypes.
*/
object AllowedJoinTypes {
def unapply(joinType: JoinType): Option[JoinType] = joinType match {
case Inner | Cross | LeftOuter | RightOuter => Some(joinType)
case _ => None
}
}

/**
* Determine which side of the join a LeftSemi/LeftAnti join can be pushed to.
*/
private def pushTo(leftChild: Join, rightChild: LogicalPlan, joinCond: Option[Expression]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we can only push down left semi/anti join to either left or right side, there is no need to return the new join condition in this method, because the join condition won't change.

val left = leftChild.left
val right = leftChild.right
val joinType = leftChild.joinType
val rightOutput = rightChild.outputSet

if (joinCond.nonEmpty) {
val noPushdown = (pushdownDirection.none, None)
val conditions = splitConjunctivePredicates(joinCond.get)
val (leftConditions, rest) =
conditions.partition(_.references.subsetOf(left.outputSet ++ rightOutput))
val (rightConditions, commonConditions) =
rest.partition(_.references.subsetOf(right.outputSet ++ rightOutput))

if (rest.isEmpty && leftConditions.nonEmpty) {
// When the join conditions can be computed based on the left leg of
// leftsemi/anti join then push the leftsemi/anti join to the left side.
(pushdownDirection.toLeftBranch, leftConditions.reduceLeftOption(And))
} else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if commonConditions is not empty? can we add a filter at top?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Please refer to my answer above.

// When the join conditions can be computed based on the attributes from right leg of
// leftsemi/anti join then push the leftsemi/anti join to the right side.
(pushdownDirection.toRightBranch, rightConditions.reduceLeftOption(And))
} else {
noPushdown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PushPredicateThroughJoin may pushdown to both sides, do we have such a case here?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Apr 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan To the best of my knowledge, we don't have this case. I actually tried to get a subquery to push down to both legs of the join but couldn't. Normal filter conditions can trigger pushing down to both legs currently though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a TODO and say we will revisit it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Sure. I will add a TODO

}
} else {
/**
* When the join condition is empty,
* 1) if this is a left outer join or inner join, push leftsemi/anti join down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we push to both legs if it's inner join?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Perhaps its possible. In this PR, i was focusing on what is happening today in PushPredicateThroughJoin and keep the behaviour same. We can look into improving both this rule and PushPredicateThroughJoin as follow-up. The reason i say it is, probably we need to test more and prove that pushdown to both sides don't create any side effects or can cause wrong results ?

* to the left leg of join.
* 2) if a right outer join, to the right leg of join,
*/
val action = joinType match {
case RightOuter =>
pushdownDirection.toRightBranch
case _: InnerLike | LeftOuter =>
pushdownDirection.toLeftBranch
case _ =>
pushdownDirection.none
}
(action, None)
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// push LeftSemi/LeftAnti down into the join below
case j @ Join(left @ Join(gLeft, gRight, AllowedJoinTypes(_), belowJoinCond, childHint),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is hard to read when 2 joins are extracted together. How about

case j: Join(AllowedJoin(left), right, ...) =>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Good idea wenchen.

right, LeftSemiOrAnti(joinType), joinCond, parentHint) =>
val belowJoinType = left.joinType
val (action, newJoinCond) = pushTo(left, right, joinCond)

action match {
case pushdownDirection.toLeftBranch
if (belowJoinType == LeftOuter || belowJoinType.isInstanceOf[InnerLike]) =>
// push down leftsemi/anti join to the left table
val newLeft = Join(gLeft, right, joinType, newJoinCond, parentHint)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I have a question here. Will it be safe to propagate the hints here ? I am inclined to only do this optimization if no join hints are specified in either parent and child joins. Currently i am propagating them as is but thinking of changing it. Wanted to check your opinion before i made the change.

Join(newLeft, gRight, belowJoinType, belowJoinCond, childHint)
case pushdownDirection.toRightBranch
if (belowJoinType == RightOuter || belowJoinType.isInstanceOf[InnerLike]) =>
// push down leftsemi/anti join to the right table
val newRight = Join(gRight, right, joinType, newJoinCond, parentHint)
Join(gLeft, newRight, belowJoinType, belowJoinCond, childHint)
case _ =>
// Do nothing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when can this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan When we decide that we can't pushdown the parent join. For example this test should exercise the default case.

j
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ class LeftSemiPushdownSuite extends PlanTest {
CombineFilters,
PushDownPredicate,
PushDownLeftSemiAntiJoin,
PushLeftSemiLeftAntiThroughJoin,
BooleanSimplification,
CollapseProject) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)

val testRelation1 = LocalRelation('d.int)
val testRelation2 = LocalRelation('e.int)

test("Project: LeftSemiAnti join pushdown") {
val originalQuery = testRelation
Expand Down Expand Up @@ -314,4 +315,155 @@ class LeftSemiPushdownSuite extends PlanTest {
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}

Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, LeftOuter, Cross).foreach { case innerJT =>
test(s"$outerJT pushdown with empty join condition join type $innerJT") {
val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the inner join can have a join condition, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it supports, we should add Seq(Some('d === 'e), None).foreach at the begining too

val originalQuery = joinedRelation.join(testRelation, joinType = outerJT, None)
val optimized = Optimize.execute(originalQuery.analyze)

val pushedDownJoin = testRelation1.join(testRelation, joinType = outerJT, None)
val correctAnswer = pushedDownJoin.join(testRelation2, joinType = innerJT, None)
comparePlans(optimized, correctAnswer)
}
}
}

Seq(LeftSemi, LeftAnti).foreach { case jt =>
test(s"$jt pushdown with empty join condition join type RightOuter") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we merge the test case with above? something like

val correctAnswer = if (innerJT = RightOuter) {
  ...
} else {
  ...
}

val joinedRelation = testRelation1.join(testRelation2, joinType = RightOuter, None)
val originalQuery = joinedRelation.join(testRelation, joinType = jt, None)
val optimized = Optimize.execute(originalQuery.analyze)

val pushedDownJoin = testRelation2.join(testRelation, joinType = jt, None)
val correctAnswer = testRelation1.join(pushedDownJoin, joinType = RightOuter, None)
comparePlans(optimized, correctAnswer)
}
}

Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, LeftOuter, Cross).foreach { case innerJT =>
test(s"$outerJT pushdown with join condition referring to left leg of join type $innerJT") {
val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None)
val originalQuery =
joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd))
val optimized = Optimize.execute(originalQuery.analyze)

val pushedDownJoin =
testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd))
val correctAnswer = pushedDownJoin.join(testRelation2, joinType = innerJT, None).analyze
comparePlans(optimized, correctAnswer)
}
}
}

Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, LeftOuter, Cross).foreach { case innerJT =>
test(s"$outerJT pushdown with outer and inner join condition for join type $innerJT") {
val joinedRelation =
testRelation1.join(testRelation2, joinType = innerJT, condition = Some('d === 'e))
val originalQuery =
joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd))
val optimized = Optimize.execute(originalQuery.analyze)

val pushedDownJoin =
testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd))
val correctAnswer = pushedDownJoin
.join(testRelation2, joinType = innerJT, condition = Some('d === 'e))
.analyze
comparePlans(optimized, correctAnswer)
}
}
}

Seq(LeftSemi, LeftAnti).foreach { case jt =>
test(s"$jt no pushdown - join condition refers left leg - join type for RightOuter") {
val joinedRelation = testRelation1.join(testRelation2, joinType = RightOuter, None)
val originalQuery =
joinedRelation.join(testRelation, joinType = jt, condition = Some('a === 'd))
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}

Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, RightOuter, Cross).foreach { case innerJT =>
test(s"$outerJT pushdown with join condition referring to right leg - join type $innerJT") {
val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None)
val originalQuery =
joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e))
val optimized = Optimize.execute(originalQuery.analyze)

val pushedDownJoin =
testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e))
val correctAnswer = testRelation1.join(pushedDownJoin, joinType = innerJT, None).analyze
comparePlans(optimized, correctAnswer)
}
}
}

Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, RightOuter, Cross).foreach { case innerJT =>
test(s"$outerJT pushdown with outer and inner join conditions for join type $innerJT") {
val joinedRelation = testRelation1.
join(testRelation2, joinType = innerJT, condition = Some('e === 'd))
val originalQuery =
joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e))
val optimized = Optimize.execute(originalQuery.analyze)

val pushedDownJoin =
testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e))
val correctAnswer = testRelation1.
join(pushedDownJoin, joinType = innerJT, condition = Some('e === 'd))
.analyze
comparePlans(optimized, correctAnswer)
}
}
}

Seq(LeftSemi, LeftAnti).foreach { case jt =>
test(s"$jt no pushdown - join condition refers right leg - join type for LeftOuter") {
val joinedRelation = testRelation1.join(testRelation2, joinType = LeftOuter, None)
val originalQuery =
joinedRelation.join(testRelation, joinType = jt, condition = Some('a === 'e))
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}

Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, LeftOuter, RightOuter, Cross).foreach { case innerJT =>
test(s"$outerJT no pushdown - join condition refers both leg - join type $innerJT") {
val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None)
val originalQuery = joinedRelation
.join(testRelation, joinType = outerJT, condition = Some('a === 'd && 'a === 'e))
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}
}

Seq(LeftSemi, LeftAnti).foreach { case outerJT =>
Seq(Inner, LeftOuter, RightOuter, Cross).foreach { case innerJT =>
test(s"$outerJT no pushdown - join condition refers none of the leg - join type $innerJT") {
val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None)
val originalQuery = joinedRelation
.join(testRelation, joinType = outerJT, condition = Some('d + 'e === 'a))
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}
}

Seq(LeftSemi, LeftAnti).foreach { case jt =>
test(s"$jt no pushdown when child join type is FullOuter") {
val joinedRelation = testRelation1.join(testRelation2, joinType = FullOuter, None)
val originalQuery =
joinedRelation.join(testRelation, joinType = jt, condition = Some('a === 'e))
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
}

}