Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 146 additions & 3 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

use arrow::array::{Array, BooleanArray};
use arrow::compute::SlicesIterator;
use parquet_format::PageLocation;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range;

/// [`RowSelector`] represents a range of rows to scan from a parquet file
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// [`RowSelection`] is a collection of [`RowSelect`] used to skip rows when
/// scanning a parquet file
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct RowSelector {
/// The number of rows
pub row_count: usize,
Expand Down Expand Up @@ -116,6 +118,59 @@ impl RowSelection {
Self { selectors }
}

/// Given an offset index, return the offset ranges for all data pages selected by `self`
pub(crate) fn scan_ranges(
&self,
page_locations: &[PageLocation],
) -> Vec<Range<usize>> {
let mut ranges = vec![];
let mut row_offset = 0;

let mut pages = page_locations.iter().enumerate().peekable();
let mut selectors = self.selectors.iter().cloned();
let mut current_selector = selectors.next();
let mut current_page = pages.next();

let mut current_page_included = false;

while let Some((selector, (mut page_idx, page))) =
current_selector.as_mut().zip(current_page)
{
if !(selector.skip || current_page_included) {
let start = page.offset as usize;
let end = start + page.compressed_page_size as usize;
ranges.push(start..end);
current_page_included = true;
}

if let Some((_, next_page)) = pages.peek() {
if row_offset + selector.row_count > next_page.first_row_index as usize {
let remaining_in_page =
next_page.first_row_index as usize - row_offset;
selector.row_count -= remaining_in_page;
row_offset += remaining_in_page;
current_page = pages.next();
current_page_included = false;

continue;
} else {
if row_offset + selector.row_count
== next_page.first_row_index as usize
{
current_page = pages.next();
current_page_included = false;
}
row_offset += selector.row_count;
current_selector = selectors.next();
}
} else {
break;
}
}

ranges
}

/// Splits off the first `row_count` from this [`RowSelection`]
pub fn split_off(&mut self, row_count: usize) -> Self {
let mut total_count = 0;
Expand Down Expand Up @@ -162,7 +217,7 @@ impl RowSelection {
/// self: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
/// other: YYYYYNNNNYYYYYYYYYYYYY YYNNN
///
/// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN
/// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN
///
///
pub fn and_then(&self, other: &Self) -> Self {
Expand Down Expand Up @@ -423,4 +478,92 @@ mod tests {
assert_eq!(a.and_then(&b), expected);
}
}

#[test]
fn test_scan_ranges() {
let index = vec![
PageLocation {
offset: 0,
compressed_page_size: 10,
first_row_index: 0,
},
PageLocation {
offset: 10,
compressed_page_size: 10,
first_row_index: 10,
},
PageLocation {
offset: 20,
compressed_page_size: 10,
first_row_index: 20,
},
PageLocation {
offset: 30,
compressed_page_size: 10,
first_row_index: 30,
},
PageLocation {
offset: 40,
compressed_page_size: 10,
first_row_index: 40,
},
PageLocation {
offset: 50,
compressed_page_size: 10,
first_row_index: 50,
},
PageLocation {
offset: 60,
compressed_page_size: 10,
first_row_index: 60,
},
];

let selection = RowSelection::from(vec![
// Skip first page
RowSelector::skip(10),
// Multiple selects in same page
RowSelector::select(3),
RowSelector::skip(3),
RowSelector::select(4),
// Select to page boundary
RowSelector::skip(5),
RowSelector::select(5),
// Skip full page past page boundary
RowSelector::skip(12),
// Select across page boundaries
RowSelector::select(12),
// Skip final page
RowSelector::skip(12),
]);

let ranges = selection.scan_ranges(&index);

// assert_eq!(mask, vec![false, true, true, false, true, true, false]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we get a test where the final PageLocation is selected?


let selection = RowSelection::from(vec![
// Skip first page
RowSelector::skip(10),
// Multiple selects in same page
RowSelector::select(3),
RowSelector::skip(3),
RowSelector::select(4),
// Select to page boundary
RowSelector::skip(5),
RowSelector::select(5),
// Skip full page past page boundary
RowSelector::skip(12),
// Select across page boundaries
RowSelector::select(12),
RowSelector::skip(1),
// Select across page boundaries including final page
RowSelector::select(8),
]);

let ranges = selection.scan_ranges(&index);

// assert_eq!(mask, vec![false, true, true, false, true, true, true]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
}
}
Loading