Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
temp
  • Loading branch information
gatorsmile committed Feb 19, 2017
commit 7722781c140b29750a4eec6cb833f1893ac59853
Original file line number Diff line number Diff line change
Expand Up @@ -574,18 +574,30 @@ object CollapseProject extends Rule[LogicalPlan] {
object CollapseRepartition extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// Case 1
case Repartition(numPartitions, shuffle, Repartition(_, shuffleChild, child))
if shuffle == shuffleChild || shuffle =>
Repartition(numPartitions, shuffle, child)
case r @ Repartition(numPartitions, shuffle, child @ Repartition(_, _, grandChild)) =>
(shuffle, child.shuffle) match {
case (true, true) | (true, false) | (false, false) =>
Repartition(numPartitions, shuffle, grandChild)
case (false, true) if numPartitions >= child.numPartitions =>
child
case _ =>
r
}
// Case 2
case RepartitionByExpression(exprs, RepartitionByExpression(_, child, _), numPartitions) =>
RepartitionByExpression(exprs, child, numPartitions)
case RepartitionByExpression(exprs, RepartitionByExpression(_, grandChild, _), numPartitions) =>
RepartitionByExpression(exprs, grandChild, numPartitions)
// Case 3
case Repartition(numPartitions, shuffle, r: RepartitionByExpression) if shuffle =>
r.copy(numPartitions = Some(numPartitions))
case r @ Repartition(numPartitions, shuffle, child: RepartitionByExpression) =>
if (shuffle) {
child.copy(numPartitions = Some(numPartitions))
} else if (numPartitions >= child.numPartitions.get) {
r
} else {
r
}
// Case 4
case RepartitionByExpression(exprs, Repartition(_, _, child), numPartitions) =>
RepartitionByExpression(exprs, child, numPartitions)
case RepartitionByExpression(exprs, Repartition(_, _, grandChild), numPartitions) =>
RepartitionByExpression(exprs, grandChild, numPartitions)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,8 @@ case class RepartitionByExpression(
case None => // Ok
}

override lazy val resolved: Boolean = super.resolved && numPartitions.nonEmpty

override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class CollapseRepartitionSuite extends PlanTest {
}

test("collapse one coalesce and one repartition into one") {
// Remove useless coalesce below repartition
val query1 = testRelation
.coalesce(20)
.repartition(5)
Expand All @@ -65,14 +66,26 @@ class CollapseRepartitionSuite extends PlanTest {

comparePlans(optimized1, correctAnswer1)

// Remove useless coalesce above repartition when its numPartitions is larger than or equal to
// the child's numPartitions
val query2 = testRelation
.repartition(5)
.coalesce(20)

val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = testRelation.repartition(5).coalesce(20).analyze
val correctAnswer2 = testRelation.repartition(5).analyze

comparePlans(optimized2, correctAnswer2)

// Keep coalesce above repartition unchanged when its numPartitions is smaller than the child
val query3 = testRelation
.repartition(5)
.coalesce(3)

val optimized3 = Optimize.execute(query3.analyze)
val correctAnswer3 = testRelation.repartition(5).coalesce(3).analyze

comparePlans(optimized3, correctAnswer3)
}

test("collapse repartition and repartitionBy into one") {
Expand All @@ -96,17 +109,26 @@ class CollapseRepartitionSuite extends PlanTest {
}

test("collapse repartitionBy and repartition into one") {
val query = testRelation
val query1 = testRelation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add a comment: // Always respects the top repartition amd removes useless distribute below repartition

.distribute('a)(20)
.repartition(10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can still pick the same numPartition pairs: 10, 20 and 30, 20


val optimized = Optimize.execute(query.analyze)
val correctAnswer = testRelation.distribute('a)(10).analyze
val optimized1 = Optimize.execute(query1.analyze)
val correctAnswer1 = testRelation.distribute('a)(10).analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I not quite sure about this. Shall we optimize to relation.repartition(10)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I just followed what we did before. After more code reading, I think we can do it, since RoundRobinPartitioning looks cheaper.

      case logical.Repartition(numPartitions, shuffle, child) =>
        if (shuffle) {
          ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
        } else {
          execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
        }
      case logical.RepartitionByExpression(expressions, child, numPartitions) =>
        exchange.ShuffleExchange(HashPartitioning(
          expressions, numPartitions), planLater(child)) :: Nil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is, optimization should not change the result. relation.distributeBy('a, 10).repartition(10) should have same result of relation.repartition(10), instead of relation.distributeBy('a, 10). It's not about which one is cheaper, we should not surprise users.


comparePlans(optimized, correctAnswer)
comparePlans(optimized1, correctAnswer1)

val query2 = testRelation
.distribute('a)(20)
.repartition(30)

val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = testRelation.distribute('a)(20).analyze

comparePlans(optimized2, correctAnswer2)
}

test("do not collapse coalesce above repartitionBy into one") {
test("coalesce above repartitionBy") {
val query = testRelation
.distribute('a)(20)
.coalesce(10)
Expand Down