Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ package object dsl {
case plan => SubqueryAlias(alias, plan, None)
}

def coalesce(num: Integer): LogicalPlan =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it conflict with sql coalesce by having it here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are used for the test cases in catalyst package, in which Dataset APIs are not available. Thus, that is why we add these DSL for test cases

Repartition(num, shuffle = false, logicalPlan)

def repartition(num: Integer): LogicalPlan =
Repartition(num, shuffle = true, logicalPlan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,23 +563,27 @@ object CollapseProject extends Rule[LogicalPlan] {
/**
* Combines adjacent [[Repartition]] and [[RepartitionByExpression]] operator combinations
* by keeping only the one.
* 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]].
* 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]] if their shuffle types
* are the same or the parent's shuffle is true.
* 2. For adjacent [[RepartitionByExpression]]s, collapse into the last [[RepartitionByExpression]].
* 3. For a combination of [[Repartition]] and [[RepartitionByExpression]], collapse as a single
* [[RepartitionByExpression]] with the expression and last number of partition.
* 3. When a shuffle-enabled [[Repartition]] is above a [[RepartitionByExpression]], collapse as a
* single [[RepartitionByExpression]] with the expression and the last number of partition.
* 4. When a [[RepartitionByExpression]] is above a [[Repartition]], collapse as a single
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does shuffle type matter for Repartition in this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RepartitionByExpression always uses ShuffleExchange. Thus, it is like Repartition with enabled shuffle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I was referring to shuffle on Repartition, but I see your point of RepartitionByExpression overriding it regardless

* [[RepartitionByExpression]] with the expression and the last number of partition.
*/
object CollapseRepartition extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// Case 1
case Repartition(numPartitions, shuffle, Repartition(_, _, child)) =>
case Repartition(numPartitions, shuffle, Repartition(_, shuffleChild, child))
if shuffle == shuffleChild || shuffle =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thank you for fixing this!

Repartition(numPartitions, shuffle, child)
// Case 2
case RepartitionByExpression(exprs, RepartitionByExpression(_, child, _), numPartitions) =>
RepartitionByExpression(exprs, child, numPartitions)
// Case 3
case Repartition(numPartitions, _, r: RepartitionByExpression) =>
case Repartition(numPartitions, shuffle, r: RepartitionByExpression) if shuffle =>
r.copy(numPartitions = Some(numPartitions))
// Case 3
// Case 4
case RepartitionByExpression(exprs, Repartition(_, _, child), numPartitions) =>
RepartitionByExpression(exprs, child, numPartitions)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ class CollapseRepartitionSuite extends PlanTest {

val testRelation = LocalRelation('a.int, 'b.int)


test("collapse two adjacent coalesces into one") {
val query = testRelation
.coalesce(10)
.coalesce(20)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I can see the argument.
but there are 2 adjacent coalesces like this shouldn't it take the smaller number? (since coalesce can't increase partition numbers)
whereas if there are 2 adjacent repartition it could take the last number

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to respect the later input number, which is specified by users, for avoiding any surprise to users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, agreed.


val optimized = Optimize.execute(query.analyze)
val correctAnswer = testRelation.coalesce(20).analyze

comparePlans(optimized, correctAnswer)
}

test("collapse two adjacent repartitions into one") {
val query = testRelation
.repartition(10)
Expand All @@ -43,15 +55,44 @@ class CollapseRepartitionSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("collapse one coalesce and one repartition into one") {
val query1 = testRelation
.coalesce(20)
.repartition(5)

val optimized1 = Optimize.execute(query1.analyze)
val correctAnswer1 = testRelation.repartition(5).analyze

comparePlans(optimized1, correctAnswer1)

val query2 = testRelation
.repartition(5)
.coalesce(20)

val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = testRelation.repartition(5).coalesce(20).analyze
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that might be the plan but the end result should be numPartitions == 5 correct? is there another suite we could add tests for repartition/coalesce like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We can get rid of coalesce if the number of partitions is smaller than the child repartition

Actually, I can add some simple end-to-end test cases like what you did in the R side.

Copy link
Member Author

@gatorsmile gatorsmile Feb 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For improving this rule, we need to clean up the resolution of RepartitionByExpression at first. See the PR #16988


comparePlans(optimized2, correctAnswer2)
}

test("collapse repartition and repartitionBy into one") {
val query = testRelation
val query1 = testRelation
.repartition(10)
.distribute('a)(20)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = testRelation.distribute('a)(20).analyze
val optimized1 = Optimize.execute(query1.analyze)
val correctAnswer1 = testRelation.distribute('a)(20).analyze

comparePlans(optimized, correctAnswer)
comparePlans(optimized1, correctAnswer1)

val query2 = testRelation
.coalesce(10)
.distribute('a)(20)

val optimized2 = Optimize.execute(query2.analyze)
val correctAnswer2 = testRelation.distribute('a)(20).analyze

comparePlans(optimized2, correctAnswer2)
}

test("collapse repartitionBy and repartition into one") {
Expand All @@ -65,6 +106,17 @@ class CollapseRepartitionSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("do not collapse coalesce above repartitionBy into one") {
val query = testRelation
.distribute('a)(20)
.coalesce(10)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = testRelation.distribute('a)(20).coalesce(10).analyze

comparePlans(optimized, correctAnswer)
}

test("collapse two adjacent repartitionBys into one") {
val query = testRelation
.distribute('b)(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ class PlannerSuite extends SharedSQLContext {
val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5)
def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length
assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 1)
assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 2)
doubleRepartitioned.queryExecution.optimizedPlan match {
case r: Repartition =>
assert(r.numPartitions === 5)
assert(r.shuffle === false)
case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _)) =>
assert(numPartitions === 5)
assert(shuffle === false)
assert(shuffleChild === true)
}
}

Expand Down