-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Fix: eliminate unnecessary repartitioning for small datasets #19073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix: eliminate unnecessary repartitioning for small datasets #19073
Conversation
|
Sorry for opening a new PR—my goal was to keep the diff clean and reviewable, so I rebuilt the changes clearly from scratch in this pr. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR optimizes aggregate query planning by eliminating unnecessary repartitioning for small datasets that don't have enough rows to benefit from parallel processing overhead.
Key Changes:
- Added
has_sufficient_rows_for_repartition()function to check if input has enough rows (≥ batch_size) to warrant repartitioning - Modified aggregate planning to skip
FinalPartitionedmode for small datasets, usingSingleorFinalmode instead - Updated existing test with larger dataset (10,000 rows) to ensure repartitioning still occurs for large data
- Added new test case to verify single-mode execution for small datasets (6 rows)
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/core/src/physical_planner.rs | Core implementation: adds row count check before repartitioning aggregates; updates test dataset size and adds new small dataset test |
| datafusion/core/tests/dataframe/mod.rs | Updates dataframe test snapshots to reflect new single-mode aggregation plans |
| datafusion/sqllogictest/test_files/*.slt | Regenerates expected query plans across multiple test files to show Single/Final modes instead of FinalPartitioned for small datasets |
| datafusion/sqllogictest/test_files/encrypted_parquet.slt | Changes test from expecting error to expecting results with rowsort |
| datafusion/sqllogictest/test_files/clickbench_extended.slt | Updates result ordering for non-deterministic query results |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…tition-small-datasets-revised
…tition-small-datasets-revised
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ShashidharM0118 -- this is looking good. The basic idea certainly looks right to me.
I have a suggestion for exactly what setting to use. Let me know if that makes sense
| Err(_) => return Ok(true), | ||
| }; | ||
|
|
||
| if let Some(num_rows) = stats.num_rows.get_value().copied() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is already a config setting that would be appropriate for this setting (rather than using the batch size): https://datafusion.apache.org/user-guide/configs.html
| datafusion.optimizer.repartition_file_min_size |
|---|
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @alamb, thanks for the suggestion to use datafusion.optimizer.repartition_file_min_size — it makes more sense to me, so I’ve switched the heuristic over to be size-based.
I have two concerns:
-
I’ve added a follow-up
num_rowscheck that is used only whentotal_byte_sizeis missing. In other words, the code first applies the size-based threshold, and only if no size statistics are available does it fall back tonum_rows >= batch_size. Does this fallback approach look reasonable? -
The
hash_agg_group_by_partitioned_on_dictstest inphysical_planner.rs(lines 3275–3304) was originally written to assert that a partitioned aggregate is produced on dictionary keys, but with the new size-based heuristic the small in-memory dict dataset no longer triggers repartitioning and the test now assertsmode: Single, which no longer matches the test name or original intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing my two cents in here. I think this configuration would be great as letting users "turn knobs" is a great way for extensibility in datafusion and have experiemented it with myself.
I see a use for this configuration in my work and I think this fallback behavior should not exist with the min_size configuration. As a user I prefer if I turn a knob to say declare a min_size that it sticks to this behavior without this fallback behavior.
Let me know your thoughts on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing my two cents in here. I think this configuration would be great as letting users "turn knobs" is a great way for extensibility in datafusion and have experiemented it with myself.
I see a use for this configuration in my work and I think this fallback behavior should not exist with the min_size configuration. As a user I prefer if I turn a knob to say declare a min_size that it sticks to this behavior without this fallback behavior.
Let me know your thoughts on this.
understood,
I removed fallback,
…tion-small-datasets-revised
…ning and update plans
…nd fix formatting
…eliminate-repartition-small-datasets-revised
Which issue does this PR close?
Rationale for this change
Aggregate planning was repartitioning tiny datasets, leading to unnecessary cost for lightweight queries
The planner now uses statistics to keep small inputs in single-partition mode when there aren't enough rows to benefit from the overhead of repartitioning, thus avoiding this performance drain.
What changes are included in this PR?
has_sufficient_rows_for_repartition(...)indatafusion/core/src/physical_planner.rs.AggregateMode::FinalPartitionedwhenhash_agg_small_dataset_single_mode) and dataframe insta snapshots to assert the new single-mode plan and its explain output.Are these changes tested?
Yes.
Are there any user-facing changes?
Yes.