Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove file_length from the ParquetPushDecoderBuilder
  • Loading branch information
alamb committed Nov 4, 2025
commit dd683bdcd4be0992a570dead942fbb936b7641ef
6 changes: 2 additions & 4 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub use metadata::*;
mod store;

use crate::DecodeResult;
use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder, ParquetPushDecoderBuilder};
#[cfg(feature = "object_store")]
pub use store::*;

Expand Down Expand Up @@ -500,9 +500,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
let projected_schema = Arc::new(Schema::new(projected_fields));

let decoder = ParquetPushDecoderBuilder {
// Async reader doesn't know the overall size of the input, but it
// is not required for decoding as we will already have the metadata
input: 0,
input: NoInput,
metadata,
schema,
fields,
Expand Down
163 changes: 66 additions & 97 deletions parquet/src/arrow/push_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ use std::sync::Arc;
/// # let parquet_metadata = Arc::new(parquet_metadata);
/// // The file length and metadata are required to create the decoder
/// let mut decoder =
/// ParquetPushDecoderBuilder::try_new_decoder(file_length, parquet_metadata)
/// ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
/// .unwrap()
/// // Optionally configure the decoder, e.g. batch size
/// .with_batch_size(1024)
Expand Down Expand Up @@ -110,7 +110,19 @@ use std::sync::Arc;
/// }
/// }
/// ```
pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<u64>; // u64 is the file length, if known
pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;

/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
///
/// There is no "input" for the push decoder by design (the idea is that
/// the caller pushes data to the decoder as needed)..
///
/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
/// which DO have an `input`. To support reusing the same builder code for
/// all three types of decoders, we define this `NoInput` for the push decoder to
/// denote in the type system there is no type.
#[derive(Debug, Clone, Copy)]
pub struct NoInput;

/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
/// more options that can be configured.
Expand All @@ -122,15 +134,8 @@ impl ParquetPushDecoderBuilder {
/// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
///
/// See example on [`ParquetPushDecoderBuilder`]
pub fn try_new_decoder(
file_len: u64,
parquet_metadata: Arc<ParquetMetaData>,
) -> Result<Self, ParquetError> {
Self::try_new_decoder_with_options(
file_len,
parquet_metadata,
ArrowReaderOptions::default(),
)
pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was introduced in

Which has not been release yet -- and thus this is not a breaking API change. Likewise for the changes to ParquetPushDecoderBuilder

Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
}

/// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
Expand All @@ -139,27 +144,26 @@ impl ParquetPushDecoderBuilder {
/// This is similar to [`Self::try_new_decoder`] but allows configuring
/// options such as Arrow schema
pub fn try_new_decoder_with_options(
file_len: u64,
parquet_metadata: Arc<ParquetMetaData>,
arrow_reader_options: ArrowReaderOptions,
) -> Result<Self, ParquetError> {
let arrow_reader_metadata =
ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
Ok(Self::new_with_metadata(file_len, arrow_reader_metadata))
Ok(Self::new_with_metadata(arrow_reader_metadata))
}

/// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
///
/// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
/// the Parquet metadata and reader options.
pub fn new_with_metadata(file_len: u64, arrow_reader_metadata: ArrowReaderMetadata) -> Self {
Self::new_builder(file_len, arrow_reader_metadata)
pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
Self::new_builder(NoInput, arrow_reader_metadata)
}

/// Create a [`ParquetPushDecoder`] with the configured options
pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
let Self {
input: file_len,
input: NoInput,
metadata: parquet_metadata,
schema: _,
fields,
Expand All @@ -179,6 +183,7 @@ impl ParquetPushDecoderBuilder {
row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());

// Prepare to build RowGroup readers
let file_len = 0; // not used in push decoder
let buffers = PushBuffers::new(file_len);
let row_group_reader_builder = RowGroupReaderBuilder::new(
batch_size,
Expand Down Expand Up @@ -600,13 +605,10 @@ mod test {
/// available in memory
#[test]
fn test_decoder_all_data() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap()
.build()
.unwrap();
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.build()
.unwrap();

decoder
.push_range(test_file_range(), TEST_FILE_DATA.clone())
Expand All @@ -629,13 +631,10 @@ mod test {
/// fetched as needed
#[test]
fn test_decoder_incremental() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap()
.build()
.unwrap();
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.build()
.unwrap();

let mut results = vec![];

Expand Down Expand Up @@ -668,13 +667,10 @@ mod test {
/// Decode the entire file incrementally, simulating partial reads
#[test]
fn test_decoder_partial() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap()
.build()
.unwrap();
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.build()
.unwrap();

// First row group, expect a single request for all data needed to read "a" and "b"
let ranges = expect_needs_data(decoder.try_decode());
Expand Down Expand Up @@ -712,11 +708,8 @@ mod test {
/// only a single request per row group
#[test]
fn test_decoder_selection_does_one_request() {
let builder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap();
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();

let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();

Expand Down Expand Up @@ -750,11 +743,8 @@ mod test {
/// of the data needed for the filter at a time simulating partial reads.
#[test]
fn test_decoder_single_filter_partial() {
let builder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap();
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();

// Values in column "a" range 0..399
// First filter: "a" > 250 (nothing in Row Group 0, both data pages in Row Group 1)
Expand Down Expand Up @@ -811,11 +801,8 @@ mod test {
/// Decode with a filter where we also skip one of the RowGroups via a RowSelection
#[test]
fn test_decoder_single_filter_and_row_selection() {
let builder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap();
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();

// Values in column "a" range 0..399
// First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
Expand Down Expand Up @@ -866,11 +853,8 @@ mod test {
#[test]
fn test_decoder_multi_filters() {
// Create a decoder for decoding parquet data (note it does not have any IO / readers)
let builder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap();
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();

// Values in column "a" range 0..399
// Values in column "b" range 400..799
Expand Down Expand Up @@ -951,11 +935,8 @@ mod test {
#[test]
fn test_decoder_reuses_filter_pages() {
// Create a decoder for decoding parquet data (note it does not have any IO / readers)
let builder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap();
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();

// Values in column "a" range 0..399
// First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
Expand Down Expand Up @@ -1002,11 +983,8 @@ mod test {

#[test]
fn test_decoder_empty_filters() {
let builder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap();
let builder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();

// only read column "c", but with empty filters
Expand Down Expand Up @@ -1044,17 +1022,14 @@ mod test {

#[test]
fn test_decoder_offset_limit() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap()
// skip entire first row group (200 rows) and first 25 rows of second row group
.with_offset(225)
// and limit to 20 rows
.with_limit(20)
.build()
.unwrap();
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
// skip entire first row group (200 rows) and first 25 rows of second row group
.with_offset(225)
// and limit to 20 rows
.with_limit(20)
.build()
.unwrap();

// First row group should be skipped,

Expand All @@ -1073,14 +1048,11 @@ mod test {
#[test]
fn test_decoder_row_group_selection() {
// take only the second row group
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap()
.with_row_groups(vec![1])
.build()
.unwrap();
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_groups(vec![1])
.build()
.unwrap();

// First row group should be skipped,

Expand All @@ -1099,17 +1071,14 @@ mod test {
#[test]
fn test_decoder_row_selection() {
// take only the second row group
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
test_file_len(),
test_file_parquet_metadata(),
)
.unwrap()
.with_row_selection(RowSelection::from(vec![
RowSelector::skip(225), // skip first row group and 25 rows of second])
RowSelector::select(20), // take 20 rows
]))
.build()
.unwrap();
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_selection(RowSelection::from(vec![
RowSelector::skip(225), // skip first row group and 25 rows of second])
RowSelector::select(20), // take 20 rows
]))
.build()
.unwrap();

// First row group should be skipped,

Expand Down
Loading