Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2f6c80d
Merge remote-tracking branch 'upstream/master' into outerJoinElimination
gatorsmile Dec 31, 2015
90576aa
outer join conversion
gatorsmile Jan 1, 2016
5adec63
[SPARK-10359][PROJECT-INFRA] Multiple fixes to dev/test-dependencies.…
JoshRosen Jan 1, 2016
192ab19
added test cases.
gatorsmile Jan 1, 2016
c9dbfcc
[SPARK-11743][SQL] Move the test for arrayOfUDT
viirya Jan 1, 2016
a59a357
[SPARK-3873][MLLIB] Import order fixes.
Jan 1, 2016
ad5b7cf
[SPARK-12409][SPARK-12387][SPARK-12391][SQL] Refactor filter pushdown…
viirya Jan 1, 2016
c04b53b
renaming
gatorsmile Jan 1, 2016
01a2986
[SPARK-12592][SQL][TEST] Don't mute Spark loggers in TestHive.reset()
liancheng Jan 1, 2016
6c20b3c
Disable test-dependencies.sh.
rxin Jan 1, 2016
0da7bd5
[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always outp…
Jan 1, 2016
44ee920
Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] alw…
rxin Jan 2, 2016
970635a
[SPARK-12362][SQL][WIP] Inline Hive Parser
hvanhovell Jan 2, 2016
94f7a12
[SPARK-10180][SQL] JDBC datasource are not processing EqualNullSafe f…
HyukjinKwon Jan 2, 2016
15bd736
[SPARK-12481][CORE][STREAMING][SQL] Remove usage of Hadoop deprecated…
srowen Jan 2, 2016
65f9125
extend the condition to cover more cases in non null predicates.
gatorsmile Jan 3, 2016
513e3b0
[SPARK-12599][MLLIB][SQL] Remove the use of callUDF in MLlib
rxin Jan 3, 2016
6c5bbd6
Revert "Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][…
rxin Jan 3, 2016
9398644
added three more expressions: and, or and not
gatorsmile Jan 3, 2016
0bb07cb
style fix.
gatorsmile Jan 3, 2016
c5ff632
support non-local predicates and bug fix.
gatorsmile Jan 3, 2016
c3d5056
[SPARK-12327][SPARKR] fix code for lintr warning for commented code
felixcheung Jan 3, 2016
ee29dd2
scala style fix.
gatorsmile Jan 3, 2016
c82924d
[SPARK-12533][SQL] hiveContext.table() throws the wrong exception
thomastechs Jan 3, 2016
7b92922
Update MimaExcludes now Spark 1.6 is in Maven.
rxin Jan 4, 2016
b8410ff
[SPARK-12537][SQL] Add option to accept quoting of all character back…
Cazen Jan 4, 2016
13dab9c
[SPARK-12611][SQL][PYSPARK][TESTS] Fix test_infer_schema_to_local
holdenk Jan 4, 2016
7b7ea90
outer join conversion
gatorsmile Jan 1, 2016
7558e70
added test cases.
gatorsmile Jan 1, 2016
d3cbf46
renaming
gatorsmile Jan 1, 2016
2535cb1
extend the condition to cover more cases in non null predicates.
gatorsmile Jan 3, 2016
6c3f4b0
added three more expressions: and, or and not
gatorsmile Jan 3, 2016
fcd757c
style fix.
gatorsmile Jan 3, 2016
5bc7f52
support non-local predicates and bug fix.
gatorsmile Jan 3, 2016
34a0056
scala style fix.
gatorsmile Jan 3, 2016
ee7db1a
code refactoring and code merge
gatorsmile Jan 4, 2016
63d5d62
Merge remote-tracking branch 'origin/outerJoinConversion' into outerJ…
gatorsmile Jan 4, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
SetOperationPushDown,
SamplePushDown,
ReorderJoin,
OuterJoinConversion,
PushPredicateThroughJoin,
PushPredicateThroughProject,
PushPredicateThroughGenerate,
Expand Down Expand Up @@ -768,6 +769,63 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}

/**
* Conversion of outer joins, if the local predicates can restrict the result sets so that
* all null-supplying rows are eliminated
*
* - full outer -> inner if both sides have such local predicates
* - left outer -> inner if the right side has such local predicates
* - right outer -> inner if the left side has such local predicates
* - full outer -> left outer if only the left side has such local predicates
* - full outer -> right outer if only the right side has such local predicates
*
* This rule should be executed before pushing down the Filter
*/
object OuterJoinConversion extends Rule[LogicalPlan] with PredicateHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a better name for this? this is a form of strength reduction right? I don't know if there is a better term in the database land. Can you look into postgres source code and see what they call this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. OuterJoinElimination might sound better. Let me rename it today.

Since full outer is union distinct of left outer and right outer, we are removing right outer from full outer when conversion from full outer to left outer.


