Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
test cases from Yuming
  • Loading branch information
wangyum authored and gengliangwang committed Jun 6, 2020
commit 2ca483e6ccfceb40c579d0094c43832f8896e84f
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import org.apache.spark.unsafe.types.CalendarInterval
class FilterPushdownSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {

override protected val blacklistedOnceBatches: Set[String] =
Set("Push extra predicate through join")

val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Expand All @@ -39,7 +43,9 @@ class FilterPushdownSuite extends PlanTest {
PushPredicateThroughNonJoin,
BooleanSimplification,
PushPredicateThroughJoin,
CollapseProject) :: Nil
CollapseProject) ::
Batch("Push extra predicate through join", Once,
PushExtraPredicateThroughJoin) :: Nil
}

val attrA = 'a.int
Expand Down Expand Up @@ -1230,4 +1236,125 @@ class FilterPushdownSuite extends PlanTest {

comparePlans(Optimize.execute(query.analyze), expected)
}

test("inner join: push down disjunctive predicates") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y)
.where(("x.b".attr === "y.b".attr)
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11)))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where(('a > 3 || 'a > 1)).subquery('x)
val right = testRelation.where('a > 13 || 'a > 11).subquery('y)
val correctAnswer =
left.join(right, condition = Some("x.b".attr === "y.b".attr
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
.analyze

comparePlans(optimized, correctAnswer)
}

test("inner join: push down complex join predicates") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val joinCondition = (("x.b".attr === "y.b".attr)
&& ((("x.a".attr === 5) && ("y.a".attr >= 2) && ("y.a".attr <= 3))
|| (("x.a".attr === 2) && ("y.a".attr >= 1) && ("y.a".attr <= 14))
|| (("x.a".attr === 1) && ("y.a".attr >= 9) && ("y.a".attr <= 27))))

val originalQuery = x.join(y, condition = Some(joinCondition))
val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where(
('a === 5 || 'a === 2 || 'a === 1)).subquery('x)
val right = testRelation.where(
('a >= 2 && 'a <= 3) || ('a >= 1 && 'a <= 14) || ('a >= 9 && 'a <= 27)).subquery('y)
val correctAnswer = left.join(right, condition = Some(joinCondition)).analyze

comparePlans(optimized, correctAnswer)
}

test("inner join: push down join predicates with NOT predicate)") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, condition = Some(("x.b".attr === "y.b".attr)
&& Not(("x.a".attr > 3)
&& ("x.a".attr < 2 || ("y.a".attr > 13)) || ("x.a".attr > 1) && ("y.a".attr > 11))))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('a <= 3 || 'a >= 2).subquery('x)
val right = testRelation.subquery('y)
val correctAnswer =
left.join(right, condition = Some("x.b".attr === "y.b".attr
&& (("x.a".attr <= 3) || (("x.a".attr >= 2) && ("y.a".attr <= 13)))
&& (("x.a".attr <= 1) || ("y.a".attr <= 11))))
.analyze
comparePlans(optimized, correctAnswer)
}

test("left join: push down disjunctive predicates") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, joinType = LeftOuter, condition = Some(("x.b".attr === "y.b".attr)
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.subquery('x)
val right = testRelation.where('a > 13 || 'a > 11).subquery('y)
val correctAnswer =
left.join(right, joinType = LeftOuter, condition = Some("x.b".attr === "y.b".attr
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
.analyze

comparePlans(optimized, correctAnswer)
}

test("right join: push down disjunctive predicates") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, joinType = RightOuter, condition = Some(("x.b".attr === "y.b".attr)
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('a > 3 || 'a > 1).subquery('x)
val right = testRelation.subquery('y)
val correctAnswer =
left.join(right, joinType = RightOuter, condition = Some("x.b".attr === "y.b".attr
&& (("x.a".attr > 3) && ("y.a".attr > 13) || ("x.a".attr > 1) && ("y.a".attr > 11))))
.analyze

comparePlans(optimized, correctAnswer)
}

test("inner join: avoid generating too many predicates") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)

val originalQuery = {
x.join(y, condition = Some(("x.b".attr === "y.b".attr) && ((("x.a".attr > 3) &&
("x.a".attr < 13) && ("y.c".attr <= 5)) || (("y.a".attr > 2) && ("y.c".attr < 1)))))
}

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.subquery('x)
val right = testRelation.where('c <= 5 || ('a > 2 && 'c < 1)).subquery('y)
val correctAnswer = left.join(right, condition = Some("x.b".attr === "y.b".attr &&
((("x.a".attr > 3) && ("x.a".attr < 13) && ("y.c".attr <= 5)) ||
(("y.a".attr > 2) && ("y.c".attr < 1))))).analyze

comparePlans(optimized, correctAnswer)
}
}