Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/examples/read_with_rowgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn main() -> Result<()> {
let mut file = File::open(&path).await.unwrap();

// The metadata could be cached in other places, this example only shows how to read
let metadata = file.get_metadata().await?;
let metadata = file.get_metadata(None).await?;

for rg in metadata.row_groups() {
let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all());
Expand Down
91 changes: 33 additions & 58 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,11 @@ pub trait AsyncFileReader: Send {
/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, decrypting, etc...
///
/// By default calls `get_metadata()`
fn get_metadata_with_options<'a>(
/// ArrowReaderOptions may be provided to supply decryption parameters
fn get_metadata<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
let _ = options;
self.get_metadata()
}
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>>;
}

/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
Expand All @@ -130,15 +121,11 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
self.as_mut().get_byte_ranges(ranges)
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
self.as_mut().get_metadata()
}

fn get_metadata_with_options<'a>(
fn get_metadata<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
self.as_mut().get_metadata_with_options(options)
self.as_mut().get_metadata(options)
}
}

Expand All @@ -159,9 +146,9 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
.boxed()
}

fn get_metadata_with_options<'a>(
fn get_metadata<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
Expand All @@ -172,37 +159,14 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {

let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
let metadata_len = footer.metadata_length();
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;

let mut buf = Vec::with_capacity(metadata_len);
self.take(metadata_len as _).read_to_end(&mut buf).await?;

let metadata_reader = ParquetMetaDataReader::new();

#[cfg(feature = "encryption")]
let metadata_reader = metadata_reader
.with_decryption_properties(options.file_decryption_properties.as_ref());
let have_decryptor = options.is_some() &&
options.unwrap().file_decryption_properties.is_some();

let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?;
#[cfg(not(feature = "encryption"))]
let have_decryptor = options.is_some() && cfg!(feature = "encryption"); // always false

Ok(Arc::new(parquet_metadata))
}
.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;

let mut buf = [0_u8; FOOTER_SIZE];
self.read_exact(&mut buf).await?;

let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?;
let metadata_len = footer.metadata_length();

if footer.is_encrypted_footer() {
if footer.is_encrypted_footer() && !have_decryptor {
return Err(general_err!(
"Parquet file has an encrypted footer but decryption properties were not provided"
));
Expand All @@ -214,7 +178,22 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
let mut buf = Vec::with_capacity(metadata_len);
self.take(metadata_len as _).read_to_end(&mut buf).await?;

Ok(Arc::new(ParquetMetaDataReader::decode_metadata(&buf)?))
let metadata_reader = ParquetMetaDataReader::new();

#[cfg(feature = "encryption")]
let metadata_reader = match have_decryptor {
true => metadata_reader
.with_decryption_properties(options.unwrap().file_decryption_properties.as_ref()),
false => metadata_reader,
};


let parquet_metadata = match have_decryptor {
true => metadata_reader.decode_footer_metadata(&buf, &footer)?,
false => ParquetMetaDataReader::decode_metadata(&buf)?
};

Ok(Arc::new(parquet_metadata))
}
.boxed()
}
Expand All @@ -236,7 +215,7 @@ impl ArrowReaderMetadata {
) -> Result<Self> {
// TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata
// took an argument to fetch the page indexes.
let mut metadata = input.get_metadata_with_options(&options).await?;
let mut metadata = input.get_metadata(Some(&options)).await?;

if options.page_index
&& metadata.column_index().is_none()
Expand Down Expand Up @@ -1184,13 +1163,9 @@ mod tests {
futures::future::ready(Ok(self.data.slice(range))).boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
futures::future::ready(Ok(self.metadata.clone())).boxed()
}

fn get_metadata_with_options<'a>(
fn get_metadata<'a>(
&'a mut self,
_options: &'a ArrowReaderOptions,
_options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
futures::future::ready(Ok(self.metadata.clone())).boxed()
}
Expand Down
25 changes: 7 additions & 18 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,33 +163,22 @@ impl AsyncFileReader for ParquetObjectReader {
// an `impl MetadataFetch` and calls those methods to get data from it. Due to `Self`'s impl of
// `AsyncFileReader`, the calls to `MetadataFetch::fetch` are just delegated to
// `Self::get_bytes`.
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
let metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint)
.load_and_finish(self, file_size)
.await?;
Ok(Arc::new(metadata))
})
}

fn get_metadata_with_options<'a>(
fn get_metadata<'a>(
&'a mut self,
options: &'a ArrowReaderOptions,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
let metadata = ParquetMetaDataReader::new()
let mut metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint);

#[cfg(feature = "encryption")]
let metadata =
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
if let Some(options) = options {
metadata = metadata
.with_decryption_properties(options.file_decryption_properties.as_ref());
}

let metadata = metadata.load_and_finish(self, file_size).await?;

Expand Down
4 changes: 2 additions & 2 deletions parquet/tests/arrow_reader/encryption_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async fn test_decrypting_without_decryption_properties_fails() {
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Parquet error: Parquet file has an encrypted footer but no decryption properties were provided"
"Parquet error: Parquet file has an encrypted footer but decryption properties were not provided"
);
}

Expand Down Expand Up @@ -277,7 +277,7 @@ async fn test_read_encrypted_file_from_object_store() {
let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);

let mut reader = ParquetObjectReader::new(store, meta);
let metadata = reader.get_metadata_with_options(&options).await.unwrap();
let metadata = reader.get_metadata(Some(&options)).await.unwrap();
let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await
.unwrap();
Expand Down
Loading