Skip to content

Commit f7ac2d6

Browse files
committed
[SPARK-34474][SQL] Remove unnecessary Union under Distinct/Deduplicate
### What changes were proposed in this pull request? This patch proposes to let optimizer to remove unnecessary `Union` under `Distinct`/`Deduplicate`. ### Why are the changes needed? For an `Union` under `Distinct`/`Deduplicate`, if its children are all the same, we can just keep one among them and remove the `Union`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests. Closes #31595 from viirya/remove-union. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent 4a3200b commit f7ac2d6

File tree

7 files changed

+259
-94
lines changed

7 files changed

+259
-94
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
157157
// since the other rules might make two separate Unions operators adjacent.
158158
Batch("Union", Once,
159159
RemoveNoopOperators,
160+
RemoveNoopUnion,
160161
CombineUnions) ::
161162
Batch("OptimizeLimitZero", Once,
162163
OptimizeLimitZero) ::
@@ -501,6 +502,48 @@ object RemoveNoopOperators extends Rule[LogicalPlan] {
501502
}
502503
}
503504

505+
/**
506+
* Remove no-op `Union` from the query plan that do not make any modifications.
507+
*/
508+
object RemoveNoopUnion extends Rule[LogicalPlan] {
509+
/**
510+
* This only removes the `Project` that has only attributes or aliased attributes
511+
* from its child.
512+
*/
513+
private def removeAliasOnlyProject(plan: LogicalPlan): LogicalPlan = plan match {
514+
case p @ Project(projectList, child) =>
515+
val aliasOnly = projectList.length == child.output.length &&
516+
projectList.zip(child.output).forall {
517+
case (Alias(left: Attribute, _), right) => left.semanticEquals(right)
518+
case (left: Attribute, right) => left.semanticEquals(right)
519+
case _ => false
520+
}
521+
if (aliasOnly) {
522+
child
523+
} else {
524+
p
525+
}
526+
case _ => plan
527+
}
528+
529+
private def removeUnion(u: Union): Option[LogicalPlan] = {
530+
val unionChildren = u.children.map(removeAliasOnlyProject)
531+
if (unionChildren.tail.forall(unionChildren.head.sameResult(_))) {
532+
Some(u.children.head)
533+
} else {
534+
None
535+
}
536+
}
537+
538+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
539+
case d @ Distinct(u: Union) =>
540+
removeUnion(u).map(c => d.withNewChildren(Seq(c))).getOrElse(d)
541+
542+
case d @ Deduplicate(_, u: Union) =>
543+
removeUnion(u).map(c => d.withNewChildren(Seq(c))).getOrElse(d)
544+
}
545+
}
546+
504547
/**
505548
* Pushes down [[LocalLimit]] beneath UNION ALL and joins.
506549
*/
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.dsl.expressions._
21+
import org.apache.spark.sql.catalyst.plans.PlanTest
22+
import org.apache.spark.sql.catalyst.plans.logical._
23+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
24+
25+
class RemoveNoopUnionSuite extends PlanTest {
26+
27+
object Optimize extends RuleExecutor[LogicalPlan] {
28+
val batches =
29+
Batch("CollapseProject", Once,
30+
CollapseProject) ::
31+
Batch("RemoveNoopUnion", Once,
32+
RemoveNoopUnion) :: Nil
33+
}
34+
35+
val testRelation = LocalRelation('a.int, 'b.int)
36+
37+
test("SPARK-34474: Remove redundant Union under Distinct") {
38+
val union = Union(testRelation :: testRelation :: Nil)
39+
val distinct = Distinct(union)
40+
val optimized = Optimize.execute(distinct)
41+
comparePlans(optimized, Distinct(testRelation))
42+
}
43+
44+
test("SPARK-34474: Remove redundant Union under Deduplicate") {
45+
val union = Union(testRelation :: testRelation :: Nil)
46+
val deduplicate = Deduplicate(testRelation.output, union)
47+
val optimized = Optimize.execute(deduplicate)
48+
comparePlans(optimized, Deduplicate(testRelation.output, testRelation))
49+
}
50+
51+
test("SPARK-34474: Do not remove necessary Project 1") {
52+
val child1 = Project(Seq(testRelation.output(0), testRelation.output(1),
53+
(testRelation.output(0) + 1).as("expr")), testRelation)
54+
val child2 = Project(Seq(testRelation.output(0), testRelation.output(1),
55+
(testRelation.output(0) + 2).as("expr")), testRelation)
56+
val union = Union(child1 :: child2 :: Nil)
57+
val distinct = Distinct(union)
58+
val optimized = Optimize.execute(distinct)
59+
comparePlans(optimized, distinct)
60+
}
61+
62+
test("SPARK-34474: Do not remove necessary Project 2") {
63+
val child1 = Project(Seq(testRelation.output(0), testRelation.output(1)), testRelation)
64+
val child2 = Project(Seq(testRelation.output(1), testRelation.output(0)), testRelation)
65+
val union = Union(child1 :: child2 :: Nil)
66+
val distinct = Distinct(union)
67+
val optimized = Optimize.execute(distinct)
68+
comparePlans(optimized, distinct)
69+
}
70+
}

