Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ case class AttributeReference(
val exprId: ExprId = NamedExpression.newExprId,
val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] {

/**
* Returns true iff the expression id is the same for both attributes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iff => if ???

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iff == if and only if

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"if and only if" should be much more specific in the method comment IMHO.
iff can be confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but I disagree. We use it iff a lot in SQL / algebra, and if you Google, the first thing that comes up is "if and only if". Plus this is an internal comment.

edit: This was in response to @damiencarol's comment, not @marmbrus'.

*/
def sameRef(other: AttributeReference): Boolean = this.exprId == other.exprId

override def equals(other: Any): Boolean = other match {
case ar: AttributeReference => name == ar.name && exprId == ar.exprId && dataType == ar.dataType
case _ => false
Expand Down
38 changes: 34 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,7 @@ class DataFrame private[sql](
* }}}
* @group dfops
*/
def join(right: DataFrame, joinExprs: Column): DataFrame = {
Join(logicalPlan, right.logicalPlan, joinType = Inner, Some(joinExprs.expr))
}
def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner")

/**
* Join with another [[DataFrame]], using the given join expression. The following performs
Expand All @@ -440,7 +438,39 @@ class DataFrame private[sql](
* @group dfops
*/
def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = {
Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))
// Note that in this function, we introduce a hack in the case of self-join to automatically
// resolve ambiguous join conditions into ones that might make sense [SPARK-6231].
// Consider this case: df.join(df, df("key") === df("key"))
// Since df("key") === df("key") is a trivially true condition, this actually becomes a
// cartesian join. However, most likely users expect to perform a self join using "key".
// With that assumption, this hack turns the trivially true condition into equality on join
// keys that are resolved to both sides.

// Trigger analysis so in the case of self-join, the analyzer will clone the plan.
// After the cloning, left and right side will have distinct expression ids.
val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))
.queryExecution.analyzed.asInstanceOf[Join]

// If auto self join alias is disabled, return the plan.
if (!sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity) {
return plan
}

// If left/right have no output set intersection, return the plan.
val lanalyzed = this.logicalPlan.queryExecution.analyzed
val ranalyzed = right.logicalPlan.queryExecution.analyzed
if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
return plan
}

// Otherwise, find the trivially true predicates and automatically resolves them to both sides.
// By the time we get here, since we have already run analysis, all attributes should've been
// resolved and become AttributeReference.
val cond = plan.condition.map { _.transform {
case EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) =>
EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
}}
plan.copy(condition = cond)
}

/**
Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ private[spark] object SQLConf {
// Set to false when debugging requires the ability to look at invalid query plans.
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"

// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = "spark.sql.selfJoinAutoResolveAmbiguity"

val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2"

object Deprecated {
Expand Down Expand Up @@ -219,6 +223,9 @@ private[sql] class SQLConf extends Serializable {
private[spark] def dataFrameEagerAnalysis: Boolean =
getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean

private[spark] def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY, "true").toBoolean

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.TestData._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._


class DataFrameJoinSuite extends QueryTest {

test("join - join using") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = Seq(1, 2, 3).map(i => (i, (i + 1).toString)).toDF("int", "str")

checkAnswer(
df.join(df2, "int"),
Row(1, "1", "2") :: Row(2, "2", "3") :: Row(3, "3", "4") :: Nil)
}

test("join - join using self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")

// self join
checkAnswer(
df.join(df, "int"),
Row(1, "1", "1") :: Row(2, "2", "2") :: Row(3, "3", "3") :: Nil)
}

test("join - self join") {
val df1 = testData.select(testData("key")).as('df1)
val df2 = testData.select(testData("key")).as('df2)

checkAnswer(
df1.join(df2, $"df1.key" === $"df2.key"),
sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = b.key").collect().toSeq)
}

test("join - using aliases after self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
checkAnswer(
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count(),
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)

checkAnswer(
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").count(),
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
}

test("[SPARK-6231] join - self join auto resolve ambiguity") {
val df = Seq((1, "1"), (2, "2")).toDF("key", "value")
checkAnswer(
df.join(df, df("key") === df("key")),
Row(1, "1", 1, "1") :: Row(2, "2", 2, "2") :: Nil)

checkAnswer(
df.join(df.filter($"value" === "2"), df("key") === df("key")),
Row(2, "2", 2, "2") :: Nil)

checkAnswer(
df.join(df, df("key") === df("key") && df("value") === 1),
Row(1, "1", 1, "1") :: Nil)

val left = df.groupBy("key").agg($"key", count("*"))
val right = df.groupBy("key").agg($"key", sum("key"))
checkAnswer(
left.join(right, left("key") === right("key")),
Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil)
}
}
39 changes: 0 additions & 39 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, TestSQLContext}
import org.apache.spark.sql.test.TestSQLContext.logicalPlanToSparkQuery
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.test.TestSQLContext.sql


class DataFrameSuite extends QueryTest {
Expand Down Expand Up @@ -118,44 +117,6 @@ class DataFrameSuite extends QueryTest {
)
}

test("join - join using") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = Seq(1, 2, 3).map(i => (i, (i + 1).toString)).toDF("int", "str")

checkAnswer(
df.join(df2, "int"),
Row(1, "1", "2") :: Row(2, "2", "3") :: Row(3, "3", "4") :: Nil)
}

test("join - join using self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")

// self join
checkAnswer(
df.join(df, "int"),
Row(1, "1", "1") :: Row(2, "2", "2") :: Row(3, "3", "3") :: Nil)
}

test("join - self join") {
val df1 = testData.select(testData("key")).as('df1)
val df2 = testData.select(testData("key")).as('df2)

checkAnswer(
df1.join(df2, $"df1.key" === $"df2.key"),
sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = b.key").collect().toSeq)
}

test("join - using aliases after self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
checkAnswer(
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count(),
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)

checkAnswer(
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").count(),
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
}

test("explode") {
val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
val df2 =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.spark.sql.{Row, QueryTest}
import org.apache.spark.sql.hive.test.TestHive.implicits._


class HiveDataFrameJoinSuite extends QueryTest {

// We should move this into SQL package if we make case sensitivity configurable in SQL.
test("join - self join auto resolve ambiguity with case insensitivity") {
val df = Seq((1, "1"), (2, "2")).toDF("key", "value")
checkAnswer(
df.join(df, df("key") === df("Key")),
Row(1, "1", 1, "1") :: Row(2, "2", 2, "2") :: Nil)

checkAnswer(
df.join(df.filter($"value" === "2"), df("key") === df("Key")),
Row(2, "2", 2, "2") :: Nil)
}

}