Skip to content

Commit d566017

Browse files
committed
[SPARK-40149][SQL][3.2] Propagate metadata columns through Project
backport apache#37758 to 3.2 ### What changes were proposed in this pull request? This PR fixes a regression caused by apache#32017 . In apache#32017 , we tried to be more conservative and decided to not propagate metadata columns in certain operators, including `Project`. However, the decision was made only considering SQL API, not DataFrame API. In fact, it's very common to chain `Project` operators in DataFrame, e.g. `df.withColumn(...).withColumn(...)...`, and it's very inconvenient if metadata columns are not propagated through `Project`. This PR makes 2 changes: 1. Project should propagate metadata columns 2. SubqueryAlias should only propagate metadata columns if the child is a leaf node or also a SubqueryAlias The second change is needed to still forbid weird queries like `SELECT m from (SELECT a from t)`, which is the main motivation of apache#32017 . After propagating metadata columns, a problem from apache#31666 is exposed: the natural join metadata columns may confuse the analyzer and lead to wrong analyzed plan. For example, `SELECT t1.value FROM t1 LEFT JOIN t2 USING (key) ORDER BY key`, how shall we resolve `ORDER BY key`? It should be resolved to `t1.key` via the rule `ResolveMissingReferences`, which is in the output of the left join. However, if `Project` can propagate metadata columns, `ORDER BY key` will be resolved to `t2.key`. To solve this problem, this PR only allows qualified access for metadata columns of natural join. This has no breaking change, as people can only do qualified access for natural join metadata columns before, in the `Project` right after `Join`. This actually enables more use cases, as people can now access natural join metadata columns in ORDER BY. I've added a test for it. ### Why are the changes needed? fix a regression ### Does this PR introduce _any_ user-facing change? For SQL API, there is no change, as a `SubqueryAlias` always comes with a `Project` or `Aggregate`, so we still don't propagate metadata columns through a SELECT group. For DataFrame API, the behavior becomes more lenient. The only breaking case is an operator that can propagate metadata columns then follows a `SubqueryAlias`, e.g. `df.filter(...).as("t").select("t.metadata_col")`. But this is a weird use case and I don't think we should support it at the first place. ### How was this patch tested? new tests Closes apache#37818 from cloud-fan/backport. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent c7cc0ae commit d566017

File tree

9 files changed

+263
-238
lines changed

9 files changed

+263
-238
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,9 +1043,11 @@ class Analyzer(override val catalogManager: CatalogManager)
10431043
private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
10441044
case r: DataSourceV2Relation => r.withMetadataColumns()
10451045
case p: Project =>
1046-
p.copy(
1046+
val newProj = p.copy(
10471047
projectList = p.metadataOutput ++ p.projectList,
10481048
child = addMetadataCol(p.child))
1049+
newProj.copyTagsFrom(p)
1050+
newProj
10491051
case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
10501052
}
10511053
}
@@ -3480,8 +3482,8 @@ class Analyzer(override val catalogManager: CatalogManager)
34803482
val project = Project(projectList, Join(left, right, joinType, newCondition, hint))
34813483
project.setTagValue(
34823484
Project.hiddenOutputTag,
3483-
hiddenList.map(_.markAsSupportsQualifiedStar()) ++
3484-
project.child.metadataOutput.filter(_.supportsQualifiedStar))
3485+
hiddenList.map(_.markAsQualifiedAccessOnly()) ++
3486+
project.child.metadataOutput.filter(_.qualifiedAccessOnly))
34853487
project
34863488
}
34873489

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
386386
if (target.isEmpty) return input.output
387387

388388
// If there is a table specified, use hidden input attributes as well
389-
val hiddenOutput = input.metadataOutput.filter(_.supportsQualifiedStar)
389+
val hiddenOutput = input.metadataOutput.filter(_.qualifiedAccessOnly)
390390
val expandedAttributes = (hiddenOutput ++ input.output).filter(
391391
matchedQualifier(_, target.get, resolver))
392392

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps
2323

