-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35264][SQL] Support AQE side broadcastJoin threshold #32391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
cc @maropu @cloud-fan @maryannxue do you have any thoughts about this feature ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My worry is that we need to keep the join selection logic in sync with the normal planner, which can be hard to maintain.
Can we put the logic in JoinSelectionHelper directly? We can distinguish AQE stats and normal stats by adding a boolean flag to Statistics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan That might work but I want to explain some reason of adding this rule at AQE optimizer side. Currently it looks like we follow the JoinSelection logic but it may be broken. Then we can easily add other join selection in future.
Let's assume if we have an another config like spark.sql.adaptive.shuffledHashJoinBuildSideThreshold, then we can add the check in this rule so that we can pick the SMJ instead of SHJ if it's partition size over the threshold.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we are not on the same page. I'm good with a new config. I'm asking for moving the logic to the existing planner rule, and use different configs for AQE and normal stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Updated it. Added a flag isAdaptive in Statistics which you suggested to simplify the logic.
|
Test build #138068 has finished for PR 32391 at commit
|
21591b8 to
2923933
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
| private def canBuildLocalHashMapBySize(plan: LogicalPlan, conf: SQLConf): Boolean = { | ||
| plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions | ||
| plan.stats.sizeInBytes < | ||
| autoBroadcastJoinThreshold(plan.stats.isAdaptive, conf) * conf.numShufflePartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not touch shuffle hash join for now. I think in AQE we should check per-partition size to decide if we want to go SHJ or not, instead of using the old formula.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I have the similar thought. The current condition of converting join to shuffled hash join is a bit rough that assume the data is not skewed for all partition.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
@cloud-fan thank you for the review, address all comments:
|
|
Test build #138093 has finished for PR 32391 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #138098 has finished for PR 32391 at commit
|
|
thanks, merging to master! |
|
Test build #138099 has finished for PR 32391 at commit
|
|
thanks for merging! |
|
Hi @ulysses-you , I have some questions for this PR. In current version, I think stats estimation is always larger or equal than actual value, so it seems like if a join is determined to convert to BHJ, it is also meet the broadcast conditions in AQE. |
|
@Gabriel39 Not sure I see your point. Do you mean a plan which can be broadcast in AQE is always smaller than the config we specified ? If so, What's the issue with that. |
|
@ulysses-you Well, I think you want to make broadcast threshold isolation between AQE and normal because current broadcast can lead to OOM. However, when a join is converted to a BHJ during normal planning process using static stats, it is definitely a BHJ and AQE should not optimize it to other join type since static stats (e.g sizeInBytes) is always larger or equal the actual value. So driver side OOM will occur only if the broadcast threshold is too large. So Im not sure this PR make sense since OOM commonly due to unreasonable broadcast threshold. If I misunderstand your point, feel free to point out my mistake. Thx. |
|
@Gabriel39 I guess you misunderstand the logic of AQE.
That's wrong, AQE can never change a BHJ to other join strategy which is decided at normal planner side. It's not about the stats, you can see some key code in And this new config is assuming a join is not a BHJ before AQE, so that AQE can use the new config and runtime stats to make a join (mostly is SMJ) as BHJ. So, usually the right way of using this new config is 1) forbid the normal auto broadcast or reduce the value 2) tune the new config value. |
|
To add a bit more color: The static size estimation in Spark is usually underestimated, due to things like file compression. We can set the AQE broadcast threshold a bit higher as AQE size estimation is more precise. |
|
@ulysses-you @cloud-fan Thank you for your patience! I think I really misunderstood this PR and stats estimation before. |
### What changes were proposed in this pull request? The `RemoveRedundantProjects` feature can conflict with the AQE broadcast threshold ([PR](#32391)) sometimes. After removing the project, the physical plan to logical plan link can be changed and we may have a `Project` above `LogicalQueryStage`. This breaks AQE broadcast threshold, because the stats of `Project` does not have the `isRuntime = true` flag, and thus still use the normal broadcast threshold. This PR updates `RemoveRedundantProjects` to not remove `ProjectExec` that has a different logical plan link than its child. ### Why are the changes needed? Make AQE broadcast threshold work in more cases. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #33222 from cloud-fan/aqe2. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? The `RemoveRedundantProjects` feature can conflict with the AQE broadcast threshold ([PR](#32391)) sometimes. After removing the project, the physical plan to logical plan link can be changed and we may have a `Project` above `LogicalQueryStage`. This breaks AQE broadcast threshold, because the stats of `Project` does not have the `isRuntime = true` flag, and thus still use the normal broadcast threshold. This PR updates `RemoveRedundantProjects` to not remove `ProjectExec` that has a different logical plan link than its child. ### Why are the changes needed? Make AQE broadcast threshold work in more cases. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #33222 from cloud-fan/aqe2. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 6b3ab82) Signed-off-by: Wenchen Fan <[email protected]>
The `RemoveRedundantProjects` feature can conflict with the AQE broadcast threshold ([PR](apache#32391)) sometimes. After removing the project, the physical plan to logical plan link can be changed and we may have a `Project` above `LogicalQueryStage`. This breaks AQE broadcast threshold, because the stats of `Project` does not have the `isRuntime = true` flag, and thus still use the normal broadcast threshold. This PR updates `RemoveRedundantProjects` to not remove `ProjectExec` that has a different logical plan link than its child. Make AQE broadcast threshold work in more cases. no new tests Closes apache#33222 from cloud-fan/aqe2. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
~~This PR aims to add a new AQE optimizer rule `DynamicJoinSelection`. Like other AQE partition number configs, this rule add a new broadcast threshold config `spark.sql.adaptive.autoBroadcastJoinThreshold`.~~ This PR amis to add a flag in `Statistics` to distinguish AQE stats or normal stats, so that we can make some sql configs isolation between AQE and normal. The main idea here is that make join config isolation between normal planner and aqe planner which shared the same code path. Actually we do not very trust using the static stats to consider if it can build broadcast hash join. In our experience it's very common that Spark throw broadcast timeout or driver side OOM exception when execute a bit large plan. And due to braodcast join is not reversed which means if we covert join to braodcast hash join at first time, we(AQE) can not optimize it again, so it should make sense to decide if we can do broadcast at aqe side using different sql config. Yes, a new config `spark.sql.adaptive.autoBroadcastJoinThreshold` added. Add new test. Closes apache#32391 from ulysses-you/SPARK-35264. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…oin threshold (#942) * [SPARK-35264][SQL] Support AQE side broadcastJoin threshold ### What changes were proposed in this pull request? ~~This PR aims to add a new AQE optimizer rule `DynamicJoinSelection`. Like other AQE partition number configs, this rule add a new broadcast threshold config `spark.sql.adaptive.autoBroadcastJoinThreshold`.~~ This PR amis to add a flag in `Statistics` to distinguish AQE stats or normal stats, so that we can make some sql configs isolation between AQE and normal. ### Why are the changes needed? The main idea here is that make join config isolation between normal planner and aqe planner which shared the same code path. Actually we do not very trust using the static stats to consider if it can build broadcast hash join. In our experience it's very common that Spark throw broadcast timeout or driver side OOM exception when execute a bit large plan. And due to braodcast join is not reversed which means if we covert join to braodcast hash join at first time, we(AQE) can not optimize it again, so it should make sense to decide if we can do broadcast at aqe side using different sql config. ### Does this PR introduce _any_ user-facing change? Yes, a new config `spark.sql.adaptive.autoBroadcastJoinThreshold` added. ### How was this patch tested? Add new test. Closes #32391 from ulysses-you/SPARK-35264. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 39889df) * fix * fix Co-authored-by: ulysses-you <[email protected]>
What changes were proposed in this pull request?
This PR aims to add a new AQE optimizer ruleDynamicJoinSelection. Like other AQE partition number configs, this rule add a new broadcast threshold configspark.sql.adaptive.autoBroadcastJoinThreshold.This PR amis to add a flag in
Statisticsto distinguish AQE stats or normal stats, so that we can make some sql configs isolation between AQE and normal.Why are the changes needed?
The main idea here is that make join config isolation between normal planner and aqe planner which shared the same code path.
Actually we do not very trust using the static stats to consider if it can build broadcast hash join. In our experience it's very common that Spark throw broadcast timeout or driver side OOM exception when execute a bit large plan. And due to braodcast join is not reversed which means if we covert join to braodcast hash join at first time, we(AQE) can not optimize it again, so it should make sense to decide if we can do broadcast at aqe side using different sql config.
Does this PR introduce any user-facing change?
Yes, a new config
spark.sql.adaptive.autoBroadcastJoinThresholdadded.How was this patch tested?
Add new test.