-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34527][SQL] Resolve duplicated common columns from USING/NATURAL JOIN #31666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
1c5ab03
2fe733f
2c261bb
80beda8
e1719d3
6fa70ba
0ba1916
2b7e730
0c116a5
bf87f55
b5dc44f
6e32b3d
7c3f5df
181751a
ad5e824
e36e853
db44c53
73b7c8a
1eb01e2
9fd2490
f5cc3ae
fa7207e
7af12ae
07f9ad5
c474745
0f267e7
ed0270c
66ad572
fc3b16d
f665030
85b81b1
eab7964
44ee9f8
c84f396
0fe04a2
c7c3df6
b1bf28d
47be66d
8c5144e
333a815
49de5c5
9e62d7d
446d4bc
8f70c2d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Signed-off-by: Karen Feng <[email protected]>
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -988,9 +988,19 @@ class Analyzer(override val catalogManager: CatalogManager) | |
| object AddMetadataColumns extends Rule[LogicalPlan] { | ||
| import org.apache.spark.sql.catalyst.util._ | ||
|
|
||
| private def getMetadataAttributes(plan: LogicalPlan): Seq[Attribute] = { | ||
| lazy val childMetadataOutput = plan.children.flatMap(_.metadataOutput) | ||
| plan.expressions.collect { | ||
| case a: Attribute if a.isMetadataCol => a | ||
| case a: Attribute if childMetadataOutput.exists(_.exprId == a.exprId) => | ||
|
||
| childMetadataOutput.find(_.exprId == a.exprId).get | ||
| } | ||
| } | ||
|
|
||
| private def hasMetadataCol(plan: LogicalPlan): Boolean = { | ||
| lazy val childMetadataOutput = plan.children.flatMap(_.metadataOutput) | ||
| plan.expressions.exists(_.find { | ||
| case a: Attribute => a.isMetadataCol | ||
| case a: Attribute => a.isMetadataCol || childMetadataOutput.exists(_.exprId == a.exprId) | ||
| case _ => false | ||
| }.isDefined) | ||
| } | ||
|
|
@@ -1006,7 +1016,7 @@ class Analyzer(override val catalogManager: CatalogManager) | |
| def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { | ||
| case node if node.children.nonEmpty && node.resolved && hasMetadataCol(node) => | ||
| val inputAttrs = AttributeSet(node.children.flatMap(_.output)) | ||
| val metaCols = node.expressions.flatMap(_.collect { | ||
| val metaCols = getMetadataAttributes(node).flatMap(_.collect { | ||
| case a: Attribute if a.isMetadataCol && !inputAttrs.contains(a) => a | ||
| }) | ||
| if (metaCols.isEmpty) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,7 +85,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => | |
| if (!analyzed) { | ||
| AnalysisHelper.allowInvokingTransformsInAnalyzer { | ||
| val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule)) | ||
| if (self fastEquals afterRuleOnChildren) { | ||
| val newNode = if (self fastEquals afterRuleOnChildren) { | ||
| CurrentOrigin.withOrigin(origin) { | ||
| rule.applyOrElse(self, identity[LogicalPlan]) | ||
| } | ||
|
|
@@ -94,6 +94,8 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => | |
| rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan]) | ||
| } | ||
| } | ||
| newNode.copyTagsFrom(this) | ||
|
||
| newNode | ||
| } | ||
| } else { | ||
| self | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -477,4 +477,26 @@ class DataFrameJoinSuite extends QueryTest | |
|
|
||
| checkAnswer(df3.except(df4), Row(10, 50, 2, Row(10, 50, 2))) | ||
| } | ||
|
|
||
| test("SPARK-34527: Resolve common columns from USING JOIN") { | ||
| val joinDf = testData2.as("testData2").join( | ||
| testData3.as("testData3"), usingColumns = Seq("a"), joinType = "fullouter") | ||
| val dfQuery = joinDf.select( | ||
| $"a", $"testData2.a", $"testData2.b", $"testData3.a", $"testData3.b") | ||
| val dfQuery2 = joinDf.select( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These demonstrate that the behavior now works in Scala. |
||
| $"a", testData2.col("a"), testData2.col("b"), testData3.col("a"), testData3.col("b")) | ||
|
|
||
| Seq(dfQuery, dfQuery2).map { query => | ||
| checkAnswer(query, | ||
| Seq( | ||
| Row(1, 1, 1, 1, null), | ||
| Row(1, 1, 2, 1, null), | ||
| Row(2, 2, 1, 2, 2), | ||
| Row(2, 2, 2, 2, 2), | ||
| Row(3, 3, 1, null, null), | ||
| Row(3, 3, 2, null, null) | ||
| ) | ||
| ) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can avoid building a new
Seqfrequently. The check can beplan.children.exists(c => c.metadataOutput.exists(_.exprId == a.exprId))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same to
hasMetadataCol