Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,146 @@ pub fn parquet_column<'a>(
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}

#[cfg(test)]
mod test {
use crate::arrow::ArrowWriter;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use bytes::Bytes;
use std::sync::Arc;

#[test]
fn test_metadata_read_write_roundtrip() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
assert_ne!(
metadata_bytes.len(),
parquet_bytes.len(),
"metadata is subset of parquet"
);

let roundtrip_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&metadata_bytes)
.unwrap();

assert_eq!(original_metadata, roundtrip_metadata);
}

#[test]
fn test_metadata_read_write_roundtrip_page_index() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file including the page index structures
// (which are stored elsewhere in the footer)
let original_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&metadata_bytes)
.unwrap();

// Need to normalize the metadata first to remove offsets in data
let original_metadata = normalize_locations(original_metadata);
let roundtrip_metadata = normalize_locations(roundtrip_metadata);
assert_eq!(
format!("{original_metadata:#?}"),
format!("{roundtrip_metadata:#?}")
);
assert_eq!(original_metadata, roundtrip_metadata);
}

#[test]
// Reproducer for https://github.com/apache/arrow-rs/issues/6464 (this should eventually pass)
#[should_panic(expected = "missing required field ColumnIndex.null_pages")]
fn test_metadata_read_write_partial_offset() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file WITHOUT the page index structures
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes requesting to read the offsets
let metadata_bytes = metadata_to_bytes(&original_metadata);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true) // there are no page indexes in the metadata
.parse_and_finish(&metadata_bytes)
.unwrap();

// Need to normalize the metadata first to remove offsets in data
let original_metadata = normalize_locations(original_metadata);
let roundtrip_metadata = normalize_locations(roundtrip_metadata);
assert_eq!(
format!("{original_metadata:#?}"),
format!("{roundtrip_metadata:#?}")
);
assert_eq!(original_metadata, roundtrip_metadata);
}

// TODO: test reading parquet bytes from serialized metadata

/// Write a parquet filed into an in memory buffer
fn create_parquet_file() -> Bytes {
let mut buf = vec![];
let data = vec![100, 200, 201, 300, 102, 33];
let array: ArrayRef = Arc::new(Int32Array::from(data));
let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);

Bytes::from(buf)
}

/// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter`
fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
let mut buf = vec![];
ParquetMetaDataWriter::new(&mut buf, metadata)
.finish()
.unwrap();
Bytes::from(buf)
}

/// Sets the page index offset locations in the metadata to `None`
///
/// This is because the offsets are used to find the relative location of the index
/// structures, and thus differ depending on how the structures are stored.
fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
let mut metadata_builder = metadata.into_builder();
for rg in metadata_builder.take_row_groups() {
let mut rg_builder = rg.into_builder();
for col in rg_builder.take_columns() {
rg_builder = rg_builder.add_column_metadata(
col.into_builder()
.set_offset_index_offset(None)
.set_index_page_offset(None)
.set_column_index_offset(None)
.build()
.unwrap(),
);
}
let rg = rg_builder.build().unwrap();
metadata_builder = metadata_builder.add_row_group(rg);
}
metadata_builder.build()
}
}
Loading