Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cc6dd14
update
XiangpengHao Sep 4, 2024
5837fc7
update
XiangpengHao Dec 22, 2024
fec6313
update
XiangpengHao Dec 28, 2024
948db87
update
XiangpengHao Dec 29, 2024
8c50d90
poc reader
XiangpengHao Dec 30, 2024
f5422ce
update
XiangpengHao Dec 31, 2024
dfdc1b6
avoid recreating new buffers
XiangpengHao Dec 31, 2024
3c526f8
update
XiangpengHao Dec 31, 2024
53f5fad
bug fix
XiangpengHao Dec 31, 2024
56980de
selective cache
XiangpengHao Jan 1, 2025
4dd1b6b
clean up changes
XiangpengHao Jan 1, 2025
f8f983e
clean up more and format
XiangpengHao Jan 1, 2025
882aaf1
cleanup and add docs
XiangpengHao Jan 1, 2025
c8bdbcf
switch to mutex instead of rwlock
XiangpengHao Jan 2, 2025
cdb1d85
revert irrelevant changes
XiangpengHao Jan 2, 2025
69720e5
submodule
XiangpengHao Jan 3, 2025
a9550ab
update
XiangpengHao Jan 3, 2025
be1435f
rebase
XiangpengHao Jan 6, 2025
e4d9eb7
Merge remote-tracking branch 'upstream/main' into better-decoder
XiangpengHao Jan 8, 2025
21e015b
remove unrelated changes
XiangpengHao Jan 8, 2025
bbc3595
Merge remote-tracking branch 'upstream/main' into better-decoder
XiangpengHao Jan 10, 2025
547fb46
fix clippy
XiangpengHao Jan 10, 2025
05c8c8f
make various ci improvements
XiangpengHao Jan 10, 2025
314fda1
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 21, 2025
c895dd2
whitespace
alamb Mar 21, 2025
3cf0a98
Reduce some ugliness, avoid unwrap
alamb Mar 21, 2025
7b72f9d
more factory
alamb Mar 21, 2025
5bdf51a
lint
alamb Mar 22, 2025
a77e1e7
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 26, 2025
90a55d5
Isolate reader cache more
alamb Mar 26, 2025
9ffa81c
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 27, 2025
7c10b4a
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
selective cache
  • Loading branch information
XiangpengHao committed Jan 7, 2025
commit 56980defc5f52fd315d452d7f1f1c1df1a9e1d84
35 changes: 28 additions & 7 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,13 +512,30 @@ where
.filter(|index| index.first().map(|v| !v.is_empty()).unwrap_or(false))
.map(|x| x[row_group_idx].as_slice());

let mut predicate_projection: Option<ProjectionMask> = None;
if let Some(filter) = self.filter.as_mut() {
for predicate in filter.predicates.iter_mut() {
let p_projection = predicate.projection();
if let Some(ref mut p) = predicate_projection {
p.union(&p_projection);
} else {
predicate_projection = Some(p_projection.clone());
}
}
}
let projection_to_cache = predicate_projection.map(|mut p| {
p.intersect(&projection);
p
});

let mut row_group = InMemoryRowGroup {
metadata: meta,
// schema: meta.schema_descr_ptr(),
row_count: meta.num_rows() as usize,
column_chunks: vec![None; meta.columns().len()],
offset_index,
cache: Arc::new(PageCache::new()),
projection_to_cache,
};

let mut selection =
Expand All @@ -531,13 +548,13 @@ where
return Ok((self, None));
}

let predicate_projection = predicate.projection();
let p_projection = predicate.projection();
row_group
.fetch(&mut self.input, predicate_projection, Some(&selection))
.fetch(&mut self.input, p_projection, Some(&selection))
.await?;

let array_reader =
build_array_reader(self.fields.as_deref(), predicate_projection, &row_group)?;
build_array_reader(self.fields.as_deref(), p_projection, &row_group)?;
filter_readers.push(array_reader);
}
}
Expand Down Expand Up @@ -799,6 +816,7 @@ struct InMemoryRowGroup<'a> {
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
cache: Arc<PageCache>,
projection_to_cache: Option<ProjectionMask>,
}

impl InMemoryRowGroup<'_> {
Expand Down Expand Up @@ -911,10 +929,13 @@ impl RowGroups for InMemoryRowGroup<'_> {
.filter(|index| !index.is_empty())
.map(|index| index[i].page_locations.clone());

let page_reader: Box<dyn PageReader> = if std::env::var("CACHE_PAGES")
.map(|v| v == "1")
.unwrap_or(false)
{
let cached_reader = if let Some(projection_to_cache) = &self.projection_to_cache {
projection_to_cache.leaf_included(i)
} else {
false
};

let page_reader: Box<dyn PageReader> = if cached_reader {
Box::new(CachedPageReader::new(
SerializedPageReader::new(
data.clone(),
Expand Down