Skip to content

Commit b01dd12

Browse files
karenfengcloud-fan
authored andcommitted
[SPARK-34555][SQL] Resolve metadata output from DataFrame
### What changes were proposed in this pull request? Add metadataOutput as a fallback to resolution. Builds off apache#31654. ### Why are the changes needed? The metadata columns could not be resolved via `df.col("metadataColName")` from the DataFrame API. ### Does this PR introduce _any_ user-facing change? Yes, the metadata columns can now be resolved as described above. ### How was this patch tested? Scala unit test. Closes apache#31668 from karenfeng/spark-34555. Authored-by: Karen Feng <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 56edb81 commit b01dd12

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ abstract class LogicalPlan
9595

9696
private[this] lazy val outputAttributes = AttributeSeq(output)
9797

98+
private[this] lazy val outputMetadataAttributes = AttributeSeq(metadataOutput)
99+
98100
/**
99101
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
100102
* nodes of this LogicalPlan. The attribute is expressed as
@@ -115,6 +117,7 @@ abstract class LogicalPlan
115117
nameParts: Seq[String],
116118
resolver: Resolver): Option[NamedExpression] =
117119
outputAttributes.resolve(nameParts, resolver)
120+
.orElse(outputMetadataAttributes.resolve(nameParts, resolver))
118121

119122
/**
120123
* Given an attribute name, split it to name parts by dot, but
@@ -124,7 +127,7 @@ abstract class LogicalPlan
124127
def resolveQuoted(
125128
name: String,
126129
resolver: Resolver): Option[NamedExpression] = {
127-
outputAttributes.resolve(UnresolvedAttribute.parseAttributeName(name), resolver)
130+
resolve(UnresolvedAttribute.parseAttributeName(name), resolver)
128131
}
129132

130133
/**

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2607,6 +2607,27 @@ class DataSourceV2SQLSuite
26072607
}
26082608
}
26092609

2610+
test("SPARK-34555: Resolve DataFrame metadata column") {
2611+
val tbl = s"${catalogAndNamespace}table"
2612+
withTable(tbl) {
2613+
sql(s"CREATE TABLE $tbl (id bigint, data string) USING $v2Format " +
2614+
"PARTITIONED BY (bucket(4, id), id)")
2615+
sql(s"INSERT INTO $tbl VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2616+
val table = spark.table(tbl)
2617+
val dfQuery = table.select(
2618+
table.col("id"),
2619+
table.col("data"),
2620+
table.col("index"),
2621+
table.col("_partition")
2622+
)
2623+
2624+
checkAnswer(
2625+
dfQuery,
2626+
Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))
2627+
)
2628+
}
2629+
}
2630+
26102631
test("SPARK-34561: drop/add columns to a dataset of `DESCRIBE TABLE`") {
26112632
val tbl = s"${catalogAndNamespace}tbl"
26122633
withTable(tbl) {

0 commit comments

Comments
 (0)