Skip to content

Conversation

@ctsk
Copy link
Contributor

@ctsk ctsk commented Mar 28, 2025

Relates to Issue: #15478

Rationale for this change

The blocking operators (HJ buid side, Aggregation) are often planned on top of a RepartitionExec with a CoalesceBatchesExec in-between. However, one of the first things these operators do is concatenate the freshly CoalescedBatches.
This PR is to test if the overhead of the 2-step coalesce+concat outweighs the gains of fewer dispatches of the consuming operators.

What changes are included in this PR?

This PR adds a physical optimizer rule UncoalesceBatches. It runs after the CoalesceBatches rule and removes CoalesceBatchesExec that are at the build side of HashJoins and in front of non-partial aggregations

Are these changes tested?

Not yet!

Are there any user-facing changes?

Yes.

ctsk added 2 commits March 28, 2025 15:06
The partial aggregation can switch to a pass-through mode. In this case, coalescing might make sense
@Dandandan
Copy link
Contributor

That makes a lot of sense!


/// Remove CoalesceBatchesExec that are in front of a AggregateExec
#[derive(Default, Debug)]
pub struct UnCoalesceBatches {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we instead can avoid adding them in the CoalesceBatches optimizer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly something to attempt. I've not done it (yet) because it's not necessary to evaluate the impact of this change

@berkaysynnada
Copy link
Contributor

I think you can generalize this logic by tracking the ExecutionPlanProperties::pipeline_behavior() of operators in the plan.

@alamb
Copy link
Contributor

alamb commented Mar 30, 2025

BTW thank you very much @ctsk -- it is really cool to see the joins get some careful love and attention ❤️

@Rachelint
Copy link
Contributor

Here is my result after removing CoalesceBatchesExec for Aggregate:

--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ remove-coalesc-test ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     0.58ms │              0.59ms │     no change │
│ QQuery 1     │    69.10ms │             67.52ms │     no change │
│ QQuery 2     │   164.32ms │            162.57ms │     no change │
│ QQuery 3     │   174.68ms │            173.55ms │     no change │
│ QQuery 4     │  1530.10ms │           1486.31ms │     no change │
│ QQuery 5     │  1438.23ms │           1390.79ms │     no change │
│ QQuery 6     │    67.68ms │             68.17ms │     no change │
│ QQuery 7     │    78.59ms │             80.99ms │     no change │
│ QQuery 8     │  1646.02ms │           1594.09ms │     no change │
│ QQuery 9     │  1823.36ms │           1806.96ms │     no change │
│ QQuery 10    │   464.34ms │            443.72ms │     no change │
│ QQuery 11    │   521.13ms │            510.05ms │     no change │
│ QQuery 12    │  1606.39ms │           1559.59ms │     no change │
│ QQuery 13    │  2578.92ms │           2425.36ms │ +1.06x faster │
│ QQuery 14    │  1650.71ms │           1584.42ms │     no change │
│ QQuery 15    │  1807.01ms │           1756.57ms │     no change │
│ QQuery 16    │  3430.08ms │           3226.74ms │ +1.06x faster │
│ QQuery 17    │  3177.38ms │           2923.88ms │ +1.09x faster │
│ QQuery 18    │  7348.28ms │           6759.03ms │ +1.09x faster │
│ QQuery 19    │   145.29ms │            142.43ms │     no change │
│ QQuery 20    │  2650.47ms │           2652.96ms │     no change │
│ QQuery 21    │  3416.49ms │           3382.16ms │     no change │
│ QQuery 22    │  8195.70ms │           8383.83ms │     no change │
│ QQuery 23    │ 21618.48ms │          21754.72ms │     no change │
│ QQuery 24    │   997.94ms │            999.24ms │     no change │
│ QQuery 25    │   908.12ms │            885.02ms │     no change │
│ QQuery 26    │  1168.94ms │           1169.06ms │     no change │
│ QQuery 27    │  3827.54ms │           3838.34ms │     no change │
│ QQuery 28    │ 22386.67ms │          22554.63ms │     no change │
│ QQuery 29    │   910.11ms │            910.88ms │     no change │
│ QQuery 30    │  1633.75ms │           1606.75ms │     no change │
│ QQuery 31    │  1876.00ms │           1838.23ms │     no change │
│ QQuery 32    │  7765.63ms │           7456.08ms │     no change │
│ QQuery 33    │  7439.70ms │           7022.39ms │ +1.06x faster │
│ QQuery 34    │  7414.28ms │           7033.60ms │ +1.05x faster │
│ QQuery 35    │  2212.33ms │           2106.79ms │     no change │
│ QQuery 36    │   220.97ms │            204.14ms │ +1.08x faster │
│ QQuery 37    │   142.39ms │            141.67ms │     no change │
│ QQuery 38    │   136.95ms │            134.41ms │     no change │
│ QQuery 39    │   388.01ms │            373.10ms │     no change │
│ QQuery 40    │    56.90ms │             57.45ms │     no change │
│ QQuery 41    │    54.00ms │             53.49ms │     no change │
│ QQuery 42    │    61.03ms │             60.48ms │     no change │
└──────────────┴────────────┴─────────────────────┴───────────────┘

@berkaysynnada
Copy link
Contributor

I think you can generalize this logic by tracking the ExecutionPlanProperties::pipeline_behavior() of operators in the plan.

I can give some guidance on that BTW, if you prefer so

@ctsk ctsk marked this pull request as draft April 7, 2025 10:37
@ctsk
Copy link
Contributor Author

ctsk commented Apr 7, 2025

Marking as draft to signify the implementation is not finished yet

Thanks for the benchmark @Rachelint. Did you use the implementation in this PR or write your own? Overall the impact seems small (still good for how little it takes to implement!). I think it's worth investigating if a Coalesce with a different threshold makes sense.

Thanks for the offer @berkaysynnada. I think I know what to do, but haven't found the time the past few days. I'll give it a try later today.

@alamb
Copy link
Contributor

alamb commented Apr 7, 2025

Thanks for the benchmark @Rachelint. Did you use the implementation in this PR or write your own? Overall the impact seems small (still good for how little it takes to implement!). I think it's worth investigating if a Coalesce with a different threshold makes sense.

It looks to me like clickbench results from bench.sh: https://github.com/apache/datafusion/blob/main/benchmarks/README.md

@ctsk
Copy link
Contributor Author

ctsk commented Apr 7, 2025

@berkaysynnada I had a look at ExecutionPlanProperties::pipeline_behavior(). I think it is not quite what I want here: For the HashJoin, I want to remove the coalesce on the build side, but keep it on the probe side. The pipeline behaviour doesn't tell me which child is processed batch-wise, and which child is processed incrementally.

I could add a blanket rule for other plans - potentially outside of datafusion repo - that removes the coalesce for each child of a plan that does not have EmissionType::Incremental. Unfortunately this does not cover the rule for the aggregation: Here I purposefully kept the CoalesceBatchesExec underneath partial aggregations, because those can switch to passing-through batches without aggregating.

So far, this PR is a lot of reasoning what might make sense, but in the end it's down to measuring the impact for each operator. I plan on trying different coalesce thresholds tomorrow and see what works best.

@berkaysynnada
Copy link
Contributor

@berkaysynnada I had a look at ExecutionPlanProperties::pipeline_behavior(). I think it is not quite what I want here: For the HashJoin, I want to remove the coalesce on the build side, but keep it on the probe side. The pipeline behaviour doesn't tell me which child is processed batch-wise, and which child is processed incrementally.

I understand why it doesn't fit in this use case.

Maybe we should have another API for operators like pipeline_behavior: accumulate_input_batches(&self) -> Vec<bool>? HashJoin implements as vec![true,false], SortExec implements [true], AggregateExec [true] if it has not an ordered input on gb keys, FilterExec has [false] etc. WDYT? That would be an over-engineering or reflects the behaviors better? Maybe we can utilize that in other places as well where we downcast operators and check the type

@Dandandan
Copy link
Contributor

Dandandan commented Apr 18, 2025

I think this is a nice experiment.
That said, I think we can better try changing the build side of the join to use Vec<RecordBatch> instead of concatenating to a single RecordBatch.
I remember we (I) changed it to concatenate all build batches to one side (to improve performance back then), but it would be preferable if we wouldn't concatenate everything in one batch.
One downside of doing it is that we can't load > 4GiB of Utf8 columns in the left side, it will fail with overflowing offsets.

@ctsk
Copy link
Contributor Author

ctsk commented Apr 20, 2025

Sorry for the silence. I've been extensively benchmarking this PR and the results have been fairly mixed. I've also tried different thresholds for coalescing. I plan on generating tables and push the results later today.

@ctsk
Copy link
Contributor Author

ctsk commented Apr 20, 2025

benchmarks/hashagg-results.md
benchmarks/join-results.md
benchmarks/sort-results.md

I've checked in the results because I think they would be too large to include as a comment.

Each file contains the results of reducing the coalesce threshold for a single operator - joins, hash aggregations, and sorts. Coalescing before all other operators remains unchanged. The value behind each configuration describes what the coalesce threshold was set to: SORT0 means that CoalesceBatches operators were fully removed, whereas SORT256 means that the CoalesceBatches operator in front of a SORT was configured to emit a batch once it had 256 rows buffered. The same applies to joins and hash aggregations.
The CHANGE value represents the relative change of the column to its right to the base column (the baseline when this PR branched off main).

The benchmarks were run with 16 target partitions. I suspect that the more target partitions there are, the smaller the batches produced by RepartitionExec become. Therefore, removing coalesce might work better with smaller target partition counts (for hash aggregation and joins).

@ctsk ctsk closed this May 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants