Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cc6dd14
update
XiangpengHao Sep 4, 2024
5837fc7
update
XiangpengHao Dec 22, 2024
fec6313
update
XiangpengHao Dec 28, 2024
948db87
update
XiangpengHao Dec 29, 2024
8c50d90
poc reader
XiangpengHao Dec 30, 2024
f5422ce
update
XiangpengHao Dec 31, 2024
dfdc1b6
avoid recreating new buffers
XiangpengHao Dec 31, 2024
3c526f8
update
XiangpengHao Dec 31, 2024
53f5fad
bug fix
XiangpengHao Dec 31, 2024
56980de
selective cache
XiangpengHao Jan 1, 2025
4dd1b6b
clean up changes
XiangpengHao Jan 1, 2025
f8f983e
clean up more and format
XiangpengHao Jan 1, 2025
882aaf1
cleanup and add docs
XiangpengHao Jan 1, 2025
c8bdbcf
switch to mutex instead of rwlock
XiangpengHao Jan 2, 2025
cdb1d85
revert irrelevant changes
XiangpengHao Jan 2, 2025
69720e5
submodule
XiangpengHao Jan 3, 2025
a9550ab
update
XiangpengHao Jan 3, 2025
be1435f
rebase
XiangpengHao Jan 6, 2025
e4d9eb7
Merge remote-tracking branch 'upstream/main' into better-decoder
XiangpengHao Jan 8, 2025
21e015b
remove unrelated changes
XiangpengHao Jan 8, 2025
bbc3595
Merge remote-tracking branch 'upstream/main' into better-decoder
XiangpengHao Jan 10, 2025
547fb46
fix clippy
XiangpengHao Jan 10, 2025
05c8c8f
make various ci improvements
XiangpengHao Jan 10, 2025
314fda1
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 21, 2025
c895dd2
whitespace
alamb Mar 21, 2025
3cf0a98
Reduce some ugliness, avoid unwrap
alamb Mar 21, 2025
7b72f9d
more factory
alamb Mar 21, 2025
5bdf51a
lint
alamb Mar 22, 2025
a77e1e7
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 26, 2025
90a55d5
Isolate reader cache more
alamb Mar 26, 2025
9ffa81c
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 27, 2025
7c10b4a
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
poc reader
  • Loading branch information
XiangpengHao committed Jan 7, 2025
commit 8c50d90b98a10419c57b362621f68a8db147e70c
4 changes: 0 additions & 4 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,6 @@ impl RowSelection {
pub fn skipped_row_count(&self) -> usize {
self.iter().filter(|s| s.skip).map(|s| s.row_count).sum()
}

pub(crate) fn extend(&mut self, other: Self) {
self.selectors.extend(other.selectors);
}
}

