Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

We missed the fact that submitting a shuffle or broadcast query stage can be heavy, as it needs to submit subqueries and wait for the results. This blocks the AQE loop and hurts the parallelism of AQE.

This PR fixes the problem by using shuffle/broadcast's own thread pool to wait for subqueries and other preparations.

This PR also re-implements #45234 to avoid submitting the shuffle job if the query is failed and all query stages need to be cancelled.

Why are the changes needed?

better parallelism for AQE

Does this PR introduce any user-facing change?

no

How was this patch tested?

new test case

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions bot added the SQL label Jul 30, 2024
*/
final def cancelBroadcastJob(reason: Option[String]): Unit = {
if (isMaterializationStarted() && !this.relationFuture.isDone) {
if (!this.relationFuture.isDone) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not re-implement broadcast cancellation, as we need more refactoring to move the creation of Future to BroadcastExchangeLike

@cloud-fan
Copy link
Contributor Author

cc @ulysses-you @yaooqinn

.version("4.0.0")
.intConf
.checkValue(thres => thres > 0 && thres <= 1024, "The threshold must be in (0,1024].")
.createWithDefault(1024)
Copy link
Member

@yaooqinn yaooqinn Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we pick this number? It might create memory pressure on the driver

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shuffle async job is just waiting for other work (subquery expression execution) to finish, which is very light-weighted. The broadcast async job executes a query and collects the result in the driver, which is very heavy. That's why we can give much larger parallelism to the shuffle async jobs. In our benchmark we found this number is reasonably good for TPC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a correlation with the number of system cores?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, the BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD is also way larger than the driver system cores.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this parameter has anything to do with SPARK-49091 or if it was just caused by SPARK-41914 which the reporter pointed to.

Also cc @wangyum

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: SPARK-49091 is not related

@cloud-fan
Copy link
Contributor Author

thanks for the review, merging to master!

@cloud-fan cloud-fan closed this in f01eafd Aug 5, 2024
}

test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the cancellation") {
test("SPARK-47148: AQE should avoid to submit shuffle job on cancellation") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case seems to be a bit unstable:

[info] - SPARK-47148: AQE should avoid to submit shuffle job on cancellation *** FAILED *** (6 seconds, 94 milliseconds)
[info]   "Multiple failures in stage materialization." did not contain "coalesce test error" (AdaptiveQueryExecSuite.scala:939)
- SPARK-47148: AQE should avoid to submit shuffle job on cancellation *** FAILED ***
  "[SCALAR_SUBQUERY_TOO_MANY_ROWS] More than one row returned by a subquery used as an expression. SQLSTATE: 21000
  == SQL (line 1, position 12) ==
  SELECT id, (SELECT slow_udf() FROM range(2)) FROM range(5)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  " did not contain "coalesce test error" (AdaptiveQueryExecSuite.scala:939)

any good solutions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because the slow_udf is not slow enough and the shuffle stage was submitted too early. Can you try to increase the sleep time in slow_udf and see if it fixes the problem?

@dongjoon-hyun
Copy link
Member

To @cloud-fan and @LuciferYang , I made a follow-up PR to fix Multiple failures in stage materialization failure. I also hit the same issues multiple times in CIs.

[info] - SPARK-47148: AQE should avoid to submit shuffle job on cancellation *** FAILED *** (6 seconds, 94 milliseconds)
[info]   "Multiple failures in stage materialization." did not contain "coalesce test error" (AdaptiveQueryExecSuite.scala:939)

dongjoon-hyun added a commit that referenced this pull request Oct 16, 2024
…error case

### What changes were proposed in this pull request?

This PR aims to fix a flaky test by handling `_LEGACY_ERROR_TEMP_2235`(multiple failures exception) in addition to the single exception.

### Why are the changes needed?

After merging
- #47533

The following failures were reported multiple times in the PR and today.
- https://github.com/apache/spark/actions/runs/11358629880/job/31593568476
- https://github.com/apache/spark/actions/runs/11367718498/job/31621128680
- https://github.com/apache/spark/actions/runs/11360602982/job/31598792247
```
[info] - SPARK-47148: AQE should avoid to submit shuffle job on cancellation *** FAILED *** (6 seconds, 92 milliseconds)
[info]   "Multiple failures in stage materialization." did not contain "coalesce test error" (AdaptiveQueryExecSuite.scala:939)
```

The root cause is that `AdaptiveSparkPlanExec.cleanUpAndThrowException` throws two types of exceptions. When there are multiple errors, `_LEGACY_ERROR_TEMP_2235` is thrown. We need to handle this too in the test case.

https://github.com/apache/spark/blob/bcfe62b9988f9b00c23de0b71acc1c6170edee9e/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L843-L850

https://github.com/apache/spark/blob/bcfe62b9988f9b00c23de0b71acc1c6170edee9e/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala#L1916-L1921

### Does this PR introduce _any_ user-facing change?

No, this is a test-only change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48498 from dongjoon-hyun/SPARK-49057.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants