-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19712][SQL] Pushdown LeftSemi/LeftAnti below join #24331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
a5f1f50
14ed943
813075f
9945c28
4a5b4ca
fe3e168
a5ed3d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -202,7 +202,7 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH | |
| val rightOutput = rightChild.outputSet | ||
|
|
||
| if (joinCond.nonEmpty) { | ||
| val noPushdown = (PushdownDirection.NONE, None) | ||
| val noPushdown = PushdownDirection.NONE | ||
| val conditions = splitConjunctivePredicates(joinCond.get) | ||
| val (leftConditions, rest) = | ||
| conditions.partition(_.references.subsetOf(left.outputSet ++ rightOutput)) | ||
|
|
@@ -212,11 +212,11 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH | |
| if (rest.isEmpty && leftConditions.nonEmpty) { | ||
| // When the join conditions can be computed based on the left leg of | ||
| // leftsemi/anti join then push the leftsemi/anti join to the left side. | ||
| (PushdownDirection.TO_LEFT_BRANCH, leftConditions.reduceLeftOption(And)) | ||
| (PushdownDirection.TO_LEFT_BRANCH) | ||
|
||
| } else if (leftConditions.isEmpty && rightConditions.nonEmpty && commonConditions.isEmpty) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan Please refer to my answer above. |
||
| // When the join conditions can be computed based on the attributes from right leg of | ||
| // leftsemi/anti join then push the leftsemi/anti join to the right side. | ||
| (PushdownDirection.TO_RIGHT_BRANCH, rightConditions.reduceLeftOption(And)) | ||
| (PushdownDirection.TO_RIGHT_BRANCH) | ||
|
||
| } else { | ||
| noPushdown | ||
|
||
| } | ||
|
|
@@ -227,15 +227,14 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH | |
| * to the left leg of join. | ||
| * 2) if a right outer join, to the right leg of join, | ||
| */ | ||
| val action = joinType match { | ||
| joinType match { | ||
| case _: InnerLike | LeftOuter => | ||
| PushdownDirection.TO_LEFT_BRANCH | ||
| case RightOuter => | ||
| PushdownDirection.TO_RIGHT_BRANCH | ||
| case _ => | ||
| PushdownDirection.NONE | ||
| } | ||
| (action, None) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -244,18 +243,18 @@ object PushLeftSemiLeftAntiThroughJoin extends Rule[LogicalPlan] with PredicateH | |
| case j @ Join(AllowedJoin(left), right, LeftSemiOrAnti(joinType), joinCond, parentHint) => | ||
| val (childJoinType, childLeft, childRight, childCondition, childHint) = | ||
| (left.joinType, left.left, left.right, left.condition, left.hint) | ||
| val (action, newJoinCond) = pushTo(left, right, joinCond) | ||
| val action = pushTo(left, right, joinCond) | ||
|
|
||
| action match { | ||
| case PushdownDirection.TO_LEFT_BRANCH | ||
| if (childJoinType == LeftOuter || childJoinType.isInstanceOf[InnerLike]) => | ||
| // push down leftsemi/anti join to the left table | ||
| val newLeft = Join(childLeft, right, joinType, newJoinCond, parentHint) | ||
| val newLeft = Join(childLeft, right, joinType, joinCond, parentHint) | ||
| Join(newLeft, childRight, childJoinType, childCondition, childHint) | ||
| case PushdownDirection.TO_RIGHT_BRANCH | ||
| if (childJoinType == RightOuter || childJoinType.isInstanceOf[InnerLike]) => | ||
| // push down leftsemi/anti join to the right table | ||
| val newRight = Join(childRight, right, joinType, newJoinCond, parentHint) | ||
| val newRight = Join(childRight, right, joinType, joinCond, parentHint) | ||
| Join(childLeft, newRight, childJoinType, childCondition, childHint) | ||
| case _ => | ||
| // Do nothing | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when can this happen?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan When we decide that we can't pushdown the parent join. For example this test should exercise the default case. |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -317,62 +317,58 @@ class LeftSemiPushdownSuite extends PlanTest { | |
| } | ||
|
|
||
| Seq(LeftSemi, LeftAnti).foreach { case outerJT => | ||
| Seq(Inner, LeftOuter, Cross).foreach { case innerJT => | ||
| Seq(Inner, LeftOuter, Cross, RightOuter).foreach { case innerJT => | ||
| test(s"$outerJT pushdown with empty join condition join type $innerJT") { | ||
| val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) | ||
|
||
| val originalQuery = joinedRelation.join(testRelation, joinType = outerJT, None) | ||
| val optimized = Optimize.execute(originalQuery.analyze) | ||
|
|
||
| val pushedDownJoin = testRelation1.join(testRelation, joinType = outerJT, None) | ||
| val correctAnswer = pushedDownJoin.join(testRelation2, joinType = innerJT, None) | ||
| val correctAnswer = if (innerJT == RightOuter) { | ||
| val pushedDownJoin = testRelation2.join(testRelation, joinType = outerJT, None) | ||
| testRelation1.join(pushedDownJoin, joinType = innerJT, None) | ||
| } else { | ||
| val pushedDownJoin = testRelation1.join(testRelation, joinType = outerJT, None) | ||
| pushedDownJoin.join(testRelation2, joinType = innerJT, None) | ||
| } | ||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Seq(LeftSemi, LeftAnti).foreach { case jt => | ||
| test(s"$jt pushdown with empty join condition join type RightOuter") { | ||
| val joinedRelation = testRelation1.join(testRelation2, joinType = RightOuter, None) | ||
| val originalQuery = joinedRelation.join(testRelation, joinType = jt, None) | ||
| val optimized = Optimize.execute(originalQuery.analyze) | ||
|
|
||
| val pushedDownJoin = testRelation2.join(testRelation, joinType = jt, None) | ||
| val correctAnswer = testRelation1.join(pushedDownJoin, joinType = RightOuter, None) | ||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
| } | ||
|
|
||
| Seq(LeftSemi, LeftAnti).foreach { case outerJT => | ||
| Seq(Inner, LeftOuter, Cross).foreach { case innerJT => | ||
| test(s"$outerJT pushdown with join condition referring to left leg of join type $innerJT") { | ||
| val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) | ||
| val originalQuery = | ||
| joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) | ||
| val optimized = Optimize.execute(originalQuery.analyze) | ||
|
|
||
| val pushedDownJoin = | ||
| testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) | ||
| val correctAnswer = pushedDownJoin.join(testRelation2, joinType = innerJT, None).analyze | ||
| comparePlans(optimized, correctAnswer) | ||
| Seq(Some('d === 'e), None).foreach { case innerJoinCond => | ||
| Seq(LeftSemi, LeftAnti).foreach { case outerJT => | ||
| Seq(Inner, LeftOuter, Cross).foreach { case innerJT => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These test cases almost cover the above one, except the RightOuter join type. Can we merge them.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nvm, I was wrong |
||
| test(s"$outerJT pushdown to left of join type: $innerJT join condition $innerJoinCond") { | ||
| val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, innerJoinCond) | ||
| val originalQuery = | ||
| joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) | ||
| val optimized = Optimize.execute(originalQuery.analyze) | ||
|
|
||
| val pushedDownJoin = | ||
| testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) | ||
| val correctAnswer = | ||
| pushedDownJoin.join(testRelation2, joinType = innerJT, innerJoinCond).analyze | ||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Seq(LeftSemi, LeftAnti).foreach { case outerJT => | ||
| Seq(Inner, LeftOuter, Cross).foreach { case innerJT => | ||
| test(s"$outerJT pushdown with outer and inner join condition for join type $innerJT") { | ||
| val joinedRelation = | ||
| testRelation1.join(testRelation2, joinType = innerJT, condition = Some('d === 'e)) | ||
| val originalQuery = | ||
| joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) | ||
| val optimized = Optimize.execute(originalQuery.analyze) | ||
|
|
||
| val pushedDownJoin = | ||
| testRelation1.join(testRelation, joinType = outerJT, condition = Some('a === 'd)) | ||
| val correctAnswer = pushedDownJoin | ||
| .join(testRelation2, joinType = innerJT, condition = Some('d === 'e)) | ||
| .analyze | ||
| comparePlans(optimized, correctAnswer) | ||
| Seq(Some('e === 'd), None).foreach { case innerJoinCond => | ||
| Seq(LeftSemi, LeftAnti).foreach { case outerJT => | ||
| Seq(Inner, RightOuter, Cross).foreach { case innerJT => | ||
| test(s"$outerJT pushdown to right of join type: $innerJT join condition $innerJoinCond") { | ||
| val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, innerJoinCond) | ||
| val originalQuery = | ||
| joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) | ||
| val optimized = Optimize.execute(originalQuery.analyze) | ||
|
|
||
| val pushedDownJoin = | ||
| testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) | ||
| val correctAnswer = | ||
| testRelation1.join(pushedDownJoin, joinType = innerJT, innerJoinCond).analyze | ||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -387,41 +383,6 @@ class LeftSemiPushdownSuite extends PlanTest { | |
| } | ||
| } | ||
|
|
||
| Seq(LeftSemi, LeftAnti).foreach { case outerJT => | ||
| Seq(Inner, RightOuter, Cross).foreach { case innerJT => | ||
| test(s"$outerJT pushdown with join condition referring to right leg - join type $innerJT") { | ||
| val joinedRelation = testRelation1.join(testRelation2, joinType = innerJT, None) | ||
| val originalQuery = | ||
| joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) | ||
| val optimized = Optimize.execute(originalQuery.analyze) | ||
|
|
||
| val pushedDownJoin = | ||
| testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) | ||
| val correctAnswer = testRelation1.join(pushedDownJoin, joinType = innerJT, None).analyze | ||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Seq(LeftSemi, LeftAnti).foreach { case outerJT => | ||
| Seq(Inner, RightOuter, Cross).foreach { case innerJT => | ||
| test(s"$outerJT pushdown with outer and inner join conditions for join type $innerJT") { | ||
| val joinedRelation = testRelation1. | ||
| join(testRelation2, joinType = innerJT, condition = Some('e === 'd)) | ||
| val originalQuery = | ||
| joinedRelation.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) | ||
| val optimized = Optimize.execute(originalQuery.analyze) | ||
|
|
||
| val pushedDownJoin = | ||
| testRelation2.join(testRelation, joinType = outerJT, condition = Some('a === 'e)) | ||
| val correctAnswer = testRelation1. | ||
| join(pushedDownJoin, joinType = innerJT, condition = Some('e === 'd)) | ||
| .analyze | ||
| comparePlans(optimized, correctAnswer) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Seq(LeftSemi, LeftAnti).foreach { case jt => | ||
| test(s"$jt no pushdown - join condition refers right leg - join type for LeftOuter") { | ||
| val joinedRelation = testRelation1.join(testRelation2, joinType = LeftOuter, None) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use
PushdownDirection.NONEdirectly.