Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
1f93a93
update
XiangpengHao Jul 1, 2025
2e01e56
update
XiangpengHao Jul 1, 2025
0bd08c3
update
XiangpengHao Jul 1, 2025
d6ecbd4
update
XiangpengHao Jul 1, 2025
7cd5518
cleanup
XiangpengHao Jul 2, 2025
4520048
update
XiangpengHao Jul 2, 2025
e6281bc
update
XiangpengHao Jul 2, 2025
6b6d4fc
update
XiangpengHao Jul 2, 2025
b696b66
update
XiangpengHao Jul 2, 2025
f60581f
update
XiangpengHao Jul 2, 2025
1851f0b
clippy and license
XiangpengHao Jul 2, 2025
5e414a8
Merge remote-tracking branch 'apache/main' into pushdown-v4
alamb Jul 7, 2025
58add51
bug fix
XiangpengHao Jul 8, 2025
470cc01
Merge remote-tracking branch 'refs/remotes/origin/pushdown-v3' into p…
XiangpengHao Jul 8, 2025
2bf3d38
clippy
XiangpengHao Jul 8, 2025
2cf1a8f
bug fix
XiangpengHao Jul 8, 2025
86e149c
switch to boolean array for row selection
XiangpengHao Jul 15, 2025
4d24172
Merge remote-tracking branch 'apache/main' into pushdown-v4
alamb Jul 15, 2025
be134d6
Add comments (OCD) and rename some fields
alamb Jul 15, 2025
eecaf99
Merge pull request #4 from alamb/alamb/pushdown_suggestions
XiangpengHao Jul 15, 2025
5537bcb
fmt
XiangpengHao Jul 15, 2025
b835163
fmt
alamb Jul 16, 2025
5132de8
Simplify projection caching
alamb Jul 16, 2025
253dad3
Move cache options construction to ArrayReaderBuilder, add builders
alamb Jul 16, 2025
5d9781e
update memory accounting
XiangpengHao Jul 17, 2025
2e20902
Merge remote-tracking branch 'refs/remotes/origin/pushdown-v4' into p…
XiangpengHao Jul 17, 2025
721d00c
Merge pull request #5 from alamb/alamb/simplify_cache
XiangpengHao Jul 17, 2025
f8aed80
Merge pull request #6 from alamb/alamb/cleaner_api
XiangpengHao Jul 17, 2025
884b591
update
XiangpengHao Jul 17, 2025
4f6b918
array size
XiangpengHao Jul 17, 2025
6c53bfd
add test case
XiangpengHao Jul 17, 2025
8ebe579
fix bug
XiangpengHao Jul 17, 2025
c240a52
clippy & fmt
XiangpengHao Jul 17, 2025
30a0d1c
Add config option for predicate cache memory limit
alamb Jul 23, 2025
ed3ce13
Add option to control predicate cache, documentation, ArrowReaderMetr…
alamb Jul 23, 2025
42d5520
Update parquet/src/arrow/arrow_reader/mod.rs
alamb Jul 24, 2025
6e618b3
Merge pull request #7 from alamb/alamb/test_memory_limit
XiangpengHao Jul 24, 2025
f70e46a
Clarify in documentation that cache is only for async decoder
alamb Jul 25, 2025
15d6826
add comment
alamb Jul 25, 2025
bec6d9c
Revert backwards incompatible changes to the Parquet reader API
alamb Jul 25, 2025
3e05cb2
Merge pull request #9 from alamb/alamb/revert_api_changes
XiangpengHao Jul 25, 2025
4d64dc0
Merge pull request #8 from alamb/alamb/pushdown-v4-cleanup
XiangpengHao Jul 25, 2025
8da582b
Merge remote-tracking branch 'apache/main' into pushdown-v4
alamb Aug 6, 2025
315e463
exclude nested column from cache
XiangpengHao Aug 7, 2025
1db701a
only use expanded selection when the column is one of cache column
XiangpengHao Aug 7, 2025
bea4433
Merge remote-tracking branch 'upstream/main' into pushdown-v4
XiangpengHao Aug 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update
  • Loading branch information
