-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation #45234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
|
Should we also backport this patch to |
53dd089 to
a7a869f
Compare
|
cc @cloud-fan @maryannxue as well |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
a7a869f to
ef8c50e
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
355aeb0 to
a923c2a
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
867134b to
edf95b3
Compare
c178f2e to
d0e4127
Compare
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
aaafa1d to
4378034
Compare
4378034 to
c17240b
Compare
|
Thanks @cloud-fan and @ulysses-you for the reviews and approval. |
|
Build is green now so PR is ready to be merged. Thanks in advance. |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
Outdated
Show resolved
Hide resolved
ae77cbb to
1105282
Compare
|
thanks, merging to master! |
### What changes were proposed in this pull request? A followup of #45234 to make the test more stable by using broadcast hint. ### Why are the changes needed? test improvement ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? no Closes #47007 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: yangjie01 <[email protected]>
| * such as waiting for the subqueries. | ||
| */ | ||
| @transient private lazy val shuffleFuture: Future[MapOutputStatistics] = executeQuery { | ||
| materializationStarted.set(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a closer look, I don't think this change works as we expect. We set this materializationStarted flag before we return the Future, which means we are still on the AQE loop's main thread. That said, once we submit a query stage, its materializationStarted becomes true immediately and we can't really avoid the wasted query stage execution when cancelling it.
The test passed because ShuffleExchangeExec calls child.execute() before returning the Future. Then we exit the AQE loop without cancelling other stages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Further more, I don't think this idea works. Let's say when we want to cancel a query stage and the materializationStarted flag is false, we decide to skip the cancellation but maybe the next second the materializationStarted becomes true and we miss to cancel the shuffle job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a bit of synchronization here. The shuffle node should have two fields: isCancelled flag and the shuffle job Future.
- When we cancel a shuffle, we lock on the shuffle node, and set
isCancelledflag to true. Then if the shuffle jobFutureis present, we cancel it. - When we are going to submit a shuffle, we lock on the shuffle node. Then: if the
isCancelledflag is true, fail immediately, otherwise, submit the shuffle job and set theFuturefield.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems not an acutally issue for now. AQE always do materilize stage and cancel stage at main thread. So if we decide to cancel stage then that means we will never do materilize stage again. We may need improve this code if we support do materilize concurrently in future.
…ages ### What changes were proposed in this pull request? We missed the fact that submitting a shuffle or broadcast query stage can be heavy, as it needs to submit subqueries and wait for the results. This blocks the AQE loop and hurts the parallelism of AQE. This PR fixes the problem by using shuffle/broadcast's own thread pool to wait for subqueries and other preparations. This PR also re-implements #45234 to avoid submitting the shuffle job if the query is failed and all query stages need to be cancelled. ### Why are the changes needed? better parallelism for AQE ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test case ### Was this patch authored or co-authored using generative AI tooling? no Closes #47533 from cloud-fan/aqe. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
AQE can materialize both
ShuffleQueryStageandBroadcastQueryStageon the cancellation. This causes unnecessary stage materialization by submitting Shuffle Job and Broadcast Job. Under normal circumstances, if the stage is already non-materialized (a.k.aShuffleQueryStage.shuffleFutureorBroadcastQueryStage.broadcastFutureis not initialized yet), it should just be skipped without materializing it.Problematic Stacktrace:
Please find sample use-case:
1- Stage Materialization Steps:
When stage materialization is failed:
2- Stage Cancellation Steps:
Reproduce Steps:
https://github.com/apache/spark/pull/45234/files#diff-f89f2fe78b324c6bc7190bef84220181f3616efc156ea99b3f15d375a22d7f88R900
Why are the changes needed?
Current logic introduces unnecessary Shuffle Job / Broadcast Job to be able to cancel
ShuffleQueryStage/BroadcastQueryStage.Does this PR introduce any user-facing change?
No
How was this patch tested?
Added new Unit Tests
Was this patch authored or co-authored using generative AI tooling?
No