diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 8d95c9a205ee..4b1611636923 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -715,6 +715,11 @@ where /// Compute which columns are used in filters and the final (output) projection fn compute_cache_projection(&self, projection: &ProjectionMask) -> Option { + // Do not compute the projection mask if the predicate cache is disabled + if self.max_predicate_cache_size == 0 { + return None; + } + let filters = self.filter.as_ref()?; let mut cache_projection = filters.predicates.first()?.projection().clone(); for predicate in filters.predicates.iter() { @@ -2611,4 +2616,96 @@ mod tests { // error we want to reproduce. let _result: Vec<_> = stream.try_collect().await.unwrap(); } + + #[tokio::test] + async fn test_predicate_cache_disabled() { + let k = Int32Array::from_iter_values(0..10); + let data = RecordBatch::try_from_iter([("k", Arc::new(k) as ArrayRef)]).unwrap(); + + let mut buf = Vec::new(); + // both the page row limit and batch size are set to 1 to create one page per row + let props = WriterProperties::builder() + .set_data_page_row_count_limit(1) + .set_write_batch_size(1) + .set_max_row_group_size(10) + .set_write_page_header_statistics(true) + .build(); + let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap(); + writer.write(&data).unwrap(); + writer.close().unwrap(); + + let data = Bytes::from(buf); + let metadata = ParquetMetaDataReader::new() + .with_page_index_policy(PageIndexPolicy::Required) + .parse_and_finish(&data) + .unwrap(); + let parquet_schema = metadata.file_metadata().schema_descr_ptr(); + + // the filter is not clone-able, so we use a lambda to simplify + let build_filter = || { + let scalar = Int32Array::from_iter_values([5]); + let predicate = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![0]), + move |batch| eq(batch.column(0), &Scalar::new(&scalar)), + ); + RowFilter::new(vec![Box::new(predicate)]) + }; + + // select only one of the pages + let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(1)]); + + let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required); + let reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap(); + + // using the predicate cache (default) + let reader_with_cache = TestReader::new(data.clone()); + let requests_with_cache = reader_with_cache.requests.clone(); + let stream = ParquetRecordBatchStreamBuilder::new_with_metadata( + reader_with_cache, + reader_metadata.clone(), + ) + .with_batch_size(1000) + .with_row_selection(selection.clone()) + .with_row_filter(build_filter()) + .build() + .unwrap(); + let batches_with_cache: Vec<_> = stream.try_collect().await.unwrap(); + + // disabling the predicate cache + let reader_without_cache = TestReader::new(data); + let requests_without_cache = reader_without_cache.requests.clone(); + let stream = ParquetRecordBatchStreamBuilder::new_with_metadata( + reader_without_cache, + reader_metadata, + ) + .with_batch_size(1000) + .with_row_selection(selection) + .with_row_filter(build_filter()) + .with_max_predicate_cache_size(0) // disabling it by setting the limit to 0 + .build() + .unwrap(); + let batches_without_cache: Vec<_> = stream.try_collect().await.unwrap(); + + assert_eq!(batches_with_cache, batches_without_cache); + + let requests_with_cache = requests_with_cache.lock().unwrap(); + let requests_without_cache = requests_without_cache.lock().unwrap(); + + // less requests will be made without the predicate cache + assert_eq!(requests_with_cache.len(), 11); + assert_eq!(requests_without_cache.len(), 2); + + // less bytes will be retrieved without the predicate cache + assert_eq!( + requests_with_cache.iter().map(|r| r.len()).sum::(), + 433 + ); + assert_eq!( + requests_without_cache + .iter() + .map(|r| r.len()) + .sum::(), + 92 + ); + } }