XiangpengHao committed Jul 1, 2025
commit d6ecbd4fb61b41afcc4f60b3d639f61f3551eed4
82 changes: 46 additions & 36 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct ArrayReaderBuilder<'a> {
row_groups: &'a dyn RowGroups,
}

type CacheContext<'a> = (&'a ProjectionMask, Arc<Mutex<RowGroupCache>>);

impl<'a> ArrayReaderBuilder<'a> {
pub fn new(row_groups: &'a dyn RowGroups) -> Self {
Self { row_groups }
Expand All @@ -51,12 +53,25 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: Option<&ParquetField>,
mask: &ProjectionMask,
cache_mask: &ProjectionMask,
cache: Arc<Mutex<RowGroupCache>>,
) -> Result<Box<dyn ArrayReader>> {
let reader = field
.and_then(|field| self.build_reader(field, mask, None).transpose())
.transpose()?
.unwrap_or_else(|| make_empty_array_reader(self.num_rows()));

Ok(reader)
}

/// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
pub fn build_array_reader_with_cache(
&self,
field: Option<&ParquetField>,
mask: &ProjectionMask,
cache_context: CacheContext,
) -> Result<Box<dyn ArrayReader>> {
let reader = field
.and_then(|field| {
self.build_reader(field, mask, cache_mask, cache)
self.build_reader(field, mask, Some(cache_context))
.transpose()
})
.transpose()?
Expand All @@ -74,32 +89,32 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: &ParquetField,
mask: &ProjectionMask,
cache_mask: &ProjectionMask,
cache: Arc<Mutex<RowGroupCache>>,
cache_context: Option<CacheContext>,
) -> Result<Option<Box<dyn ArrayReader>>> {
match field.field_type {
ParquetFieldType::Primitive { col_idx, .. } => {
if let Some(reader) = self.build_primitive_reader(field, mask)? {
if cache_mask.leaf_included(col_idx) {
Ok(Some(Box::new(CachedArrayReader::new(
reader, cache, col_idx,
))))
} else {
Ok(Some(reader))
}
let Some(reader) = self.build_primitive_reader(field, mask)? else {
return Ok(None);
};
let Some((cache_mask, cache)) = cache_context else {
return Ok(Some(reader));
};

if cache_mask.leaf_included(col_idx) {
Ok(Some(Box::new(CachedArrayReader::new(
reader, cache, col_idx,
))))
} else {
Ok(None)
Ok(Some(reader))
}
}
ParquetFieldType::Group { .. } => match &field.arrow_type {
DataType::Map(_, _) => self.build_map_reader(field, mask, cache_mask, cache),
DataType::Struct(_) => self.build_struct_reader(field, mask, cache_mask, cache),
DataType::List(_) => self.build_list_reader(field, mask, cache_mask, cache, false),
DataType::LargeList(_) => {
self.build_list_reader(field, mask, cache_mask, cache, true)
}
DataType::Map(_, _) => self.build_map_reader(field, mask, cache_context),
DataType::Struct(_) => self.build_struct_reader(field, mask, cache_context),
DataType::List(_) => self.build_list_reader(field, mask, cache_context, false),
DataType::LargeList(_) => self.build_list_reader(field, mask, cache_context, true),
DataType::FixedSizeList(_, _) => {
self.build_fixed_size_list_reader(field, mask, cache_mask, cache)
self.build_fixed_size_list_reader(field, mask, cache_context)
}
d => unimplemented!("reading group type {} not implemented", d),
},
Expand All @@ -111,14 +126,13 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: &ParquetField,
mask: &ProjectionMask,
cache_mask: &ProjectionMask,
cache: Arc<Mutex<RowGroupCache>>,
cache_context: Option<CacheContext>,
) -> Result<Option<Box<dyn ArrayReader>>> {
let children = field.children().unwrap();
assert_eq!(children.len(), 2);

let key_reader = self.build_reader(&children[0], mask, cache_mask, cache.clone())?;
let value_reader = self.build_reader(&children[1], mask, cache_mask, cache)?;
let key_reader = self.build_reader(&children[0], mask, cache_context.clone())?;
let value_reader = self.build_reader(&children[1], mask, cache_context)?;

match (key_reader, value_reader) {
(Some(key_reader), Some(value_reader)) => {
Expand Down Expand Up @@ -164,14 +178,13 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: &ParquetField,
mask: &ProjectionMask,
cache_mask: &ProjectionMask,
cache: Arc<Mutex<RowGroupCache>>,
cache_context: Option<CacheContext>,
is_large: bool,
) -> Result<Option<Box<dyn ArrayReader>>> {
let children = field.children().unwrap();
assert_eq!(children.len(), 1);

let reader = match self.build_reader(&children[0], mask, cache_mask, cache)? {
let reader = match self.build_reader(&children[0], mask, cache_context)? {
Some(item_reader) => {
// Need to retrieve underlying data type to handle projection
let item_type = item_reader.get_data_type().clone();
Expand Down Expand Up @@ -213,13 +226,12 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: &ParquetField,
mask: &ProjectionMask,
cache_mask: &ProjectionMask,
cache: Arc<Mutex<RowGroupCache>>,
cache_context: Option<CacheContext>,
) -> Result<Option<Box<dyn ArrayReader>>> {
let children = field.children().unwrap();
assert_eq!(children.len(), 1);

let reader = match self.build_reader(&children[0], mask, cache_mask, cache)? {
let reader = match self.build_reader(&children[0], mask, cache_context)? {
Some(item_reader) => {
let item_type = item_reader.get_data_type().clone();
let reader = match &field.arrow_type {
Expand Down Expand Up @@ -349,8 +361,7 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: &ParquetField,
mask: &ProjectionMask,
cache_mask: &ProjectionMask,
cache: Arc<Mutex<RowGroupCache>>,
cache_context: Option<CacheContext>,
) -> Result<Option<Box<dyn ArrayReader>>> {
let arrow_fields = match &field.arrow_type {
DataType::Struct(children) => children,
Expand All @@ -363,7 +374,7 @@ impl<'a> ArrayReaderBuilder<'a> {
let mut builder = SchemaBuilder::with_capacity(children.len());

for (arrow, parquet) in arrow_fields.iter().zip(children) {
if let Some(reader) = self.build_reader(parquet, mask, cache_mask, cache.clone())? {
if let Some(reader) = self.build_reader(parquet, mask, cache_context.clone())? {
// Need to retrieve underlying data type to handle projection
let child_type = reader.get_data_type().clone();
builder.push(arrow.as_ref().clone().with_data_type(child_type));
Expand Down Expand Up @@ -407,10 +418,9 @@ mod tests {
file_metadata.key_value_metadata(),
)
.unwrap();
let cache = Arc::new(Mutex::new(RowGroupCache::new(1000)));

let array_reader = ArrayReaderBuilder::new(&file_reader)
.build_array_reader(fields.as_ref(), &mask, &ProjectionMask::all(), cache)
.build_array_reader(fields.as_ref(), &mask)
.unwrap();

// Create arrow types
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/array_reader/cached_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ impl ArrayReader for CachedArrayReader {
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.inner.get_def_levels()
None // we don't allow nullable parent for now.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nested columns not support yet

}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.inner.get_rep_levels()
None
}
}

Expand Down
6 changes: 2 additions & 4 deletions parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
mod tests {
use super::*;
use crate::arrow::array_reader::list_array::ListArrayReader;
use crate::arrow::array_reader::row_group_cache::RowGroupCache;
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
use crate::arrow::array_reader::ArrayReaderBuilder;
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
Expand All @@ -260,7 +259,7 @@ mod tests {
use arrow_array::{Array, PrimitiveArray};
use arrow_data::ArrayDataBuilder;
use arrow_schema::Fields;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

fn list_type<OffsetSize: OffsetSizeTrait>(
data_type: ArrowType,
Expand Down Expand Up @@ -563,10 +562,9 @@ mod tests {
file_metadata.key_value_metadata(),
)
.unwrap();
let cache = Arc::new(Mutex::new(RowGroupCache::new(1000)));

let mut array_reader = ArrayReaderBuilder::new(&file_reader)
.build_array_reader(fields.as_ref(), &mask, &ProjectionMask::all(), cache)
.build_array_reader(fields.as_ref(), &mask)
.unwrap();

let batch = array_reader.next_batch(100).unwrap();
Expand Down
32 changes: 7 additions & 25 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};
use std::sync::Arc;

pub use crate::arrow::array_reader::RowGroups;
use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder, RowGroupCache};
use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use crate::column::page::{PageIterator, PageReader};
Expand Down Expand Up @@ -711,10 +711,6 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
let batch_size = self
.batch_size
.min(self.metadata.file_metadata().num_rows() as usize);
let cache_projection = match self.compute_cache_projection(&self.projection) {
Some(projection) => projection,
None => ProjectionMask::all(),
};

let row_groups = self
.row_groups
Expand All @@ -725,7 +721,6 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
metadata: self.metadata,
row_groups,
};
let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(batch_size)));

let mut filter = self.filter;
let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection);
Expand All @@ -741,23 +736,15 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
let mut cache_projection = predicate.projection().clone();
cache_projection.intersect(&self.projection);

let array_reader = ArrayReaderBuilder::new(&reader).build_array_reader(
self.fields.as_deref(),
predicate.projection(),
&cache_projection,
row_group_cache.clone(),
)?;
let array_reader = ArrayReaderBuilder::new(&reader)
.build_array_reader(self.fields.as_deref(), predicate.projection())?;

plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
}
}

let array_reader = ArrayReaderBuilder::new(&reader).build_array_reader(
self.fields.as_deref(),
&self.projection,
&cache_projection,
row_group_cache.clone(),
)?;
let array_reader = ArrayReaderBuilder::new(&reader)
.build_array_reader(self.fields.as_deref(), &self.projection)?;

let read_plan = plan_builder
.limited(reader.num_rows())
Expand Down Expand Up @@ -968,12 +955,7 @@ impl ParquetRecordBatchReader {
selection: Option<RowSelection>,
) -> Result<Self> {
let array_reader = ArrayReaderBuilder::new(row_groups)
.build_array_reader(
levels.levels.as_ref(),
&ProjectionMask::all(),
&ProjectionMask::all(),
Arc::new(Mutex::new(RowGroupCache::new(batch_size))),
)?;
.build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;

let read_plan = ReadPlanBuilder::new(batch_size)
.with_selection(selection)
Expand Down
24 changes: 11 additions & 13 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,12 +621,12 @@ where

let mut cache_projection = predicate.projection().clone();
cache_projection.intersect(&projection);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A column is cached if and only if it appears both in output projection and filter projection

Copy link
Contributor

@alamb alamb Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one thing I didn't understand after reading this PR in detail was how the relative row positions are updated after applying a filter.

For example if we are applying multiple filters, the first may reduce the original RowSelection down to [100->200], and now when the second filter runs it is only evaluated on the 100->200 rows , not the original selection

In other words I think there needs to be some sort of function equvalent to RowSelection::and_then that applies to the cache

// Narrow the cache so that it only retains the results of evaluating the predicate
let row_group_cache = row_group_cache.and_then(resulting_selection)

Maybe this is the root cause of https://github.com/apache/datafusion/actions/runs/16302299778/job/46039904381?pr=16711

let array_reader = ArrayReaderBuilder::new(&row_group).build_array_reader(
self.fields.as_deref(),
predicate.projection(),
&cache_projection,
row_group_cache.clone(),
)?;
let array_reader = ArrayReaderBuilder::new(&row_group)
.build_array_reader_with_cache(
self.fields.as_deref(),
predicate.projection(),
(&cache_projection, row_group_cache.clone()),
)?;

plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
}
Expand Down Expand Up @@ -673,13 +673,11 @@ where

let plan = plan_builder.build();

let array_reader = ArrayReaderBuilder::new(&row_group)
.build_array_reader(
self.fields.as_deref(),
&projection,
&cache_projection,
row_group_cache.clone(),
)?;
let array_reader = ArrayReaderBuilder::new(&row_group).build_array_reader_with_cache(
self.fields.as_deref(),
&projection,
(&cache_projection, row_group_cache.clone()),
)?;

let reader = ParquetRecordBatchReader::new(array_reader, plan);

Expand Down