Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl ParquetMetaDataReader {
mut fetch: F,
file_size: usize,
) -> Result<()> {
let (metadata, remainder) =
let (metadata, fetched) =
Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?;

self.metadata = Some(metadata);
Expand All @@ -382,7 +382,7 @@ impl ParquetMetaDataReader {
return Ok(());
}

self.load_page_index_with_remainder(fetch, remainder).await
self.load_page_index_with_remainder(fetch, fetched).await
}

/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
Expand All @@ -396,7 +396,7 @@ impl ParquetMetaDataReader {
async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: F,
remainder: Option<(usize, Bytes)>,
fetched: Option<(usize, Bytes)>,
) -> Result<()> {
if self.metadata.is_none() {
return Err(general_err!("Footer metadata is not present"));
Expand All @@ -409,10 +409,21 @@ impl ParquetMetaDataReader {
None => return Ok(()),
};

let bytes = match &remainder {
Some((remainder_start, remainder)) if *remainder_start <= range.start => {
let offset = range.start - *remainder_start;
remainder.slice(offset..range.end - *remainder_start + offset)
let bytes = match &fetched {
Some((fetched_start, fetched)) if *fetched_start <= range.start => {
// `fetched`` is an amount of data spanning from fetched_start to the end of the file
// We want to slice out the range we need from that data, but need to adjust the
// range we are looking for to be relative to fetched_start.
let fetched_start = *fetched_start;
let range = range.start - fetched_start..range.end - fetched_start;
// santity check: `fetched` should always go until the end of the file
// so if our range is beyond that, something is wrong!
assert!(
range.end <= fetched_start + fetched.len(),
"range: {range:?}, fetched: {}, fetched_start: {fetched_start}",
fetched.len()
);
fetched.slice(range)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let bytes = match &fetched {
Some((fetched_start, fetched)) if *fetched_start <= range.start => {
// `fetched`` is an amount of data spanning from fetched_start to the end of the file
// We want to slice out the range we need from that data, but need to adjust the
// range we are looking for to be relative to fetched_start.
let fetched_start = *fetched_start;
let range = range.start - fetched_start..range.end - fetched_start;
// santity check: `fetched` should always go until the end of the file
// so if our range is beyond that, something is wrong!
assert!(
range.end <= fetched_start + fetched.len(),
"range: {range:?}, fetched: {}, fetched_start: {fetched_start}",
fetched.len()
);
fetched.slice(range)
let offset = range.start - *remainder_start;
let end = offset + range.end - range.start;
assert!(end <= remainder.len());
remainder.slice(offset..end)

Instead of all the other changes, I think this will properly compute the end of the slice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 😄

}
// Note: this will potentially fetch data already in remainder, this keeps things simple
_ => fetch.fetch(range.start..range.end).await?,
Expand Down Expand Up @@ -580,10 +591,7 @@ impl ParquetMetaDataReader {
} else {
let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
Ok((
Self::decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
))
Ok((Self::decode_metadata(slice)?, Some((footer_start, suffix))))
}
}

Expand Down Expand Up @@ -1052,5 +1060,17 @@ mod async_tests {
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(len)) // prefetch entire file
.load_and_finish(f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
}
}
Loading