Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cc6dd14
update
XiangpengHao Sep 4, 2024
5837fc7
update
XiangpengHao Dec 22, 2024
fec6313
update
XiangpengHao Dec 28, 2024
948db87
update
XiangpengHao Dec 29, 2024
8c50d90
poc reader
XiangpengHao Dec 30, 2024
f5422ce
update
XiangpengHao Dec 31, 2024
dfdc1b6
avoid recreating new buffers
XiangpengHao Dec 31, 2024
3c526f8
update
XiangpengHao Dec 31, 2024
53f5fad
bug fix
XiangpengHao Dec 31, 2024
56980de
selective cache
XiangpengHao Jan 1, 2025
4dd1b6b
clean up changes
XiangpengHao Jan 1, 2025
f8f983e
clean up more and format
XiangpengHao Jan 1, 2025
882aaf1
cleanup and add docs
XiangpengHao Jan 1, 2025
c8bdbcf
switch to mutex instead of rwlock
XiangpengHao Jan 2, 2025
cdb1d85
revert irrelevant changes
XiangpengHao Jan 2, 2025
69720e5
submodule
XiangpengHao Jan 3, 2025
a9550ab
update
XiangpengHao Jan 3, 2025
be1435f
rebase
XiangpengHao Jan 6, 2025
e4d9eb7
Merge remote-tracking branch 'upstream/main' into better-decoder
XiangpengHao Jan 8, 2025
21e015b
remove unrelated changes
XiangpengHao Jan 8, 2025
bbc3595
Merge remote-tracking branch 'upstream/main' into better-decoder
XiangpengHao Jan 10, 2025
547fb46
fix clippy
XiangpengHao Jan 10, 2025
05c8c8f
make various ci improvements
XiangpengHao Jan 10, 2025
314fda1
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 21, 2025
c895dd2
whitespace
alamb Mar 21, 2025
3cf0a98
Reduce some ugliness, avoid unwrap
alamb Mar 21, 2025
7b72f9d
more factory
alamb Mar 21, 2025
5bdf51a
lint
alamb Mar 22, 2025
a77e1e7
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 26, 2025
90a55d5
Isolate reader cache more
alamb Mar 26, 2025
9ffa81c
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 27, 2025
7c10b4a
Merge remote-tracking branch 'apache/main' into better-decoder
alamb Mar 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Reduce some ugliness, avoid unwrap
  • Loading branch information
alamb committed Mar 21, 2025
commit 3cf0a986810c5087af34bfb52c36d252d8f9e3ef
17 changes: 10 additions & 7 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
projection: self.projection,
selection: self.selection,
schema,
reader: Some(reader),
reader_factory: Some(reader),
state: StreamState::Init,
})
}
Expand Down Expand Up @@ -780,7 +780,7 @@ pub struct ParquetRecordBatchStream<T> {
selection: Option<RowSelection>,

/// This is an option so it can be moved into a future
reader: Option<ReaderFactory<T>>,
reader_factory: Option<ReaderFactory<T>>,

state: StreamState<T>,
}
Expand Down Expand Up @@ -842,7 +842,7 @@ where

let selection = self.selection.as_mut().map(|s| s.split_off(row_count));

let reader_factory = self.reader.take().expect("lost reader");
let reader_factory = self.reader_factory.take().expect("lost reader");

let (reader_factory, maybe_reader) = reader_factory
.read_row_group(
Expand All @@ -856,7 +856,7 @@ where
self.state = StreamState::Error;
err
})?;
self.reader = Some(reader_factory);
self.reader_factory = Some(reader_factory);

if let Some(reader) = maybe_reader {
return Ok(Some(reader));
Expand Down Expand Up @@ -891,7 +891,10 @@ where
None => {
// this is ugly, but works for now.
let filter = batch_reader.take_filter();
self.reader.as_mut().unwrap().filter = filter;
let Some(reader_factory) = self.reader_factory.as_mut() else {
return Poll::Ready(Some(Err(ParquetError::General("Internal: Unexpected state".into()))))
};
reader_factory.filter = filter;
self.state = StreamState::Init
}
},
Expand All @@ -901,7 +904,7 @@ where
None => return Poll::Ready(None),
};

let reader = self.reader.take().expect("lost reader");
let reader = self.reader_factory.take().expect("lost reader");

let row_count = self.metadata.row_group(row_group_idx).num_rows() as usize;

Expand All @@ -920,7 +923,7 @@ where
}
StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
Ok((reader_factory, maybe_reader)) => {
self.reader = Some(reader_factory);
self.reader_factory = Some(reader_factory);
match maybe_reader {
// Read records from [`ParquetRecordBatchReader`]
Some(reader) => self.state = StreamState::Decoding(reader),
Expand Down