Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
741bbf6
bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
BugenZhao Jul 16, 2024
8f76248
Remove `impl<T: AsRef<[u8]>> From<T> for Buffer` that easily acciden…
XiangpengHao Jul 16, 2024
bb5f12b
Make display of interval types more pretty (#6006)
Rachelint Jul 16, 2024
756b1fb
Update snafu (#5930)
Jesse-Bakker Jul 16, 2024
fe04e09
Update Parquet thrift generated structures (#6045)
etseidl Jul 16, 2024
2e7f7ef
Revert "Revert "Write Bloom filters between row groups instead of the…
alamb Jul 16, 2024
effccc1
Revert "Update snafu (#5930)" (#6069)
alamb Jul 16, 2024
649d09d
Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)
crepererum Jul 17, 2024
05e681d
remove repeated codes to make the codes more concise. (#6080)
Rachelint Jul 18, 2024
e40b311
Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
etseidl Jul 19, 2024
81c34ac
Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)
dependabot[bot] Jul 23, 2024
3bc9987
Deprecate read_page_locations() and simplify offset index in `Parquet…
etseidl Jul 23, 2024
095130f
Merge remote-tracking branch 'apache/master' into 53.0.0-dev
alamb Jul 25, 2024
a6353d1
Update parquet/src/column/writer/mod.rs
alamb Jul 25, 2024
eeccaca
Upgrade protobuf definitions to flightsql 17.0 (#6133)
djanderson Jul 27, 2024
b07d057
Add `ParquetMetadataWriter` allow ad-hoc encoding of `ParquetMetadata`
adriangb Jul 24, 2024
e2be8d3
fix loading in test by etseidl
adriangb Jul 31, 2024
0175d53
add rough equivalence test
etseidl Jul 31, 2024
f188bf8
one more check
etseidl Jul 31, 2024
57b85d7
make clippy happy
etseidl Jul 31, 2024
1f3eb0b
Merge pull request #1 from etseidl/pr_6000_ets
adriangb Jul 31, 2024
4d1651c
separate tests that require arrow into a separate module
etseidl Jul 31, 2024
8691903
Merge remote-tracking branch 'origin/master' into test_merge5
etseidl Aug 1, 2024
241ee02
add histograms to to_thrift()
etseidl Aug 1, 2024
0b53d55
Merge pull request #2 from etseidl/fix_compile_check
adriangb Aug 5, 2024
4d7158f
Merge pull request #3 from etseidl/fix_checks_and_merge
adriangb Aug 5, 2024
590c4ed
Merge remote-tracking branch 'apache/master' into add-encode_metadata
alamb Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Deprecate read_page_locations() and simplify offset index in `Parquet…
…MetaData` (#6095)

* deprecate read_page_locations

* add to_thrift() to OffsetIndexMetaData
  • Loading branch information
etseidl authored Jul 23, 2024
commit 3bc998792b19ba20285ca82765991f03c7fa845a
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ impl ArrowReaderMetadata {
let offset_index = metadata
.row_groups()
.iter()
.map(|rg| index_reader::read_pages_locations(reader, rg.columns()))
.map(|rg| index_reader::read_offset_indexes(reader, rg.columns()))
.collect::<Result<Vec<_>>>()?;

metadata.set_offset_index(Some(offset_index))
Expand Down Expand Up @@ -689,7 +689,7 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
// To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out empty `i[rg_idx]`.
let page_locations = offset_index
.filter(|i| !i[rg_idx].is_empty())
.map(|i| i[rg_idx][self.column_idx].clone());
.map(|i| i[rg_idx][self.column_idx].page_locations.clone());
let total_rows = rg.num_rows() as usize;
let reader = self.reader.clone();

Expand Down
14 changes: 10 additions & 4 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
Expand Down Expand Up @@ -1378,7 +1380,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
Expand Down Expand Up @@ -1408,7 +1412,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
let num_data_pages = &column_offset_index[*rg_index][parquet_index]
.page_locations()
.len();

(*num_data_pages, column_page_index_per_row_group_per_column)
});
Expand Down Expand Up @@ -1450,7 +1456,7 @@ impl<'a> StatisticsConverter<'a> {

let mut row_count_total = Vec::new();
for rg_idx in row_group_indices {
let page_locations = &column_offset_index[*rg_idx][parquet_index];
let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations();

let row_count_per_page = page_locations
.windows(2)
Expand Down
14 changes: 7 additions & 7 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ mod tests {
use crate::data_type::AsBytes;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::page_index::index_reader::read_offset_indexes;
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
};
Expand Down Expand Up @@ -1669,16 +1669,16 @@ mod tests {
"Expected a dictionary page"
);

let page_locations = read_pages_locations(&file, column).unwrap();
let offset_indexes = read_offset_indexes(&file, column).unwrap();

let offset_index = page_locations[0].clone();
let page_locations = offset_indexes[0].page_locations.clone();

// We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes
// so we expect one dictionary encoded page and then a page per row thereafter.
assert_eq!(
offset_index.len(),
page_locations.len(),
10,
"Expected 9 pages but got {offset_index:#?}"
"Expected 9 pages but got {page_locations:#?}"
);
}

Expand Down Expand Up @@ -3020,8 +3020,8 @@ mod tests {

assert_eq!(index.len(), 1);
assert_eq!(index[0].len(), 2); // 2 columns
assert_eq!(index[0][0].len(), 1); // 1 page
assert_eq!(index[0][1].len(), 1); // 1 page
assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
}

#[test]
Expand Down
8 changes: 2 additions & 6 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{
acc_range, decode_column_index, decode_page_locations,
};
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -179,9 +177,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
x.columns()
.iter()
.map(|c| match c.offset_index_range() {
Some(r) => {
decode_page_locations(&data[r.start - offset..r.end - offset])
}
Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
None => Err(general_err!("missing offset index")),
})
.collect::<Result<Vec<_>>>()
Expand Down
24 changes: 13 additions & 11 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::file::FOOTER_SIZE;
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, PageLocation};
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};

mod metadata;
pub use metadata::*;
Expand Down Expand Up @@ -489,7 +490,7 @@ where
// TODO: calling build_array multiple times is wasteful

let meta = self.metadata.row_group(row_group_idx);
let page_locations = self
let offset_index = self
.metadata
.offset_index()
.map(|x| x[row_group_idx].as_slice());
Expand All @@ -499,7 +500,7 @@ where
// schema: meta.schema_descr_ptr(),
row_count: meta.num_rows() as usize,
column_chunks: vec![None; meta.columns().len()],
page_locations,
offset_index,
};

if let Some(filter) = self.filter.as_mut() {
Expand Down Expand Up @@ -703,7 +704,7 @@ where
/// An in-memory collection of column chunks
struct InMemoryRowGroup<'a> {
metadata: &'a RowGroupMetaData,
page_locations: Option<&'a [Vec<PageLocation>]>,
offset_index: Option<&'a [OffsetIndexMetaData]>,
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
}
Expand All @@ -716,7 +717,7 @@ impl<'a> InMemoryRowGroup<'a> {
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
if let Some((selection, page_locations)) = selection.zip(self.page_locations) {
if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
Expand All @@ -734,14 +735,14 @@ impl<'a> InMemoryRowGroup<'a> {
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
}
_ => (),
}

ranges.extend(selection.scan_ranges(&page_locations[idx]));
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());

ranges
Expand Down Expand Up @@ -812,7 +813,9 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> {
"Invalid column index {i}, column was not fetched"
))),
Some(data) => {
let page_locations = self.page_locations.map(|index| index[i].clone());
let page_locations = self
.offset_index
.map(|index| index[i].page_locations.clone());
let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
Expand Down Expand Up @@ -1529,7 +1532,7 @@ mod tests {
let metadata = parse_metadata(&data).unwrap();

let offset_index =
index_reader::read_pages_locations(&data, metadata.row_group(0).columns())
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
.expect("reading offset index");

let row_group_meta = metadata.row_group(0).clone();
Expand All @@ -1538,7 +1541,6 @@ mod tests {
vec![row_group_meta],
None,
Some(vec![offset_index.clone()]),
None,
);

let metadata = Arc::new(metadata);
Expand Down Expand Up @@ -1575,7 +1577,7 @@ mod tests {
};

let mut skip = true;
let mut pages = offset_index[0].iter().peekable();
let mut pages = offset_index[0].page_locations.iter().peekable();

// Setup `RowSelection` so that we can skip every other page, selecting the last page
let mut selectors = vec![];
Expand Down
12 changes: 7 additions & 5 deletions parquet/src/bin/parquet-index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
use clap::Parser;
use parquet::errors::{ParquetError, Result};
use parquet::file::page_index::index::{Index, PageIndex};
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::serialized_reader::ReadOptionsBuilder;
use parquet::format::PageLocation;
Expand Down Expand Up @@ -93,7 +94,8 @@ impl Args {
))
})?;

let row_counts = compute_row_counts(offset_index, row_group.num_rows());
let row_counts =
compute_row_counts(offset_index.page_locations.as_slice(), row_group.num_rows());
match &column_indices[column_idx] {
Index::NONE => println!("NO INDEX"),
Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?,
Expand Down Expand Up @@ -131,20 +133,20 @@ fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec<i64> {
/// Prints index information for a single column chunk
fn print_index<T: std::fmt::Display>(
column_index: &[PageIndex<T>],
offset_index: &[PageLocation],
offset_index: &OffsetIndexMetaData,
row_counts: &[i64],
) -> Result<()> {
if column_index.len() != offset_index.len() {
if column_index.len() != offset_index.page_locations.len() {
return Err(ParquetError::General(format!(
"Index length mismatch, got {} and {}",
column_index.len(),
offset_index.len()
offset_index.page_locations.len()
)));
}

for (idx, ((c, o), row_count)) in column_index
.iter()
.zip(offset_index)
.zip(offset_index.page_locations())
.zip(row_counts)
.enumerate()
{
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::data_type::private::ParquetValueType;
use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData};
use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::index::{Index, NativeIndex, PageIndex};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::format::{BoundaryOrder, PageLocation, SortingColumn};
use std::sync::Arc;
Expand Down Expand Up @@ -144,6 +145,12 @@ impl HeapSize for Statistics {
}
}

impl HeapSize for OffsetIndexMetaData {
fn heap_size(&self) -> usize {
self.page_locations.heap_size() + self.unencoded_byte_array_data_bytes.heap_size()
}
}

impl HeapSize for Index {
fn heap_size(&self) -> usize {
match self {
Expand Down
Loading