// Todo: is it complete?
private def hasNonNullPredicate(condition: Seq[Expression], child: LogicalPlan): Boolean = {
val localCondition = condition.filter(_.references subsetOf child.outputSet)
localCondition.exists(_.collect {
case EqualTo(ar: AttributeReference, l: Literal) => true
case EqualTo(l: Literal, ar: AttributeReference) => true
case EqualNullSafe(ar: AttributeReference, l: Literal) => true
case EqualNullSafe(l: Literal, ar: AttributeReference) => true
case GreaterThan(ar: AttributeReference, l: Literal) => true
case GreaterThan(l: Literal, ar: AttributeReference) => true
case GreaterThanOrEqual(ar: AttributeReference, l: Literal) => true
case GreaterThanOrEqual(l: Literal, ar: AttributeReference) => true
case LessThan(ar: AttributeReference, l: Literal) => true
case LessThan(l: Literal, ar: AttributeReference) => true
case LessThanOrEqual(ar: AttributeReference, l: Literal) => true
case LessThanOrEqual(l: Literal, ar: AttributeReference) => true
case In(ar: AttributeReference, l) => true
case IsNotNull(ar: AttributeReference) => true
}.nonEmpty)
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) =>
val leftHasNonNullPredicate =
hasNonNullPredicate(splitConjunctivePredicates(filterCondition), left)
val rightHasNonNullPredicate =
hasNonNullPredicate(splitConjunctivePredicates(filterCondition), right)
joinType match {
case RightOuter if leftHasNonNullPredicate =>
Filter(filterCondition, Join(left, right, Inner, joinCondition))
case LeftOuter if rightHasNonNullPredicate =>
Filter(filterCondition, Join(left, right, Inner, joinCondition))
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate =>
Filter(filterCondition, Join(left, right, Inner, joinCondition))
case FullOuter if leftHasNonNullPredicate =>
Filter(filterCondition, Join(left, right, LeftOuter, joinCondition))
case FullOuter if rightHasNonNullPredicate =>
Filter(filterCondition, Join(left, right, RightOuter, joinCondition))
case _ => f
}
}
}

/**
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._

class OuterJoinConversionSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubQueries) ::
Batch("OuterJoinConversion", Once,
OuterJoinConversion,
PushPredicateThroughJoin) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation1 = LocalRelation('d.int, 'e.int, 'f.int)

test("joins: full outer to inner") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
.where("x.b".attr >= 1 && "y.d".attr >= 2)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('b >= 1)
val right = testRelation1.where('d >= 2)
val correctAnswer =
left.join(right, Inner, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

test("joins: full outer to right") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("y.d".attr > 2)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation
val right = testRelation1.where('d > 2)
val correctAnswer =
left.join(right, RightOuter, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

test("joins: full outer to left") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("x.a".attr <=> 2)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('a <=> 2)
val right = testRelation1
val correctAnswer =
left.join(right, LeftOuter, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

test("joins: right to inner") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, RightOuter, Option("x.a".attr === "y.d".attr)).where("x.b".attr > 2)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('b > 2)
val right = testRelation1
val correctAnswer =
left.join(right, Inner, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

test("joins: left to inner") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)).where("y.e".attr.isNotNull)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation
val right = testRelation1.where('e.isNotNull)
val correctAnswer =
left.join(right, Inner, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution.joins.BroadcastHashJoin
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -140,4 +142,50 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
assert(df1.join(broadcast(pf1)).count() === 4)
}
}

test("join - outer join conversion") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")

// outer -> left
val outerJoin2Left = df.join(df2, $"a.int" === $"b.int", "outer").where($"a.int" === 3)
assert(outerJoin2Left.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, LeftOuter, _) => j }.size === 1)
checkAnswer(
outerJoin2Left,
Row(3, 4, "3", null, null, null) :: Nil)

// outer -> right
val outerJoin2Right = df.join(df2, $"a.int" === $"b.int", "outer").where($"b.int" === 5)
assert(outerJoin2Right.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, RightOuter, _) => j }.size === 1)
checkAnswer(
outerJoin2Right,
Row(null, null, null, 5, 6, "5") :: Nil)

// outer -> inner
val outerJoin2Inner = df.join(df2, $"a.int" === $"b.int", "outer").
where($"a.int" === 1 && $"b.int2" === 3)
assert(outerJoin2Inner.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Inner, _) => j }.size === 1)
checkAnswer(
outerJoin2Inner,
Row(1, 2, "1", 1, 3, "1") :: Nil)

// right -> inner
val rightJoin2Inner = df.join(df2, $"a.int" === $"b.int", "right").where($"a.int" === 1)
assert(rightJoin2Inner.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Inner, _) => j }.size === 1)
checkAnswer(
rightJoin2Inner,
Row(1, 2, "1", 1, 3, "1") :: Nil)

// left -> inner
val leftJoin2Inner = df.join(df2, $"a.int" === $"b.int", "left").where($"b.int2" === 3)
assert(leftJoin2Inner.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Inner, _) => j }.size === 1)
checkAnswer(
leftJoin2Inner,
Row(1, 2, "1", 1, 3, "1") :: Nil)
}
}
8 changes: 6 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeOuterJoin]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
classOf[SortMergeOuterJoin]),
classOf[SortMergeJoin]), // conversion from Right Outer to Inner
("SELECT * FROM testData right join testData2 ON key = a and key = 2",
classOf[SortMergeOuterJoin]),
("SELECT * FROM testData full outer join testData2 ON key = a",
Expand Down Expand Up @@ -123,8 +123,12 @@ class JoinSuite extends QueryTest with SharedSQLContext {
("SELECT * FROM testData LEFT JOIN testData2 ON key = a",
classOf[SortMergeOuterJoin]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
classOf[BroadcastHashOuterJoin]),
classOf[BroadcastHashJoin]),
("SELECT * FROM testData right join testData2 ON key = a and key = 2",
classOf[BroadcastHashOuterJoin]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where a = 2",
classOf[BroadcastHashOuterJoin]),
("SELECT * FROM testData right join testData2 ON key = a and a = 2",
classOf[BroadcastHashOuterJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
sql("UNCACHE TABLE testData")
Expand Down