impl From<Vec<RowSelector>> for RowSelection {
Expand Down
86 changes: 53 additions & 33 deletions parquet/src/arrow/async_reader/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::collections::HashMap;
use std::sync::RwLock;
use std::{collections::VecDeque, sync::Arc};

use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader, StructArray};
use arrow_array::ArrayRef;
use arrow_array::{cast::AsArray, Array, RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;

Expand All @@ -45,7 +46,7 @@ pub struct FilteredParquetRecordBatchReader {
fn read_selection(
reader: &mut dyn ArrayReader,
selection: &RowSelection,
) -> Result<StructArray, ParquetError> {
) -> Result<ArrayRef, ParquetError> {
for selector in selection.iter() {
if selector.skip {
let skipped = reader.skip_records(selector.row_count)?;
Expand All @@ -55,11 +56,7 @@ fn read_selection(
debug_assert_eq!(read_records, selector.row_count, "failed to read rows");
}
}
let array = reader.consume_batch()?;
let struct_array = array
.as_struct_opt()
.ok_or_else(|| general_err!("Struct array reader should return struct array"))?;
Ok(struct_array.clone())
reader.consume_batch()
}

/// Take the next selection from the selection queue, and return the selection
Expand Down Expand Up @@ -142,7 +139,9 @@ impl FilteredParquetRecordBatchReader {
.zip(self.predicate_readers.iter_mut())
{
let array = read_selection(reader.as_mut(), &selection)?;
let batch = RecordBatch::from(array);
let batch = RecordBatch::from(array.as_struct_opt().ok_or_else(|| {
general_err!("Struct array reader should return struct array")
})?);
let input_rows = batch.num_rows();
let predicate_filter = predicate.evaluate(batch)?;
if predicate_filter.len() != input_rows {
Expand Down Expand Up @@ -178,7 +177,6 @@ impl Iterator for FilteredParquetRecordBatchReader {
// It boils down to leveraging array_reader's ability to collect large batches natively,
// rather than concatenating multiple small batches.

let mut selection = RowSelection::default();
let mut selected = 0;
while let Some(cur_selection) =
take_next_selection(&mut self.selection, self.batch_size - selected)
Expand All @@ -187,21 +185,29 @@ impl Iterator for FilteredParquetRecordBatchReader {
Ok(selection) => selection,
Err(e) => return Some(Err(e)),
};

for selector in filtered_selection.iter() {
if selector.skip {
self.array_reader.skip_records(selector.row_count).ok()?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, Can we kindly add error checks for these two cases - debugging an issue was hard without it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bharath-techie -- perhaps you could propose some changes in a PR?

I think @XiangpengHao is busy for a while. It would be great to help push this PR along

} else {
self.array_reader.read_records(selector.row_count).ok()?;
}
}
selected += filtered_selection.row_count();
selection.extend(filtered_selection);
if selected >= (self.batch_size / 4 * 3) {
break;
}
}
if !selection.selects_any() {
if selected == 0 {
return None;
}

let rt = read_selection(&mut *self.array_reader, &selection);
match rt {
Ok(array) => Some(Ok(RecordBatch::from(array))),
Err(e) => Some(Err(e.into())),
}
let array = self.array_reader.consume_batch().ok()?;
let struct_array = array
.as_struct_opt()
.ok_or_else(|| general_err!("Struct array reader should return struct array"))
.ok()?;
Some(Ok(RecordBatch::from(struct_array.clone())))
}
}

Expand All @@ -212,11 +218,11 @@ impl RecordBatchReader for FilteredParquetRecordBatchReader {
}

struct PageCacheInner {
queue: VecDeque<usize>,
pages: HashMap<usize, Page>,
pages: HashMap<usize, (usize, Page)>, // col_id -> (offset, page)
}

/// A simple FIFO cache for pages.
/// A simple cache for decompressed pages.
/// We cache only one page per column
pub(crate) struct PageCache {
inner: RwLock<PageCacheInner>,
}
Expand All @@ -227,36 +233,49 @@ impl PageCache {
pub(crate) fn new() -> Self {
Self {
inner: RwLock::new(PageCacheInner {
queue: VecDeque::with_capacity(Self::CAPACITY),
pages: HashMap::with_capacity(Self::CAPACITY),
}),
}
}

pub(crate) fn get_page(&self, offset: usize) -> Option<Page> {
pub(crate) fn get_page(&self, col_id: usize, offset: usize) -> Option<Page> {
let read_lock = self.inner.read().unwrap();
read_lock.pages.get(&offset).cloned()
read_lock
.pages
.get(&col_id)
.and_then(|(cached_offset, page)| {
if *cached_offset == offset {
Some(page)
} else {
None
}
})
.cloned()
}

pub(crate) fn insert_page(&self, offset: usize, page: Page) {
pub(crate) fn insert_page(&self, col_id: usize, offset: usize, page: Page) {
let mut write_lock = self.inner.write().unwrap();
if write_lock.pages.len() >= Self::CAPACITY {
let oldest_offset = write_lock.queue.pop_front().unwrap();
write_lock.pages.remove(&oldest_offset).unwrap();
}
write_lock.pages.insert(offset, page);
write_lock.queue.push_back(offset);
write_lock.pages.insert(col_id, (offset, page));
}
}

pub(crate) struct CachedPageReader<R: ChunkReader> {
inner: SerializedPageReader<R>,
cache: Arc<PageCache>,
col_id: usize,
}

impl<R: ChunkReader> CachedPageReader<R> {
pub(crate) fn new(inner: SerializedPageReader<R>, cache: Arc<PageCache>) -> Self {
Self { inner, cache }
pub(crate) fn new(
inner: SerializedPageReader<R>,
cache: Arc<PageCache>,
col_id: usize,
) -> Self {
Self {
inner,
cache,
col_id,
}
}
}

Expand All @@ -277,7 +296,7 @@ impl<R: ChunkReader> PageReader for CachedPageReader<R> {
return Ok(None);
};

let page = self.cache.get_page(offset);
let page = self.cache.get_page(self.col_id, offset);
if let Some(page) = page {
self.inner.skip_next_page()?;
Ok(Some(page))
Expand All @@ -286,7 +305,8 @@ impl<R: ChunkReader> PageReader for CachedPageReader<R> {
let Some(inner_page) = inner_page else {
return Ok(None);
};
self.cache.insert_page(offset, inner_page.clone());
self.cache
.insert_page(self.col_id, offset, inner_page.clone());
Ok(Some(inner_page))
}
}
Expand Down
30 changes: 18 additions & 12 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,22 +911,28 @@ impl RowGroups for InMemoryRowGroup<'_> {
.filter(|index| !index.is_empty())
.map(|index| index[i].page_locations.clone());

// let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
// data.clone(),
// self.metadata.column(i),
// self.row_count,
// page_locations,
// )?);

let page_reader: Box<dyn PageReader> = Box::new(CachedPageReader::new(
SerializedPageReader::new(
let page_reader: Box<dyn PageReader> = if std::env::var("CACHE_PAGES")
.map(|v| v == "1")
.unwrap_or(false)
{
Box::new(CachedPageReader::new(
SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
self.row_count,
page_locations,
)?,
self.cache.clone(),
i,
))
} else {
Box::new(SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
self.row_count,
page_locations,
)?,
self.cache.clone(),
));
)?)
};

Ok(Box::new(ColumnChunkIterator {
reader: Some(Ok(page_reader)),
Expand Down