2424
import org.apache.spark.sql.AnalysisException
2525
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
26+
import org.apache.spark.sql.catalyst.util.MetadataColumnHelper
2627
import org.apache.spark.sql.types.{StructField, StructType}
2728

2829
/**
@@ -265,7 +266,7 @@ package object expressions {
265266
case (Seq(), _) =>
266267
val name = nameParts.head
267268
val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT)))
268-
(attributes, nameParts.tail)
269+
(attributes.filterNot(_.qualifiedAccessOnly), nameParts.tail)
269270
case _ => matches
270271
}
271272
}
@@ -314,10 +315,12 @@ package object expressions {
314315
var i = nameParts.length - 1
315316
while (i >= 0 && candidates.isEmpty) {
316317
val name = nameParts(i)
317-
candidates = collectMatches(
318-
name,
319-
nameParts.take(i),
320-
direct.get(name.toLowerCase(Locale.ROOT)))
318+
val attrsToLookup = if (i == 0) {
319+
direct.get(name.toLowerCase(Locale.ROOT)).map(_.filterNot(_.qualifiedAccessOnly))
320+
} else {
321+
direct.get(name.toLowerCase(Locale.ROOT))
322+
}
323+
candidates = collectMatches(name, nameParts.take(i), attrsToLookup)
321324
if (candidates.nonEmpty) {
322325
nestedFields = nameParts.takeRight(nameParts.length - i - 1)
323326
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
8888
getAllValidConstraints(projectList)
8989

9090
override def metadataOutput: Seq[Attribute] =
91-
getTagValue(Project.hiddenOutputTag).getOrElse(Nil)
91+
getTagValue(Project.hiddenOutputTag).getOrElse(child.metadataOutput)
9292

9393
override protected def withNewChildInternal(newChild: LogicalPlan): Project =
9494
copy(child = newChild)
@@ -1307,9 +1307,14 @@ case class SubqueryAlias(
13071307
}
13081308

13091309
override def metadataOutput: Seq[Attribute] = {
1310-
val qualifierList = identifier.qualifier :+ alias
1311-
val nonHiddenMetadataOutput = child.metadataOutput.filter(!_.supportsQualifiedStar)
1312-
nonHiddenMetadataOutput.map(_.withQualifier(qualifierList))
1310+
// Propagate metadata columns from leaf nodes through a chain of `SubqueryAlias`.
1311+
if (child.isInstanceOf[LeafNode] || child.isInstanceOf[SubqueryAlias]) {
1312+
val qualifierList = identifier.qualifier :+ alias
1313+
val nonHiddenMetadataOutput = child.metadataOutput.filter(!_.qualifiedAccessOnly)
1314+
nonHiddenMetadataOutput.map(_.withQualifier(qualifierList))
1315+
} else {
1316+
Nil
1317+
}
13131318
}
13141319

13151320
override def maxRows: Option[Long] = child.maxRows

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,22 +206,23 @@ package object util extends Logging {
206206

207207
implicit class MetadataColumnHelper(attr: Attribute) {
208208
/**
209-
* If set, this metadata column is a candidate during qualified star expansions.
209+
* If set, this metadata column can only be accessed with qualifiers, e.g. `qualifiers.col` or
210+
* `qualifiers.*`. If not set, metadata columns cannot be accessed via star.
210211
*/
211-
val SUPPORTS_QUALIFIED_STAR = "__supports_qualified_star"
212+
val QUALIFIED_ACCESS_ONLY = "__qualified_access_only"
212213

213214
def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) &&
214215
attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)
215216

216-
def supportsQualifiedStar: Boolean = attr.isMetadataCol &&
217-
attr.metadata.contains(SUPPORTS_QUALIFIED_STAR) &&
218-
attr.metadata.getBoolean(SUPPORTS_QUALIFIED_STAR)
217+
def qualifiedAccessOnly: Boolean = attr.isMetadataCol &&
218+
attr.metadata.contains(QUALIFIED_ACCESS_ONLY) &&
219+
attr.metadata.getBoolean(QUALIFIED_ACCESS_ONLY)
219220

