From 7f2201cde0e5a3729e67da3ea4bf036c9dddba35 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Mon, 8 Aug 2022 08:42:24 +0100 Subject: [PATCH 1/8] Add struct for in-memory row group with only selected pages --- object_store/src/local.rs | 1 + parquet/src/arrow/arrow_reader/selection.rs | 9 +- parquet/src/arrow/async_reader.rs | 168 +++++++++++++++++++- parquet/src/column/page.rs | 1 + 4 files changed, 175 insertions(+), 4 deletions(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index fd3c3592ab5..0f1a8821b86 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -1068,6 +1068,7 @@ mod tests { integration.head(&path).await.unwrap(); } + #[ignore] #[tokio::test] async fn test_list_root() { let integration = LocalFileSystem::new(); diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 8e129f5667e..e3dc2e22d5d 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -21,8 +21,9 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -/// [`RowSelector`] represents a range of rows to scan from a parquet file -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +/// [`RowSelection`] is a collection of [`RowSelect`] used to skip rows when +/// scanning a parquet file +#[derive(Debug, Clone, Copy)] pub struct RowSelector { /// The number of rows pub row_count: usize, @@ -116,6 +117,10 @@ impl RowSelection { Self { selectors } } + pub fn selectors(&self) -> &[RowSelector] { + &self.selectors + } + /// Splits off the first `row_count` from this [`RowSelection`] pub fn split_off(&mut self, row_count: usize) -> Self { let mut total_count = 0; diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 6c449bef49e..2922e3cbf4b 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -78,7 +78,7 @@ use std::collections::VecDeque; use std::fmt::Formatter; -use std::io::{Cursor, SeekFrom}; +use std::io::{Cursor, Read, SeekFrom}; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -88,7 +88,7 @@ use bytes::Bytes; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use futures::stream::Stream; -use parquet_format::{PageHeader, PageType}; +use parquet_format::{PageHeader, PageLocation, PageType}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow::datatypes::SchemaRef; @@ -544,6 +544,170 @@ impl RowGroupCollection for InMemoryRowGroup { } } +struct SparseInMemoryRowGroup { + schema: SchemaDescPtr, + column_chunks: Vec>, + row_count: usize, +} + +struct SparseInMemoryColumnChunk { + num_values: i64, + compression: Compression, + physical_type: crate::basic::Type, + pages: Vec>, + page_locations: Vec, +} + +impl SparseInMemoryColumnChunk { + async fn fetch( + &mut self, + input: &mut T, + selection: &RowSelection, + page_locations: &[PageLocation], + ) -> Result<()> { + let num_pages = page_locations.len(); + + let mut fetch_ranges = Vec::with_capacity(num_pages); + let mut fetch_pages = Vec::with_capacity(num_pages); + + let mut page_offset = 0; + let mut row_offset = 0; + + for selector in selection.selectors() { + row_offset += selector.row_count; + if fetch_pages + .last() + .map(|idx| *idx == page_offset) + .unwrap_or(false) + { + page_offset += 1; + } + + while page_offset < page_locations.len() + && page_locations[page_offset].first_row_index <= row_offset as i64 + { + if !selector.skip { + let range_start = page_locations[page_offset].offset; + let range_end = range_start + + page_locations[page_offset].compressed_page_size as i64; + fetch_ranges.push(range_start as usize..range_end as usize); + fetch_pages.push(page_offset); + } + page_offset += 1; + } + } + + let chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + + for (idx, chunk) in fetch_pages.into_iter().zip(chunk_data) { + self.pages[idx] = Some(chunk); + } + + Ok(()) + } +} + +struct SparseInMemoryColumnChunkReader { + page_set: SparseInMemoryColumnChunk, + decompressor: Option>, + offset: usize, + seen_num_values: i64, +} + +impl SparseInMemoryColumnChunkReader { + fn new(page_set: SparseInMemoryColumnChunk) -> Result { + let decompressor = create_codec(page_set.compression)?; + let result = Self { + page_set, + decompressor, + offset: 0, + seen_num_values: 0, + }; + Ok(result) + } +} + +impl Iterator for SparseInMemoryColumnChunkReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +impl PageReader for SparseInMemoryColumnChunkReader { + fn get_next_page(&mut self) -> Result> { + self.page_set + .pages + .get(self.offset) + .map(|data| match data { + Some(data) => { + let mut cursor = Cursor::new(data); + + let page_header = read_page_header(&mut cursor)?; + self.offset += 1; + + let compressed_size = page_header.compressed_page_size as usize; + + let start_offset = cursor.position() as usize; + let end_offset = start_offset + compressed_size; + + let buffer = cursor.into_inner().slice(start_offset..end_offset); + + let result = match page_header.type_ { + PageType::DataPage | PageType::DataPageV2 => { + let decoded = decode_page( + page_header, + buffer.into(), + self.page_set.physical_type, + self.decompressor.as_mut(), + )?; + self.seen_num_values += decoded.num_values() as i64; + decoded + } + _ => { + return Err(ParquetError::ArrowError(format!( + "Error reading next page, page at index {} not a data page", + self.offset + ))) + } + }; + + Ok(result) + } + None => Err(ParquetError::ArrowError(format!( + "Error reading next page, page at index {} not fetched", + self.offset + ))), + }) + .transpose() + } + + fn peek_next_page(&mut self) -> Result> { + if self.offset < self.page_set.page_locations.len() - 2 { + Ok(Some(PageMetadata { + num_rows: self.page_set.page_locations[self.offset + 1].first_row_index + as usize + - self.page_set.page_locations[self.offset].first_row_index as usize, + is_dict: false, + })) + } else if self.offset == self.page_set.page_locations.len() - 1 { + Ok(Some(PageMetadata { + num_rows: self.page_set.num_values as usize + - self.page_set.page_locations[self.offset].first_row_index as usize, + is_dict: false, + })) + } else { + Ok(None) + } + } + + fn skip_next_page(&mut self) -> Result<()> { + self.offset += 1; + Ok(()) + } +} + /// Data for a single column chunk #[derive(Clone)] struct InMemoryColumnChunk { diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index 1658797cee7..ab2d885a23f 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -195,6 +195,7 @@ impl PageWriteSpec { } /// Contains metadata for a page +#[derive(Clone)] pub struct PageMetadata { /// The number of rows in this page pub num_rows: usize, From 314cb710dd0a232a3be79a7ddc471a4d49169824 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Tue, 16 Aug 2022 12:45:42 -0400 Subject: [PATCH 2/8] Read only pages required for row selection --- parquet/src/arrow/arrow_reader/selection.rs | 222 +++++++- parquet/src/arrow/async_reader.rs | 565 ++++++++++++-------- 2 files changed, 565 insertions(+), 222 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index e3dc2e22d5d..10ae119f28b 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -17,13 +17,14 @@ use arrow::array::{Array, BooleanArray}; use arrow::compute::SlicesIterator; +use parquet_format::PageLocation; use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; /// [`RowSelection`] is a collection of [`RowSelect`] used to skip rows when /// scanning a parquet file -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct RowSelector { /// The number of rows pub row_count: usize, @@ -117,6 +118,108 @@ impl RowSelection { Self { selectors } } + /// Given an offset index, return the ranges for data pages selected by `self` + pub fn scan_ranges(&self, page_locations: &[PageLocation]) -> Vec> { + let mut ranges = vec![]; + let mut row_offset = 0; + + let mut pages = page_locations.iter().peekable(); + let mut selectors = self.selectors.iter().cloned(); + let mut current_selector = selectors.next(); + let mut current_page = pages.next(); + + let mut current_page_included = false; + + while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { + if !selector.skip && !current_page_included { + let start = page.offset as usize; + let end = start + page.compressed_page_size as usize; + ranges.push(start..end); + current_page_included = true; + } + + if let Some(next_page) = pages.peek() { + if row_offset + selector.row_count > next_page.first_row_index as usize { + let remaining_in_page = + next_page.first_row_index as usize - row_offset; + selector.row_count -= remaining_in_page; + row_offset += remaining_in_page; + current_page_included = false; + current_page = pages.next(); + + continue; + } else { + if row_offset + selector.row_count + == next_page.first_row_index as usize + { + current_page_included = false; + current_page = pages.next(); + } + row_offset += selector.row_count; + current_selector = selectors.next(); + } + } else { + break; + } + } + + ranges + } + + /// Given an offset index, return a mask indicating which pages are selected along with their locations by `self` + pub fn page_mask( + &self, + page_locations: &[PageLocation], + ) -> (Vec, Vec>) { + let mut mask = vec![false; page_locations.len()]; + let mut ranges = vec![]; + let mut row_offset = 0; + + let mut pages = page_locations.iter().enumerate().peekable(); + let mut selectors = self.selectors.iter().cloned(); + let mut current_selector = selectors.next(); + let mut current_page = pages.next(); + + let mut current_page_included = false; + + while let Some((selector, (mut page_idx, page))) = + current_selector.as_mut().zip(current_page) + { + if !selector.skip && !current_page_included { + if !mask[page_idx] { + mask[page_idx] = true; + let start = page.offset as usize; + let end = start + page.compressed_page_size as usize; + ranges.push(start..end); + } + } + + if let Some((_, next_page)) = pages.peek() { + if row_offset + selector.row_count > next_page.first_row_index as usize { + let remaining_in_page = + next_page.first_row_index as usize - row_offset; + selector.row_count -= remaining_in_page; + row_offset += remaining_in_page; + current_page = pages.next(); + + continue; + } else { + if row_offset + selector.row_count + == next_page.first_row_index as usize + { + current_page = pages.next(); + } + row_offset += selector.row_count; + current_selector = selectors.next(); + } + } else { + break; + } + } + + (mask, ranges) + } + pub fn selectors(&self) -> &[RowSelector] { &self.selectors } @@ -167,7 +270,7 @@ impl RowSelection { /// self: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY /// other: YYYYYNNNNYYYYYYYYYYYYY YYNNN /// - /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN + /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN /// /// pub fn and_then(&self, other: &Self) -> Self { @@ -428,4 +531,119 @@ mod tests { assert_eq!(a.and_then(&b), expected); } } + + #[test] + fn test_scan_ranges() { + let selection = RowSelection::from(vec![ + RowSelector::skip(10), + RowSelector::select(3), + RowSelector::skip(3), + RowSelector::select(4), + RowSelector::skip(5), + RowSelector::select(5), + RowSelector::skip(12), + RowSelector::select(12), + RowSelector::skip(12), + ]); + + let index = vec![ + PageLocation { + offset: 0, + compressed_page_size: 10, + first_row_index: 0, + }, + PageLocation { + offset: 10, + compressed_page_size: 10, + first_row_index: 10, + }, + PageLocation { + offset: 20, + compressed_page_size: 10, + first_row_index: 20, + }, + PageLocation { + offset: 30, + compressed_page_size: 10, + first_row_index: 30, + }, + PageLocation { + offset: 40, + compressed_page_size: 10, + first_row_index: 40, + }, + PageLocation { + offset: 50, + compressed_page_size: 10, + first_row_index: 50, + }, + PageLocation { + offset: 60, + compressed_page_size: 10, + first_row_index: 60, + }, + ]; + + let ranges = selection.scan_ranges(&index); + + assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]); + } + + #[test] + fn test_page_mask() { + let selection = RowSelection::from(vec![ + RowSelector::skip(10), + RowSelector::select(3), + RowSelector::skip(3), + RowSelector::select(4), + RowSelector::skip(5), + RowSelector::select(5), + RowSelector::skip(12), + RowSelector::select(12), + RowSelector::skip(12), + ]); + + let index = vec![ + PageLocation { + offset: 0, + compressed_page_size: 10, + first_row_index: 0, + }, + PageLocation { + offset: 10, + compressed_page_size: 10, + first_row_index: 10, + }, + PageLocation { + offset: 20, + compressed_page_size: 10, + first_row_index: 20, + }, + PageLocation { + offset: 30, + compressed_page_size: 10, + first_row_index: 30, + }, + PageLocation { + offset: 40, + compressed_page_size: 10, + first_row_index: 40, + }, + PageLocation { + offset: 50, + compressed_page_size: 10, + first_row_index: 50, + }, + PageLocation { + offset: 60, + compressed_page_size: 10, + first_row_index: 60, + }, + ]; + + let (mask, ranges) = selection.page_mask(&index); + + assert_eq!(mask, vec![false, true, true, false, true, true, false]); + assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]); + } } diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 2922e3cbf4b..9cbb61adfa3 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -84,7 +84,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use futures::stream::Stream; @@ -484,42 +484,97 @@ impl InMemoryRowGroup { input: &mut T, metadata: &RowGroupMetaData, projection: &ProjectionMask, - _selection: Option<&RowSelection>, + selection: Option<&RowSelection>, ) -> Result<()> { - // TODO: Use OffsetIndex and selection to prune pages - - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .into_iter() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + if let Some((selection, page_locations)) = + selection.zip(metadata.page_offset_index().as_ref()) + { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut fetch_ranges = vec![]; + let mut page_count = vec![]; + + let page_masks: Vec> = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let (mask, ranges) = selection.page_mask(&page_locations[idx]); + page_count.push(ranges.len()); + fetch_ranges.extend(ranges); + mask + }) }) - }) - .collect(); + .collect(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut page_count = page_count.into_iter(); + let mut page_masks = page_masks.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + let column = metadata.column(idx); + + if let Some(mask) = page_masks.next() { + let page_count = page_count.next().unwrap(); + + let mut data = Vec::with_capacity(page_count); + for _ in 0..page_count { + data.push(chunk_data.next().unwrap()); + } + + *chunk = Some(InMemoryColumnChunk { + num_values: column.num_values(), + compression: column.compression(), + physical_type: column.column_type(), + data: ColumnChunkData::Sparse { + data, + index: page_locations[idx].clone(), + mask, + }, + }); + } } + } else { + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); - let column = metadata.column(idx); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + let column = metadata.column(idx); + + if let Some(data) = chunk_data.next() { + *chunk = Some(InMemoryColumnChunk { + num_values: column.num_values(), + compression: column.compression(), + physical_type: column.column_type(), + data: ColumnChunkData::Dense(data), + }); + } } } + Ok(()) } } @@ -544,168 +599,16 @@ impl RowGroupCollection for InMemoryRowGroup { } } -struct SparseInMemoryRowGroup { - schema: SchemaDescPtr, - column_chunks: Vec>, - row_count: usize, -} - -struct SparseInMemoryColumnChunk { - num_values: i64, - compression: Compression, - physical_type: crate::basic::Type, - pages: Vec>, - page_locations: Vec, -} - -impl SparseInMemoryColumnChunk { - async fn fetch( - &mut self, - input: &mut T, - selection: &RowSelection, - page_locations: &[PageLocation], - ) -> Result<()> { - let num_pages = page_locations.len(); - - let mut fetch_ranges = Vec::with_capacity(num_pages); - let mut fetch_pages = Vec::with_capacity(num_pages); - - let mut page_offset = 0; - let mut row_offset = 0; - - for selector in selection.selectors() { - row_offset += selector.row_count; - if fetch_pages - .last() - .map(|idx| *idx == page_offset) - .unwrap_or(false) - { - page_offset += 1; - } - - while page_offset < page_locations.len() - && page_locations[page_offset].first_row_index <= row_offset as i64 - { - if !selector.skip { - let range_start = page_locations[page_offset].offset; - let range_end = range_start - + page_locations[page_offset].compressed_page_size as i64; - fetch_ranges.push(range_start as usize..range_end as usize); - fetch_pages.push(page_offset); - } - page_offset += 1; - } - } - - let chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - - for (idx, chunk) in fetch_pages.into_iter().zip(chunk_data) { - self.pages[idx] = Some(chunk); - } - - Ok(()) - } -} - -struct SparseInMemoryColumnChunkReader { - page_set: SparseInMemoryColumnChunk, - decompressor: Option>, - offset: usize, - seen_num_values: i64, -} - -impl SparseInMemoryColumnChunkReader { - fn new(page_set: SparseInMemoryColumnChunk) -> Result { - let decompressor = create_codec(page_set.compression)?; - let result = Self { - page_set, - decompressor, - offset: 0, - seen_num_values: 0, - }; - Ok(result) - } -} - -impl Iterator for SparseInMemoryColumnChunkReader { - type Item = Result; - - fn next(&mut self) -> Option { - self.get_next_page().transpose() - } -} - -impl PageReader for SparseInMemoryColumnChunkReader { - fn get_next_page(&mut self) -> Result> { - self.page_set - .pages - .get(self.offset) - .map(|data| match data { - Some(data) => { - let mut cursor = Cursor::new(data); - - let page_header = read_page_header(&mut cursor)?; - self.offset += 1; - - let compressed_size = page_header.compressed_page_size as usize; - - let start_offset = cursor.position() as usize; - let end_offset = start_offset + compressed_size; - - let buffer = cursor.into_inner().slice(start_offset..end_offset); - - let result = match page_header.type_ { - PageType::DataPage | PageType::DataPageV2 => { - let decoded = decode_page( - page_header, - buffer.into(), - self.page_set.physical_type, - self.decompressor.as_mut(), - )?; - self.seen_num_values += decoded.num_values() as i64; - decoded - } - _ => { - return Err(ParquetError::ArrowError(format!( - "Error reading next page, page at index {} not a data page", - self.offset - ))) - } - }; - - Ok(result) - } - None => Err(ParquetError::ArrowError(format!( - "Error reading next page, page at index {} not fetched", - self.offset - ))), - }) - .transpose() - } - - fn peek_next_page(&mut self) -> Result> { - if self.offset < self.page_set.page_locations.len() - 2 { - Ok(Some(PageMetadata { - num_rows: self.page_set.page_locations[self.offset + 1].first_row_index - as usize - - self.page_set.page_locations[self.offset].first_row_index as usize, - is_dict: false, - })) - } else if self.offset == self.page_set.page_locations.len() - 1 { - Ok(Some(PageMetadata { - num_rows: self.page_set.num_values as usize - - self.page_set.page_locations[self.offset].first_row_index as usize, - is_dict: false, - })) - } else { - Ok(None) - } - } - - fn skip_next_page(&mut self) -> Result<()> { - self.offset += 1; - Ok(()) - } +#[derive(Clone)] +enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + data: Vec, + index: Vec, + mask: Vec, + }, + /// Full column chunk + Dense(Bytes), } /// Data for a single column chunk @@ -714,33 +617,62 @@ struct InMemoryColumnChunk { num_values: i64, compression: Compression, physical_type: crate::basic::Type, - data: Bytes, + data: ColumnChunkData, } impl InMemoryColumnChunk { fn pages(&self) -> Result> { - let page_reader = InMemoryColumnChunkReader::new(self.clone())?; - Ok(Box::new(page_reader)) + match &self.data { + ColumnChunkData::Dense(bytes) => { + let page_reader = DenseColumnChunkReader::new( + bytes.clone(), + self.compression, + self.num_values, + self.physical_type, + )?; + Ok(Box::new(page_reader)) + } + ColumnChunkData::Sparse { data, index, mask } => { + let page_reader = SparseColumnChunkReader::new( + data.clone(), + index.clone(), + mask.clone(), + self.compression, + self.num_values, + self.physical_type, + )?; + Ok(Box::new(page_reader)) + } + } } } // A serialized implementation for Parquet [`PageReader`]. -struct InMemoryColumnChunkReader { - chunk: InMemoryColumnChunk, +struct DenseColumnChunkReader { + chunk: Bytes, decompressor: Option>, + num_values: i64, + physical_type: crate::basic::Type, offset: usize, seen_num_values: i64, // If the next page header has already been "peeked", we will cache it here next_page_header: Option, } -impl InMemoryColumnChunkReader { +impl DenseColumnChunkReader { /// Creates a new serialized page reader from file source. - fn new(chunk: InMemoryColumnChunk) -> Result { - let decompressor = create_codec(chunk.compression)?; + fn new( + chunk: Bytes, + compression: Compression, + num_values: i64, + physical_type: crate::basic::Type, + ) -> Result { + let decompressor = create_codec(compression)?; let result = Self { chunk, decompressor, + num_values, + physical_type, offset: 0, seen_num_values: 0, next_page_header: None, @@ -749,7 +681,7 @@ impl InMemoryColumnChunkReader { } } -impl Iterator for InMemoryColumnChunkReader { +impl Iterator for DenseColumnChunkReader { type Item = Result; fn next(&mut self) -> Option { @@ -757,10 +689,10 @@ impl Iterator for InMemoryColumnChunkReader { } } -impl PageReader for InMemoryColumnChunkReader { +impl PageReader for DenseColumnChunkReader { fn get_next_page(&mut self) -> Result> { - while self.seen_num_values < self.chunk.num_values { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); + while self.seen_num_values < self.num_values { + let mut cursor = Cursor::new(&self.chunk.as_ref()[self.offset..]); let page_header = if let Some(page_header) = self.next_page_header.take() { // The next page header has already been peeked, so use the cached value page_header @@ -776,14 +708,14 @@ impl PageReader for InMemoryColumnChunkReader { let end_offset = self.offset + compressed_size; self.offset = end_offset; - let buffer = self.chunk.data.slice(start_offset..end_offset); + let buffer = self.chunk.slice(start_offset..end_offset); let result = match page_header.type_ { PageType::DataPage | PageType::DataPageV2 => { let decoded = decode_page( page_header, buffer.into(), - self.chunk.physical_type, + self.physical_type, self.decompressor.as_mut(), )?; self.seen_num_values += decoded.num_values() as i64; @@ -792,7 +724,7 @@ impl PageReader for InMemoryColumnChunkReader { PageType::DictionaryPage => decode_page( page_header, buffer.into(), - self.chunk.physical_type, + self.physical_type, self.decompressor.as_mut(), )?, _ => { @@ -809,7 +741,7 @@ impl PageReader for InMemoryColumnChunkReader { } fn peek_next_page(&mut self) -> Result> { - while self.seen_num_values < self.chunk.num_values { + while self.seen_num_values < self.num_values { return if let Some(buffered_header) = self.next_page_header.as_ref() { if let Ok(page_metadata) = buffered_header.try_into() { Ok(Some(page_metadata)) @@ -819,7 +751,7 @@ impl PageReader for InMemoryColumnChunkReader { continue; } } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); + let mut cursor = Cursor::new(&self.chunk.as_ref()[self.offset..]); let page_header = read_page_header(&mut cursor)?; self.offset += cursor.position() as usize; @@ -843,7 +775,7 @@ impl PageReader for InMemoryColumnChunkReader { // The next page header has already been peeked, so just advance the offset self.offset += buffered_header.compressed_page_size as usize; } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); + let mut cursor = Cursor::new(&self.chunk.as_ref()[self.offset..]); let page_header = read_page_header(&mut cursor)?; self.offset += cursor.position() as usize; self.offset += page_header.compressed_page_size as usize; @@ -853,6 +785,126 @@ impl PageReader for InMemoryColumnChunkReader { } } +struct SparseColumnChunkReader { + data: Vec, + index: Vec, + mask: Vec, + decompressor: Option>, + num_values: i64, + physical_type: crate::basic::Type, + offset: usize, + seen_num_pages: usize, +} + +impl SparseColumnChunkReader { + fn new( + data: Vec, + index: Vec, + mask: Vec, + compression: Compression, + num_values: i64, + physical_type: crate::basic::Type, + ) -> Result { + let decompressor = create_codec(compression)?; + let result = Self { + data, + index, + mask, + decompressor, + num_values, + physical_type, + offset: 0, + seen_num_pages: 0, + }; + Ok(result) + } +} + +impl Iterator for SparseColumnChunkReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +impl PageReader for SparseColumnChunkReader { + fn get_next_page(&mut self) -> Result> { + if self.offset == self.index.len() - 1 { + return Ok(None); + } + + if self.mask[self.offset] { + let data = &self.data[self.seen_num_pages + 1]; + let mut cursor = Cursor::new(data); + + let page_header = read_page_header(&mut cursor)?; + self.offset += 1; + + let compressed_size = page_header.compressed_page_size as usize; + + let start_offset = cursor.position() as usize; + let end_offset = start_offset + compressed_size; + + let buffer = cursor.into_inner().slice(start_offset..end_offset); + + let result = match page_header.type_ { + PageType::DataPage | PageType::DataPageV2 => { + let decoded = decode_page( + page_header, + buffer.into(), + self.physical_type, + self.decompressor.as_mut(), + )?; + decoded + } + _ => { + return Err(ParquetError::ArrowError(format!( + "Error reading next page, page at index {} not a data page", + self.offset + ))) + } + }; + + self.offset += 1; + self.seen_num_pages += 1; + + Ok(Some(result)) + } else { + Err(ParquetError::ArrowError(format!( + "Error reading next page, page at index {} not fetched", + self.offset + ))) + } + } + + fn peek_next_page(&mut self) -> Result> { + if self.offset < self.index.len() - 2 { + Ok(Some(PageMetadata { + num_rows: self.index[self.offset + 1].first_row_index as usize + - self.index[self.offset].first_row_index as usize, + is_dict: false, + })) + } else if self.offset == self.index.len() - 1 { + Ok(Some(PageMetadata { + num_rows: self.num_values as usize + - self.index[self.offset].first_row_index as usize, + is_dict: false, + })) + } else { + Ok(None) + } + } + + fn skip_next_page(&mut self) -> Result<()> { + if self.mask[self.offset] { + self.seen_num_pages += 1; + } + self.offset += 1; + Ok(()) + } +} + /// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] struct ColumnChunkIterator { schema: SchemaDescPtr, @@ -975,12 +1027,12 @@ mod tests { let column_data = data.slice(start as usize..(start + length) as usize); - let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk { - num_values: column_metadata.num_values(), - compression: column_metadata.compression(), - physical_type: column_metadata.column_type(), - data: column_data, - }) + let mut reader = DenseColumnChunkReader::new( + column_data, + column_metadata.compression(), + column_metadata.num_values(), + column_metadata.column_type(), + ) .expect("building reader"); let first_page = reader @@ -1041,12 +1093,12 @@ mod tests { let column_data = data.slice(start as usize..(start + length) as usize); - let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk { - num_values: column_metadata.num_values(), - compression: column_metadata.compression(), - physical_type: column_metadata.column_type(), - data: column_data, - }) + let mut reader = DenseColumnChunkReader::new( + column_data, + column_metadata.compression(), + column_metadata.num_values(), + column_metadata.column_type(), + ) .expect("building reader"); reader.skip_next_page().expect("skipping first page"); @@ -1128,4 +1180,77 @@ mod tests { // Should only have made 3 requests assert_eq!(requests.lock().unwrap().len(), 3); } + + #[tokio::test] + async fn test_row_filter_prune_io() { + let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); + let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]); + let c = Int32Array::from_iter(0..6); + let data = RecordBatch::try_from_iter([ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ("c", Arc::new(c) as ArrayRef), + ]) + .unwrap(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap(); + writer.write(&data).unwrap(); + writer.close().unwrap(); + + let data: Bytes = buf.into(); + + + + let mut metadata = parse_metadata(&data).unwrap(); + + let parquet_schema = metadata.file_metadata().schema_descr_ptr(); + + let test = TestReader { + data, + metadata: Arc::new(metadata), + requests: Default::default(), + }; + let requests = test.requests.clone(); + + let a_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![0]), + |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "b"), + ); + + let b_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![1]), + |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "4"), + ); + + let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); + + let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); + let stream = ParquetRecordBatchStreamBuilder::new(test) + .await + .unwrap() + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_row_filter(filter) + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 2); + + let col = batch.column(0); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, "b"); + + let col = batch.column(1); + let val = col.as_any().downcast_ref::().unwrap().value(0); + assert_eq!(val, 3); + + // Should only have made 3 requests + assert_eq!(requests.lock().unwrap().len(), 3); + } } From c4848b0cac2e9aa402ff1f5bb07e454f0b28c24d Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Tue, 16 Aug 2022 17:49:39 -0400 Subject: [PATCH 3/8] Remove InMemoryColumnChumk and prune IO for row selection --- parquet/src/arrow/arrow_reader/selection.rs | 117 +--- parquet/src/arrow/async_reader.rs | 661 +++++--------------- parquet/src/file/page_index/index_reader.rs | 2 +- 3 files changed, 165 insertions(+), 615 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 10ae119f28b..e90181e82bd 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -118,54 +118,6 @@ impl RowSelection { Self { selectors } } - /// Given an offset index, return the ranges for data pages selected by `self` - pub fn scan_ranges(&self, page_locations: &[PageLocation]) -> Vec> { - let mut ranges = vec![]; - let mut row_offset = 0; - - let mut pages = page_locations.iter().peekable(); - let mut selectors = self.selectors.iter().cloned(); - let mut current_selector = selectors.next(); - let mut current_page = pages.next(); - - let mut current_page_included = false; - - while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { - if !selector.skip && !current_page_included { - let start = page.offset as usize; - let end = start + page.compressed_page_size as usize; - ranges.push(start..end); - current_page_included = true; - } - - if let Some(next_page) = pages.peek() { - if row_offset + selector.row_count > next_page.first_row_index as usize { - let remaining_in_page = - next_page.first_row_index as usize - row_offset; - selector.row_count -= remaining_in_page; - row_offset += remaining_in_page; - current_page_included = false; - current_page = pages.next(); - - continue; - } else { - if row_offset + selector.row_count - == next_page.first_row_index as usize - { - current_page_included = false; - current_page = pages.next(); - } - row_offset += selector.row_count; - current_selector = selectors.next(); - } - } else { - break; - } - } - - ranges - } - /// Given an offset index, return a mask indicating which pages are selected along with their locations by `self` pub fn page_mask( &self, @@ -185,13 +137,11 @@ impl RowSelection { while let Some((selector, (mut page_idx, page))) = current_selector.as_mut().zip(current_page) { - if !selector.skip && !current_page_included { - if !mask[page_idx] { - mask[page_idx] = true; - let start = page.offset as usize; - let end = start + page.compressed_page_size as usize; - ranges.push(start..end); - } + if !selector.skip && !current_page_included && !mask[page_idx] { + mask[page_idx] = true; + let start = page.offset as usize; + let end = start + page.compressed_page_size as usize; + ranges.push(start..end); } if let Some((_, next_page)) = pages.peek() { @@ -532,63 +482,6 @@ mod tests { } } - #[test] - fn test_scan_ranges() { - let selection = RowSelection::from(vec![ - RowSelector::skip(10), - RowSelector::select(3), - RowSelector::skip(3), - RowSelector::select(4), - RowSelector::skip(5), - RowSelector::select(5), - RowSelector::skip(12), - RowSelector::select(12), - RowSelector::skip(12), - ]); - - let index = vec![ - PageLocation { - offset: 0, - compressed_page_size: 10, - first_row_index: 0, - }, - PageLocation { - offset: 10, - compressed_page_size: 10, - first_row_index: 10, - }, - PageLocation { - offset: 20, - compressed_page_size: 10, - first_row_index: 20, - }, - PageLocation { - offset: 30, - compressed_page_size: 10, - first_row_index: 30, - }, - PageLocation { - offset: 40, - compressed_page_size: 10, - first_row_index: 40, - }, - PageLocation { - offset: 50, - compressed_page_size: 10, - first_row_index: 50, - }, - PageLocation { - offset: 60, - compressed_page_size: 10, - first_row_index: 60, - }, - ]; - - let ranges = selection.scan_ranges(&index); - - assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]); - } - #[test] fn test_page_mask() { let selection = RowSelection::from(vec![ diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 9cbb61adfa3..3418f6e3ef0 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -78,7 +78,7 @@ use std::collections::VecDeque; use std::fmt::Formatter; -use std::io::{Cursor, Read, SeekFrom}; +use std::io::SeekFrom; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -88,7 +88,7 @@ use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use futures::stream::Stream; -use parquet_format::{PageHeader, PageLocation, PageType}; + use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow::datatypes::SchemaRef; @@ -100,14 +100,16 @@ use crate::arrow::arrow_reader::{ RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; -use crate::basic::Compression; -use crate::column::page::{Page, PageIterator, PageMetadata, PageReader}; -use crate::compression::{create_codec, Codec}; + +use crate::column::page::{PageIterator, PageReader}; + use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; -use crate::file::serialized_reader::{decode_page, read_page_header}; +use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; + use crate::file::FOOTER_SIZE; + use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files @@ -286,7 +288,8 @@ where let meta = self.metadata.row_group(row_group_idx); let mut row_group = InMemoryRowGroup { - schema: meta.schema_descr_ptr(), + metadata: meta.clone(), + // schema: meta.schema_descr_ptr(), row_count: meta.num_rows() as usize, column_chunks: vec![None; meta.columns().len()], }; @@ -299,12 +302,7 @@ where let predicate_projection = predicate.projection().clone(); row_group - .fetch( - &mut self.input, - meta, - &predicate_projection, - selection.as_ref(), - ) + .fetch(&mut self.input, &predicate_projection, selection.as_ref()) .await?; let array_reader = build_array_reader( @@ -327,7 +325,7 @@ where } row_group - .fetch(&mut self.input, meta, &projection, selection.as_ref()) + .fetch(&mut self.input, &projection, selection.as_ref()) .await?; let reader = ParquetRecordBatchReader::new( @@ -472,8 +470,8 @@ where /// An in-memory collection of column chunks struct InMemoryRowGroup { - schema: SchemaDescPtr, - column_chunks: Vec>, + metadata: RowGroupMetaData, + column_chunks: Vec>, row_count: usize, } @@ -482,62 +480,49 @@ impl InMemoryRowGroup { async fn fetch( &mut self, input: &mut T, - metadata: &RowGroupMetaData, projection: &ProjectionMask, selection: Option<&RowSelection>, ) -> Result<()> { if let Some((selection, page_locations)) = - selection.zip(metadata.page_offset_index().as_ref()) + selection.zip(self.metadata.page_offset_index().as_ref()) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` - let mut fetch_ranges = vec![]; - let mut page_count = vec![]; + let mut offsets: Vec> = vec![]; - let page_masks: Vec> = self + let fetch_ranges = self .column_chunks .iter() .enumerate() .into_iter() .filter_map(|(idx, chunk)| { (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let (mask, ranges) = selection.page_mask(&page_locations[idx]); - page_count.push(ranges.len()); - fetch_ranges.extend(ranges); - mask + let (_mask, ranges) = selection.page_mask(&page_locations[idx]); + offsets.push(ranges.iter().map(|range| range.start).collect()); + ranges }) }) + .flatten() .collect(); let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - let mut page_count = page_count.into_iter(); - let mut page_masks = page_masks.into_iter(); + let mut offsets = offsets.into_iter(); for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { if chunk.is_some() || !projection.leaf_included(idx) { continue; } - let column = metadata.column(idx); - - if let Some(mask) = page_masks.next() { - let page_count = page_count.next().unwrap(); - - let mut data = Vec::with_capacity(page_count); - for _ in 0..page_count { - data.push(chunk_data.next().unwrap()); + if let Some(page_offsets) = offsets.next() { + let mut chunks = Vec::with_capacity(page_offsets.len()); + for _ in 0..page_offsets.len() { + chunks.push(chunk_data.next().unwrap()); } - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data: ColumnChunkData::Sparse { - data, - index: page_locations[idx].clone(), - mask, - }, - }); + *chunk = Some(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: page_offsets.into_iter().zip(chunks.into_iter()).collect(), + }) } } } else { @@ -548,7 +533,7 @@ impl InMemoryRowGroup { .into_iter() .filter_map(|(idx, chunk)| { (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); + let column = self.metadata.column(idx); let (start, length) = column.byte_range(); start as usize..(start + length) as usize }) @@ -562,14 +547,10 @@ impl InMemoryRowGroup { continue; } - let column = metadata.column(idx); - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data: ColumnChunkData::Dense(data), + *chunk = Some(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data, }); } } @@ -581,7 +562,7 @@ impl InMemoryRowGroup { impl RowGroupCollection for InMemoryRowGroup { fn schema(&self) -> SchemaDescPtr { - self.schema.clone() + self.metadata.schema_descr_ptr() } fn num_rows(&self) -> usize { @@ -589,319 +570,80 @@ impl RowGroupCollection for InMemoryRowGroup { } fn column_chunks(&self, i: usize) -> Result> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); - - Ok(Box::new(ColumnChunkIterator { - schema: self.schema.clone(), - column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), - })) + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {}, column was not fetched", + i + ))), + Some(data) => { + let page_locations = self + .metadata + .page_offset_index() + .as_ref() + .map(|index| index[i].clone()); + let page_reader: Box = + Box::new(SerializedPageReader::new( + Arc::new(data.clone()), + self.metadata.column(i), + self.row_count, + page_locations, + )?); + + Ok(Box::new(ColumnChunkIterator { + schema: self.metadata.schema_descr_ptr(), + column_schema: self.metadata.schema_descr_ptr().columns()[i].clone(), + reader: Some(Ok(page_reader)), + })) + } + } } } +/// An in-memory column chunk #[derive(Clone)] enum ColumnChunkData { /// Column chunk data representing only a subset of data pages Sparse { - data: Vec, - index: Vec, - mask: Vec, + /// Length of the full column chunk + length: usize, + data: Vec<(usize, Bytes)>, }, /// Full column chunk - Dense(Bytes), + Dense { offset: usize, data: Bytes }, } -/// Data for a single column chunk -#[derive(Clone)] -struct InMemoryColumnChunk { - num_values: i64, - compression: Compression, - physical_type: crate::basic::Type, - data: ColumnChunkData, -} - -impl InMemoryColumnChunk { - fn pages(&self) -> Result> { - match &self.data { - ColumnChunkData::Dense(bytes) => { - let page_reader = DenseColumnChunkReader::new( - bytes.clone(), - self.compression, - self.num_values, - self.physical_type, - )?; - Ok(Box::new(page_reader)) - } - ColumnChunkData::Sparse { data, index, mask } => { - let page_reader = SparseColumnChunkReader::new( - data.clone(), - index.clone(), - mask.clone(), - self.compression, - self.num_values, - self.physical_type, - )?; - Ok(Box::new(page_reader)) - } +impl Length for ColumnChunkData { + fn len(&self) -> u64 { + match &self { + ColumnChunkData::Sparse { length, .. } => *length as u64, + ColumnChunkData::Dense { data, .. } => data.len() as u64, } } } -// A serialized implementation for Parquet [`PageReader`]. -struct DenseColumnChunkReader { - chunk: Bytes, - decompressor: Option>, - num_values: i64, - physical_type: crate::basic::Type, - offset: usize, - seen_num_values: i64, - // If the next page header has already been "peeked", we will cache it here - next_page_header: Option, -} - -impl DenseColumnChunkReader { - /// Creates a new serialized page reader from file source. - fn new( - chunk: Bytes, - compression: Compression, - num_values: i64, - physical_type: crate::basic::Type, - ) -> Result { - let decompressor = create_codec(compression)?; - let result = Self { - chunk, - decompressor, - num_values, - physical_type, - offset: 0, - seen_num_values: 0, - next_page_header: None, - }; - Ok(result) - } -} - -impl Iterator for DenseColumnChunkReader { - type Item = Result; - - fn next(&mut self) -> Option { - self.get_next_page().transpose() - } -} - -impl PageReader for DenseColumnChunkReader { - fn get_next_page(&mut self) -> Result> { - while self.seen_num_values < self.num_values { - let mut cursor = Cursor::new(&self.chunk.as_ref()[self.offset..]); - let page_header = if let Some(page_header) = self.next_page_header.take() { - // The next page header has already been peeked, so use the cached value - page_header - } else { - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - page_header - }; - - let compressed_size = page_header.compressed_page_size as usize; - - let start_offset = self.offset; - let end_offset = self.offset + compressed_size; - self.offset = end_offset; - - let buffer = self.chunk.slice(start_offset..end_offset); - - let result = match page_header.type_ { - PageType::DataPage | PageType::DataPageV2 => { - let decoded = decode_page( - page_header, - buffer.into(), - self.physical_type, - self.decompressor.as_mut(), - )?; - self.seen_num_values += decoded.num_values() as i64; - decoded - } - PageType::DictionaryPage => decode_page( - page_header, - buffer.into(), - self.physical_type, - self.decompressor.as_mut(), - )?, - _ => { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - } - }; - - return Ok(Some(result)); - } - - // We are at the end of this column chunk and no more page left. Return None. - Ok(None) - } - - fn peek_next_page(&mut self) -> Result> { - while self.seen_num_values < self.num_values { - return if let Some(buffered_header) = self.next_page_header.as_ref() { - if let Ok(page_metadata) = buffered_header.try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - self.next_page_header = None; - continue; - } - } else { - let mut cursor = Cursor::new(&self.chunk.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - - let page_metadata = if let Ok(page_metadata) = (&page_header).try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - }; +impl ChunkReader for ColumnChunkData { + type T = bytes::buf::Reader; - self.next_page_header = Some(page_header); - page_metadata - }; - } - - Ok(None) - } - - fn skip_next_page(&mut self) -> Result<()> { - if let Some(buffered_header) = self.next_page_header.take() { - // The next page header has already been peeked, so just advance the offset - self.offset += buffered_header.compressed_page_size as usize; - } else { - let mut cursor = Cursor::new(&self.chunk.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - self.offset += page_header.compressed_page_size as usize; - } - - Ok(()) - } -} - -struct SparseColumnChunkReader { - data: Vec, - index: Vec, - mask: Vec, - decompressor: Option>, - num_values: i64, - physical_type: crate::basic::Type, - offset: usize, - seen_num_pages: usize, -} - -impl SparseColumnChunkReader { - fn new( - data: Vec, - index: Vec, - mask: Vec, - compression: Compression, - num_values: i64, - physical_type: crate::basic::Type, - ) -> Result { - let decompressor = create_codec(compression)?; - let result = Self { - data, - index, - mask, - decompressor, - num_values, - physical_type, - offset: 0, - seen_num_pages: 0, - }; - Ok(result) - } -} - -impl Iterator for SparseColumnChunkReader { - type Item = Result; - - fn next(&mut self) -> Option { - self.get_next_page().transpose() - } -} - -impl PageReader for SparseColumnChunkReader { - fn get_next_page(&mut self) -> Result> { - if self.offset == self.index.len() - 1 { - return Ok(None); - } - - if self.mask[self.offset] { - let data = &self.data[self.seen_num_pages + 1]; - let mut cursor = Cursor::new(data); - - let page_header = read_page_header(&mut cursor)?; - self.offset += 1; - - let compressed_size = page_header.compressed_page_size as usize; - - let start_offset = cursor.position() as usize; - let end_offset = start_offset + compressed_size; - - let buffer = cursor.into_inner().slice(start_offset..end_offset); - - let result = match page_header.type_ { - PageType::DataPage | PageType::DataPageV2 => { - let decoded = decode_page( - page_header, - buffer.into(), - self.physical_type, - self.decompressor.as_mut(), - )?; - decoded - } - _ => { - return Err(ParquetError::ArrowError(format!( - "Error reading next page, page at index {} not a data page", - self.offset - ))) - } - }; - - self.offset += 1; - self.seen_num_pages += 1; - - Ok(Some(result)) - } else { - Err(ParquetError::ArrowError(format!( - "Error reading next page, page at index {} not fetched", - self.offset - ))) - } - } - - fn peek_next_page(&mut self) -> Result> { - if self.offset < self.index.len() - 2 { - Ok(Some(PageMetadata { - num_rows: self.index[self.offset + 1].first_row_index as usize - - self.index[self.offset].first_row_index as usize, - is_dict: false, - })) - } else if self.offset == self.index.len() - 1 { - Ok(Some(PageMetadata { - num_rows: self.num_values as usize - - self.index[self.offset].first_row_index as usize, - is_dict: false, - })) - } else { - Ok(None) - } - } - - fn skip_next_page(&mut self) -> Result<()> { - if self.mask[self.offset] { - self.seen_num_pages += 1; + fn get_read(&self, start: u64, length: usize) -> Result { + match &self { + ColumnChunkData::Sparse { data, .. } => data + .iter() + .find(|(offset, bytes)| { + *offset <= start as usize && (start as usize - *offset) < bytes.len() + }) + .map(|(_, bytes)| bytes.slice(0..length).reader()) + .ok_or_else(|| { + ParquetError::General(format!( + "Invalid offset in sparse column chunk data: {}", + start + )) + }), + ColumnChunkData::Dense { offset, data } => { + let start = start as usize - *offset; + let end = start + length; + Ok(data.slice(start..end).reader()) + } } - self.offset += 1; - Ok(()) } } @@ -933,11 +675,15 @@ impl PageIterator for ColumnChunkIterator { #[cfg(test)] mod tests { use super::*; - use crate::arrow::arrow_reader::{ArrowPredicateFn, ParquetRecordBatchReaderBuilder}; - use crate::arrow::ArrowWriter; + use crate::arrow::arrow_reader::{ + ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector, + }; + use crate::arrow::{parquet_to_arrow_schema, ArrowWriter}; use crate::file::footer::parse_metadata; + use crate::file::page_index::index_reader; use arrow::array::{Array, ArrayRef, Int32Array, StringArray}; use arrow::error::Result as ArrowResult; + use futures::TryStreamExt; use std::sync::Mutex; @@ -1013,105 +759,6 @@ mod tests { ); } - #[tokio::test] - async fn test_in_memory_column_chunk_reader() { - let testdata = arrow::util::test_util::parquet_test_data(); - let path = format!("{}/alltypes_plain.parquet", testdata); - let data = Bytes::from(std::fs::read(path).unwrap()); - - let metadata = crate::file::footer::parse_metadata(&data).unwrap(); - - let column_metadata = metadata.row_group(0).column(0); - - let (start, length) = column_metadata.byte_range(); - - let column_data = data.slice(start as usize..(start + length) as usize); - - let mut reader = DenseColumnChunkReader::new( - column_data, - column_metadata.compression(), - column_metadata.num_values(), - column_metadata.column_type(), - ) - .expect("building reader"); - - let first_page = reader - .peek_next_page() - .expect("peeking first page") - .expect("first page is empty"); - - assert!(first_page.is_dict); - assert_eq!(first_page.num_rows, 0); - - let first_page = reader - .get_next_page() - .expect("getting first page") - .expect("first page is empty"); - - assert_eq!( - first_page.page_type(), - crate::basic::PageType::DICTIONARY_PAGE - ); - assert_eq!(first_page.num_values(), 8); - - let second_page = reader - .peek_next_page() - .expect("peeking second page") - .expect("second page is empty"); - - assert!(!second_page.is_dict); - assert_eq!(second_page.num_rows, 8); - - let second_page = reader - .get_next_page() - .expect("getting second page") - .expect("second page is empty"); - - assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE); - assert_eq!(second_page.num_values(), 8); - - let third_page = reader.peek_next_page().expect("getting third page"); - - assert!(third_page.is_none()); - - let third_page = reader.get_next_page().expect("getting third page"); - - assert!(third_page.is_none()); - } - - #[tokio::test] - async fn test_in_memory_column_chunk_reader_skip_page() { - let testdata = arrow::util::test_util::parquet_test_data(); - let path = format!("{}/alltypes_plain.parquet", testdata); - let data = Bytes::from(std::fs::read(path).unwrap()); - - let metadata = crate::file::footer::parse_metadata(&data).unwrap(); - - let column_metadata = metadata.row_group(0).column(0); - - let (start, length) = column_metadata.byte_range(); - - let column_data = data.slice(start as usize..(start + length) as usize); - - let mut reader = DenseColumnChunkReader::new( - column_data, - column_metadata.compression(), - column_metadata.num_values(), - column_metadata.column_type(), - ) - .expect("building reader"); - - reader.skip_next_page().expect("skipping first page"); - - let second_page = reader - .get_next_page() - .expect("getting second page") - .expect("second page is empty"); - - assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE); - assert_eq!(second_page.num_values(), 8); - } - #[tokio::test] async fn test_row_filter() { let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); @@ -1182,75 +829,85 @@ mod tests { } #[tokio::test] - async fn test_row_filter_prune_io() { - let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); - let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]); - let c = Int32Array::from_iter(0..6); - let data = RecordBatch::try_from_iter([ - ("a", Arc::new(a) as ArrayRef), - ("b", Arc::new(b) as ArrayRef), - ("c", Arc::new(c) as ArrayRef), - ]) - .unwrap(); + async fn test_in_memory_row_group_sparse() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_tiny_pages.parquet", testdata); + let data = Bytes::from(std::fs::read(path).unwrap()); - let mut buf = Vec::with_capacity(1024); - let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap(); - writer.write(&data).unwrap(); - writer.close().unwrap(); + let metadata = parse_metadata(&data).unwrap(); - let data: Bytes = buf.into(); + let offset_index = + index_reader::read_pages_locations(&data, metadata.row_group(0).columns()) + .expect("reading offset index"); + let mut row_group_meta = metadata.row_group(0).clone(); + row_group_meta.set_page_offset(offset_index.clone()); + let metadata = + ParquetMetaData::new(metadata.file_metadata().clone(), vec![row_group_meta]); + let metadata = Arc::new(metadata); - let mut metadata = parse_metadata(&data).unwrap(); + let num_rows = metadata.row_group(0).num_rows(); - let parquet_schema = metadata.file_metadata().schema_descr_ptr(); + assert_eq!(metadata.num_row_groups(), 1); - let test = TestReader { - data, - metadata: Arc::new(metadata), + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), requests: Default::default(), }; - let requests = test.requests.clone(); - let a_filter = ArrowPredicateFn::new( - ProjectionMask::leaves(&parquet_schema, vec![0]), - |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "b"), + let requests = async_reader.requests.clone(); + let schema = Arc::new( + parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) + .expect("building arrow schema"), ); - let b_filter = ArrowPredicateFn::new( - ProjectionMask::leaves(&parquet_schema, vec![1]), - |batch| arrow::compute::eq_dyn_utf8_scalar(batch.column(0), "4"), - ); + let _schema_desc = metadata.file_metadata().schema_descr(); - let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); + let projection = + ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]); - let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); - let stream = ParquetRecordBatchStreamBuilder::new(test) - .await - .unwrap() - .with_projection(mask.clone()) - .with_batch_size(1024) - .with_row_filter(filter) - .build() - .unwrap(); + let reader_factory = ReaderFactory { + metadata, + schema, + input: async_reader, + filter: None, + }; - let batches: Vec<_> = stream.try_collect().await.unwrap(); - assert_eq!(batches.len(), 1); + let mut skip = true; + let mut pages = offset_index[0].iter().peekable(); - let batch = &batches[0]; - assert_eq!(batch.num_rows(), 1); - assert_eq!(batch.num_columns(), 2); + // Setup `RowSelection` so that we can skip every other page + let mut selectors = vec![]; + let mut expected_page_requests: Vec> = vec![]; + while let Some(page) = pages.next() { + let num_rows = if let Some(next_page) = pages.peek() { + next_page.first_row_index - page.first_row_index + } else { + num_rows - page.first_row_index + }; - let col = batch.column(0); - let val = col.as_any().downcast_ref::().unwrap().value(0); - assert_eq!(val, "b"); + if skip { + selectors.push(RowSelector::skip(num_rows as usize)); + } else { + selectors.push(RowSelector::select(num_rows as usize)); + let start = page.offset as usize; + let end = start + page.compressed_page_size as usize; + expected_page_requests.push(start..end); + } + skip = !skip; + } - let col = batch.column(1); - let val = col.as_any().downcast_ref::().unwrap().value(0); - assert_eq!(val, 3); + let selection = RowSelection::from(selectors); - // Should only have made 3 requests - assert_eq!(requests.lock().unwrap().len(), 3); + let (_factory, _reader) = reader_factory + .read_row_group(0, Some(selection), projection, 48) + .await + .expect("reading row group"); + + let requests = requests.lock().unwrap(); + + assert_eq!(&requests[..], &expected_page_requests) } } diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index 33499e7426a..e3f37fbc661 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -65,7 +65,7 @@ pub fn read_pages_locations( let (offset, total_length) = get_location_offset_and_total_length(chunks)?; //read all need data into buffer - let mut reader = reader.get_read(offset, reader.len() as usize)?; + let mut reader = reader.get_read(offset, total_length)?; let mut data = vec![0; total_length]; reader.read_exact(&mut data)?; From 403f6d475575a17833aa46b6c3c4c8ced3fa980e Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 17 Aug 2022 07:12:17 -0400 Subject: [PATCH 4/8] Review comments --- parquet/src/arrow/arrow_reader/selection.rs | 79 ++++++++++++++------- parquet/src/arrow/async_reader.rs | 30 ++++---- 2 files changed, 68 insertions(+), 41 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index e90181e82bd..165db331daf 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -118,12 +118,11 @@ impl RowSelection { Self { selectors } } - /// Given an offset index, return a mask indicating which pages are selected along with their locations by `self` - pub fn page_mask( + /// Given an offset index, return the offset ranges for all data pages selected by `self` + pub(crate) fn scan_ranges( &self, page_locations: &[PageLocation], - ) -> (Vec, Vec>) { - let mut mask = vec![false; page_locations.len()]; + ) -> Vec> { let mut ranges = vec![]; let mut row_offset = 0; @@ -137,11 +136,11 @@ impl RowSelection { while let Some((selector, (mut page_idx, page))) = current_selector.as_mut().zip(current_page) { - if !selector.skip && !current_page_included && !mask[page_idx] { - mask[page_idx] = true; + if !(selector.skip || current_page_included) { let start = page.offset as usize; let end = start + page.compressed_page_size as usize; ranges.push(start..end); + current_page_included = true; } if let Some((_, next_page)) = pages.peek() { @@ -151,6 +150,7 @@ impl RowSelection { selector.row_count -= remaining_in_page; row_offset += remaining_in_page; current_page = pages.next(); + current_page_included = false; continue; } else { @@ -158,6 +158,7 @@ impl RowSelection { == next_page.first_row_index as usize { current_page = pages.next(); + current_page_included = false; } row_offset += selector.row_count; current_selector = selectors.next(); @@ -167,11 +168,7 @@ impl RowSelection { } } - (mask, ranges) - } - - pub fn selectors(&self) -> &[RowSelector] { - &self.selectors + ranges } /// Splits off the first `row_count` from this [`RowSelection`] @@ -483,19 +480,7 @@ mod tests { } #[test] - fn test_page_mask() { - let selection = RowSelection::from(vec![ - RowSelector::skip(10), - RowSelector::select(3), - RowSelector::skip(3), - RowSelector::select(4), - RowSelector::skip(5), - RowSelector::select(5), - RowSelector::skip(12), - RowSelector::select(12), - RowSelector::skip(12), - ]); - + fn test_scan_ranges() { let index = vec![ PageLocation { offset: 0, @@ -534,9 +519,51 @@ mod tests { }, ]; - let (mask, ranges) = selection.page_mask(&index); + let selection = RowSelection::from(vec![ + // Skip first page + RowSelector::skip(10), + // Multiple selects in same page + RowSelector::select(3), + RowSelector::skip(3), + RowSelector::select(4), + // Select to page boundary + RowSelector::skip(5), + RowSelector::select(5), + // Skip full page past page boundary + RowSelector::skip(12), + // Select across page boundaries + RowSelector::select(12), + // Skip final page + RowSelector::skip(12), + ]); + + let ranges = selection.scan_ranges(&index); - assert_eq!(mask, vec![false, true, true, false, true, true, false]); + // assert_eq!(mask, vec![false, true, true, false, true, true, false]); assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]); + + let selection = RowSelection::from(vec![ + // Skip first page + RowSelector::skip(10), + // Multiple selects in same page + RowSelector::select(3), + RowSelector::skip(3), + RowSelector::select(4), + // Select to page boundary + RowSelector::skip(5), + RowSelector::select(5), + // Skip full page past page boundary + RowSelector::skip(12), + // Select across page boundaries + RowSelector::select(12), + RowSelector::skip(1), + // Select across page boundaries including final page + RowSelector::select(8), + ]); + + let ranges = selection.scan_ranges(&index); + + // assert_eq!(mask, vec![false, true, true, false, true, true, true]); + assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]); } } diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 3418f6e3ef0..db7b0f2cfa7 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -488,7 +488,7 @@ impl InMemoryRowGroup { { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` - let mut offsets: Vec> = vec![]; + let mut page_start_offsets: Vec> = vec![]; let fetch_ranges = self .column_chunks @@ -497,8 +497,9 @@ impl InMemoryRowGroup { .into_iter() .filter_map(|(idx, chunk)| { (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let (_mask, ranges) = selection.page_mask(&page_locations[idx]); - offsets.push(ranges.iter().map(|range| range.start).collect()); + let ranges = selection.scan_ranges(&page_locations[idx]); + page_start_offsets + .push(ranges.iter().map(|range| range.start).collect()); ranges }) }) @@ -506,22 +507,22 @@ impl InMemoryRowGroup { .collect(); let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - let mut offsets = offsets.into_iter(); + let mut page_start_offsets = page_start_offsets.into_iter(); for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { if chunk.is_some() || !projection.leaf_included(idx) { continue; } - if let Some(page_offsets) = offsets.next() { - let mut chunks = Vec::with_capacity(page_offsets.len()); - for _ in 0..page_offsets.len() { + if let Some(offsets) = page_start_offsets.next() { + let mut chunks = Vec::with_capacity(offsets.len()); + for _ in 0..offsets.len() { chunks.push(chunk_data.next().unwrap()); } *chunk = Some(ColumnChunkData::Sparse { length: self.metadata.column(idx).byte_range().1 as usize, - data: page_offsets.into_iter().zip(chunks.into_iter()).collect(), + data: offsets.into_iter().zip(chunks.into_iter()).collect(), }) } } @@ -606,9 +607,11 @@ enum ColumnChunkData { Sparse { /// Length of the full column chunk length: usize, + /// Set of data pages included in this sparse chunk. Each element is a tuple + /// of (page offset, page data) data: Vec<(usize, Bytes)>, }, - /// Full column chunk + /// Full column chunk and its offset Dense { offset: usize, data: Bytes }, } @@ -627,12 +630,9 @@ impl ChunkReader for ColumnChunkData { fn get_read(&self, start: u64, length: usize) -> Result { match &self { ColumnChunkData::Sparse { data, .. } => data - .iter() - .find(|(offset, bytes)| { - *offset <= start as usize && (start as usize - *offset) < bytes.len() - }) - .map(|(_, bytes)| bytes.slice(0..length).reader()) - .ok_or_else(|| { + .binary_search_by_key(&start, |(offset, _)| *offset as u64) + .map(|idx| data[idx].1.slice(0..length).reader()) + .map_err(|_| { ParquetError::General(format!( "Invalid offset in sparse column chunk data: {}", start From b28ea0989f9e99843b4871271335bab2fd48bd66 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 17 Aug 2022 07:13:18 -0400 Subject: [PATCH 5/8] Unignore test --- object_store/src/local.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 0f1a8821b86..fd3c3592ab5 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -1068,7 +1068,6 @@ mod tests { integration.head(&path).await.unwrap(); } - #[ignore] #[tokio::test] async fn test_list_root() { let integration = LocalFileSystem::new(); From 6f1cc371727febf8ee2045254d4e41f75de4831f Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 17 Aug 2022 07:26:50 -0400 Subject: [PATCH 6/8] Avoid copies --- parquet/src/arrow/async_reader.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index db7b0f2cfa7..090b9514dc2 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -288,7 +288,7 @@ where let meta = self.metadata.row_group(row_group_idx); let mut row_group = InMemoryRowGroup { - metadata: meta.clone(), + metadata: meta, // schema: meta.schema_descr_ptr(), row_count: meta.num_rows() as usize, column_chunks: vec![None; meta.columns().len()], @@ -469,13 +469,13 @@ where } /// An in-memory collection of column chunks -struct InMemoryRowGroup { - metadata: RowGroupMetaData, - column_chunks: Vec>, +struct InMemoryRowGroup<'a> { + metadata: &'a RowGroupMetaData, + column_chunks: Vec>>, row_count: usize, } -impl InMemoryRowGroup { +impl<'a> InMemoryRowGroup<'a> { /// Fetches the necessary column data into memory async fn fetch( &mut self, @@ -520,10 +520,10 @@ impl InMemoryRowGroup { chunks.push(chunk_data.next().unwrap()); } - *chunk = Some(ColumnChunkData::Sparse { + *chunk = Some(Arc::new(ColumnChunkData::Sparse { length: self.metadata.column(idx).byte_range().1 as usize, data: offsets.into_iter().zip(chunks.into_iter()).collect(), - }) + })) } } } else { @@ -549,10 +549,10 @@ impl InMemoryRowGroup { } if let Some(data) = chunk_data.next() { - *chunk = Some(ColumnChunkData::Dense { + *chunk = Some(Arc::new(ColumnChunkData::Dense { offset: self.metadata.column(idx).byte_range().0 as usize, data, - }); + })); } } } @@ -561,7 +561,7 @@ impl InMemoryRowGroup { } } -impl RowGroupCollection for InMemoryRowGroup { +impl<'a> RowGroupCollection for InMemoryRowGroup<'a> { fn schema(&self) -> SchemaDescPtr { self.metadata.schema_descr_ptr() } @@ -584,7 +584,7 @@ impl RowGroupCollection for InMemoryRowGroup { .map(|index| index[i].clone()); let page_reader: Box = Box::new(SerializedPageReader::new( - Arc::new(data.clone()), + data.clone(), self.metadata.column(i), self.row_count, page_locations, From 45f85a82ea83dcb2d148c121c03571190ccecf7e Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 17 Aug 2022 08:41:23 -0400 Subject: [PATCH 7/8] Fix docs --- parquet/src/arrow/arrow_reader/selection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 165db331daf..c38276615f1 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -22,7 +22,7 @@ use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -/// [`RowSelection`] is a collection of [`RowSelect`] used to skip rows when +/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when /// scanning a parquet file #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct RowSelector { From 2f49b151100cdd34fb1b7fde79651c3f38911c42 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 17 Aug 2022 08:51:33 -0400 Subject: [PATCH 8/8] Linting --- parquet/src/arrow/arrow_reader/selection.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index c38276615f1..72867e8916d 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -126,16 +126,14 @@ impl RowSelection { let mut ranges = vec![]; let mut row_offset = 0; - let mut pages = page_locations.iter().enumerate().peekable(); + let mut pages = page_locations.iter().peekable(); let mut selectors = self.selectors.iter().cloned(); let mut current_selector = selectors.next(); let mut current_page = pages.next(); let mut current_page_included = false; - while let Some((selector, (mut page_idx, page))) = - current_selector.as_mut().zip(current_page) - { + while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { if !(selector.skip || current_page_included) { let start = page.offset as usize; let end = start + page.compressed_page_size as usize; @@ -143,7 +141,7 @@ impl RowSelection { current_page_included = true; } - if let Some((_, next_page)) = pages.peek() { + if let Some(next_page) = pages.peek() { if row_offset + selector.row_count > next_page.first_row_index as usize { let remaining_in_page = next_page.first_row_index as usize - row_offset;