Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2558,8 +2558,8 @@ test_that("coalesce, repartition, numPartitions", {

df2 <- repartition(df1, 10)
expect_equal(getNumPartitions(df2), 10)
expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
expect_equal(getNumPartitions(coalesce(df2, 13)), 10)
expect_equal(getNumPartitions(coalesce(df2, 7)), 7)
expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ package object dsl {

def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan, None)

def coalesce(num: Integer): LogicalPlan =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it conflict with sql coalesce by having it here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are used for the test cases in catalyst package, in which Dataset APIs are not available. Thus, that is why we add these DSL for test cases

Repartition(num, shuffle = false, logicalPlan)

def repartition(num: Integer): LogicalPlan =
Repartition(num, shuffle = true, logicalPlan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,27 +562,29 @@ object CollapseProject extends Rule[LogicalPlan] {
}

/**
* Combines adjacent [[Repartition]] and [[RepartitionByExpression]] operator combinations
* by keeping only the one.
* 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]].
* 2. For adjacent [[RepartitionByExpression]]s, collapse into the last [[RepartitionByExpression]].
* 3. For a combination of [[Repartition]] and [[RepartitionByExpression]], collapse as a single
* [[RepartitionByExpression]] with the expression and last number of partition.
* Combines adjacent [[RepartitionOperation]] operators
*/
object CollapseRepartition extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// Case 1
case Repartition(numPartitions, shuffle, Repartition(_, _, child)) =>
Repartition(numPartitions, shuffle, child)
// Case 2
case RepartitionByExpression(exprs, RepartitionByExpression(_, child, _), numPartitions) =>
RepartitionByExpression(exprs, child, numPartitions)
// Case 3
case Repartition(numPartitions, _, r: RepartitionByExpression) =>
r.copy(numPartitions = numPartitions)
// Case 3
case RepartitionByExpression(exprs, Repartition(_, _, child), numPartitions) =>
RepartitionByExpression(exprs, child, numPartitions)
// Case 1: When a Repartition has a child of Repartition or RepartitionByExpression,
// 1) When the top node does not enable the shuffle (i.e., coalesce API), but the child
// enables the shuffle. Returns the child node if the last numPartitions is bigger;
// otherwise, keep unchanged.
// 2) In the other cases, returns the child node with the last numPartitions.
case r @ Repartition(_, _, child: RepartitionOperation) => (r.shuffle, child.shuffle) match {
case (false, true) =>
if (r.numPartitions >= child.numPartitions) child else r
case _ => child match {
case child: Repartition =>
child.copy(numPartitions = r.numPartitions, shuffle = r.shuffle)
case child: RepartitionByExpression =>
child.copy(numPartitions = r.numPartitions)
}
}
// Case 2: When a RepartitionByExpression has a child of Repartition or RepartitionByExpression
// we can remove the child.
case r @ RepartitionByExpression(_, child: RepartitionOperation, _) =>
r.copy(child = child.child)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,16 +835,24 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

/**
* A base interface for [[RepartitionByExpression]] and [[Repartition]]
*/
abstract class RepartitionOperation extends UnaryNode {
def shuffle: Boolean
def numPartitions: Int
override def output: Seq[Attribute] = child.output
}

/**
* Returns a new RDD that has exactly `numPartitions` partitions. Differs from
* [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
* asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
* of the output requires some specific ordering or distribution of the data.
*/
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
extends UnaryNode {
extends RepartitionOperation {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
override def output: Seq[Attribute] = child.output
}

/**
Expand All @@ -856,12 +864,12 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
case class RepartitionByExpression(
partitionExpressions: Seq[Expression],
child: LogicalPlan,
numPartitions: Int) extends UnaryNode {
numPartitions: Int) extends RepartitionOperation {

require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")

override def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
override def shuffle: Boolean = true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,47 +32,180 @@ class CollapseRepartitionSuite extends PlanTest {

val testRelation = LocalRelation('a.int, 'b.int)


test("collapse two adjacent coalesces into one") {
// Always respects the top coalesces amd removes useless coalesce below coalesce
val query1 = testRelation
.coalesce(10)
.coalesce(20)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I can see the argument.
but there are 2 adjacent coalesces like this shouldn't it take the smaller number? (since coalesce can't increase partition numbers)
whereas if there are 2 adjacent repartition it could take the last number

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to respect the later input number, which is specified by users, for avoiding any surprise to users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, agreed.

val query2 = testRelation
.coalesce(30)
.coalesce(20)

val optimized1 = Optimize.execute(query1.analyze)
val optimized2 = Optimize.execute(query2.analyze)

val correctAnswer = testRelation.coalesce(20).analyze

comparePlans(optimized1, correctAnswer)
comparePlans(optimized2, correctAnswer)
}

test("collapse two adjacent repartitions into one") {
val query = testRelation
// Always respects the top repartition amd removes useless repartition below repartition
val query1 = testRelation
.repartition(10)
.repartition(20)
val query2 = testRelation
.repartition(30)
.repartition(20)

val optimized1 = Optimize.execute(query1.analyze)
val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer = testRelation.repartition(20).analyze

comparePlans(optimized1, correctAnswer)
comparePlans(optimized2, correctAnswer)
}

test("coalesce above repartition") {
// Remove useless coalesce above repartition
val query1 = testRelation
.repartition(10)
.coalesce(20)

val optimized1 = Optimize.execute(query1.analyze)
val correctAnswer1 = testRelation.repartition(10).analyze

comparePlans(optimized1, correctAnswer1)

// No change in this case
val query2 = testRelation
.repartition(30)
.coalesce(20)

val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = query2.analyze

comparePlans(optimized2, correctAnswer2)
}

test("repartition above coalesce") {
// Always respects the top repartition amd removes useless coalesce below repartition
val query1 = testRelation
.coalesce(10)
.repartition(20)
// Remove useless coalesce above repartition
val query2 = testRelation
.coalesce(30)
.repartition(20)

val optimized = Optimize.execute(query.analyze)
val optimized1 = Optimize.execute(query1.analyze)
val optimized2 = Optimize.execute(query2.analyze)

val correctAnswer = testRelation.repartition(20).analyze

comparePlans(optimized, correctAnswer)
comparePlans(optimized1, correctAnswer)
comparePlans(optimized2, correctAnswer)
}

test("collapse repartition and repartitionBy into one") {
val query = testRelation
test("repartitionBy above repartition") {
val query1 = testRelation
.repartition(10)
.distribute('a)(20)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = testRelation.distribute('a)(20).analyze
val optimized1 = Optimize.execute(query1.analyze)
val correctAnswer1 = testRelation.distribute('a)(20).analyze

comparePlans(optimized1, correctAnswer1)

val query2 = testRelation
.repartition(30)
.distribute('a)(20)

val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = testRelation.distribute('a)(20).analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is same as correctAnswer1


comparePlans(optimized, correctAnswer)
comparePlans(optimized2, correctAnswer2)
}

test("collapse repartitionBy and repartition into one") {
val query = testRelation
test("repartitionBy above coalesce") {
val query1 = testRelation
.coalesce(10)
.distribute('a)(20)

val optimized1 = Optimize.execute(query1.analyze)
val correctAnswer1 = testRelation.distribute('a)(20).analyze

comparePlans(optimized1, correctAnswer1)

val query2 = testRelation
.coalesce(20)
.distribute('a)(30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to make query2 as

testRelation
  .coalesce(30)
  .distribute('a)(20)

i.e. the numPartitions of coalesce is bigger than distribute


val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = testRelation.distribute('a)(30).analyze

comparePlans(optimized2, correctAnswer2)
}

test("repartition above repartitionBy") {
val query1 = testRelation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add a comment: // Always respects the top repartition amd removes useless distribute below repartition

.distribute('a)(20)
.repartition(10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can still pick the same numPartition pairs: 10, 20 and 30, 20


val optimized = Optimize.execute(query.analyze)
val correctAnswer = testRelation.distribute('a)(10).analyze
val optimized1 = Optimize.execute(query1.analyze)
val correctAnswer1 = testRelation.distribute('a)(10).analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I not quite sure about this. Shall we optimize to relation.repartition(10)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I just followed what we did before. After more code reading, I think we can do it, since RoundRobinPartitioning looks cheaper.

      case logical.Repartition(numPartitions, shuffle, child) =>
        if (shuffle) {
          ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
        } else {
          execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
        }
      case logical.RepartitionByExpression(expressions, child, numPartitions) =>
        exchange.ShuffleExchange(HashPartitioning(
          expressions, numPartitions), planLater(child)) :: Nil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is, optimization should not change the result. relation.distributeBy('a, 10).repartition(10) should have same result of relation.repartition(10), instead of relation.distributeBy('a, 10). It's not about which one is cheaper, we should not surprise users.


comparePlans(optimized1, correctAnswer1)

val query2 = testRelation
.distribute('a)(20)
.repartition(30)

val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = testRelation.distribute('a)(30).analyze

comparePlans(optimized, correctAnswer)
comparePlans(optimized2, correctAnswer2)
}

test("coalesce above repartitionBy") {
val query1 = testRelation
.distribute('a)(20)
.coalesce(10)

val optimized1 = Optimize.execute(query1.analyze)
val correctAnswer1 = testRelation.distribute('a)(20).coalesce(10).analyze

comparePlans(optimized1, correctAnswer1)

val query2 = testRelation
.distribute('a)(20)
.coalesce(30)

val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = testRelation.distribute('a)(20).analyze

comparePlans(optimized2, correctAnswer2)
}

test("collapse two adjacent repartitionBys into one") {
val query = testRelation
val query1 = testRelation
.distribute('b)(10)
.distribute('a)(20)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = testRelation.distribute('a)(20).analyze
val optimized1 = Optimize.execute(query1.analyze)
val correctAnswer1 = testRelation.distribute('a)(20).analyze

comparePlans(optimized1, correctAnswer1)

val query2 = testRelation
.distribute('b)(30)
.distribute('a)(20)

val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = testRelation.distribute('a)(20).analyze
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's same with correctAnswer1


comparePlans(optimized, correctAnswer)
comparePlans(optimized2, correctAnswer2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ class PlannerSuite extends SharedSQLContext {
val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5)
def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length
assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 1)
assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 2)
doubleRepartitioned.queryExecution.optimizedPlan match {
case r: Repartition =>
assert(r.numPartitions === 5)
assert(r.shuffle === false)
case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _)) =>
assert(numPartitions === 5)
assert(shuffle === false)
assert(shuffleChild === true)
}
}

Expand Down