Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Keep and deprecate previous api
  • Loading branch information
Dandandan committed Apr 28, 2025
commit 28e25c42d41b183d433a1236afb8a9391f8d062e
26 changes: 23 additions & 3 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl BatchPartitioner {
Ok(Self { state, timer })
}

/// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`]
/// Partition a single [`RecordBatch`] into one or more partitioned [`RecordBatch`]
/// based on the [`Partitioning`] specified on construction
///
/// `f` will be called for each partitioned [`RecordBatch`] with the corresponding
Expand All @@ -233,7 +233,27 @@ impl BatchPartitioner {
///
/// The time spent repartitioning, not including time spent in `f` will be recorded
/// to the [`metrics::Time`] provided on construction
pub fn partition<F>(&mut self, batches: Vec<RecordBatch>, mut f: F) -> Result<()>
#[deprecated(since = "48.0.0", note = "use partition_batches instead")]
pub fn partition<F>(&mut self, batch: RecordBatch, mut f: F) -> Result<()>
where
F: FnMut(usize, RecordBatch) -> Result<()>,
{
self.partition_iter(vec![batch])?.try_for_each(|res| match res {
Ok((partition, batch)) => f(partition, batch),
Err(e) => Err(e),
})
}

/// Partition the provided [`Vec<RecordBatch>`] into one or more partitioned [`RecordBatch`]
/// based on the [`Partitioning`] specified on construction
///
/// `f` will be called for each partitioned [`RecordBatch`] with the corresponding
/// partition index. Any error returned by `f` will be immediately returned by this
/// function without attempting to publish further [`RecordBatch`]
///
/// The time spent repartitioning, not including time spent in `f` will be recorded
/// to the [`metrics::Time`] provided on construction
pub fn partition_batches<F>(&mut self, batches: Vec<RecordBatch>, mut f: F) -> Result<()>
where
F: FnMut(usize, RecordBatch) -> Result<()>,
{
Expand Down Expand Up @@ -314,7 +334,7 @@ impl BatchPartitioner {
// Tracking time required for repartitioned batches construction
let _timer = partitioner_timer.timer();
let b: Vec<&RecordBatch> = batches.iter().collect();

// Produce batches based on indices
let batch = interleave_record_batch(&b, &indices)?;
Copy link
Contributor Author

@Dandandan Dandandan Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably an api like apache/arrow-rs#7325 would be even faster (avoiding one level of "trivial" indexing).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @ctsk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice change, and a much clearer performance win than #15479. I expect (without testing) that these two PRs interact negatively with one another - Removing coalesce will mean that the data is "more scattered" in memory and probably make interleave work worse - as well as the computation of the left join keys.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing coalesce after this change (for all hash repartitions) might be possible, as the output batch size will be roughly equal to input batch size (instead of roughly 1/partitions * batch_size). Unless hash values are somehow skewed (but this is currently also not good anyway).

A future api could use your take_in api maybe to only output rows once batch size has been reached.

Copy link
Contributor

@ctsk ctsk Apr 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see I had misunderstood this PR. It makes a lot of sense to do this. As part of prototyping the integration of a take_in API in datafusion, I made a similar change - move the buffering before sending the small batches to their destination thread. I don't remember seeing as much speedup when I benchmarked that change independently - I guess using interleave instead of a take/concat combo (like I did back then) makes a significant difference. Awesome!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the speedup comes from avoiding copying the data a second time in concat / CoalesceBatches. So when using take_in we should be careful to use it once (for a single destination batch) to avoid doing the concat on the small batches

Ok((partition, batch))
Expand Down
Loading