-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13427][SQL] Support USING clause in JOIN. #11297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -387,6 +387,7 @@ TOK_SETCONFIG; | |
| TOK_DFS; | ||
| TOK_ADDFILE; | ||
| TOK_ADDJAR; | ||
| TOK_USING; | ||
| } | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,7 +87,7 @@ class Analyzer( | |
| ResolveSubquery :: | ||
| ResolveWindowOrder :: | ||
| ResolveWindowFrame :: | ||
| ResolveNaturalJoin :: | ||
| ResolveNaturalAndUsingJoin :: | ||
| ExtractWindowExpressions :: | ||
| GlobalAggregates :: | ||
| ResolveAggregateFunctions :: | ||
|
|
@@ -1329,48 +1329,72 @@ class Analyzer( | |
| } | ||
|
|
||
| /** | ||
| * Removes natural joins by calculating output columns based on output from two sides, | ||
| * Then apply a Project on a normal Join to eliminate natural join. | ||
| * Removes natural or using joins by calculating output columns based on output from two sides, | ||
| * Then apply a Project on a normal Join to eliminate natural or using join. | ||
| */ | ||
| object ResolveNaturalJoin extends Rule[LogicalPlan] { | ||
| object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] { | ||
| override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
| case j @ Join(left, right, UsingJoin(joinType, usingCols), condition) | ||
| if left.resolved && right.resolved && j.duplicateResolved => | ||
| // Resolve the column names referenced in using clause from both the legs of join. | ||
| val lCols = usingCols.flatMap(col => left.resolveQuoted(col.name, resolver)) | ||
| val rCols = usingCols.flatMap(col => right.resolveQuoted(col.name, resolver)) | ||
| if ((lCols.length == usingCols.length) && (rCols.length == usingCols.length)) { | ||
| val joinNames = lCols.map(exp => exp.name) | ||
| commonNaturalJoinProcessing(left, right, joinType, joinNames, None) | ||
| } else { | ||
| j | ||
| } | ||
| case j @ Join(left, right, NaturalJoin(joinType), condition) if j.resolvedExceptNatural => | ||
| // find common column names from both sides | ||
| val joinNames = left.output.map(_.name).intersect(right.output.map(_.name)) | ||
| val leftKeys = joinNames.map(keyName => left.output.find(_.name == keyName).get) | ||
| val rightKeys = joinNames.map(keyName => right.output.find(_.name == keyName).get) | ||
| val joinPairs = leftKeys.zip(rightKeys) | ||
|
|
||
| // Add joinPairs to joinConditions | ||
| val newCondition = (condition ++ joinPairs.map { | ||
| case (l, r) => EqualTo(l, r) | ||
| }).reduceOption(And) | ||
|
|
||
| // columns not in joinPairs | ||
| val lUniqueOutput = left.output.filterNot(att => leftKeys.contains(att)) | ||
| val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att)) | ||
|
|
||
| // the output list looks like: join keys, columns from left, columns from right | ||
| val projectList = joinType match { | ||
| case LeftOuter => | ||
| leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true)) | ||
| case RightOuter => | ||
| rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput | ||
| case FullOuter => | ||
| // in full outer join, joinCols should be non-null if there is. | ||
| val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } | ||
| joinedCols ++ | ||
| lUniqueOutput.map(_.withNullability(true)) ++ | ||
| rUniqueOutput.map(_.withNullability(true)) | ||
| case Inner => | ||
| rightKeys ++ lUniqueOutput ++ rUniqueOutput | ||
| case _ => | ||
| sys.error("Unsupported natural join type " + joinType) | ||
| } | ||
| // use Project to trim unnecessary fields | ||
| Project(projectList, Join(left, right, joinType, newCondition)) | ||
| commonNaturalJoinProcessing(left, right, joinType, joinNames, condition) | ||
| } | ||
| } | ||
|
|
||
| private def commonNaturalJoinProcessing( | ||
| left: LogicalPlan, | ||
| right: LogicalPlan, | ||
| joinType: JoinType, | ||
| joinNames: Seq[String], | ||
| condition: Option[Expression]) = { | ||
| val leftKeys = joinNames.map(keyName => left.output.find(_.name == keyName).get) | ||
| val rightKeys = joinNames.map(keyName => right.output.find(_.name == keyName).get) | ||
| val joinPairs = leftKeys.zip(rightKeys) | ||
|
|
||
| // Add joinPairs to joinConditions | ||
| val newCondition = (condition ++ joinPairs.map { | ||
|
||
| case (l, r) => EqualTo(l, r) | ||
| }).reduceOption(And) | ||
|
|
||
| // columns not in joinPairs | ||
| val lUniqueOutput = left.output.filterNot(att => leftKeys.contains(att)) | ||
| val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att)) | ||
|
|
||
| // the output list looks like: join keys, columns from left, columns from right | ||
| val projectList = joinType match { | ||
| case LeftOuter => | ||
| leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true)) | ||
| case LeftSemi => | ||
| leftKeys ++ lUniqueOutput | ||
| case RightOuter => | ||
| rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput | ||
| case FullOuter => | ||
| // in full outer join, joinCols should be non-null if there is. | ||
| val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() } | ||
| joinedCols ++ | ||
| lUniqueOutput.map(_.withNullability(true)) ++ | ||
| rUniqueOutput.map(_.withNullability(true)) | ||
| case Inner => | ||
| leftKeys ++ lUniqueOutput ++ rUniqueOutput | ||
| case _ => | ||
| sys.error("Unsupported natural join type " + joinType) | ||
| } | ||
| // use Project to trim unnecessary fields | ||
| Project(projectList, Join(left, right, joinType, newCondition)) | ||
| } | ||
|
|
||
|
|
||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1136,6 +1136,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { | |
| reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) | ||
| case FullOuter => f // DO Nothing for Full Outer Join | ||
| case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node") | ||
| case UsingJoin(_, _) => sys.error("Untransformed Using join node") | ||
| } | ||
|
|
||
| // push down the join filter into sub query scanning if applicable | ||
|
|
@@ -1171,6 +1172,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { | |
| Join(newLeft, newRight, LeftOuter, newJoinCond) | ||
| case FullOuter => f | ||
| case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node") | ||
| case UsingJoin(_, _) => sys.error("Untransformed Using join node") | ||
|
||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @hvanhovell
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty good.