Skip to content

Commit d9fd36e

Browse files
amaliujiacloud-fan
authored andcommitted
[SPARK-39144][SQL] Nested subquery expressions deduplicate relations should be done bottom up
### What changes were proposed in this pull request? When we have nested subquery expressions, there is a chance that deduplicate relations could replace an attributes with a wrong one. This is because the attributes replacement is done by top down than bottom up. This could happen if the subplan gets deduplicate relations first (thus two same relation with different attributes id), then a more complex plan built on top of the subplan (e.g. a UNION of queries with nested subquery expressions) can trigger this wrong attribute replacement error. For concrete example please see the added unit test. ### Why are the changes needed? This is bug that we can fix. Without this PR, we could hit that outer attribute reference does not exist in the outer relation at certain scenario. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes apache#36503 from amaliujia/testnestedsubqueryexpression. Authored-by: Rui Wang <rui.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent e7b8be3 commit d9fd36e

File tree

3 files changed

+69
-13
lines changed

3 files changed

+69
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,9 +745,27 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
745745
expressions.exists(_.exists(_.semanticEquals(expr)))
746746
}
747747

748+
def checkOuterReference(p: LogicalPlan, expr: SubqueryExpression): Unit = p match {
749+
case f: Filter =>
750+
if (hasOuterReferences(expr.plan)) {
751+
expr.plan.expressions.foreach(_.foreachUp {
752+
case o: OuterReference =>
753+
p.children.foreach(e =>
754+
if (!e.output.exists(_.exprId == o.exprId)) {
755+
failAnalysis("outer attribute not found")
756+
})
757+
case _ =>
758+
})
759+
}
760+
case _ =>
761+
}
762+
748763
// Validate the subquery plan.
749764
checkAnalysis(expr.plan)
750765

766+
// Check if there is outer attribute that cannot be found from the plan.
767+
checkOuterReference(plan, expr)
768+
751769
expr match {
752770
case ScalarSubquery(query, outerAttrs, _, _) =>
753771
// Scalar subquery must return one column as output.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,18 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
125125
}
126126
}
127127

128+
val planWithNewSubquery = plan.transformExpressions {
129+
case subquery: SubqueryExpression =>
130+
val (renewed, collected, changed) = renewDuplicatedRelations(
131+
existingRelations ++ relations, subquery.plan)
132+
relations ++= collected
133+
if (changed) planChanged = true
134+
subquery.withNewPlan(renewed)
135+
}
136+
128137
if (planChanged) {
129-
if (plan.childrenResolved) {
130-
val planWithNewChildren = plan.withNewChildren(newChildren.toSeq)
138+
if (planWithNewSubquery.childrenResolved) {
139+
val planWithNewChildren = planWithNewSubquery.withNewChildren(newChildren.toSeq)
131140
val attrMap = AttributeMap(
132141
plan
133142
.children
@@ -140,24 +149,15 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
140149
planWithNewChildren.rewriteAttrs(attrMap)
141150
}
142151
} else {
143-
plan.withNewChildren(newChildren.toSeq)
152+
planWithNewSubquery.withNewChildren(newChildren.toSeq)
144153
}
145154
} else {
146155
plan
147156
}
148157
} else {
149158
plan
150159
}
151-
152-
val planWithNewSubquery = newPlan.transformExpressions {
153-
case subquery: SubqueryExpression =>
154-
val (renewed, collected, changed) = renewDuplicatedRelations(
155-
existingRelations ++ relations, subquery.plan)
156-
relations ++= collected
157-
if (changed) planChanged = true
158-
subquery.withNewPlan(renewed)
159-
}
160-
(planWithNewSubquery, relations, planChanged)
160+
(newPlan, relations, planChanged)
161161
}
162162

163163
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,4 +1176,42 @@ class AnalysisSuite extends AnalysisTest with Matchers {
11761176
false)
11771177
}
11781178
}
1179+
1180+
test("SPARK-39144: nested subquery expressions deduplicate relations should be done bottom up") {
1181+
val innerRelation = SubqueryAlias("src1", testRelation)
1182+
val outerRelation = SubqueryAlias("src2", testRelation)
1183+
val ref1 = testRelation.output.head
1184+
1185+
val subPlan = getAnalyzer.execute(
1186+
Project(
1187+
Seq(UnresolvedStar(None)),
1188+
Filter.apply(
1189+
Exists(
1190+
Filter.apply(
1191+
EqualTo(
1192+
OuterReference(ref1),
1193+
ref1),
1194+
innerRelation
1195+
)
1196+
),
1197+
outerRelation
1198+
)))
1199+
1200+
val finalPlan = {
1201+
Union.apply(
1202+
Project(
1203+
Seq(UnresolvedStar(None)),
1204+
subPlan
1205+
),
1206+
Filter.apply(
1207+
Exists(
1208+
subPlan
1209+
),
1210+
subPlan
1211+
)
1212+
)
1213+
}
1214+
1215+
assertAnalysisSuccess(finalPlan)
1216+
}
11791217
}

0 commit comments

Comments
 (0)