Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Aug 22, 2022

What changes were proposed in this pull request?

  • Support get user-specified root repartition through DeserializeToObjectExec
  • Skip optimize empty for the root repartition which is user-specified
  • Add a new rule AdjustShuffleExchangePosition to adjust the shuffle we add back, so that we can restore shuffle safely.

Why are the changes needed?

AQE can not completely respect the user-specified repartition. The main reasons are:

  1. the AQE optimzier will convert empty to local relation which does not reserve the partitioning info
  2. the machine of AQE requiredDistribution only restore the repartition which does not support through DeserializeToObjectExec

After the fix:
The partition number of spark.range(0).repartition(5).rdd.getNumPartitions should be 5.

Does this PR introduce any user-facing change?

yes, ensure the user-specified distribution.

How was this patch tested?

add tests

@github-actions github-actions bot added the SQL label Aug 22, 2022
@ulysses-you ulysses-you marked this pull request as draft August 22, 2022 11:24
@ulysses-you ulysses-you force-pushed the output-partition branch 3 times, most recently from 23eb828 to e744b4e Compare August 23, 2022 03:16
@ulysses-you ulysses-you marked this pull request as ready for review August 23, 2022 03:17
@ulysses-you
Copy link
Contributor Author

cc @wangyum @zsxwing @cloud-fan @maryannxue if you have time to review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as `EnsureRequirements` can only optimize out user-specified repartition with `HashPartitioning`

This is not true any more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only removed the with HashPartitioning since we are going to check all partitionings which come from repartition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if EnsureRequirements can't remove user-specified repartition that is not HashPartitioning, we don't need to change here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is right, but it can not describe the following code equivalently.. I changed this comments to:

// Note, here are two cases of how user-specified repartition can be optimized out:
// 1. `EnsureRequirements` can only optimize out user-specified repartition with
//    `HashPartitioning`.
// 2. `AQEOptimizer` can optimize out user-specified repartition with all `Partitioning`,
//     e.g. convert empty to local relation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a test case to cover this case, or it's just a safe guard for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just a safe guard, the error msg is logging by logOnLevel which is debug by default at reOptimize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little overkill to validate parititoning again after EnsureRequirement. I removed this part of code.

@ulysses-you
Copy link
Contributor Author

It should be clear now. This pr only did two things:

  1. Only apply AQE for the children of DeserializeToObjectExec
  2. Check all partitioning for requiredDistribution so we can have a chance to add it back

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// `DeserializeToObjectExec` is used by Spark internal e.g. `Dataset.rdd`.
// `DeserializeToObjectExec` is used by Spark internally e.g. `Dataset.rdd`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This is conflict with AQE framework since we may add shuffle back during re-optimize
// This conflicts with AQE framework since we may add shuffle back during re-optimize

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we fix PropagateEmptyRelationBase instead? I don't think we can optimize out Repartition which breaks user expectations. The change here only covers AQE and I think this is a problem for non AQE as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, we can still optimize out Repartition, but we should assign a Partitioning to LocalRelation. Ideally a empty data can be any partitioning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we fix PropagateEmptyRelationBase instead? I don't think we can optimize out Repartition which breaks user expectations. The change here only covers AQE and I think this is a problem for non AQE as well.

if we do not optimize the repartition at the top of empty relation, we also can not optimize other plan at the top of repartition. It may become a regression. We should only preserve the final repatition which can affect the final output partition as the requiredDistribution did.

Or, we can still optimize out Repartition, but we should assign a Partitioning to LocalRelation. Ideally a empty data can be any partitioning.

I actually thought about it, but it's not easy. If we want to assign a Partitioning to LocalRelation, we also need to consider how to propagate the Partitioning when we through other plan otherwise it does less meaning. Then we need care the output partitioning for all logical plan. This should not be expected since the partitioning is a physical concept.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK let me make my proposal clear: let's not optimize out repartition if it's the root node, or below Project/Filter, in any cases. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip optimize root user-specified repartition by 5466289

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make a new PR for this change? It's not only related to AQE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will send a new for non-AQE part

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can a leaf node be root repartition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is LogicalQueryStage in AQE since the reparition is planned to shuffle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to move the apply method to the base class, so that we can share more code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not retained? repartition under Project should be respected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test is outdate, fixed them in 048e781bcaf5f78d28666c5af16745c6a93cc08c

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for AQE side to tag TreePattern. cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this comment change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it another bug? the num partitions is not respected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a magic from RangePartitioner, not sure it is a bug. RangePartitioner do sample to decide the range partition bounds, and the bounds will be empty if the data is empty. Then the output partition number is 1.

def numPartitions: Int = rangeBounds.length + 1

DisableUnnecessaryBucketedScan,
OptimizeSkewedJoin(ensureRequirements)
OptimizeSkewedJoin(ensureRequirements),
AdjustShuffleExchangePosition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put it right after ensureRequirements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

testData.select("key").collect().toSeq)

assert(spark.emptyDataFrame.coalesce(1).rdd.partitions.size === 0)
assert(spark.emptyDataFrame.coalesce(1).rdd.partitions.size === 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more failed test

@cloud-fan
Copy link
Contributor

@ulysses-you can you update the PR description? I think this is ready to merge.

@ulysses-you
Copy link
Contributor Author

thank you @cloud-fan, updated

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 14, 2022

thanks, merging to master/3.3! (the last commit only add comments)

@cloud-fan cloud-fan closed this in 801ca25 Sep 14, 2022
cloud-fan added a commit that referenced this pull request Sep 14, 2022
…n AQE

### What changes were proposed in this pull request?

- Support get user-specified root repartition through  `DeserializeToObjectExec`
- Skip optimize empty for the root repartition which is user-specified
- Add a new rule `AdjustShuffleExchangePosition` to adjust the shuffle we add back, so that we can restore shuffle safely.

### Why are the changes needed?

AQE can not completely respect the user-specified repartition. The main reasons are:

1. the AQE optimzier will convert empty to local relation which does not reserve the partitioning info
2. the machine of AQE `requiredDistribution` only restore the repartition which does not support through `DeserializeToObjectExec`

After the fix:
The partition number of `spark.range(0).repartition(5).rdd.getNumPartitions` should be 5.

### Does this PR introduce _any_ user-facing change?

yes, ensure the user-specified distribution.

### How was this patch tested?

add tests

Closes #37612 from ulysses-you/output-partition.

Lead-authored-by: ulysses-you <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 801ca25)
Signed-off-by: Wenchen Fan <[email protected]>
@ulysses-you ulysses-you deleted the output-partition branch September 14, 2022 01:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants