Skip to content

Commit 6b3ab82

Browse files
committed
[SPARK-36020][SQL] Check logical link in remove redundant projects
### What changes were proposed in this pull request? The `RemoveRedundantProjects` feature can conflict with the AQE broadcast threshold ([PR](#32391)) sometimes. After removing the project, the physical plan to logical plan link can be changed and we may have a `Project` above `LogicalQueryStage`. This breaks AQE broadcast threshold, because the stats of `Project` does not have the `isRuntime = true` flag, and thus still use the normal broadcast threshold. This PR updates `RemoveRedundantProjects` to not remove `ProjectExec` that has a different logical plan link than its child. ### Why are the changes needed? Make AQE broadcast threshold work in more cases. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #33222 from cloud-fan/aqe2. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 2fff060 commit 6b3ab82

File tree

8 files changed

+208
-153
lines changed

8 files changed

+208
-153
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,8 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
4848
private def removeProject(plan: SparkPlan, requireOrdering: Boolean): SparkPlan = {
4949
plan match {
5050
case p @ ProjectExec(_, child) =>
51-
if (isRedundant(p, child, requireOrdering)) {
52-
val newPlan = removeProject(child, requireOrdering)
53-
newPlan.setLogicalLink(child.logicalLink.get)
54-
newPlan
51+
if (isRedundant(p, child, requireOrdering) && canRemove(p, child)) {
52+
removeProject(child, requireOrdering)
5553
} else {
5654
p.mapChildren(removeProject(_, false))
5755
}
@@ -110,4 +108,11 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
110108
}
111109
}
112110
}
111+
112+
// SPARK-36020: Currently a project can only be removed if (1) its logical link is empty or (2)
113+
// its logical link is the same as the child's logical link. This is to ensure the physical
114+
// plan node can correctly map to its logical plan node in AQE.
115+
private def canRemove(project: ProjectExec, child: SparkPlan): Boolean = {
116+
project.logicalLink.isEmpty || project.logicalLink.exists(child.logicalLink.contains)
117+
}
113118
}

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22.sf100/explain.txt

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,29 @@
11
== Physical Plan ==
2-
TakeOrderedAndProject (24)
3-
+- * HashAggregate (23)
4-
+- Exchange (22)
5-
+- * HashAggregate (21)
6-
+- * Expand (20)
7-
+- * BroadcastNestedLoopJoin Inner BuildRight (19)
8-
:- * Project (15)
9-
: +- * SortMergeJoin Inner (14)
10-
: :- * Sort (8)
11-
: : +- Exchange (7)
12-
: : +- * Project (6)
13-
: : +- * BroadcastHashJoin Inner BuildRight (5)
14-
: : :- * Filter (3)
15-
: : : +- * ColumnarToRow (2)
16-
: : : +- Scan parquet default.inventory (1)
17-
: : +- ReusedExchange (4)
18-
: +- * Sort (13)
19-
: +- Exchange (12)
20-
: +- * Filter (11)
21-
: +- * ColumnarToRow (10)
22-
: +- Scan parquet default.item (9)
23-
+- BroadcastExchange (18)
24-
+- * ColumnarToRow (17)
25-
+- Scan parquet default.warehouse (16)
2+
TakeOrderedAndProject (25)
3+
+- * HashAggregate (24)
4+
+- Exchange (23)
5+
+- * HashAggregate (22)
6+
+- * Expand (21)
7+
+- * Project (20)
8+
+- * BroadcastNestedLoopJoin Inner BuildRight (19)
9+
:- * Project (15)
10+
: +- * SortMergeJoin Inner (14)
11+
: :- * Sort (8)
12+
: : +- Exchange (7)
13+
: : +- * Project (6)
14+
: : +- * BroadcastHashJoin Inner BuildRight (5)
15+
: : :- * Filter (3)
16+
: : : +- * ColumnarToRow (2)
17+
: : : +- Scan parquet default.inventory (1)
18+
: : +- ReusedExchange (4)
19+
: +- * Sort (13)
20+
: +- Exchange (12)
21+
: +- * Filter (11)
22+
: +- * ColumnarToRow (10)
23+
: +- Scan parquet default.item (9)
24+
+- BroadcastExchange (18)
25+
+- * ColumnarToRow (17)
26+
+- Scan parquet default.warehouse (16)
2627

