diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 89d65e13fb57..ceb9726bc6d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -89,8 +89,9 @@ abstract class LogicalPlan } } - private[this] lazy val childAttributes = - AttributeSeq(children.flatMap(c => c.output ++ c.metadataOutput)) + private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output)) + + private[this] lazy val childMetadataAttributes = AttributeSeq(children.flatMap(_.metadataOutput)) private[this] lazy val outputAttributes = AttributeSeq(output) @@ -103,6 +104,7 @@ abstract class LogicalPlan nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = childAttributes.resolve(nameParts, resolver) + .orElse(childMetadataAttributes.resolve(nameParts, resolver)) /** * Optionally resolves the given strings to a [[NamedExpression]] based on the output of this diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 2f57298856fb..be71ae2b4d7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2492,6 +2492,37 @@ class DataSourceV2SQLSuite } } + test("SPARK-34547: metadata columns are resolved last") { + val t1 = s"${catalogAndNamespace}tableOne" + val t2 = "t2" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + withTempView(t2) { + sql(s"CREATE TEMPORARY VIEW $t2 AS SELECT * FROM " + + s"VALUES (1, -1), (2, -2), (3, -3) AS $t2(id, index)") + + val sqlQuery = spark.sql(s"SELECT $t1.id, $t2.id, data, index, $t1.index, $t2.index FROM " + + s"$t1 JOIN $t2 WHERE $t1.id = $t2.id") + val t1Table = spark.table(t1) + val t2Table = spark.table(t2) + val dfQuery = t1Table.join(t2Table, t1Table.col("id") === t2Table.col("id")) + .select(s"$t1.id", s"$t2.id", "data", "index", s"$t1.index", s"$t2.index") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, + Seq( + Row(1, 1, "a", -1, 0, -1), + Row(2, 2, "b", -2, 0, -2), + Row(3, 3, "c", -3, 0, -3) + ) + ) + } + } + } + } + test("SPARK-33505: insert into partitioned table") { val t = "testpart.ns1.ns2.tbl" withTable(t) {