sql/core/src/test/resources/sql-tests/inputs/explain.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ EXPLAIN FORMATTED
3434
EXPLAIN FORMATTED
3535
SELECT key, val FROM explain_temp1 WHERE key > 0
3636
UNION
37-
SELECT key, val FROM explain_temp1 WHERE key > 0;
37+
SELECT key, val FROM explain_temp1 WHERE key > 1;
3838

3939
-- Join
4040
EXPLAIN FORMATTED

sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ Arguments: isFinalPlan=false
205205
EXPLAIN FORMATTED
206206
SELECT key, val FROM explain_temp1 WHERE key > 0
207207
UNION
208-
SELECT key, val FROM explain_temp1 WHERE key > 0
208+
SELECT key, val FROM explain_temp1 WHERE key > 1
209209
-- !query schema
210210
struct<plan:string>
211211
-- !query output
@@ -236,12 +236,12 @@ Condition : (isnotnull(key#x) AND (key#x > 0))
236236
Output [2]: [key#x, val#x]
237237
Batched: true
238238
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
239-
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
239+
PushedFilters: [IsNotNull(key), GreaterThan(key,1)]
240240
ReadSchema: struct<key:int,val:int>
241241

242242
(4) Filter
243243
Input [2]: [key#x, val#x]
244-
Condition : (isnotnull(key#x) AND (key#x > 0))
244+
Condition : (isnotnull(key#x) AND (key#x > 1))
245245

246246
(5) Union
247247

sql/core/src/test/resources/sql-tests/results/explain.sql.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ Input [3]: [key#x, max(val)#x, max(val#x)#x]
203203
EXPLAIN FORMATTED
204204
SELECT key, val FROM explain_temp1 WHERE key > 0
205205
UNION
206-
SELECT key, val FROM explain_temp1 WHERE key > 0
206+
SELECT key, val FROM explain_temp1 WHERE key > 1
207207
-- !query schema
208208
struct<plan:string>
209209
-- !query output
@@ -238,15 +238,15 @@ Condition : (isnotnull(key#x) AND (key#x > 0))
238238
Output [2]: [key#x, val#x]
239239
Batched: true
240240
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
241-
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
241+
PushedFilters: [IsNotNull(key), GreaterThan(key,1)]
242242
ReadSchema: struct<key:int,val:int>
243243

244244
(5) ColumnarToRow [codegen id : 2]
245245
Input [2]: [key#x, val#x]
246246

247247
(6) Filter [codegen id : 2]
248248
Input [2]: [key#x, val#x]
249-
Condition : (isnotnull(key#x) AND (key#x > 0))
249+
Condition : (isnotnull(key#x) AND (key#x > 1))
250250

251251
(7) Union
252252

0 commit comments

Comments
 (0)