Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
first cut at reading straight to ParquetMetaData
  • Loading branch information
etseidl committed Aug 11, 2025
commit 502c679b7d5ff45052eb1736fc9addd05f2ca31e
27 changes: 26 additions & 1 deletion parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use parquet::file::metadata::ParquetMetaDataReader;
use rand::Rng;
use thrift::protocol::TCompactOutputProtocol;

Expand Down Expand Up @@ -198,19 +199,43 @@ fn criterion_benchmark(c: &mut Criterion) {
});

let meta_data = get_footer_bytes(data.clone());
c.bench_function("parquet metadata", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata(&meta_data).unwrap();
})
});

c.bench_function("decode file metadata", |b| {
b.iter(|| {
parquet::thrift::bench_file_metadata(&meta_data);
})
});

let buf = black_box(encoded_meta()).into();
c.bench_function("decode new", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_file_metadata(&meta_data).unwrap();
})
});

let buf: Bytes = black_box(encoded_meta()).into();
c.bench_function("parquet metadata (wide)", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata(&buf).unwrap();
})
});

c.bench_function("decode file metadata (wide)", |b| {
b.iter(|| {
parquet::thrift::bench_file_metadata(&buf);
})
});

c.bench_function("decode new (wide)", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_file_metadata(&buf).unwrap();
})
});

// rewrite file with page statistics. then read page headers.
#[cfg(feature = "arrow")]
let (file_bytes, metadata) = rewrite_file(data.clone());
Expand Down
77 changes: 13 additions & 64 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{fmt, str};

pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
use crate::parquet_thrift::{FieldType, ThriftCompactInputProtocol};
use crate::{thrift_enum, thrift_private_struct, thrift_union_all_empty};
use crate::{thrift_enum, thrift_struct, thrift_union_all_empty};

use crate::errors::{ParquetError, Result};

Expand Down Expand Up @@ -210,14 +210,14 @@ union TimeUnit {

// private structs for decoding logical type

thrift_private_struct!(
thrift_struct!(
struct DecimalType {
1: required i32 scale
2: required i32 precision
}
);

thrift_private_struct!(
thrift_struct!(
struct TimestampType {
1: required bool is_adjusted_to_u_t_c
2: required TimeUnit unit
Expand All @@ -227,84 +227,33 @@ struct TimestampType {
// they are identical
use TimestampType as TimeType;

thrift_private_struct!(
thrift_struct!(
struct IntType {
1: required i8 bit_width
2: required bool is_signed
}
);

thrift_private_struct!(
thrift_struct!(
struct VariantType {
// The version of the variant specification that the variant was
// written with.
1: optional i8 specification_version
}
);

// TODO need macro for structs that need lifetime annotation
thrift_struct!(
struct GeometryType<'a> {
crs: Option<&'a str>,
}

impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for GeometryType<'a> {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
let mut crs: Option<&str> = None;
prot.read_struct_begin()?;
loop {
let field_ident = prot.read_field_begin()?;
if field_ident.field_type == FieldType::Stop {
break;
}
match field_ident.id {
1 => {
let val = prot.read_string()?;
crs = Some(val);
}
_ => {
prot.skip(field_ident.field_type)?;
}
};
}
Ok(Self { crs })
}
1: optional string<'a> crs;
}
);

thrift_struct!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty sweet way of generating structs BTW

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jhorstmann! I agree! 😄

struct GeographyType<'a> {
crs: Option<&'a str>,
algorithm: Option<EdgeInterpolationAlgorithm>,
}

impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for GeographyType<'a> {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
let mut crs: Option<&str> = None;
let mut algorithm: Option<EdgeInterpolationAlgorithm> = None;
prot.read_struct_begin()?;
loop {
let field_ident = prot.read_field_begin()?;
if field_ident.field_type == FieldType::Stop {
break;
}
match field_ident.id {
1 => {
let val = prot.read_string()?;
crs = Some(val);
}
2 => {
let val = EdgeInterpolationAlgorithm::try_from(&mut *prot)?;
algorithm = Some(val);
}

_ => {
prot.skip(field_ident.field_type)?;
}
};
}
Ok(Self { crs, algorithm })
}
1: optional string<'a> crs;
2: optional EdgeInterpolationAlgorithm algorithm;
}
);

/// Logical types used by version 2.4.0+ of the Parquet format.
///
Expand Down Expand Up @@ -971,7 +920,7 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ColumnOrder {
}
let ret = match field_ident.id {
1 => {
// TODO: the sort order needs to be set correctly after parsing.
// NOTE: the sort order needs to be set correctly after parsing.
prot.skip_empty_struct()?;
Self::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
}
Expand Down
55 changes: 46 additions & 9 deletions parquet/src/file/column_crypto_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,66 @@

//! Column chunk encryption metadata

use crate::errors::Result;
use crate::errors::{ParquetError, Result};
use crate::format::{
ColumnCryptoMetaData as TColumnCryptoMetaData,
EncryptionWithColumnKey as TEncryptionWithColumnKey,
EncryptionWithFooterKey as TEncryptionWithFooterKey,
};
use crate::parquet_thrift::{FieldType, ThriftCompactInputProtocol};
use crate::thrift_struct;

thrift_struct!(
/// Encryption metadata for a column chunk encrypted with a column-specific key
pub struct EncryptionWithColumnKey {
/// Path to the column in the Parquet schema
1: required list<string> path_in_schema

/// Path to the column in the Parquet schema
2: optional binary key_metadata
}
);

/// ColumnCryptoMetadata for a column chunk
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq)]
pub enum ColumnCryptoMetaData {
/// The column is encrypted with the footer key
EncryptionWithFooterKey,
/// The column is encrypted with a column-specific key
EncryptionWithColumnKey(EncryptionWithColumnKey),
}