2728

2829
(1) Scan parquet default.inventory
@@ -40,7 +41,7 @@ Input [3]: [inv_item_sk#1, inv_quantity_on_hand#2, inv_date_sk#3]
4041
Input [3]: [inv_item_sk#1, inv_quantity_on_hand#2, inv_date_sk#3]
4142
Condition : isnotnull(inv_item_sk#1)
4243

43-
(4) ReusedExchange [Reuses operator id: 29]
44+
(4) ReusedExchange [Reuses operator id: 30]
4445
Output [1]: [d_date_sk#5]
4546

4647
(5) BroadcastHashJoin [codegen id : 2]
@@ -107,61 +108,65 @@ Arguments: IdentityBroadcastMode, [id=#13]
107108
(19) BroadcastNestedLoopJoin [codegen id : 7]
108109
Join condition: None
109110

110-
(20) Expand [codegen id : 7]
111+
(20) Project [codegen id : 7]
112+
Output [5]: [inv_quantity_on_hand#2, i_product_name#11, i_brand#8, i_class#9, i_category#10]
111113
Input [5]: [inv_quantity_on_hand#2, i_brand#8, i_class#9, i_category#10, i_product_name#11]
114+
115+
(21) Expand [codegen id : 7]
116+
Input [5]: [inv_quantity_on_hand#2, i_product_name#11, i_brand#8, i_class#9, i_category#10]
112117
Arguments: [[inv_quantity_on_hand#2, i_product_name#11, i_brand#8, i_class#9, i_category#10, 0], [inv_quantity_on_hand#2, i_product_name#11, i_brand#8, i_class#9, null, 1], [inv_quantity_on_hand#2, i_product_name#11, i_brand#8, null, null, 3], [inv_quantity_on_hand#2, i_product_name#11, null, null, null, 7], [inv_quantity_on_hand#2, null, null, null, null, 15]], [inv_quantity_on_hand#2, i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
113118

114-
(21) HashAggregate [codegen id : 7]
119+
(22) HashAggregate [codegen id : 7]
115120
Input [6]: [inv_quantity_on_hand#2, i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
116121
Keys [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
117122
Functions [1]: [partial_avg(inv_quantity_on_hand#2)]
118123
Aggregate Attributes [2]: [sum#19, count#20]
119124
Results [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22]
120125

121-
(22) Exchange
126+
(23) Exchange
122127
Input [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22]
123128
Arguments: hashpartitioning(i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, 5), ENSURE_REQUIREMENTS, [id=#23]
124129

125-
(23) HashAggregate [codegen id : 8]
130+
(24) HashAggregate [codegen id : 8]
126131
Input [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22]
127132
Keys [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
128133
Functions [1]: [avg(inv_quantity_on_hand#2)]
129134
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#2)#24]
130135
Results [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, avg(inv_quantity_on_hand#2)#24 AS qoh#25]
131136

132-
(24) TakeOrderedAndProject
137+
(25) TakeOrderedAndProject
133138
Input [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, qoh#25]
134139
Arguments: 100, [qoh#25 ASC NULLS FIRST, i_product_name#14 ASC NULLS FIRST, i_brand#15 ASC NULLS FIRST, i_class#16 ASC NULLS FIRST, i_category#17 ASC NULLS FIRST], [i_product_name#14, i_brand#15, i_class#16, i_category#17, qoh#25]
135140

136141
===== Subqueries =====
137142

138143
Subquery:1 Hosting operator id = 1 Hosting Expression = inv_date_sk#3 IN dynamicpruning#4
139-
BroadcastExchange (29)
140-
+- * Project (28)
141-
+- * Filter (27)
142-
+- * ColumnarToRow (26)
143-
+- Scan parquet default.date_dim (25)
144+
BroadcastExchange (30)
145+
+- * Project (29)
146+
+- * Filter (28)
147+
+- * ColumnarToRow (27)
148+
+- Scan parquet default.date_dim (26)
144149

145150

146-
(25) Scan parquet default.date_dim
151+
(26) Scan parquet default.date_dim
147152
Output [2]: [d_date_sk#5, d_month_seq#26]
148153
Batched: true
149154
Location [not included in comparison]/{warehouse_dir}/date_dim]
150155
PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1200), LessThanOrEqual(d_month_seq,1211), IsNotNull(d_date_sk)]
151156
ReadSchema: struct<d_date_sk:int,d_month_seq:int>
152157

153-
(26) ColumnarToRow [codegen id : 1]
158+
(27) ColumnarToRow [codegen id : 1]
154159
Input [2]: [d_date_sk#5, d_month_seq#26]
155160

156-
(27) Filter [codegen id : 1]
161+
(28) Filter [codegen id : 1]
157162
Input [2]: [d_date_sk#5, d_month_seq#26]
158163
Condition : (((isnotnull(d_month_seq#26) AND (d_month_seq#26 >= 1200)) AND (d_month_seq#26 <= 1211)) AND isnotnull(d_date_sk#5))
159164

160-
(28) Project [codegen id : 1]
165+
(29) Project [codegen id : 1]
161166
Output [1]: [d_date_sk#5]
162167
Input [2]: [d_date_sk#5, d_month_seq#26]
163168

164-
(29) BroadcastExchange
169+
(30) BroadcastExchange
165170
Input [1]: [d_date_sk#5]
166171
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#27]
167172

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22.sf100/simplified.txt

Lines changed: 41 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,44 +6,45 @@ TakeOrderedAndProject [qoh,i_product_name,i_brand,i_class,i_category]
66
WholeStageCodegen (7)
77
HashAggregate [i_product_name,i_brand,i_class,i_category,spark_grouping_id,inv_quantity_on_hand] [sum,count,sum,count]
88
Expand [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category]
9-
BroadcastNestedLoopJoin
10-
Project [inv_quantity_on_hand,i_brand,i_class,i_category,i_product_name]
11-
SortMergeJoin [inv_item_sk,i_item_sk]
12-
InputAdapter
13-
WholeStageCodegen (3)
14-
Sort [inv_item_sk]
9+
Project [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category]
10+
BroadcastNestedLoopJoin
11+
Project [inv_quantity_on_hand,i_brand,i_class,i_category,i_product_name]
12+
SortMergeJoin [inv_item_sk,i_item_sk]
13+
InputAdapter
14+
WholeStageCodegen (3)
15+
Sort [inv_item_sk]
16+
InputAdapter
17+
Exchange [inv_item_sk] #2
18+
WholeStageCodegen (2)
19+
Project [inv_item_sk,inv_quantity_on_hand]
20+
BroadcastHashJoin [inv_date_sk,d_date_sk]
21+
Filter [inv_item_sk]
22+
ColumnarToRow
23+
InputAdapter
24+
Scan parquet default.inventory [inv_item_sk,inv_quantity_on_hand,inv_date_sk]
25+
SubqueryBroadcast [d_date_sk] #1
26+
BroadcastExchange #3
27+
WholeStageCodegen (1)
28+
Project [d_date_sk]
29+
Filter [d_month_seq,d_date_sk]
30+
ColumnarToRow
31+
InputAdapter
32+
Scan parquet default.date_dim [d_date_sk,d_month_seq]
33+
InputAdapter
34+
ReusedExchange [d_date_sk] #3
35+
InputAdapter
36+
WholeStageCodegen (5)
37+
Sort [i_item_sk]
38+
InputAdapter
39+
Exchange [i_item_sk] #4
40+
WholeStageCodegen (4)
41+
Filter [i_item_sk]
42+
ColumnarToRow
43+
InputAdapter
44+
Scan parquet default.item [i_item_sk,i_brand,i_class,i_category,i_product_name]
45+
InputAdapter
46+
BroadcastExchange #5
47+
WholeStageCodegen (6)
48+
ColumnarToRow
1549
InputAdapter
16-
Exchange [inv_item_sk] #2
17-
WholeStageCodegen (2)
18-
Project [inv_item_sk,inv_quantity_on_hand]
19-
BroadcastHashJoin [inv_date_sk,d_date_sk]
20-
Filter [inv_item_sk]
21-
ColumnarToRow
22-
InputAdapter
23-
Scan parquet default.inventory [inv_item_sk,inv_quantity_on_hand,inv_date_sk]
24-
SubqueryBroadcast [d_date_sk] #1
25-
BroadcastExchange #3
26-
WholeStageCodegen (1)
27-
Project [d_date_sk]
28-
Filter [d_month_seq,d_date_sk]
29-
ColumnarToRow
30-
InputAdapter
31-
Scan parquet default.date_dim [d_date_sk,d_month_seq]
32-
InputAdapter
33-
ReusedExchange [d_date_sk] #3
34-
InputAdapter
35-
WholeStageCodegen (5)
36-
Sort [i_item_sk]
37-
InputAdapter
38-
Exchange [i_item_sk] #4
39-
WholeStageCodegen (4)
40-
Filter [i_item_sk]
41-
ColumnarToRow
42-
InputAdapter
43-
Scan parquet default.item [i_item_sk,i_brand,i_class,i_category,i_product_name]
44-
InputAdapter
45-
BroadcastExchange #5
46-
WholeStageCodegen (6)
47-
ColumnarToRow
48-
InputAdapter
49-
Scan parquet default.warehouse
50+
Scan parquet default.warehouse

sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22/explain.txt

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
11
== Physical Plan ==
2-
TakeOrderedAndProject (21)
3-
+- * HashAggregate (20)
4-
+- Exchange (19)
5-
+- * HashAggregate (18)
6-
+- * Expand (17)
7-
+- * BroadcastNestedLoopJoin Inner BuildRight (16)
8-
:- * Project (12)
9-
: +- * BroadcastHashJoin Inner BuildRight (11)
10-
: :- * Project (6)
11-
: : +- * BroadcastHashJoin Inner BuildRight (5)
12-
: : :- * Filter (3)
13-
: : : +- * ColumnarToRow (2)
14-
: : : +- Scan parquet default.inventory (1)
15-
: : +- ReusedExchange (4)
16-
: +- BroadcastExchange (10)
17-
: +- * Filter (9)
18-
: +- * ColumnarToRow (8)
19-
: +- Scan parquet default.item (7)
20-
+- BroadcastExchange (15)
21-
+- * ColumnarToRow (14)
22-
+- Scan parquet default.warehouse (13)
2+
TakeOrderedAndProject (22)
3+
+- * HashAggregate (21)
4+
+- Exchange (20)
5+
+- * HashAggregate (19)
6+
+- * Expand (18)
7+
+- * Project (17)
8+
+- * BroadcastNestedLoopJoin Inner BuildRight (16)
9+
:- * Project (12)
10+
: +- * BroadcastHashJoin Inner BuildRight (11)
11+
: :- * Project (6)
12+
: : +- * BroadcastHashJoin Inner BuildRight (5)
13+
: : :- * Filter (3)
14+
: : : +- * ColumnarToRow (2)
15+
: : : +- Scan parquet default.inventory (1)
16+
: : +- ReusedExchange (4)
17+
: +- BroadcastExchange (10)
18+
: +- * Filter (9)
19+
: +- * ColumnarToRow (8)
20+
: +- Scan parquet default.item (7)
21+
+- BroadcastExchange (15)
22+
+- * ColumnarToRow (14)
23+
+- Scan parquet default.warehouse (13)
2324

2425

2526
(1) Scan parquet default.inventory
@@ -37,7 +38,7 @@ Input [3]: [inv_item_sk#1, inv_quantity_on_hand#2, inv_date_sk#3]
3738
Input [3]: [inv_item_sk#1, inv_quantity_on_hand#2, inv_date_sk#3]
3839
Condition : isnotnull(inv_item_sk#1)
3940

40-
(4) ReusedExchange [Reuses operator id: 26]
41+
(4) ReusedExchange [Reuses operator id: 27]
4142
Output [1]: [d_date_sk#5]
4243

4344
(5) BroadcastHashJoin [codegen id : 4]
@@ -92,61 +93,65 @@ Arguments: IdentityBroadcastMode, [id=#12]
9293
(16) BroadcastNestedLoopJoin [codegen id : 4]
9394
Join condition: None
9495

95-
(17) Expand [codegen id : 4]
96+
(17) Project [codegen id : 4]
97+
Output [5]: [inv_quantity_on_hand#2, i_product_name#10, i_brand#7, i_class#8, i_category#9]
9698
Input [5]: [inv_quantity_on_hand#2, i_brand#7, i_class#8, i_category#9, i_product_name#10]
99+
100+
(18) Expand [codegen id : 4]
101+
Input [5]: [inv_quantity_on_hand#2, i_product_name#10, i_brand#7, i_class#8, i_category#9]
97102
Arguments: [[inv_quantity_on_hand#2, i_product_name#10, i_brand#7, i_class#8, i_category#9, 0], [inv_quantity_on_hand#2, i_product_name#10, i_brand#7, i_class#8, null, 1], [inv_quantity_on_hand#2, i_product_name#10, i_brand#7, null, null, 3], [inv_quantity_on_hand#2, i_product_name#10, null, null, null, 7], [inv_quantity_on_hand#2, null, null, null, null, 15]], [inv_quantity_on_hand#2, i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17]
98103

99-
(18) HashAggregate [codegen id : 4]
104+
(19) HashAggregate [codegen id : 4]
100105
Input [6]: [inv_quantity_on_hand#2, i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17]
101106
Keys [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17]
102107
Functions [1]: [partial_avg(inv_quantity_on_hand#2)]
103108
Aggregate Attributes [2]: [sum#18, count#19]
104109
Results [7]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, sum#20, count#21]
105110

106-
(19) Exchange
111+
(20) Exchange
107112
Input [7]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, sum#20, count#21]
108113
Arguments: hashpartitioning(i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, 5), ENSURE_REQUIREMENTS, [id=#22]
109114

110-
(20) HashAggregate [codegen id : 5]
115+
(21) HashAggregate [codegen id : 5]
111116
Input [7]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17, sum#20, count#21]
112117
Keys [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, spark_grouping_id#17]
113118
Functions [1]: [avg(inv_quantity_on_hand#2)]
114119
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#2)#23]
115120
Results [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, avg(inv_quantity_on_hand#2)#23 AS qoh#24]
116121

117-
(21) TakeOrderedAndProject
122+
(22) TakeOrderedAndProject
118123
Input [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, qoh#24]
119124
Arguments: 100, [qoh#24 ASC NULLS FIRST, i_product_name#13 ASC NULLS FIRST, i_brand#14 ASC NULLS FIRST, i_class#15 ASC NULLS FIRST, i_category#16 ASC NULLS FIRST], [i_product_name#13, i_brand#14, i_class#15, i_category#16, qoh#24]
120125

121126
===== Subqueries =====
122127

123128
Subquery:1 Hosting operator id = 1 Hosting Expression = inv_date_sk#3 IN dynamicpruning#4
124-
BroadcastExchange (26)
125-
+- * Project (25)
126-
+- * Filter (24)
127-
+- * ColumnarToRow (23)
128-
+- Scan parquet default.date_dim (22)
129+
BroadcastExchange (27)
130+
+- * Project (26)
131+
+- * Filter (25)
132+
+- * ColumnarToRow (24)
133+
+- Scan parquet default.date_dim (23)
129134

130135

131-
(22) Scan parquet default.date_dim
136+
(23) Scan parquet default.date_dim
132137
Output [2]: [d_date_sk#5, d_month_seq#25]
133138
Batched: true
134139
Location [not included in comparison]/{warehouse_dir}/date_dim]
135140
PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1200), LessThanOrEqual(d_month_seq,1211), IsNotNull(d_date_sk)]
136141
ReadSchema: struct<d_date_sk:int,d_month_seq:int>
137142

138-
(23) ColumnarToRow [codegen id : 1]
143+
(24) ColumnarToRow [codegen id : 1]
139144
Input [2]: [d_date_sk#5, d_month_seq#25]
140145

141-
(24) Filter [codegen id : 1]
146+
(25) Filter [codegen id : 1]
142147
Input [2]: [d_date_sk#5, d_month_seq#25]
143148
Condition : (((isnotnull(d_month_seq#25) AND (d_month_seq#25 >= 1200)) AND (d_month_seq#25 <= 1211)) AND isnotnull(d_date_sk#5))
144149

145-
(25) Project [codegen id : 1]
150+
(26) Project [codegen id : 1]
146151
Output [1]: [d_date_sk#5]
147152
Input [2]: [d_date_sk#5, d_month_seq#25]
148153

149-
(26) BroadcastExchange
154+
(27) BroadcastExchange
150155
Input [1]: [d_date_sk#5]
151156
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#26]
152157

0 commit comments

Comments
 (0)