Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix
  • Loading branch information
zhengruifeng committed Apr 7, 2024
commit f93c04a5bd97fdfddeadee89739c4272163a5dbb
Original file line number Diff line number Diff line change
Expand Up @@ -940,11 +940,14 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
}
assert(e3.getMessage.contains("AMBIGUOUS_COLUMN_REFERENCE"))

val e4 = intercept[AnalysisException] {
// df1("i") is ambiguous as df1 appears in both join sides (df1_filter contains df1).
df1.join(df1_filter, df1("i") === 1).collect()
}
assert(e4.getMessage.contains("AMBIGUOUS_COLUMN_REFERENCE"))
// val e4 = intercept[AnalysisException] {
// // df1("i") is ambiguous as df1 appears in both join sides (df1_filter contains df1).
// df1.join(df1_filter, df1("i") === 1).collect()
// }
// assert(e4.getMessage.contains("AMBIGUOUS_COLUMN_REFERENCE"))
//
// "[AMBIGUOUS_COLUMN_OR_FIELD] Column or field `i` is ambiguous and has 2 matches.
// SQLSTATE: 42702" did not contain "AMBIGUOUS_COLUMN_REFERENCE"

checkSameResult(
Seq(Row("a")),
Expand Down
7 changes: 7 additions & 0 deletions python/pyspark/sql/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ def test_self_join(self):
df = df2.join(df1, df2["b"] == df1["a"])
self.assertTrue(df.count() == 100)

def test_self_join_II(self):
df = self.spark.createDataFrame([(1, 2), (3, 4)], schema=["a", "b"])
df2 = df.select(df.a.alias("aa"), df.b)
df3 = df2.join(df, df2.b == df.b)
self.assertTrue(df3.columns, ["aa", "b", "a", "b"])
self.assertTrue(df3.count() == 2)

def test_duplicated_column_names(self):
df = self.spark.createDataFrame([(1, 2)], ["c", "c"])
row = df.select("*").first()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,15 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
logDebug(s"Extract plan_id $planId from $u")

val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).nonEmpty
val (resolved, matched) = resolveDataFrameColumnByPlanId(u, planId, isMetadataAccess, q)
if (!matched) {
// Can not find the target plan node with plan id, e.g.
// df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
// df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
// df1.select(df2.a) <- illegal reference df2.a
throw QueryCompilationErrors.cannotResolveDataFrameColumn(u)
val resolved = resolveDataFrameColumnByPlanId(u, planId, isMetadataAccess, q, 0).map(_._1)
if (resolved.isEmpty) {
if (!q.exists(_.exists(_.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(planId)))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when this condition can be false if resolved.isEmpty is true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are two cases:

1, analyzer rules supporting missing column resolution

https://github.com/apache/spark/blob/923f04606fe6bee5913f8fce7aaa643984f79756/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala#L583-L607

2, plan id missed in some way (should be bugs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we still have the matched flag? The new code is confusing and I can't understand it even after reading your comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment in code would be very helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let me restore matched

// Can not find the target plan node with plan id, e.g.
// df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
// df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
// df1.select(df2.a) <- illegal reference df2.a
throw QueryCompilationErrors.cannotResolveDataFrameColumn(u)
}
}
resolved
}
Expand All @@ -542,23 +544,29 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
u: UnresolvedAttribute,
id: Long,
isMetadataAccess: Boolean,
q: Seq[LogicalPlan]): (Option[NamedExpression], Boolean) = {
q.iterator.map(resolveDataFrameColumnRecursively(u, id, isMetadataAccess, _))
.foldLeft((Option.empty[NamedExpression], false)) {
case ((r1, m1), (r2, m2)) =>
if (r1.nonEmpty && r2.nonEmpty) {
q: Seq[LogicalPlan],
d: Int): Option[(NamedExpression, Int)] = {
q.iterator.flatMap(resolveDataFrameColumnRecursively(u, id, isMetadataAccess, _, d))
.foldLeft(Option.empty[(NamedExpression, Int)]) {
case (Some((r1, d1)), (r2, d2)) =>
if (d1 == 0 && d2 != 0) {
Some((r1, 0))
} else if (d2 == 0 && d1 != 0) {
Some((r2, 0))
} else {
throw QueryCompilationErrors.ambiguousColumnReferences(u)
}
(if (r1.nonEmpty) r1 else r2, m1 | m2)
case (_, (r2, d2)) => Some((r2, d2))
}
}

private def resolveDataFrameColumnRecursively(
u: UnresolvedAttribute,
id: Long,
isMetadataAccess: Boolean,
p: LogicalPlan): (Option[NamedExpression], Boolean) = {
val (resolved, matched) = if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
p: LogicalPlan,
d: Int): Option[(NamedExpression, Int)] = {
val resolved = if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
val resolved = try {
if (!isMetadataAccess) {
p.resolve(u.nameParts, conf.resolver)
Expand All @@ -572,9 +580,9 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
logDebug(s"Fail to resolve $u with $p due to $e")
None
}
(resolved, true)
resolved.map(r => (r, d))
} else {
resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, p.children)
resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, p.children, d + 1)
}

// In self join case like:
Expand Down Expand Up @@ -602,14 +610,13 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
// maybe filtered out here. In this case, resolveDataFrameColumnByPlanId
// returns None, the dataframe column will remain unresolved, and the analyzer
// will try to resolve it without plan id later.
val filtered = resolved.filter { r =>
resolved.filter { r =>
if (isMetadataAccess) {
r.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput))
r._1.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput))
} else {
r.references.subsetOf(p.outputSet)
r._1.references.subsetOf(p.outputSet)
}
}
(filtered, matched)
}

private def resolveDataFrameStar(
Expand Down