220-
def markAsSupportsQualifiedStar(): Attribute = attr.withMetadata(
221+
def markAsQualifiedAccessOnly(): Attribute = attr.withMetadata(
221222
new MetadataBuilder()
222223
.withMetadata(attr.metadata)
223224
.putBoolean(METADATA_COL_ATTR_KEY, true)
224-
.putBoolean(SUPPORTS_QUALIFIED_STAR, true)
225+
.putBoolean(QUALIFIED_ACCESS_ONLY, true)
225226
.build()
226227
)
227228
}

sql/core/src/test/resources/sql-tests/inputs/using-join.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ SELECT nt1.*, nt2.* FROM nt1 left outer join nt2 using (k);
1919

2020
SELECT nt1.k, nt2.k FROM nt1 left outer join nt2 using (k);
2121

22+
SELECT nt1.k, nt2.k FROM nt1 left outer join nt2 using (k) ORDER BY nt2.k;
23+
2224
SELECT k, nt1.k FROM nt1 left outer join nt2 using (k);
2325

2426
SELECT k, nt2.k FROM nt1 left outer join nt2 using (k);

sql/core/src/test/resources/sql-tests/results/using-join.sql.out

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ three NULL
7171
two two
7272

7373

74+
-- !query
75+
SELECT nt1.k, nt2.k FROM nt1 left outer join nt2 using (k) ORDER BY nt2.k
76+
-- !query schema
77+
struct<k:string,k:string>
78+
-- !query output
79+
three NULL
80+
one one
81+
one one
82+
two two
83+
84+
7485
-- !query
7586
SELECT k, nt1.k FROM nt1 left outer join nt2 using (k)
7687
-- !query schema

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 0 additions & 218 deletions
Original file line numberDiff line numberDiff line change
@@ -2524,100 +2524,6 @@ class DataSourceV2SQLSuite
25242524
}
25252525
}
25262526

