diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 05557069aa7..f601ac7cefd 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -78,6 +78,7 @@ base64 = { version = "0.22", default-features = false, features = ["std"] } criterion = { version = "0.5", default-features = false, features = ["async_futures"] } snap = { version = "1.0", default-features = false } tempfile = { version = "3.0", default-features = false } +insta = "1.43.1" brotli = { version = "8.0", default-features = false, features = ["std"] } flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] } lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"] } diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index 7e2b149ad3f..61af21a68ec 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -48,11 +48,12 @@ pub trait Length { /// Generates [`Read`]ers to read chunks of a Parquet data source. /// /// The Parquet reader uses [`ChunkReader`] to access Parquet data, allowing -/// multiple decoders to read concurrently from different locations in the same file. +/// multiple decoders to read concurrently from different locations in the same +/// file. /// -/// The trait provides: -/// * random access (via [`Self::get_bytes`]) -/// * sequential (via [`Self::get_read`]) +/// The trait functions both as a reader and a factory for readers. +/// * random access via [`Self::get_bytes`] +/// * sequential access via the reader returned via factory method [`Self::get_read`] /// /// # Provided Implementations /// * [`File`] for reading from local file system diff --git a/parquet/tests/arrow_reader/io/async_reader.rs b/parquet/tests/arrow_reader/io/async_reader.rs new file mode 100644 index 00000000000..f2d3ce07234 --- /dev/null +++ b/parquet/tests/arrow_reader/io/async_reader.rs @@ -0,0 +1,430 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for the async reader ([`ParquetRecordBatchStreamBuilder`]) + +use crate::io::{ + filter_a_175_b_625, filter_b_575_625, filter_b_false, test_file, test_options, LogEntry, + OperationLog, TestParquetFile, +}; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::errors::Result; +use parquet::file::metadata::ParquetMetaData; +use std::ops::Range; +use std::sync::Arc; + +#[tokio::test] +async fn test_read_entire_file() { + // read entire file without any filtering or projection + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 0, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7346 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7456 bytes, 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_single_group() { + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()) + .await + // read only second row group + .with_row_groups(vec![1]); + + // Expect to see only IO for Row Group 1. Should see no IO for Row Group 0. + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7456 bytes, 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_single_column() { + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder.with_projection(ProjectionMask::columns(&schema_descr, ["b"])); + // Expect to see only IO for column "b". Should see no IO for columns "a" or "c". + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_row_selection() { + // There are 400 total rows spread across 4 data pages (100 rows each) + // select rows 175..225 (i.e. DataPage(1) of row group 0 and DataPage(0) of row group 1) + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(175), + RowSelector::select(50), + ])); + + // Expect to see only data IO for one page for each column for each row group + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + " Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + " Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_limit() { + // There are 400 total rows spread across 4 data pages (100 rows each) + // a limit of 125 rows should only fetch the first two data pages (DataPage(0) and DataPage(1)) from row group 0 + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a"])) + .with_limit(125); + + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + " Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_single_row_filter() { + // Values from column "b" range 400..799 + // filter "b" > 575 and < than 625 + // (last data page in Row Group 0 and first DataPage in Row Group 1) + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(filter_b_575_625(&schema_descr)); + + // Expect to see I/O for column b in both row groups to evaluate filter, + // then a single pages for the "a" column in each row group + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_single_row_filter_no_page_index() { + // Values from column "b" range 400..799 + // Apply a filter "b" > 575 and than 625 + // (last data page in Row Group 0 and first DataPage in Row Group 1) + let test_file = test_file(); + let options = test_options().with_page_index(false); + let builder = async_builder(&test_file, options).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(filter_b_575_625(&schema_descr)); + + // Since we don't have the page index, expect to see: + // 1. I/O for all pages of column b to evaluate the filter + // 2. IO for all pages of column a as the reader doesn't know where the page + // boundaries are so needs to scan them. + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 0, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_multiple_row_filter() { + // Values in column "a" range 0..399 + // Values in column "b" range 400..799 + // First filter: "a" > 175 (last data page in Row Group 0) + // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1) + // Read column "c" + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["c"])) + .with_row_filter(filter_a_175_b_625(&schema_descr)); + + // Expect that we will see + // 1. IO for all pages of column A (to evaluate the first filter) + // 2. IO for pages of column b that passed the first filter (to evaluate the second filter) + // 3. IO after reader is built only for column c for the rows that passed both filters + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 0, column 'c': DictionaryPage (7107 bytes, 1 requests) [data]", + " Row Group 0, column 'c': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + " Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'c': DictionaryPage (7217 bytes, 1 requests) [data]", + " Row Group 1, column 'c': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[tokio::test] +async fn test_read_single_row_filter_all() { + // Apply a filter that filters out all rows + + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(filter_b_false(&schema_descr)); + + // Expect to see reads for column "b" to evaluate the filter, but no reads + // for column "a" as no rows pass the filter + insta::assert_debug_snapshot!(run( + &test_file, + builder).await, @r#" + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + ] + "#); +} + +/// Return a [`ParquetRecordBatchStreamBuilder`] for reading this file +async fn async_builder( + test_file: &TestParquetFile, + options: ArrowReaderOptions, +) -> ParquetRecordBatchStreamBuilder { + let parquet_meta_data = if options.page_index() { + Arc::clone(test_file.parquet_metadata()) + } else { + // strip out the page index from the metadata + let metadata = test_file + .parquet_metadata() + .as_ref() + .clone() + .into_builder() + .set_column_index(None) + .set_offset_index(None) + .build(); + Arc::new(metadata) + }; + + let reader = RecordingAsyncFileReader { + bytes: test_file.bytes().clone(), + ops: Arc::clone(test_file.ops()), + parquet_meta_data, + }; + + ParquetRecordBatchStreamBuilder::new_with_options(reader, options) + .await + .unwrap() +} + +/// Build the reader from the specified builder and read all batches from it, +/// and return the operations log. +async fn run( + test_file: &TestParquetFile, + builder: ParquetRecordBatchStreamBuilder, +) -> Vec { + let ops = test_file.ops(); + ops.add_entry(LogEntry::event("Builder Configured")); + let mut stream = builder.build().unwrap(); + ops.add_entry(LogEntry::event("Reader Built")); + while let Some(batch) = stream.next().await { + match batch { + Ok(_) => {} + Err(e) => panic!("Error reading batch: {e}"), + } + } + ops.snapshot() +} + +struct RecordingAsyncFileReader { + bytes: Bytes, + ops: Arc, + parquet_meta_data: Arc, +} + +impl AsyncFileReader for RecordingAsyncFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + let ops = Arc::clone(&self.ops); + let data = self + .bytes + .slice(range.start as usize..range.end as usize) + .clone(); + + // translate to usize from u64 + let logged_range = Range { + start: range.start as usize, + end: range.end as usize, + }; + async move { + ops.add_entry_for_range(&logged_range); + Ok(data) + } + .boxed() + } + + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + let ops = Arc::clone(&self.ops); + let datas = ranges + .iter() + .map(|range| { + self.bytes + .slice(range.start as usize..range.end as usize) + .clone() + }) + .collect::>(); + // translate to usize from u64 + let logged_ranges = ranges + .into_iter() + .map(|r| Range { + start: r.start as usize, + end: r.end as usize, + }) + .collect::>(); + + async move { + ops.add_entry_for_ranges(&logged_ranges); + Ok(datas) + } + .boxed() + } + + fn get_metadata<'a>( + &'a mut self, + _options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, Result>> { + let ops = Arc::clone(&self.ops); + let parquet_meta_data = Arc::clone(&self.parquet_meta_data); + async move { + ops.add_entry(LogEntry::GetProvidedMetadata); + Ok(parquet_meta_data) + } + .boxed() + } +} diff --git a/parquet/tests/arrow_reader/io/mod.rs b/parquet/tests/arrow_reader/io/mod.rs new file mode 100644 index 00000000000..b31f295755b --- /dev/null +++ b/parquet/tests/arrow_reader/io/mod.rs @@ -0,0 +1,703 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for IO read patterns in the Parquet Reader +//! +//! Each test: +//! 1. Creates a temporary Parquet file with a known row group structure +//! 2. Reads data from that file using the Arrow Parquet Reader, recording the IO operations +//! 3. Asserts the expected IO patterns based on the read operations +//! +//! Note this module contains test infrastructure only. The actual tests are in the +//! sub-modules [`sync_reader`] and [`async_reader`]. +//! +//! Key components: +//! - [`TestParquetFile`] - Represents a Parquet file and its layout +//! - [`OperationLog`] - Records IO operations performed on the file +//! - [`LogEntry`] - Represents a single IO operation in the log + +mod sync_reader; + +#[cfg(feature = "async")] +mod async_reader; + +use arrow::compute::and; +use arrow::compute::kernels::cmp::{gt, lt}; +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, StringViewArray}; +use bytes::Bytes; +use parquet::arrow::arrow_reader::{ + ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter, +}; +use parquet::arrow::{ArrowWriter, ProjectionMask}; +use parquet::data_type::AsBytes; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetOffsetIndex}; +use parquet::file::properties::WriterProperties; +use parquet::file::FOOTER_SIZE; +use parquet::format::PageLocation; +use parquet::schema::types::SchemaDescriptor; +use std::collections::BTreeMap; +use std::fmt::Display; +use std::ops::Range; +use std::sync::{Arc, LazyLock, Mutex}; + +/// Create a new `TestParquetFile` with: +/// 3 columns: "a", "b", "c" +/// +/// 2 row groups, each with 200 rows +/// each data page has 100 rows +/// +/// Values of column "a" are 0..399 +/// Values of column "b" are 400..799 +/// Values of column "c" are alternating strings of length 12 and longer +fn test_file() -> TestParquetFile { + TestParquetFile::new(TEST_FILE_DATA.clone()) +} + +/// Default options for tests +/// +/// Note these tests use the PageIndex to reduce IO +fn test_options() -> ArrowReaderOptions { + ArrowReaderOptions::default().with_page_index(true) +} + +/// Return a row filter that evaluates "b > 575" AND "b < 625" +/// +/// last data page in Row Group 0 and first DataPage in Row Group 1 +fn filter_b_575_625(schema_descr: &SchemaDescriptor) -> RowFilter { + // "b" > 575 and "b" < 625 + let predicate = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_575 = Int64Array::new_scalar(575); + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::(); + and(>(column, &scalar_575)?, <(column, &scalar_625)?) + }, + ); + RowFilter::new(vec![Box::new(predicate)]) +} + +/// Filter a > 175 and b < 625 +/// First filter: "a" > 175 (last data page in Row Group 0) +/// Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1) +fn filter_a_175_b_625(schema_descr: &SchemaDescriptor) -> RowFilter { + // "a" > 175 and "b" < 625 + let predicate_a = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["a"]), + |batch: RecordBatch| { + let scalar_175 = Int64Array::new_scalar(175); + let column = batch.column(0).as_primitive::(); + gt(column, &scalar_175) + }, + ); + + let predicate_b = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::(); + lt(column, &scalar_625) + }, + ); + + RowFilter::new(vec![Box::new(predicate_a), Box::new(predicate_b)]) +} + +/// Filter FALSE (no rows) with b +/// Entirely filters out both row groups +/// Note it selects "b" +fn filter_b_false(schema_descr: &SchemaDescriptor) -> RowFilter { + // "false" + let predicate = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["b"]), + |batch: RecordBatch| { + let result = + BooleanArray::from_iter(std::iter::repeat_n(Some(false), batch.num_rows())); + Ok(result) + }, + ); + RowFilter::new(vec![Box::new(predicate)]) +} + +/// Create a parquet file in memory for testing. See [`test_file`] for details. +static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + // Input batch has 400 rows, with 3 columns: "a", "b", "c" + // Note c is a different types (so the data page sizes will be different) + let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400)); + let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800)); + let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| { + if i % 2 == 0 { + format!("string_{i}") + } else { + format!("A string larger than 12 bytes and thus not inlined {i}") + } + }))); + + let input_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let mut output = Vec::new(); + + let writer_options = WriterProperties::builder() + .set_max_row_group_size(200) + .set_data_page_row_count_limit(100) + .build(); + let mut writer = + ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap(); + + // since the limits are only enforced on batch boundaries, write the input + // batch in chunks of 50 + let mut row_remain = input_batch.num_rows(); + while row_remain > 0 { + let chunk_size = row_remain.min(50); + let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size); + writer.write(&chunk).unwrap(); + row_remain -= chunk_size; + } + writer.close().unwrap(); + Bytes::from(output) +}); + +/// A test parquet file and its layout. +struct TestParquetFile { + bytes: Bytes, + /// The operation log for IO operations performed on this file + ops: Arc, + /// The (pre-parsed) parquet metadata for this file + parquet_metadata: Arc, +} + +impl TestParquetFile { + /// Create a new `TestParquetFile` with the specified temporary directory and path + /// and determines the row group layout. + fn new(bytes: Bytes) -> Self { + // Read the parquet file to determine its layout + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + bytes.clone(), + ArrowReaderOptions::default().with_page_index(true), + ) + .unwrap(); + + let parquet_metadata = Arc::clone(builder.metadata()); + + let offset_index = parquet_metadata + .offset_index() + .expect("Parquet metadata should have a page index"); + + let row_groups = TestRowGroups::new(&parquet_metadata, offset_index); + + // figure out the footer location in the file + let footer_location = bytes.len() - FOOTER_SIZE..bytes.len(); + let footer = bytes.slice(footer_location.clone()); + let footer: &[u8; FOOTER_SIZE] = footer + .as_bytes() + .try_into() // convert to a fixed size array + .unwrap(); + + // figure out the metadata location + let footer = ParquetMetaDataReader::decode_footer_tail(footer).unwrap(); + let metadata_len = footer.metadata_length(); + let metadata_location = footer_location.start - metadata_len..footer_location.start; + + let ops = Arc::new(OperationLog::new( + footer_location, + metadata_location, + row_groups, + )); + + TestParquetFile { + bytes, + ops, + parquet_metadata, + } + } + + /// Return the internal bytes of the parquet file + fn bytes(&self) -> &Bytes { + &self.bytes + } + + /// Return the operation log for this file + fn ops(&self) -> &Arc { + &self.ops + } + + /// Return the parquet metadata for this file + fn parquet_metadata(&self) -> &Arc { + &self.parquet_metadata + } +} + +/// Information about a column chunk +#[derive(Debug)] +struct TestColumnChunk { + /// The name of the column + name: String, + + /// The location of the entire column chunk in the file including dictionary pages + /// and data pages. + location: Range, + + /// The offset of the start of of the dictionary page if any + dictionary_page_location: Option, + + /// The location of the data pages in the file + page_locations: Vec, +} + +/// Information about the pages in a single row group +#[derive(Debug)] +struct TestRowGroup { + /// Maps column_name -> Information about the column chunk + columns: BTreeMap, +} + +/// Information about all the row groups in a Parquet file, extracted from its metadata +#[derive(Debug)] +struct TestRowGroups { + /// List of row groups, each containing information about its columns and page locations + row_groups: Vec, +} + +impl TestRowGroups { + fn new(parquet_metadata: &ParquetMetaData, offset_index: &ParquetOffsetIndex) -> Self { + let row_groups = parquet_metadata + .row_groups() + .iter() + .enumerate() + .map(|(rg_index, rg_meta)| { + let columns = rg_meta + .columns() + .iter() + .enumerate() + .map(|(col_idx, col_meta)| { + let column_name = col_meta.column_descr().name().to_string(); + let page_locations = + offset_index[rg_index][col_idx].page_locations().to_vec(); + let dictionary_page_location = col_meta.dictionary_page_offset(); + + // We can find the byte range of the entire column chunk + let (start_offset, length) = col_meta.byte_range(); + let start_offset = start_offset as usize; + let end_offset = start_offset + length as usize; + + TestColumnChunk { + name: column_name.clone(), + location: start_offset..end_offset, + dictionary_page_location, + page_locations, + } + }) + .map(|test_column_chunk| { + // make key=value pairs to insert into the BTreeMap + (test_column_chunk.name.clone(), test_column_chunk) + }) + .collect::>(); + TestRowGroup { columns } + }) + .collect(); + + Self { row_groups } + } + + fn iter(&self) -> impl Iterator { + self.row_groups.iter() + } +} + +/// Type of data read +#[derive(Debug, PartialEq)] +enum PageType { + /// The data page with the specified index + Data { + data_page_index: usize, + }, + Dictionary, + /// Multiple pages read together + Multi { + /// Was the dictionary page included? + dictionary_page: bool, + /// The data pages included + data_page_indices: Vec, + }, +} + +impl Display for PageType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PageType::Data { data_page_index } => { + write!(f, "DataPage({data_page_index})") + } + PageType::Dictionary => write!(f, "DictionaryPage"), + PageType::Multi { + dictionary_page, + data_page_indices, + } => { + let dictionary_page = if *dictionary_page { + "dictionary_page: true, " + } else { + "" + }; + write!( + f, + "MultiPage({dictionary_page}data_pages: {data_page_indices:?})", + ) + } + } + } +} + +/// Read single logical data object (data page or dictionary page) +/// in one or more requests +#[derive(Debug)] +struct ReadInfo { + row_group_index: usize, + column_name: String, + range: Range, + read_type: PageType, + /// Number of distinct requests (function calls) that were used + num_requests: usize, +} + +impl Display for ReadInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + row_group_index, + column_name, + range, + read_type, + num_requests, + } = self; + + // If the average read size is less than 10 bytes, assume it is the thrift + // decoder reading the page headers and add an annotation + let annotation = if (range.len() / num_requests) < 10 { + " [header]" + } else { + " [data]" + }; + + // align the read type to 20 characters for better readability, not sure why + // this does not work inline with write! macro below + write!( + f, + "Row Group {row_group_index}, column '{column_name}': {:15} ({:10}, {:8}){annotation}", + // convert to strings so alignment works + format!("{read_type}"), + format!("{} bytes", range.len()), + format!("{num_requests} requests"), + ) + } +} + +/// Store structured entries in the log to make it easier to combine multiple entries +#[derive(Debug)] +enum LogEntry { + /// Read the footer (last 8 bytes) of the parquet file + ReadFooter(Range), + /// Read the metadata of the parquet file + ReadMetadata(Range), + /// Access previously parsed metadata + GetProvidedMetadata, + /// Read a single logical data object + ReadData(ReadInfo), + /// Read one or more logical data objects in a single operation + ReadMultipleData(Vec), + /// Not known where the read came from + Unknown(Range), + /// A user defined event + Event(String), +} + +impl LogEntry { + fn event(event: impl Into) -> Self { + LogEntry::Event(event.into()) + } + + /// Appends a string representation of this log entry to the output vector + fn append_string(&self, output: &mut Vec, indent: usize) { + let indent_str = " ".repeat(indent); + match self { + LogEntry::ReadFooter(range) => { + output.push(format!("{indent_str}Footer: {} bytes", range.len())) + } + LogEntry::ReadMetadata(range) => { + output.push(format!("{indent_str}Metadata: {}", range.len())) + } + LogEntry::GetProvidedMetadata => { + output.push(format!("{indent_str}Get Provided Metadata")) + } + LogEntry::ReadData(read_info) => output.push(format!("{indent_str}{read_info}")), + LogEntry::ReadMultipleData(read_infos) => { + output.push(format!("{indent_str}Read Multi:")); + for read_info in read_infos { + let new_indent = indent + 2; + read_info.append_string(output, new_indent); + } + } + LogEntry::Unknown(range) => { + output.push(format!("{indent_str}UNKNOWN: {range:?} (maybe Page Index)")) + } + LogEntry::Event(event) => output.push(format!("Event: {event}")), + } + } +} + +#[derive(Debug)] +struct OperationLog { + /// The operations performed on the file + ops: Mutex>, + + /// Footer location in the parquet file + footer_location: Range, + + /// Metadata location in the parquet file + metadata_location: Range, + + /// Information about the row group layout in the parquet file, used to + /// translate read operations into human understandable IO operations + /// Path to the parquet file + row_groups: TestRowGroups, +} + +impl OperationLog { + fn new( + footer_location: Range, + metadata_location: Range, + row_groups: TestRowGroups, + ) -> Self { + OperationLog { + ops: Mutex::new(Vec::new()), + metadata_location, + footer_location, + row_groups, + } + } + + /// Add an operation to the log + fn add_entry(&self, entry: LogEntry) { + let mut ops = self.ops.lock().unwrap(); + ops.push(entry); + } + + /// Adds an entry to the operation log for the interesting object that is + /// accessed by the specified range + /// + /// This function checks the ranges in order against possible locations + /// and adds the appropriate operation to the log for the first match found. + fn add_entry_for_range(&self, range: &Range) { + self.add_entry(self.entry_for_range(range)); + } + + /// Adds entries to the operation log for each interesting object that is + /// accessed by the specified range + /// + /// It behaves the same as [`add_entry_for_range`] but for multiple ranges. + fn add_entry_for_ranges<'a>(&self, ranges: impl IntoIterator>) { + let entries = ranges + .into_iter() + .map(|range| self.entry_for_range(range)) + .collect::>(); + self.add_entry(LogEntry::ReadMultipleData(entries)); + } + + /// Create an appropriate LogEntry for the specified range + fn entry_for_range(&self, range: &Range) -> LogEntry { + let start = range.start as i64; + let end = range.end as i64; + + // figure out what logical part of the file this range corresponds to + if self.metadata_location.contains(&range.start) + || self.metadata_location.contains(&(range.end - 1)) + { + return LogEntry::ReadMetadata(range.clone()); + } + + if self.footer_location.contains(&range.start) + || self.footer_location.contains(&(range.end - 1)) + { + return LogEntry::ReadFooter(range.clone()); + } + + // Search for the location in each column chunk. + // + // The actual parquet reader must in general decode the page headers + // and determine the byte ranges of the pages. However, for this test + // we assume the following layout: + // + // ```text + // (Dictionary Page) + // (Data Page) + // ... + // (Data Page) + // ``` + // + // We also assume that `self.page_locations` holds the location of all + // data pages, so any read operation that overlaps with a data page + // location is considered a read of that page, and any other read must + // be a dictionary page read. + for (row_group_index, row_group) in self.row_groups.iter().enumerate() { + for (column_name, test_column_chunk) in &row_group.columns { + // Check if the range overlaps with any data page locations + let page_locations = test_column_chunk.page_locations.iter(); + + // What data pages does this range overlap with? + let mut data_page_indices = vec![]; + + for (data_page_index, page_location) in page_locations.enumerate() { + let page_offset = page_location.offset; + let page_end = page_offset + page_location.compressed_page_size as i64; + + // if the range fully contains the page, consider it a read of that page + if start >= page_offset && end <= page_end { + let read_info = ReadInfo { + row_group_index, + column_name: column_name.clone(), + range: range.clone(), + read_type: PageType::Data { data_page_index }, + num_requests: 1, + }; + return LogEntry::ReadData(read_info); + } + + // if the range overlaps with the page, add it to the list of overlapping pages + if start < page_end && end > page_offset { + data_page_indices.push(data_page_index); + } + } + + // was the dictionary page read? + let mut dictionary_page = false; + + // Check if the range overlaps with the dictionary page location + if let Some(dict_page_offset) = test_column_chunk.dictionary_page_location { + let dict_page_end = dict_page_offset + test_column_chunk.location.len() as i64; + if start >= dict_page_offset && end < dict_page_end { + let read_info = ReadInfo { + row_group_index, + column_name: column_name.clone(), + range: range.clone(), + read_type: PageType::Dictionary, + num_requests: 1, + }; + + return LogEntry::ReadData(read_info); + } + + // if the range overlaps with the dictionary page, add it to the list of overlapping pages + if start < dict_page_end && end > dict_page_offset { + dictionary_page = true; + } + } + + // If we can't find a page, but the range overlaps with the + // column chunk location, use the column chunk location + let column_byte_range = &test_column_chunk.location; + if column_byte_range.contains(&range.start) + && column_byte_range.contains(&(range.end - 1)) + { + let read_data_entry = ReadInfo { + row_group_index, + column_name: column_name.clone(), + range: range.clone(), + read_type: PageType::Multi { + data_page_indices, + dictionary_page, + }, + num_requests: 1, + }; + + return LogEntry::ReadData(read_data_entry); + } + } + } + + // If we reach here, the range does not match any known logical part of the file + LogEntry::Unknown(range.clone()) + } + + // Combine entries in the log that are similar to reduce noise in the log. + fn coalesce_entries(&self) { + let mut ops = self.ops.lock().unwrap(); + + // Coalesce entries with the same read type + let prev_ops = std::mem::take(&mut *ops); + for entry in prev_ops { + let Some(last) = ops.last_mut() else { + ops.push(entry); + continue; + }; + + let LogEntry::ReadData(ReadInfo { + row_group_index: last_rg_index, + column_name: last_column_name, + range: last_range, + read_type: last_read_type, + num_requests: last_num_reads, + }) = last + else { + // If the last entry is not a ReadColumnChunk, just push it + ops.push(entry); + continue; + }; + + // If the entry is not a ReadColumnChunk, just push it + let LogEntry::ReadData(ReadInfo { + row_group_index, + column_name, + range, + read_type, + num_requests: num_reads, + }) = &entry + else { + ops.push(entry); + continue; + }; + + // Combine the entries if they are the same and this read is less than 10b. + // + // This heuristic is used to combine small reads (typically 1-2 + // byte) made by the thrift decoder when reading the data/dictionary + // page headers. + if *row_group_index != *last_rg_index + || column_name != last_column_name + || read_type != last_read_type + || (range.start > last_range.end) + || (range.end < last_range.start) + || range.len() > 10 + { + ops.push(entry); + continue; + } + // combine + *last_range = last_range.start.min(range.start)..last_range.end.max(range.end); + *last_num_reads += num_reads; + } + } + + /// return a snapshot of the current operations in the log. + fn snapshot(&self) -> Vec { + self.coalesce_entries(); + let ops = self.ops.lock().unwrap(); + let mut actual = vec![]; + let indent = 0; + ops.iter() + .for_each(|s| s.append_string(&mut actual, indent)); + actual + } +} diff --git a/parquet/tests/arrow_reader/io/sync_reader.rs b/parquet/tests/arrow_reader/io/sync_reader.rs new file mode 100644 index 00000000000..685f251a9e2 --- /dev/null +++ b/parquet/tests/arrow_reader/io/sync_reader.rs @@ -0,0 +1,443 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for the sync reader - [`ParquetRecordBatchReaderBuilder`] + +use crate::io::{ + filter_a_175_b_625, filter_b_575_625, filter_b_false, test_file, test_options, LogEntry, + OperationLog, TestParquetFile, +}; + +use bytes::Bytes; +use parquet::arrow::arrow_reader::{ + ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector, +}; +use parquet::arrow::ProjectionMask; +use parquet::file::reader::{ChunkReader, Length}; +use std::io::Read; +use std::sync::Arc; + +#[test] +fn test_read_entire_file() { + // read entire file without any filtering or projection + let test_file = test_file(); + // Expect to see IO for all data pages for each row group and column + let builder = sync_builder(&test_file, test_options()); + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 0, column 'c': DictionaryPage (7107 bytes, 1 requests) [data]", + "Row Group 0, column 'c': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'c': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'c': DictionaryPage (7217 bytes, 1 requests) [data]", + "Row Group 1, column 'c': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'c': DataPage(1) (126 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_single_group() { + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()).with_row_groups(vec![1]); // read only second row group + + // Expect to see only IO for Row Group 1. Should see no IO for Row Group 0. + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'c': DictionaryPage (7217 bytes, 1 requests) [data]", + "Row Group 1, column 'c': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'c': DataPage(1) (126 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_single_column() { + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder.with_projection(ProjectionMask::columns(&schema_descr, ["b"])); + // Expect to see only IO for column "b". Should see no IO for columns "a" or "c". + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_single_column_no_page_index() { + let test_file = test_file(); + let options = test_options().with_page_index(false); + let builder = sync_builder(&test_file, options); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder.with_projection(ProjectionMask::columns(&schema_descr, ["b"])); + // Expect to see only IO for column "b", should see no IO for columns "a" or "c". + // + // Note that we need to read all data page headers to find the pages for column b + // so there are many more small reads than in the test_read_single_column test above + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 0, column 'b': DictionaryPage (17 bytes , 17 requests) [header]", + "Row Group 0, column 'b': DictionaryPage (1600 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(0) (20 bytes , 20 requests) [header]", + "Row Group 0, column 'b': DataPage(0) (93 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (20 bytes , 20 requests) [header]", + "Row Group 0, column 'b': DataPage(1) (106 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (17 bytes , 17 requests) [header]", + "Row Group 1, column 'b': DictionaryPage (1600 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (20 bytes , 20 requests) [header]", + "Row Group 1, column 'b': DataPage(0) (93 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (20 bytes , 20 requests) [header]", + "Row Group 1, column 'b': DataPage(1) (106 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_row_selection() { + // There are 400 total rows spread across 4 data pages (100 rows each) + // select rows 175..225 (i.e. DataPage(1) of row group 0 and DataPage(0) of row group 1) + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder + .with_projection( + // read both "a" and "b" + ProjectionMask::columns(&schema_descr, ["a", "b"]), + ) + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(175), + RowSelector::select(50), + ])); + + // Expect to see only data IO for one page for each column for each row group + // Note the data page headers for all pages need to be read to find the correct pages + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_limit() { + // There are 400 total rows spread across 4 data pages (100 rows each) + // a limit of 125 rows should only fetch the first two data pages (DataPage(0) and DataPage(1)) from row group 0 + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a"])) + .with_limit(125); + + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Event: Reader Built", + "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_single_row_filter() { + // Values from column "b" range 400..799 + // filter "b" > 575 and < 625 + // (last data page in Row Group 0 and first DataPage in Row Group 1) + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection( + // read both "a" and "b" + ProjectionMask::columns(&schema_descr, ["a", "b"]), + ) + // "b" > 575 and "b" < 625 + .with_row_filter(filter_b_575_625(&schema_descr)); + + // Expect to see I/O for column b in both row groups and then reading just a + // single pages for a in each row group + // + // Note there is significant IO that happens during the construction of the + // reader (between "Builder Configured" and "Reader Built") + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Event: Reader Built", + "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_multiple_row_filter() { + // Values in column "a" range 0..399 + // Values in column "b" range 400..799 + // First filter: "a" > 175 (last data page in Row Group 0) + // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1) + // Read column "c" + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection( + ProjectionMask::columns(&schema_descr, ["c"]), // read "c" + ) + // a > 175 and b < 625 + .with_row_filter(filter_a_175_b_625(&schema_descr)); + + // Expect that we will see + // 1. IO for all pages of column A + // 2. IO for pages of column b that passed 1. + // 3. IO after reader is built only for column c + // + // Note there is significant IO that happens during the construction of the + // reader (between "Builder Configured" and "Reader Built") + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Event: Reader Built", + "Row Group 0, column 'c': DictionaryPage (7107 bytes, 1 requests) [data]", + "Row Group 0, column 'c': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'c': DictionaryPage (7217 bytes, 1 requests) [data]", + "Row Group 1, column 'c': DataPage(0) (113 bytes , 1 requests) [data]", + ] + "#); +} + +#[test] +fn test_read_single_row_filter_all() { + // Apply a filter that entirely filters out rows based on a predicate from one column + // should not read any data pages for any other column + + let test_file = test_file(); + let builder = sync_builder(&test_file, test_options()); + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(filter_b_false(&schema_descr)); + + // Expect to see the Footer and Metadata, then I/O for column b + // in both row groups but then nothing for column "a" + // since the row filter entirely filters out all rows. + // + // Note that all IO that happens during the construction of the reader + // (between "Builder Configured" and "Reader Built") + insta::assert_debug_snapshot!(run(&test_file, builder), + @r#" + [ + "Footer: 8 bytes", + "Metadata: 1162", + "UNKNOWN: 22230..22877 (maybe Page Index)", + "Event: Builder Configured", + "Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 0, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + "Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + "Row Group 1, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Event: Reader Built", + ] + "#); +} + +/// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file +fn sync_builder( + test_file: &TestParquetFile, + options: ArrowReaderOptions, +) -> ParquetRecordBatchReaderBuilder { + let reader = RecordingChunkReader { + inner: test_file.bytes().clone(), + ops: Arc::clone(test_file.ops()), + }; + ParquetRecordBatchReaderBuilder::try_new_with_options(reader, options) + .expect("ParquetRecordBatchReaderBuilder") +} + +/// build the reader, and read all batches from it, returning the recorded IO operations +fn run( + test_file: &TestParquetFile, + builder: ParquetRecordBatchReaderBuilder, +) -> Vec { + let ops = test_file.ops(); + ops.add_entry(LogEntry::event("Builder Configured")); + let reader = builder.build().unwrap(); + ops.add_entry(LogEntry::event("Reader Built")); + for batch in reader { + match batch { + Ok(_) => {} + Err(e) => panic!("Error reading batch: {e}"), + } + } + ops.snapshot() +} + +/// Records IO operations on an in-memory chunk reader +struct RecordingChunkReader { + inner: Bytes, + ops: Arc, +} + +impl Length for RecordingChunkReader { + fn len(&self) -> u64 { + self.inner.len() as u64 + } +} + +impl ChunkReader for RecordingChunkReader { + type T = RecordingStdIoReader; + + fn get_read(&self, start: u64) -> parquet::errors::Result { + let reader = RecordingStdIoReader { + start: start as usize, + inner: self.inner.clone(), + ops: Arc::clone(&self.ops), + }; + Ok(reader) + } + + fn get_bytes(&self, start: u64, length: usize) -> parquet::errors::Result { + let start = start as usize; + let range = start..start + length; + self.ops.add_entry_for_range(&range); + Ok(self.inner.slice(start..start + length)) + } +} + +/// Wrapper around a `Bytes` object that implements `Read` +struct RecordingStdIoReader { + /// current offset in the inner `Bytes` that this reader is reading from + start: usize, + inner: Bytes, + ops: Arc, +} + +impl Read for RecordingStdIoReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let remain = self.inner.len() - self.start; + let start = self.start; + let read_length = buf.len().min(remain); + let read_range = start..start + read_length; + + self.ops.add_entry_for_range(&read_range); + + buf.copy_from_slice(self.inner.slice(read_range).as_ref()); + // Update the inner position + self.start += read_length; + Ok(read_length) + } +} diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index 8d72d1def17..510d6278607 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -42,6 +42,7 @@ mod bad_data; #[cfg(feature = "crc")] mod checksum; mod int96_stats_roundtrip; +mod io; #[cfg(feature = "async")] mod predicate_cache; mod statistics; @@ -336,9 +337,9 @@ fn make_uint_batches(start: u8, end: u8) -> RecordBatch { Field::new("u64", DataType::UInt64, true), ])); let v8: Vec = (start..end).collect(); - let v16: Vec = (start as _..end as _).collect(); - let v32: Vec = (start as _..end as _).collect(); - let v64: Vec = (start as _..end as _).collect(); + let v16: Vec = (start as _..end as u16).collect(); + let v32: Vec = (start as _..end as u32).collect(); + let v64: Vec = (start as _..end as u64).collect(); RecordBatch::try_new( schema, vec![