From dfd64350ea48d23ff6a2b9d5d349a3b757150a0b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 7 Aug 2025 09:41:08 -0400 Subject: [PATCH 1/9] ParquetMetaDataPushDecoder --- parquet/src/errors.rs | 6 + parquet/src/file/metadata/mod.rs | 7 +- parquet/src/file/metadata/push_decoder.rs | 553 ++++++++++++++++++++++ parquet/src/file/metadata/reader.rs | 2 +- parquet/src/lib.rs | 17 + parquet/src/util/mod.rs | 1 + parquet/src/util/push_buffers.rs | 186 ++++++++ 7 files changed, 768 insertions(+), 4 deletions(-) create mode 100644 parquet/src/file/metadata/push_decoder.rs create mode 100644 parquet/src/util/push_buffers.rs diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs index 93b2c1b7e028..be08245e956c 100644 --- a/parquet/src/errors.rs +++ b/parquet/src/errors.rs @@ -52,6 +52,9 @@ pub enum ParquetError { /// Returned when a function needs more data to complete properly. The `usize` field indicates /// the total number of bytes required, not the number of additional bytes. NeedMoreData(usize), + /// Returned when a function needs more data to complete properly. + /// The `Range` indicates the range of bytes that are needed. + NeedMoreDataRange(std::ops::Range), } impl std::fmt::Display for ParquetError { @@ -69,6 +72,9 @@ impl std::fmt::Display for ParquetError { } ParquetError::External(e) => write!(fmt, "External: {e}"), ParquetError::NeedMoreData(needed) => write!(fmt, "NeedMoreData: {needed}"), + ParquetError::NeedMoreDataRange(range) => { + write!(fmt, "NeedMoreDataRange: {}..{}", range.start, range.end) + } } } } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 04129c6aa482..98ff94894629 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -40,11 +40,10 @@ //! metadata into parquet files. To work with metadata directly, //! the following APIs are available: //! -//! * [`ParquetMetaDataReader`] for reading +//! * [`ParquetMetaDataReader`] for reading from a reader for I/O +//! * [`ParquetMetaDataPushDecoder`] for decoding from bytes without I/O //! * [`ParquetMetaDataWriter`] for writing. //! -//! [`ParquetMetaDataReader`]: https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html -//! [`ParquetMetaDataWriter`]: https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataWriter.html //! //! # Examples //! @@ -92,6 +91,7 @@ //! * Same name, different struct //! ``` mod memory; +mod push_decoder; pub(crate) mod reader; mod writer; @@ -120,6 +120,7 @@ use crate::schema::types::{ }; #[cfg(feature = "encryption")] use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; +pub use push_decoder::ParquetMetaDataPushDecoder; pub use reader::{FooterTail, ParquetMetaDataReader}; use std::ops::Range; use std::sync::Arc; diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs new file mode 100644 index 000000000000..48824c13fe01 --- /dev/null +++ b/parquet/src/file/metadata/push_decoder.rs @@ -0,0 +1,553 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use crate::DecodeResult; + +/// A push decoder for [`ParquetMetaData`]. +/// +/// This structure implements a push API based version of the [`ParquetMetaDataReader`], which +/// decouples the IO from the metadata decoding logic. +/// +/// You can use this decoder to customize your IO operations, as shown in the +/// examples below for minimizing bytes read, prefetching data, or +/// using async IO. +/// +/// # Example +/// +/// The most basic usage is to feed the decoder with the necessary byte ranges +/// as requested as shown below. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// # fn decode_metadata() -> Result { +/// # let file_bytes = { +/// # let mut buffer = vec![0]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # // mimic IO by returning a function that returns the bytes for a given range +/// # let get_range = |range: &Range| -> Bytes { +/// # let start = range.start as usize; +/// # let end = range.end as usize; +/// # file_bytes.slice(start..end) +/// # }; +/// # +/// # let file_len = file_bytes.len() as u64; +/// // The `ParquetMetaDataPushDecoder` needs to know the file length. +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// // try to decode the metadata. If more data is needed, the decoder will tell you what ranges +/// loop { +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful +/// Ok(DecodeResult::NeedsData(ranges)) => { +/// // The decoder needs more data +/// // +/// // In this example, we call a function that returns the bytes for each given range. +/// // In a real application, you would likely read the data from a file or network. +/// let data = ranges.iter().map(|range| get_range(range)).collect(); +/// // Push the data into the decoder and try to decode again on the next iteration. +/// decoder.push_ranges(ranges, data).unwrap(); +/// } +/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") } +/// Err(e) => return Err(e), +/// } +/// } +/// # } +/// ``` +/// +/// # Example with "prefetching" +/// +/// By default, the [`ParquetMetaDataPushDecoder`] will requests only the exact byte +/// ranges it needs. This minimizes the number of bytes read, however it +/// requires at least two IO operations to read the metadata - one to read the +/// footer and then one to read the metadata. +/// +/// If the file has a "Page Index" (see [Self::with_page_indexes]), three +/// IO operations are required to read the metadata, as the page index is +/// not part of the normal metadata footer. +/// +/// To reduce the number of IO operations in systems with high per operation +/// overhead (e.g. cloud storage), you can "prefetch" the data and then push +/// the data into the decoder before calling [`Self::try_decode`]. If you do +/// not push enough bytes, the decoder will return the ranges that are still +/// needed. +/// +/// This approach can also be used when you the entire file already in memory +/// for other reasons. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// # fn decode_metadata() -> Result { +/// # let file_bytes = { +/// # let mut buffer = vec![0]; +/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap(); +/// # writer.write(&batch).unwrap(); +/// # writer.close().unwrap(); +/// # Bytes::from(buffer) +/// # }; +/// # +/// let file_len = file_bytes.len() as u64; +/// // for this example, we "prefetch" all the bytes which we have in memory, +/// // but in a real application, you would likely read a chunk from the end +/// // for example 1MB. +/// let prefetched_bytes = file_bytes.clone(); +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// // push the prefetched bytes into the decoder +/// decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap(); +/// // The decoder will now be able to decode the metadata. Note in a real application, +/// // unless you can guarantee that the pushed data is enough to decode the metadata, +/// // you still need to call `try_decode` in a loop until it returns `DecodeResult::Data` +/// // as shown in the previous example +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful] +/// other @ _ => { panic!("expected DecodeResult::Data, got: {other:?}") } +/// } +/// # } +/// ``` +/// +/// # Example using [`AsyncRead`] +/// +/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source that can +/// provide byte ranges, including async IO sources. However, it does not +/// implement async IO itself. To use async IO, you simply write an async +/// wrapper around it that reads the required byte ranges and pushes them into the +/// decoder. +/// +/// ```rust +/// # use std::ops::Range; +/// # use bytes::Bytes; +/// use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +/// # use arrow_array::record_batch; +/// # use parquet::DecodeResult; +/// # use parquet::arrow::ArrowWriter; +/// # use parquet::errors::ParquetError; +/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; +/// # +/// // this function decodes Parquet Metadata from anything that implements +/// // [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File +/// async fn decode_metadata( +/// file_len: u64, +/// mut async_source: impl AsyncRead + AsyncSeek + Unpin +/// ) -> Result { +/// // We need a ParquetMetaDataPushDecoder to decode the metadata. +/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); +/// loop { +/// match decoder.try_decode() { +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful +/// Ok(DecodeResult::NeedsData(ranges)) => { +/// // The decoder needs more data +/// // +/// // In this example we use the AsyncRead and AsyncSeek traits to read the +/// // required ranges from the async source. +/// let mut data = Vec::with_capacity(ranges.len()); +/// for range in &ranges { +/// let mut buffer = vec![0; (range.end - range.start) as usize]; +/// async_source.seek(std::io::SeekFrom::Start(range.start)).await?; +/// async_source.read_exact(&mut buffer).await?; +/// data.push(Bytes::from(buffer)); +/// } +/// // Push the data into the decoder and try to decode again on the next iteration. +/// decoder.push_ranges(ranges, data).unwrap(); +/// } +/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in previous match arm") } +/// Err(e) => return Err(e), +/// } +/// } +/// } +/// ``` +/// [`AsyncRead`]: tokio::io::AsyncRead +#[derive(Debug)] +pub struct ParquetMetaDataPushDecoder { + done: bool, + metadata_reader: ParquetMetaDataReader, + buffers: crate::util::push_buffers::PushBuffers, +} + +impl ParquetMetaDataPushDecoder { + /// Create a new `ParquetMetaDataPushDecoder` with the given file length. + /// + /// By default, this will read page indexes and column indexes. See + /// [`ParquetMetaDataPushDecoder::with_page_indexes`] for more detail. + /// + /// See examples on [`ParquetMetaDataPushDecoder`]. + pub fn try_new(file_len: u64) -> std::result::Result { + if file_len < 8 { + return Err(ParquetError::General(format!( + "Parquet files are at least 8 bytes long, but file length is {file_len}" + ))); + }; + + let metadata_reader = ParquetMetaDataReader::new().with_page_indexes(true); + + Ok(Self { + done: false, + metadata_reader, + buffers: crate::util::push_buffers::PushBuffers::new(file_len), + }) + } + + /// Enable or disable reading the page index structures described in + /// "[Parquet page index]: Layout to Support Page Skipping". + /// + /// Defaults to `true` + /// + /// This requires + /// 1. The Parquet file to have been written with page indexes + /// 2. Additional data to be pushed into the decoder (as the page indexes are not part of the thrift footer) + /// + /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md + pub fn with_page_indexes(mut self, val: bool) -> Self { + self.metadata_reader = self.metadata_reader.with_page_indexes(val); + self + } + + /// Push the data into the decoder's buffer. + /// + /// The decoder does not immediately attempt to decode the metadata + /// after pushing data. Instead, it accumulates the pushed data until you + /// call [`Self::try_decode`]. + /// + /// # Determining required data: + /// + /// To determine what ranges are required to decode the metadata, you can + /// either: + /// + /// 1. Call [`Self::try_decode`] first to get the exact ranges required (see + /// example on [`Self`]) + /// + /// 2. Speculatively push any data that you have available, which may + /// include more than the footer data or requested bytes. + /// + /// Speculatively pushing data can be used when "prefetching" data. See + /// example on [`Self`] + pub fn push_ranges( + &mut self, + ranges: Vec>, + buffers: Vec, + ) -> std::result::Result<(), String> { + if self.done { + return Err( + "ParquetMetaDataPushDecoder: cannot push data after decoding is finished" + .to_string(), + ); + } + self.buffers.push_ranges(ranges, buffers); + Ok(()) + } + + /// Try to decode the metadata from the pushed data, returning the + /// decoded metadata or an error if not enough data is available. + pub fn try_decode( + &mut self, + ) -> std::result::Result, ParquetError> { + if self.done { + return Ok(DecodeResult::Finished); + } + + // need to have the last 8 bytes of the file to decode the metadata + let file_len = self.buffers.file_len(); + if !self.buffers.has_range(&(file_len - 8..file_len)) { + #[expect(clippy::single_range_in_vec_init)] + return Ok(DecodeResult::NeedsData(vec![file_len - 8..file_len])); + } + + // Try to parse the metadata from the buffers we have. + // If we don't have enough data, it will return a `ParquetError::NeedMoreData` + // with the number of bytes needed to complete the metadata parsing. + // If we have enough data, it will return `Ok(())` and we can + let maybe_metadata = self + .metadata_reader + .try_parse_sized(&self.buffers, self.buffers.file_len()); + + match maybe_metadata { + Ok(()) => { + // Metadata successfully parsed, proceed to decode the row groups + let metadata = self.metadata_reader.finish()?; + self.done = true; + Ok(DecodeResult::Data(metadata)) + } + + Err(ParquetError::NeedMoreData(needed)) => { + let needed = needed as u64; + let Some(start_offset) = file_len.checked_sub(needed) else { + return Err(ParquetError::General(format!( + "Parquet metadata reader needs at least {needed} bytes, but file length is only {file_len}" + ))); + }; + let needed_range = start_offset..start_offset + needed; + // needs `needed_range` bytes at the end of the file + Ok(DecodeResult::NeedsData(vec![needed_range])) + } + Err(ParquetError::NeedMoreDataRange(range)) => Ok(DecodeResult::NeedsData(vec![range])), + + Err(e) => Err(e), // some other error, pass back + } + } +} + +// These tests use the arrow writer to create a parquet file in memory +// so they need the arrow feature and the test feature +#[cfg(all(test, feature = "arrow"))] +mod tests { + use super::*; + use crate::arrow::ArrowWriter; + use crate::file::properties::WriterProperties; + use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray}; + use bytes::Bytes; + use std::fmt::Debug; + use std::ops::Range; + use std::sync::{Arc, LazyLock}; + + /// It is possible to decode the metadata from the entire file at once before being asked + #[test] + fn test_metadata_decoder_all_data() { + let file_len = test_file_len(); + let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); + // Push the entire file data into the metadata decoder + push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]); + + // should be able to decode the metadata without needing more data + let metadata = expect_data(metadata_decoder.try_decode()); + + assert_eq!(metadata.num_row_groups(), 2); + assert_eq!(metadata.row_group(0).num_rows(), 200); + assert_eq!(metadata.row_group(1).num_rows(), 200); + assert!(metadata.column_index().is_some()); + assert!(metadata.offset_index().is_some()); + } + + /// It is possible to feed some, but not all, of the footer into the metadata decoder + /// before asked. This avoids multiple IO requests + #[test] + fn test_metadata_decoder_prefetch_success() { + let file_len = test_file_len(); + let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); + // simulate pre-fetching the last 2k bytes of the file without asking the decoder + let prefetch_range = (file_len - 2 * 1024)..file_len; + push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]); + + // expect the decoder has enough data to decode the metadata + let metadata = expect_data(metadata_decoder.try_decode()); + expect_finished(metadata_decoder.try_decode()); + assert_eq!(metadata.num_row_groups(), 2); + assert_eq!(metadata.row_group(0).num_rows(), 200); + assert_eq!(metadata.row_group(1).num_rows(), 200); + assert!(metadata.column_index().is_some()); + assert!(metadata.offset_index().is_some()); + } + + /// It is possible to pre-fetch some, but not all, of the necessary data + /// data + #[test] + fn test_metadata_decoder_prefetch_retry() { + let file_len = test_file_len(); + let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); + // simulate pre-fetching the last 1500 bytes of the file. + // this is enough to read the footer thrift metadata, but not the offset indexes + let prefetch_range = (file_len - 1500)..file_len; + push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![prefetch_range]); + + // expect another request is needed to read the offset indexes (note + // try_decode only returns NeedsData once, whereas without any prefetching it would + // return NeedsData three times) + let ranges = expect_needs_data(metadata_decoder.try_decode()); + push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges); + + // expect the decoder has enough data to decode the metadata + let metadata = expect_data(metadata_decoder.try_decode()); + expect_finished(metadata_decoder.try_decode()); + + assert_eq!(metadata.num_row_groups(), 2); + assert_eq!(metadata.row_group(0).num_rows(), 200); + assert_eq!(metadata.row_group(1).num_rows(), 200); + assert!(metadata.column_index().is_some()); + assert!(metadata.offset_index().is_some()); + } + + /// Decode the metadata incrementally, simulating a scenario where exactly the data needed + /// is read in each step + #[test] + fn test_metadata_decoder_incremental() { + let file_len = TEST_FILE_DATA.len() as u64; + let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap(); + let ranges = expect_needs_data(metadata_decoder.try_decode()); + assert_eq!(ranges.len(), 1); + assert_eq!(ranges[0], test_file_len() - 8..test_file_len()); + push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges); + + // expect the first request to read the footer + let ranges = expect_needs_data(metadata_decoder.try_decode()); + push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges); + + // expect the second request to read the offset indexes + let ranges = expect_needs_data(metadata_decoder.try_decode()); + push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges); + + // expect the third request to read the actual data + let metadata = expect_data(metadata_decoder.try_decode()); + expect_finished(metadata_decoder.try_decode()); + + assert_eq!(metadata.num_row_groups(), 2); + assert_eq!(metadata.row_group(0).num_rows(), 200); + assert_eq!(metadata.row_group(1).num_rows(), 200); + assert!(metadata.column_index().is_some()); + assert!(metadata.offset_index().is_some()); + } + + /// Decode the metadata incrementally, but without reading the page indexes + /// (so only two requests) + #[test] + fn test_metadata_decoder_incremental_no_page_index() { + let file_len = TEST_FILE_DATA.len() as u64; + let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len) + .unwrap() + .with_page_indexes(false); + let ranges = expect_needs_data(metadata_decoder.try_decode()); + assert_eq!(ranges.len(), 1); + assert_eq!(ranges[0], test_file_len() - 8..test_file_len()); + push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges); + + // expect the first request to read the footer + let ranges = expect_needs_data(metadata_decoder.try_decode()); + push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges); + + // expect NO second request to read the offset indexes, should just cough up the metadata + let metadata = expect_data(metadata_decoder.try_decode()); + expect_finished(metadata_decoder.try_decode()); + + assert_eq!(metadata.num_row_groups(), 2); + assert_eq!(metadata.row_group(0).num_rows(), 200); + assert_eq!(metadata.row_group(1).num_rows(), 200); + assert!(metadata.column_index().is_none()); // of course, we did not read the column index + assert!(metadata.offset_index().is_none()); // or the offset index + } + + static TEST_BATCH: LazyLock = LazyLock::new(|| { + // Input batch has 400 rows, with 3 columns: "a", "b", "c" + // Note c is a different types (so the data page sizes will be different) + let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400)); + let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800)); + let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| { + if i % 2 == 0 { + format!("string_{i}") + } else { + format!("A string larger than 12 bytes and thus not inlined {i}") + } + }))); + + RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap() + }); + + /// Create a parquet file in memory for testing. See [`test_file_range`] for details. + static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + let input_batch = &TEST_BATCH; + let mut output = Vec::new(); + + let writer_options = WriterProperties::builder() + .set_max_row_group_size(200) + .set_data_page_row_count_limit(100) + .build(); + let mut writer = + ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap(); + + // since the limits are only enforced on batch boundaries, write the input + // batch in chunks of 50 + let mut row_remain = input_batch.num_rows(); + while row_remain > 0 { + let chunk_size = row_remain.min(50); + let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size); + writer.write(&chunk).unwrap(); + row_remain -= chunk_size; + } + writer.close().unwrap(); + Bytes::from(output) + }); + + /// Return the length of the test file in bytes + fn test_file_len() -> u64 { + TEST_FILE_DATA.len() as u64 + } + + /// Return the range of the entire test file + fn test_file_range() -> Range { + 0..test_file_len() + } + + /// Return a slice of the test file data from the given range + pub fn test_file_slice(range: Range) -> Bytes { + let start: usize = range.start.try_into().unwrap(); + let end: usize = range.end.try_into().unwrap(); + TEST_FILE_DATA.slice(start..end) + } + + /// Push the given ranges to the metadata decoder, simulating reading from a file + fn push_ranges_to_metadata_decoder( + metadata_decoder: &mut ParquetMetaDataPushDecoder, + ranges: Vec>, + ) { + let data = ranges + .iter() + .map(|range| test_file_slice(range.clone())) + .collect::>(); + metadata_decoder.push_ranges(ranges, data).unwrap(); + } + + /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element + fn expect_data(result: Result, ParquetError>) -> T { + match result.expect("Expected Ok(DecodeResult::Data(T))") { + DecodeResult::Data(data) => data, + result => panic!("Expected DecodeResult::Data, got {result:?}"), + } + } + + /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges + fn expect_needs_data( + result: Result, ParquetError>, + ) -> Vec> { + match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") { + DecodeResult::NeedsData(ranges) => ranges, + result => panic!("Expected DecodeResult::NeedsData, got {result:?}"), + } + } + + fn expect_finished(result: Result, ParquetError>) { + match result.expect("Expected Ok(DecodeResult::Finished)") { + DecodeResult::Finished => {} + result => panic!("Expected DecodeResult::Finished, got {result:?}"), + } + } +} diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 356713837530..ad2f81a7357d 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -69,7 +69,7 @@ use crate::file::page_index::offset_index::OffsetIndexMetaData; /// assert!(metadata.column_index().is_some()); /// assert!(metadata.offset_index().is_some()); /// ``` -#[derive(Default)] +#[derive(Default, Debug)] pub struct ParquetMetaDataReader { metadata: Option, column_index: bool, diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index 07a673c295bc..5478fe002569 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -155,6 +155,8 @@ pub mod format; #[macro_use] pub mod data_type; +use std::fmt::Debug; +use std::ops::Range; // Exported for external use, such as benchmarks #[cfg(feature = "experimental")] #[doc(hidden)] @@ -179,3 +181,18 @@ pub mod record; pub mod schema; pub mod thrift; + +/// What data is needed to read the next item from a decoder. +/// +/// This is used to communicate between the decoder and the caller +/// to indicate what data is needed next, or what the result of decoding is. +#[derive(Debug)] +pub enum DecodeResult { + /// The ranges of data necessary to proceed + // TODO: distinguish between minimim needed to make progress and what could be used? + NeedsData(Vec>), + /// The decoder produced an output item + Data(T), + /// The decoder finished processing + Finished, +} diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index 1431132473e9..145cdd693e59 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -20,6 +20,7 @@ pub mod bit_util; mod bit_pack; pub(crate) mod interner; +pub mod push_buffers; #[cfg(any(test, feature = "test_common"))] pub(crate) mod test_common; pub mod utf8; diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs new file mode 100644 index 000000000000..dd7458a8611f --- /dev/null +++ b/parquet/src/util/push_buffers.rs @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::ParquetError; +use crate::file::reader::{ChunkReader, Length}; +use bytes::Bytes; +use std::fmt::Display; +use std::ops::Range; + +/// Holds multiple buffers of data that have been requested by the ParquetDecoder +/// +/// This is the in-memory buffer for the ParquetDecoder +/// +/// Features it has: +/// 1. Zero copy as much as possible +/// 2. Keeps non contiguous ranges of bytes +#[derive(Debug, Clone)] +pub(crate) struct PushBuffers { + /// the virtual "offset" of this buffers (added to any request) + offset: u64, + /// The total length of the file being decoded + file_len: u64, + /// The ranges of data that are available for decoding (not adjusted for offset) + ranges: Vec>, + /// The buffers of data that can be used to decode the Parquet file + buffers: Vec, +} + +impl Display for PushBuffers { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "Buffers (offset: {}, file_len: {})", + self.offset, self.file_len + )?; + writeln!(f, "Available Ranges (w/ offset):")?; + for range in &self.ranges { + writeln!( + f, + " {}..{} ({}..{}): {} bytes", + range.start, + range.end, + range.start + self.offset, + range.end + self.offset, + range.end - range.start + )?; + } + + Ok(()) + } +} + +impl PushBuffers { + /// Create a new Buffers instance with the given file length + pub fn new(file_len: u64) -> Self { + Self { + offset: 0, + file_len, + ranges: Vec::new(), + buffers: Vec::new(), + } + } + + /// Push all the ranges and buffers + pub fn push_ranges(&mut self, ranges: Vec>, buffers: Vec) { + assert_eq!( + ranges.len(), + buffers.len(), + "Number of ranges must match number of buffers" + ); + for (range, buffer) in ranges.into_iter().zip(buffers.into_iter()) { + self.push_range(range, buffer); + } + } + + /// Push a new range and its associated buffer + pub fn push_range(&mut self, range: Range, buffer: Bytes) { + assert_eq!( + (range.end - range.start) as usize, + buffer.len(), + "Range length must match buffer length" + ); + self.ranges.push(range); + self.buffers.push(buffer); + } + + /// Returns true if the Buffers contains data for the given range + pub fn has_range(&self, range: &Range) -> bool { + self.ranges + .iter() + .any(|r| r.start <= range.start && r.end >= range.end) + } + + fn iter(&self) -> impl Iterator, &Bytes)> { + self.ranges.iter().zip(self.buffers.iter()) + } + + /// return the file length of the Parquet file being read + pub fn file_len(&self) -> u64 { + self.file_len + } + + /// Specify a new offset + pub fn with_offset(mut self, offset: u64) -> Self { + self.offset = offset; + self + } +} + +impl Length for PushBuffers { + fn len(&self) -> u64 { + self.file_len + } +} + +/// less efficinet implementation of Read for Buffers +impl std::io::Read for PushBuffers { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // Find the range that contains the start offset + let mut found = false; + for (range, data) in self.iter() { + if range.start <= self.offset && range.end >= self.offset + buf.len() as u64 { + // Found the range, figure out the starting offset in the buffer + let start_offset = (self.offset - range.start) as usize; + let end_offset = start_offset + buf.len(); + let slice = data.slice(start_offset..end_offset); + buf.copy_from_slice(slice.as_ref()); + found = true; + } + } + if found { + // If we found the range, we can return the number of bytes read + // advance our offset + self.offset += buf.len() as u64; + Ok(buf.len()) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "No data available in Buffers", + )) + } + } +} + +impl ChunkReader for PushBuffers { + type T = Self; + + fn get_read(&self, start: u64) -> Result { + Ok(self.clone().with_offset(self.offset + start)) + } + + fn get_bytes(&self, start: u64, length: usize) -> Result { + if start > self.file_len { + return Err(ParquetError::General(format!( + "Requested start {start} is beyond the end of the file (file length: {})", + self.file_len + ))); + } + + // find the range that contains the start offset + for (range, data) in self.iter() { + if range.start <= start && range.end >= start + length as u64 { + // Found the range, figure out the starting offset in the buffer + let start_offset = (start - range.start) as usize; + return Ok(data.slice(start_offset..start_offset + length)); + } + } + // Signal that we need more data + let requested_end = start + length as u64; + Err(ParquetError::NeedMoreDataRange(start..requested_end)) + } +} From 5fc55da4925fc8a60d5915092d6243fbb6bd04f6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 15 Aug 2025 12:38:24 -0400 Subject: [PATCH 2/9] Update with new PageIndexPolicy API --- parquet/src/file/metadata/push_decoder.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 48824c13fe01..4b35ecdbc5a9 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -16,7 +16,7 @@ // under the License. use crate::errors::ParquetError; -use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; use crate::DecodeResult; /// A push decoder for [`ParquetMetaData`]. @@ -204,14 +204,15 @@ impl ParquetMetaDataPushDecoder { /// [`ParquetMetaDataPushDecoder::with_page_indexes`] for more detail. /// /// See examples on [`ParquetMetaDataPushDecoder`]. - pub fn try_new(file_len: u64) -> std::result::Result { + pub fn try_new(file_len: u64) -> Result { if file_len < 8 { return Err(ParquetError::General(format!( "Parquet files are at least 8 bytes long, but file length is {file_len}" ))); }; - let metadata_reader = ParquetMetaDataReader::new().with_page_indexes(true); + let metadata_reader = + ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Optional); Ok(Self { done: false, @@ -221,17 +222,19 @@ impl ParquetMetaDataPushDecoder { } /// Enable or disable reading the page index structures described in - /// "[Parquet page index]: Layout to Support Page Skipping". + /// "[Parquet page index] Layout to Support Page Skipping". /// - /// Defaults to `true` + /// Defaults to [`PageIndexPolicy::Optional`] /// /// This requires /// 1. The Parquet file to have been written with page indexes /// 2. Additional data to be pushed into the decoder (as the page indexes are not part of the thrift footer) /// /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md - pub fn with_page_indexes(mut self, val: bool) -> Self { - self.metadata_reader = self.metadata_reader.with_page_indexes(val); + pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self { + self.metadata_reader = self + .metadata_reader + .with_page_index_policy(page_index_policy); self } @@ -435,7 +438,7 @@ mod tests { let file_len = TEST_FILE_DATA.len() as u64; let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_len) .unwrap() - .with_page_indexes(false); + .with_page_index_policy(PageIndexPolicy::Skip); let ranges = expect_needs_data(metadata_decoder.try_decode()); assert_eq!(ranges.len(), 1); assert_eq!(ranges[0], test_file_len() - 8..test_file_len()); From 3c292b286e6b9ce3a0c3be328aafdd3d57fd7768 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 15 Aug 2025 12:42:04 -0400 Subject: [PATCH 3/9] Fix docs --- parquet/src/file/metadata/push_decoder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 4b35ecdbc5a9..af46f711f14d 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -88,7 +88,7 @@ use crate::DecodeResult; /// requires at least two IO operations to read the metadata - one to read the /// footer and then one to read the metadata. /// -/// If the file has a "Page Index" (see [Self::with_page_indexes]), three +/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three /// IO operations are required to read the metadata, as the page index is /// not part of the normal metadata footer. /// @@ -201,7 +201,7 @@ impl ParquetMetaDataPushDecoder { /// Create a new `ParquetMetaDataPushDecoder` with the given file length. /// /// By default, this will read page indexes and column indexes. See - /// [`ParquetMetaDataPushDecoder::with_page_indexes`] for more detail. + /// [`ParquetMetaDataPushDecoder::with_page_index_policy`] for more detail. /// /// See examples on [`ParquetMetaDataPushDecoder`]. pub fn try_new(file_len: u64) -> Result { From 297fbfb14ec46f544e4546767b95dbacaaeca8bf Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 20 Aug 2025 14:16:08 -0400 Subject: [PATCH 4/9] Add coalescing discussion --- parquet/src/util/push_buffers.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs index dd7458a8611f..3b3398dd85dc 100644 --- a/parquet/src/util/push_buffers.rs +++ b/parquet/src/util/push_buffers.rs @@ -21,14 +21,24 @@ use bytes::Bytes; use std::fmt::Display; use std::ops::Range; -/// Holds multiple buffers of data that have been requested by the ParquetDecoder +/// Holds multiple buffers of data /// -/// This is the in-memory buffer for the ParquetDecoder +/// This is the in-memory buffer for the ParquetDecoder and ParquetMetadataDecoders /// -/// Features it has: -/// 1. Zero copy as much as possible -/// 2. Keeps non contiguous ranges of bytes -#[derive(Debug, Clone)] +/// Features: +/// 1. Zero copy +/// 2. non contiguous ranges of bytes +/// +/// # Non Coalescing +/// +/// This buffer does not coalesce (merging adjacent ranges of bytes into a +/// single range). Coalescing at this level would require copying the data but +/// the caller may already have the needed data in a single buffer which would +/// require no copying. +/// +/// Thus, the implementation defers to the caller to coalesce subsequent requests +/// if desired. +# [derive(Debug, Clone)] pub(crate) struct PushBuffers { /// the virtual "offset" of this buffers (added to any request) offset: u64, From 3f4ad09e2e035e0e2f89aace7d50ddcc3f6a2a81 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 20 Aug 2025 14:19:06 -0400 Subject: [PATCH 5/9] break when found --- parquet/src/util/push_buffers.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs index 3b3398dd85dc..7e643f840c42 100644 --- a/parquet/src/util/push_buffers.rs +++ b/parquet/src/util/push_buffers.rs @@ -150,6 +150,7 @@ impl std::io::Read for PushBuffers { let slice = data.slice(start_offset..end_offset); buf.copy_from_slice(slice.as_ref()); found = true; + break; } } if found { From be330afbfb6f0849b64dad4dca67d7a9741a6c88 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 20 Aug 2025 14:22:09 -0400 Subject: [PATCH 6/9] fmt --- parquet/src/util/push_buffers.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs index 7e643f840c42..b30f91a81b70 100644 --- a/parquet/src/util/push_buffers.rs +++ b/parquet/src/util/push_buffers.rs @@ -38,7 +38,7 @@ use std::ops::Range; /// /// Thus, the implementation defers to the caller to coalesce subsequent requests /// if desired. -# [derive(Debug, Clone)] +#[derive(Debug, Clone)] pub(crate) struct PushBuffers { /// the virtual "offset" of this buffers (added to any request) offset: u64, @@ -137,7 +137,7 @@ impl Length for PushBuffers { } } -/// less efficinet implementation of Read for Buffers +/// less efficient implementation of Read for Buffers impl std::io::Read for PushBuffers { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { // Find the range that contains the start offset From e2ed49125f54e8b67f64a381d5af3bdaedde5e37 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 Aug 2025 11:59:23 -0400 Subject: [PATCH 7/9] Apply suggestions from code review Co-authored-by: Ed Seidl --- parquet/src/file/metadata/push_decoder.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index af46f711f14d..d4820129a3f9 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -98,7 +98,7 @@ use crate::DecodeResult; /// not push enough bytes, the decoder will return the ranges that are still /// needed. /// -/// This approach can also be used when you the entire file already in memory +/// This approach can also be used when you have the entire file already in memory /// for other reasons. /// /// ```rust @@ -121,7 +121,7 @@ use crate::DecodeResult; /// # }; /// # /// let file_len = file_bytes.len() as u64; -/// // for this example, we "prefetch" all the bytes which we have in memory, +/// // For this example, we "prefetch" all the bytes which we have in memory, /// // but in a real application, you would likely read a chunk from the end /// // for example 1MB. /// let prefetched_bytes = file_bytes.clone(); @@ -133,9 +133,9 @@ use crate::DecodeResult; /// // you still need to call `try_decode` in a loop until it returns `DecodeResult::Data` /// // as shown in the previous example /// match decoder.try_decode() { -/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful] -/// other @ _ => { panic!("expected DecodeResult::Data, got: {other:?}") } -/// } +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful] +/// other @ _ => { panic!("expected DecodeResult::Data, got: {other:?}") } +/// } /// # } /// ``` /// @@ -157,7 +157,7 @@ use crate::DecodeResult; /// # use parquet::errors::ParquetError; /// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataPushDecoder}; /// # -/// // this function decodes Parquet Metadata from anything that implements +/// // This function decodes Parquet Metadata from anything that implements /// // [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File /// async fn decode_metadata( /// file_len: u64, @@ -178,7 +178,7 @@ use crate::DecodeResult; /// let mut buffer = vec![0; (range.end - range.start) as usize]; /// async_source.seek(std::io::SeekFrom::Start(range.start)).await?; /// async_source.read_exact(&mut buffer).await?; -/// data.push(Bytes::from(buffer)); +/// data.push(Bytes::from(buffer)); /// } /// // Push the data into the decoder and try to decode again on the next iteration. /// decoder.push_ranges(ranges, data).unwrap(); From d04d3c5e591dbb9d175933551fbdc5da41a54ba7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 Aug 2025 15:31:58 -0400 Subject: [PATCH 8/9] Improve comments --- parquet/src/file/metadata/push_decoder.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index d4820129a3f9..25ff7959b9aa 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -83,7 +83,7 @@ use crate::DecodeResult; /// /// # Example with "prefetching" /// -/// By default, the [`ParquetMetaDataPushDecoder`] will requests only the exact byte +/// By default, the [`ParquetMetaDataPushDecoder`] will request only the exact byte /// ranges it needs. This minimizes the number of bytes read, however it /// requires at least two IO operations to read the metadata - one to read the /// footer and then one to read the metadata. @@ -289,9 +289,12 @@ impl ParquetMetaDataPushDecoder { } // Try to parse the metadata from the buffers we have. - // If we don't have enough data, it will return a `ParquetError::NeedMoreData` + // + // If we don't have enough data, returns a `ParquetError::NeedMoreData` // with the number of bytes needed to complete the metadata parsing. - // If we have enough data, it will return `Ok(())` and we can + // + // If we have enough data, returns `Ok(())` and we can complete + // the metadata parsing. let maybe_metadata = self .metadata_reader .try_parse_sized(&self.buffers, self.buffers.file_len()); From 52b5343023f6b73719c850fec1d74d678a5076f7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 10 Sep 2025 15:28:03 -0400 Subject: [PATCH 9/9] Update parquet/src/file/metadata/push_decoder.rs Co-authored-by: albertlockett --- parquet/src/file/metadata/push_decoder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 25ff7959b9aa..811caf4fd46c 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -133,8 +133,8 @@ use crate::DecodeResult; /// // you still need to call `try_decode` in a loop until it returns `DecodeResult::Data` /// // as shown in the previous example /// match decoder.try_decode() { -/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful] -/// other @ _ => { panic!("expected DecodeResult::Data, got: {other:?}") } +/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } // decode successful +/// other => { panic!("expected DecodeResult::Data, got: {other:?}") } /// } /// # } /// ```