2527-
test("SPARK-31255: Project a metadata column") {
2528-
val t1 = s"${catalogAndNamespace}table"
2529-
withTable(t1) {
2530-
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2531-
"PARTITIONED BY (bucket(4, id), id)")
2532-
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2533-
2534-
val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1")
2535-
val dfQuery = spark.table(t1).select("id", "data", "index", "_partition")
2536-
2537-
Seq(sqlQuery, dfQuery).foreach { query =>
2538-
checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
2539-
}
2540-
}
2541-
}
2542-
2543-
test("SPARK-31255: Projects data column when metadata column has the same name") {
2544-
val t1 = s"${catalogAndNamespace}table"
2545-
withTable(t1) {
2546-
sql(s"CREATE TABLE $t1 (index bigint, data string) USING $v2Format " +
2547-
"PARTITIONED BY (bucket(4, index), index)")
2548-
sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')")
2549-
2550-
val sqlQuery = spark.sql(s"SELECT index, data, _partition FROM $t1")
2551-
val dfQuery = spark.table(t1).select("index", "data", "_partition")
2552-
2553-
Seq(sqlQuery, dfQuery).foreach { query =>
2554-
checkAnswer(query, Seq(Row(3, "c", "1/3"), Row(2, "b", "0/2"), Row(1, "a", "3/1")))
2555-
}
2556-
}
2557-
}
2558-
2559-
test("SPARK-31255: * expansion does not include metadata columns") {
2560-
val t1 = s"${catalogAndNamespace}table"
2561-
withTable(t1) {
2562-
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2563-
"PARTITIONED BY (bucket(4, id), id)")
2564-
sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')")
2565-
2566-
val sqlQuery = spark.sql(s"SELECT * FROM $t1")
2567-
val dfQuery = spark.table(t1)
2568-
2569-
Seq(sqlQuery, dfQuery).foreach { query =>
2570-
checkAnswer(query, Seq(Row(3, "c"), Row(2, "b"), Row(1, "a")))
2571-
}
2572-
}
2573-
}
2574-
2575-
test("SPARK-31255: metadata column should only be produced when necessary") {
2576-
val t1 = s"${catalogAndNamespace}table"
2577-
withTable(t1) {
2578-
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2579-
"PARTITIONED BY (bucket(4, id), id)")
2580-
2581-
val sqlQuery = spark.sql(s"SELECT * FROM $t1 WHERE index = 0")
2582-
val dfQuery = spark.table(t1).filter("index = 0")
2583-
2584-
Seq(sqlQuery, dfQuery).foreach { query =>
2585-
assert(query.schema.fieldNames.toSeq == Seq("id", "data"))
2586-
}
2587-
}
2588-
}
2589-
2590-
test("SPARK-34547: metadata columns are resolved last") {
2591-
val t1 = s"${catalogAndNamespace}tableOne"
2592-
val t2 = "t2"
2593-
withTable(t1) {
2594-
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2595-
"PARTITIONED BY (bucket(4, id), id)")
2596-
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2597-
withTempView(t2) {
2598-
sql(s"CREATE TEMPORARY VIEW $t2 AS SELECT * FROM " +
2599-
s"VALUES (1, -1), (2, -2), (3, -3) AS $t2(id, index)")
2600-
2601-
val sqlQuery = spark.sql(s"SELECT $t1.id, $t2.id, data, index, $t1.index, $t2.index FROM " +
2602-
s"$t1 JOIN $t2 WHERE $t1.id = $t2.id")
2603-
val t1Table = spark.table(t1)
2604-
val t2Table = spark.table(t2)
2605-
val dfQuery = t1Table.join(t2Table, t1Table.col("id") === t2Table.col("id"))
2606-
.select(s"$t1.id", s"$t2.id", "data", "index", s"$t1.index", s"$t2.index")
2607-
2608-
Seq(sqlQuery, dfQuery).foreach { query =>
2609-
checkAnswer(query,
2610-
Seq(
2611-
Row(1, 1, "a", -1, 0, -1),
2612-
Row(2, 2, "b", -2, 0, -2),
2613-
Row(3, 3, "c", -3, 0, -3)
2614-
)
2615-
)
2616-
}
2617-
}
2618-
}
2619-
}
2620-
26212527
test("SPARK-33505: insert into partitioned table") {
26222528
val t = "testpart.ns1.ns2.tbl"
26232529
withTable(t) {
@@ -2702,27 +2608,6 @@ class DataSourceV2SQLSuite
27022608
}
27032609
}
27042610

2705-
test("SPARK-34555: Resolve DataFrame metadata column") {
2706-
val tbl = s"${catalogAndNamespace}table"
2707-
withTable(tbl) {
2708-
sql(s"CREATE TABLE $tbl (id bigint, data string) USING $v2Format " +
2709-
"PARTITIONED BY (bucket(4, id), id)")
2710-
sql(s"INSERT INTO $tbl VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2711-
val table = spark.table(tbl)
2712-
val dfQuery = table.select(
2713-
table.col("id"),
2714-
table.col("data"),
2715-
table.col("index"),
2716-
table.col("_partition")
2717-
)
2718-
2719-
checkAnswer(
2720-
dfQuery,
2721-
Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))
2722-
)
2723-
}
2724-
}
2725-
27262611
test("SPARK-34561: drop/add columns to a dataset of `DESCRIBE TABLE`") {
27272612
val tbl = s"${catalogAndNamespace}tbl"
27282613
withTable(tbl) {
@@ -2785,109 +2670,6 @@ class DataSourceV2SQLSuite
27852670
}
27862671
}
27872672

