Skip to content
Prev Previous commit
Next Next commit
Add comments and tests for gc_string_view_batch
  • Loading branch information
alamb committed Jul 24, 2024
commit 1fac94a996013441efe8241b0e485513abab9810
122 changes: 118 additions & 4 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,26 @@ pub fn concat_batches(
arrow::compute::concat_batches(schema, batches)
}

/// `StringViewArray` reference to the raw parquet decoded buffer, which reduces copy but prevents those buffer from being released.
/// When `StringViewArray`'s cardinality significantly drops (e.g., after `FilterExec` or `HashJoinExec` or many others),
/// we should consider consolidating it so that we can release the buffer to reduce memory usage and improve string locality for better performance.
/// Heuristically compact [`StringViewArray`]s to reduce memory usage, if needed
///
/// This function decides when to consolidate the StringView into a new buffer
/// to reduce memory usage and improve string locality for better performance.
///
/// This differs from [`StringViewArray::gc`] because:
/// 1. It may not compact the array depending on a heuristic.
/// 2. It uses a larger default block size (2MB) to reduce the number of buffers to track.
///
/// # Heuristic
///
/// If the average size of each view is larger than 32 bytes, we compact the array.
///
/// `StringViewArray` include pointers to buffer that hold the underlying data.
/// One of the great benefits of `StringViewArray` is that many operations
/// (e.g., `filter`) can be done without copying the underlying data.
///
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
/// `StringViewArray` may only refer to a small portion of the buffer,
/// significantly increasing memory usage.
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
let new_columns: Vec<ArrayRef> = batch
.columns()
Expand Down Expand Up @@ -339,7 +356,8 @@ mod tests {
use crate::{memory::MemoryExec, repartition::RepartitionExec, Partitioning};

use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::UInt32Array;
use arrow_array::builder::ArrayBuilder;
use arrow_array::{StringViewArray, UInt32Array};

#[tokio::test(flavor = "multi_thread")]
async fn test_concat_batches() -> Result<()> {
Expand Down Expand Up @@ -412,4 +430,100 @@ mod tests {
)
.unwrap()
}

#[test]
fn test_gc_string_view_batch_small_no_compact() {
// view with only short strings (no buffers) --> no need to compact
let array = StringViewTest {
rows: 1000,
strings: vec![Some("a"), Some("b"), Some("c")],
}
.build();

let gc_array = do_gc(array.clone());
compare_string_array_values(&array, &gc_array);
assert_eq!(array.data_buffers().len(), 0);
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
}

#[test]
fn test_gc_string_view_batch_large_no_compact() {
// view with large strings (has buffers) but full --> no need to compact
let array = StringViewTest {
rows: 1000,
strings: vec![Some("This string is longer than 12 bytes")],
}
.build();

let gc_array = do_gc(array.clone());
compare_string_array_values(&array, &gc_array);
assert_eq!(array.data_buffers().len(), 5);
// TODO this is failing now (it always compacts)
assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
}

#[test]
fn test_gc_string_view_batch_large_slice_compact() {
// view with large strings (has buffers) and only partially used --> no need to compact
let array = StringViewTest {
rows: 1000,
strings: vec![Some("this string is longer than 12 bytes")],
}
.build();

// slice only 11 rows, so most of the buffer is not used
let array = array.slice(11, 22);

let gc_array = do_gc(array.clone());
compare_string_array_values(&array, &gc_array);
assert_eq!(array.data_buffers().len(), 5);
assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer
}

/// Compares the values of two string view arrays
fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) {
assert_eq!(arr1.len(), arr2.len());
for (s1, s2) in arr1.iter().zip(arr2.iter()) {
assert_eq!(s1, s2);
}
}

/// runs garbage collection on string view array
/// and ensures the number of rows are the same
fn do_gc(array: StringViewArray) -> StringViewArray {
let batch =
RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
let gc_batch = gc_string_view_batch(&batch);
assert_eq!(batch.num_rows(), gc_batch.num_rows());
assert_eq!(batch.schema(), gc_batch.schema());
gc_batch
.column(0)
.as_any()
.downcast_ref::<StringViewArray>()
.unwrap()
.clone()
}

/// Describes parameters for creating a `StringViewArray`
struct StringViewTest {
/// The number of rows in the array
rows: usize,
/// The strings to use in the array (repeated over and over
strings: Vec<Option<&'static str>>,
}

impl StringViewTest {
/// Create a `StringViewArray` with the parameters specified in this struct
fn build(self) -> StringViewArray {
let mut builder = StringViewBuilder::with_capacity(100);
loop {
for &v in self.strings.iter() {
builder.append_option(v);
if builder.len() >= self.rows {
return builder.finish();
}
}
}
}
}
}