Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
revert changes to load API
  • Loading branch information
etseidl committed Sep 28, 2024
commit 8068a31e3536730d8606f82a4151d19d817f0869
14 changes: 2 additions & 12 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,6 @@ where
}
}

impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F>
where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
{
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
async move { self.0(range).await }.boxed()
}
}

/// Fetches parquet metadata
///
/// Parameters:
Expand All @@ -249,10 +239,10 @@ where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
{
let mut fetch = MetadataFetchFn(fetch);
let fetch = MetadataFetchFn(fetch);
ParquetMetaDataReader::new()
.with_prefetch_hint(prefetch)
.load_and_finish(&mut fetch, file_size)
.load_and_finish(fetch, file_size)
.await
}

Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,12 @@ impl AsyncFileReader for ParquetObjectReader {

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
let metadata = ParquetMetaDataReader::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so-beautiful

.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint)
.load_and_finish(self, self.meta.size)
.load_and_finish(self, file_size)
.await?;
Ok(Arc::new(metadata))
})
Expand Down
43 changes: 16 additions & 27 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,11 @@ impl ParquetMetaDataReader {
/// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
/// performed by this function.
#[cfg(feature = "async")]
pub async fn load_and_finish<F>(
pub async fn load_and_finish<F: MetadataFetch>(
mut self,
fetch: &mut F,
fetch: F,
file_size: usize,
) -> Result<ParquetMetaData>
where
for<'a> &'a mut F: MetadataFetch,
{
) -> Result<ParquetMetaData> {
self.try_load(fetch, file_size).await?;
self.finish()
}
Expand All @@ -317,12 +314,13 @@ impl ParquetMetaDataReader {
/// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
/// performed by this function.
#[cfg(feature = "async")]
pub async fn try_load<F>(&mut self, fetch: &mut F, file_size: usize) -> Result<()>
where
for<'a> &'a mut F: MetadataFetch,
{
pub async fn try_load<F: MetadataFetch>(
&mut self,
mut fetch: F,
file_size: usize,
) -> Result<()> {
let (metadata, remainder) =
Self::load_metadata(fetch, file_size, self.get_prefetch_size()).await?;
Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?;

self.metadata = Some(metadata);

Expand All @@ -337,22 +335,16 @@ impl ParquetMetaDataReader {
/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
/// been obtained. See [`Self::new_with_metadata()`].
#[cfg(feature = "async")]
pub async fn load_page_index<F>(&mut self, fetch: &mut F) -> Result<()>
where
for<'a> &'a mut F: MetadataFetch,
{
pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will be breaking if in 52.2.0. I'll revert this if need be. I found having remainder in the public API confusing.

self.load_page_index_with_remainder(fetch, None).await
}

#[cfg(feature = "async")]
async fn load_page_index_with_remainder<F>(
async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: &mut F,
mut fetch: F,
remainder: Option<(usize, Bytes)>,
) -> Result<()>
where
for<'a> &'a mut F: MetadataFetch,
{
) -> Result<()> {
if self.metadata.is_none() {
return Err(general_err!("Footer metadata is not present"));
}
Expand Down Expand Up @@ -507,14 +499,11 @@ impl ParquetMetaDataReader {
}

#[cfg(feature = "async")]
async fn load_metadata<F>(
mut fetch: &mut F,
async fn load_metadata<F: MetadataFetch>(
fetch: &mut F,
file_size: usize,
prefetch: usize,
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)>
where
for<'a> &'a mut F: MetadataFetch,
{
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
if file_size < FOOTER_SIZE {
return Err(eof_err!("file size of {} is less than footer", file_size));
}
Expand Down