2788-
test("SPARK-34923: do not propagate metadata columns through Project") {
2789-
val t1 = s"${catalogAndNamespace}table"
2790-
withTable(t1) {
2791-
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2792-
"PARTITIONED BY (bucket(4, id), id)")
2793-
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2794-
2795-
assertThrows[AnalysisException] {
2796-
sql(s"SELECT index, _partition from (SELECT id, data FROM $t1)")
2797-
}
2798-
assertThrows[AnalysisException] {
2799-
spark.table(t1).select("id", "data").select("index", "_partition")
2800-
}
2801-
}
2802-
}
2803-
2804-
test("SPARK-34923: do not propagate metadata columns through View") {
2805-
val t1 = s"${catalogAndNamespace}table"
2806-
val view = "view"
2807-
2808-
withTable(t1) {
2809-
withTempView(view) {
2810-
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2811-
"PARTITIONED BY (bucket(4, id), id)")
2812-
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2813-
sql(s"CACHE TABLE $view AS SELECT * FROM $t1")
2814-
assertThrows[AnalysisException] {
2815-
sql(s"SELECT index, _partition FROM $view")
2816-
}
2817-
}
2818-
}
2819-
}
2820-
2821-
test("SPARK-34923: propagate metadata columns through Filter") {
2822-
val t1 = s"${catalogAndNamespace}table"
2823-
withTable(t1) {
2824-
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2825-
"PARTITIONED BY (bucket(4, id), id)")
2826-
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2827-
2828-
val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 WHERE id > 1")
2829-
val dfQuery = spark.table(t1).where("id > 1").select("id", "data", "index", "_partition")
2830-
2831-
Seq(sqlQuery, dfQuery).foreach { query =>
2832-
checkAnswer(query, Seq(Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
2833-
}
2834-
}
2835-
}
2836-
2837-
test("SPARK-34923: propagate metadata columns through Sort") {
2838-
val t1 = s"${catalogAndNamespace}table"
2839-
withTable(t1) {
2840-
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2841-
"PARTITIONED BY (bucket(4, id), id)")
2842-
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2843-
2844-
val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 ORDER BY id")
2845-
val dfQuery = spark.table(t1).orderBy("id").select("id", "data", "index", "_partition")
2846-
2847-
Seq(sqlQuery, dfQuery).foreach { query =>
2848-
checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
2849-
}
2850-
}
2851-
}
2852-
2853-
test("SPARK-34923: propagate metadata columns through RepartitionBy") {
2854-
val t1 = s"${catalogAndNamespace}table"
2855-
withTable(t1) {
2856-
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2857-
"PARTITIONED BY (bucket(4, id), id)")
2858-
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2859-
2860-
val sqlQuery = spark.sql(
2861-
s"SELECT /*+ REPARTITION_BY_RANGE(3, id) */ id, data, index, _partition FROM $t1")
2862-
val tbl = spark.table(t1)
2863-
val dfQuery = tbl.repartitionByRange(3, tbl.col("id"))
2864-
.select("id", "data", "index", "_partition")
2865-
2866-
Seq(sqlQuery, dfQuery).foreach { query =>
2867-
checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
2868-
}
2869-
}
2870-
}
2871-
2872-
test("SPARK-34923: propagate metadata columns through SubqueryAlias") {
2873-
val t1 = s"${catalogAndNamespace}table"
2874-
val sbq = "sbq"
2875-
withTable(t1) {
2876-
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
2877-
"PARTITIONED BY (bucket(4, id), id)")
2878-
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
2879-
2880-
val sqlQuery = spark.sql(
2881-
s"SELECT $sbq.id, $sbq.data, $sbq.index, $sbq._partition FROM $t1 as $sbq")
2882-
val dfQuery = spark.table(t1).as(sbq).select(
2883-
s"$sbq.id", s"$sbq.data", s"$sbq.index", s"$sbq._partition")
2884-
2885-
Seq(sqlQuery, dfQuery).foreach { query =>
2886-
checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3")))
2887-
}
2888-
}
2889-
}
2890-
28912673
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
28922674
val e = intercept[AnalysisException] {
28932675
sql(s"$sqlCommand $sqlParams")

0 commit comments

Comments
 (0)