Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
if sys.version >= '3':
basestring = unicode = str
long = int
from functools import reduce
else:
from itertools import imap as map

Expand Down Expand Up @@ -503,36 +504,52 @@ def alias(self, alias):

@ignore_unicode_prefix
@since(1.3)
def join(self, other, joinExprs=None, joinType=None):
def join(self, other, on=None, how=None):
"""Joins with another :class:`DataFrame`, using the given join expression.

The following performs a full outer join between ``df1`` and ``df2``.

:param other: Right side of the join
:param joinExprs: a string for join column name, or a join expression (Column).
If joinExprs is a string indicating the name of the join column,
the column must exist on both sides, and this performs an inner equi-join.
:param joinType: str, default 'inner'.
:param on: a string for join column name, a list of column names,
, a join expression (Column) or a list of Columns.
If `on` is a string or a list of string indicating the name of the join column(s),
the column(s) must exist on both sides, and this performs an inner equi-join.
:param how: str, default 'inner'.
One of `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`.

>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)]

>>> cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)]

>>> df.join(df2, 'name').select(df.name, df2.height).collect()
[Row(name=u'Bob', height=85)]

>>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
[Row(name=u'Bob', age=5)]
"""

if joinExprs is None:
if on is not None and not isinstance(on, list):
on = [on]

if on is None or len(on) == 0:
jdf = self._jdf.join(other._jdf)
elif isinstance(joinExprs, basestring):
jdf = self._jdf.join(other._jdf, joinExprs)

if isinstance(on[0], basestring):
jdf = self._jdf.join(other._jdf, self._jseq(on))
else:
assert isinstance(joinExprs, Column), "joinExprs should be Column"
if joinType is None:
jdf = self._jdf.join(other._jdf, joinExprs._jc)
assert isinstance(on[0], Column), "on should be Column or list of Column"
if len(on) > 1:
on = reduce(lambda x, y: x.__and__(y), on)
else:
on = on[0]
if how is None:
jdf = self._jdf.join(other._jdf, on._jc, "inner")
else:
assert isinstance(joinType, basestring), "joinType should be basestring"
jdf = self._jdf.join(other._jdf, joinExprs._jc, joinType)
assert isinstance(how, basestring), "how should be basestring"
jdf = self._jdf.join(other._jdf, on._jc, how)
return DataFrame(jdf, self.sql_ctx)

@ignore_unicode_prefix
Expand Down Expand Up @@ -1291,6 +1308,8 @@ def _test():
.toDF(StructType([StructField('age', IntegerType()),
StructField('name', StringType())]))
globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF()
globs['df3'] = sc.parallelize([Row(name='Alice', age=2),
Row(name='Bob', age=5)]).toDF()
globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80),
Row(name='Bob', age=5, height=None),
Row(name='Tom', age=None, height=None),
Expand Down
40 changes: 34 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -395,22 +395,50 @@ class DataFrame private[sql](
* @since 1.4.0
*/
def join(right: DataFrame, usingColumn: String): DataFrame = {
join(right, Seq(usingColumn))
}

/**
* Inner equi-join with another [[DataFrame]] using the given columns.
*
* Different from other join functions, the join columns will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* {{{
* // Joining df1 and df2 using the columns "user_id" and "user_name"
* df1.join(df2, Seq("user_id", "user_name"))
* }}}
*
* Note that if you perform a self-join using this function without aliasing the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @param right Right side of the join operation.
* @param usingColumns Names of the columns to join on. This columns must exist on both sides.
* @group dfops
* @since 1.4.0
*/
def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadoc

// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sqlContext.executePlan(
Join(logicalPlan, right.logicalPlan, joinType = Inner, None)).analyzed.asInstanceOf[Join]

// Project only one of the join column.
val joinedCol = joined.right.resolve(usingColumn)
// Project only one of the join columns.
val joinedCols = usingColumns.map(col => joined.right.resolve(col))
val condition = usingColumns.map { col =>
catalyst.expressions.EqualTo(joined.left.resolve(col), joined.right.resolve(col))
}.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) =>
catalyst.expressions.And(cond, eqTo)
}

Project(
joined.output.filterNot(_ == joinedCol),
joined.output.filterNot(joinedCols.contains(_)),
Join(
joined.left,
joined.right,
joinType = Inner,
Some(catalyst.expressions.EqualTo(
joined.left.resolve(usingColumn),
joined.right.resolve(usingColumn))))
condition)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ class DataFrameJoinSuite extends QueryTest {
Row(1, "1", "2") :: Row(2, "2", "3") :: Row(3, "3", "4") :: Nil)
}

test("join - join using multiple columns") {
val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str")
val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str")

checkAnswer(
df.join(df2, Seq("int", "int2")),
Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil)
}

test("join - join using self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")

Expand Down