Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix prefetch of page index
  • Loading branch information
adriangb committed Jan 20, 2025
commit 2648dd1a20116cb2307edd4c79cf42c3f0530318
32 changes: 24 additions & 8 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl ParquetMetaDataReader {
mut fetch: F,
file_size: usize,
) -> Result<()> {
let (metadata, remainder) =
let (metadata, fetched) =
Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?;

self.metadata = Some(metadata);
Expand All @@ -382,7 +382,7 @@ impl ParquetMetaDataReader {
return Ok(());
}

self.load_page_index_with_remainder(fetch, remainder).await
self.load_page_index_with_remainder(fetch, fetched).await
}

/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
Expand All @@ -396,7 +396,7 @@ impl ParquetMetaDataReader {
async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: F,
remainder: Option<(usize, Bytes)>,
fetched: Option<(usize, Bytes)>,
) -> Result<()> {
if self.metadata.is_none() {
return Err(general_err!("Footer metadata is not present"));
Expand All @@ -409,10 +409,14 @@ impl ParquetMetaDataReader {
None => return Ok(()),
};

let bytes = match &remainder {
Some((remainder_start, remainder)) if *remainder_start <= range.start => {
let offset = range.start - *remainder_start;
remainder.slice(offset..range.end - *remainder_start + offset)
let bytes = match &fetched {
Some((fetched_start, fetched)) if *fetched_start <= range.start && (range.end <= fetched_start + fetched.len()) => {
// `fetched`` is an amount of data spanning from fetched_start to the end of the file
// We want to slice out the range we need from that data, but need to adjust the
// range we are looking for to be relative to fetched_start.
let fetched_start = *fetched_start;
let range = range.start - fetched_start..range.end - fetched_start;
fetched.slice(range)
}
// Note: this will potentially fetch data already in remainder, this keeps things simple
_ => fetch.fetch(range.start..range.end).await?,
Expand Down Expand Up @@ -582,7 +586,7 @@ impl ParquetMetaDataReader {
let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
Ok((
Self::decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
Some((footer_start, suffix)),
))
}
}
Expand Down Expand Up @@ -1052,5 +1056,17 @@ mod async_tests {
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(len)) // prefetch entire file
.load_and_finish(f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
}
}
Loading