/// Encryption metadata for a column chunk encrypted with a column-specific key
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct EncryptionWithColumnKey {
/// Path to the column in the Parquet schema
pub path_in_schema: Vec<String>,
/// Metadata required to retrieve the column encryption key
pub key_metadata: Option<Vec<u8>>,
impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ColumnCryptoMetaData {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
prot.read_struct_begin()?;

let field_ident = prot.read_field_begin()?;
if field_ident.field_type == FieldType::Stop {
return Err(general_err!("received empty union from remote LogicalType"));
}
let ret = match field_ident.id {
1 => {
prot.skip_empty_struct()?;
Self::EncryptionWithFooterKey
}
2 => Self::EncryptionWithColumnKey(EncryptionWithColumnKey::try_from(&mut *prot)?),
_ => {
return Err(general_err!(
"Unexpected EncryptionWithColumnKey {}",
field_ident.id
));
}
};
let field_ident = prot.read_field_begin()?;
if field_ident.field_type != FieldType::Stop {
return Err(general_err!(
"Received multiple fields for union from remote LogicalType"
));
}
prot.read_struct_end()?;
Ok(ret)
}
}

/// Converts Thrift definition into `ColumnCryptoMetadata`.
Expand Down
77 changes: 76 additions & 1 deletion parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ mod memory;
pub(crate) mod reader;
mod writer;

use crate::basic::{ColumnOrder, Compression, Encoding, Type};
#[cfg(feature = "encryption")]
use crate::encryption::{
decrypt::FileDecryptor,
Expand Down Expand Up @@ -124,6 +123,10 @@ use crate::{
basic::BoundaryOrder,
errors::{ParquetError, Result},
};
use crate::{
basic::{ColumnOrder, Compression, Encoding, Type},
parquet_thrift::{FieldType, ThriftCompactInputProtocol},
};
use crate::{
data_type::private::ParquetValueType, file::page_index::offset_index::OffsetIndexMetaData,
};
Expand Down Expand Up @@ -445,6 +448,39 @@ impl KeyValue {
}
}

impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for KeyValue {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
let mut key: Option<&str> = None;
let mut value: Option<&str> = None;
prot.read_struct_begin()?;
loop {
let field_ident = prot.read_field_begin()?;
if field_ident.field_type == FieldType::Stop {
break;
}
match field_ident.id {
1 => {
let val = prot.read_string()?;
key = Some(val);
}
2 => {
let val = prot.read_string()?;
value = Some(val);
}
_ => {
prot.skip(field_ident.field_type)?;
}
}
}
prot.read_struct_end()?;
Ok(Self {
key: key.expect("Required field key not present").to_owned(),
value: value.map(|v| v.to_owned()),
})
}
}

/// Reference counted pointer for [`FileMetaData`].
pub type FileMetaDataPtr = Arc<FileMetaData>;

Expand Down Expand Up @@ -558,6 +594,45 @@ pub struct SortingColumn {
pub nulls_first: bool,
}

impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for SortingColumn {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
let mut column_idx: Option<i32> = None;
let mut descending: Option<bool> = None;
let mut nulls_first: Option<bool> = None;
prot.read_struct_begin()?;
loop {
let field_ident = prot.read_field_begin()?;
if field_ident.field_type == FieldType::Stop {
break;
}
match field_ident.id {
1 => {
let val = prot.read_i32()?;
column_idx = Some(val);
}
2 => {
let val = prot.read_bool()?;
descending = Some(val);
}
3 => {
let val = prot.read_bool()?;
nulls_first = Some(val);
}
_ => {
prot.skip(field_ident.field_type)?;
}
};
}
prot.read_struct_end()?;
Ok(Self {
column_idx: column_idx.expect("Required field column_idx not present"),
descending: descending.expect("Required field descending not present"),
nulls_first: nulls_first.expect("Required field nulls_first not present"),
})
}
}

impl From<&crate::format::SortingColumn> for SortingColumn {
fn from(value: &crate::format::SortingColumn) -> Self {
Self {
Expand Down
Loading