Skip to content

Commit 7b4a9dd

Browse files
committed
fix.
1 parent 0fbecc7 commit 7b4a9dd

File tree

4 files changed

+74
-14
lines changed

4 files changed

+74
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,9 @@ package object dsl {
374374
case plan => SubqueryAlias(alias, plan, None)
375375
}
376376

377+
def coalesce(num: Integer): LogicalPlan =
378+
Repartition(num, shuffle = false, logicalPlan)
379+
377380
def repartition(num: Integer): LogicalPlan =
378381
Repartition(num, shuffle = true, logicalPlan)
379382

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -563,23 +563,27 @@ object CollapseProject extends Rule[LogicalPlan] {
563563
/**
564564
* Combines adjacent [[Repartition]] and [[RepartitionByExpression]] operator combinations
565565
* by keeping only the one.
566-
* 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]].
566+
* 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]] if their shuffle types
567+
* are the same or the parent's shuffle is true.
567568
* 2. For adjacent [[RepartitionByExpression]]s, collapse into the last [[RepartitionByExpression]].
568-
* 3. For a combination of [[Repartition]] and [[RepartitionByExpression]], collapse as a single
569-
* [[RepartitionByExpression]] with the expression and last number of partition.
569+
* 3. When a shuffle-enabled [[Repartition]] is above a [[RepartitionByExpression]], collapse as a
570+
* single [[RepartitionByExpression]] with the expression and the last number of partition.
571+
* 4. When a [[RepartitionByExpression]] is above a [[Repartition]], collapse as a single
572+
* [[RepartitionByExpression]] with the expression and the last number of partition.
570573
*/
571574
object CollapseRepartition extends Rule[LogicalPlan] {
572575
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
573576
// Case 1
574-
case Repartition(numPartitions, shuffle, Repartition(_, _, child)) =>
577+
case Repartition(numPartitions, shuffle, Repartition(_, shuffleChild, child))
578+
if shuffle == shuffleChild || shuffle =>
575579
Repartition(numPartitions, shuffle, child)
576580
// Case 2
577581
case RepartitionByExpression(exprs, RepartitionByExpression(_, child, _), numPartitions) =>
578582
RepartitionByExpression(exprs, child, numPartitions)
579583
// Case 3
580-
case Repartition(numPartitions, _, r: RepartitionByExpression) =>
584+
case Repartition(numPartitions, shuffle, r: RepartitionByExpression) if shuffle =>
581585
r.copy(numPartitions = Some(numPartitions))
582-
// Case 3
586+
// Case 4
583587
case RepartitionByExpression(exprs, Repartition(_, _, child), numPartitions) =>
584588
RepartitionByExpression(exprs, child, numPartitions)
585589
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,18 @@ class CollapseRepartitionSuite extends PlanTest {
3232

3333
val testRelation = LocalRelation('a.int, 'b.int)
3434

35+
36+
test("collapse two adjacent coalesces into one") {
37+
val query = testRelation
38+
.coalesce(10)
39+
.coalesce(20)
40+
41+
val optimized = Optimize.execute(query.analyze)
42+
val correctAnswer = testRelation.coalesce(20).analyze
43+
44+
comparePlans(optimized, correctAnswer)
45+
}
46+
3547
test("collapse two adjacent repartitions into one") {
3648
val query = testRelation
3749
.repartition(10)
@@ -43,15 +55,44 @@ class CollapseRepartitionSuite extends PlanTest {
4355
comparePlans(optimized, correctAnswer)
4456
}
4557

58+
test("collapse one coalesce and one repartition into one") {
59+
val query1 = testRelation
60+
.coalesce(20)
61+
.repartition(5)
62+
63+
val optimized1 = Optimize.execute(query1.analyze)
64+
val correctAnswer1 = testRelation.repartition(5).analyze
65+
66+
comparePlans(optimized1, correctAnswer1)
67+
68+
val query2 = testRelation
69+
.repartition(5)
70+
.coalesce(20)
71+
72+
val optimized2 = Optimize.execute(query2.analyze)
73+
val correctAnswer2 = testRelation.repartition(5).coalesce(20).analyze
74+
75+
comparePlans(optimized2, correctAnswer2)
76+
}
77+
4678
test("collapse repartition and repartitionBy into one") {
47-
val query = testRelation
79+
val query1 = testRelation
4880
.repartition(10)
4981
.distribute('a)(20)
5082

51-
val optimized = Optimize.execute(query.analyze)
52-
val correctAnswer = testRelation.distribute('a)(20).analyze
83+
val optimized1 = Optimize.execute(query1.analyze)
84+
val correctAnswer1 = testRelation.distribute('a)(20).analyze
5385

54-
comparePlans(optimized, correctAnswer)
86+
comparePlans(optimized1, correctAnswer1)
87+
88+
val query2 = testRelation
89+
.coalesce(10)
90+
.distribute('a)(20)
91+
92+
val optimized2 = Optimize.execute(query2.analyze)
93+
val correctAnswer2 = testRelation.distribute('a)(20).analyze
94+
95+
comparePlans(optimized2, correctAnswer2)
5596
}
5697

5798
test("collapse repartitionBy and repartition into one") {
@@ -65,6 +106,17 @@ class CollapseRepartitionSuite extends PlanTest {
65106
comparePlans(optimized, correctAnswer)
66107
}
67108

109+
test("do not collapse coalesce above repartitionBy into one") {
110+
val query = testRelation
111+
.distribute('a)(20)
112+
.coalesce(10)
113+
114+
val optimized = Optimize.execute(query.analyze)
115+
val correctAnswer = testRelation.distribute('a)(20).coalesce(10).analyze
116+
117+
comparePlans(optimized, correctAnswer)
118+
}
119+
68120
test("collapse two adjacent repartitionBys into one") {
69121
val query = testRelation
70122
.distribute('b)(10)

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,11 +242,12 @@ class PlannerSuite extends SharedSQLContext {
242242
val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5)
243243
def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length
244244
assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
245-
assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 1)
245+
assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 2)
246246
doubleRepartitioned.queryExecution.optimizedPlan match {
247-
case r: Repartition =>
248-
assert(r.numPartitions === 5)
249-
assert(r.shuffle === false)
247+
case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _)) =>
248+
assert(numPartitions === 5)
249+
assert(shuffle === false)
250+
assert(shuffleChild === true)
250251
}
251252
}
252253

0 commit comments

Comments
 (0)