Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Go back to first fully decoding thrift, and then transforming
to our own metadata structs. This reduces the amount of non-macro
generated code.
  • Loading branch information
etseidl committed Aug 14, 2025
commit b24b358cd2d2d7c67b84c206869017183f1aa3c7
94 changes: 33 additions & 61 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use crate::{
statistics::ValueStatistics,
},
parquet_thrift::{FieldType, ThriftCompactInputProtocol},
schema::types::{schema_from_thrift_input, ColumnDescriptor},
thrift_read_field, thrift_struct,
schema::types::{parquet_schema_from_array, ColumnDescriptor, SchemaElement},
thrift_struct,
util::bit_util::FromBytes,
};
use bytes::Bytes;
Expand All @@ -41,7 +41,7 @@ use bytes::Bytes;
use crate::file::column_crypto_metadata::ColumnCryptoMetaData;

use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData};
use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData};
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use crate::file::reader::ChunkReader;
Expand Down Expand Up @@ -1000,7 +1000,7 @@ impl ParquetMetaDataReader {
.collect::<Vec<KeyValue>>()
});

let file_metadata = FileMetaData::new(
let file_metadata = crate::file::metadata::FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
Expand Down Expand Up @@ -1043,7 +1043,7 @@ impl ParquetMetaDataReader {
.collect::<Vec<KeyValue>>()
});

let file_metadata = FileMetaData::new(
let file_metadata = crate::file::metadata::FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
Expand All @@ -1059,74 +1059,30 @@ impl ParquetMetaDataReader {
pub fn decode_file_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
let mut prot = ThriftCompactInputProtocol::new(buf);

// components of the FileMetaData
let mut version: Option<i32> = None;
let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
let mut num_rows: Option<i64> = None;
let mut row_groups: Option<Vec<RowGroup>> = None;
let mut key_value_metadata: Option<Vec<KeyValue>> = None;
let mut created_by: Option<String> = None;
let mut column_orders: Option<Vec<ColumnOrder>> = None;

// begin decoding to intermediates
prot.read_struct_begin()?;
loop {
let field_ident = prot.read_field_begin()?;
if field_ident.field_type == FieldType::Stop {
break;
}
let prot = &mut prot;
let file_meta = FileMetaData::try_from(&mut prot)?;

match field_ident.id {
1 => {
thrift_read_field!(version, prot, i32);
}
2 => {
let val = schema_from_thrift_input(prot)?;
schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
}
3 => {
thrift_read_field!(num_rows, prot, i64);
}
4 => {
// need to get temp struct here and then translate
let val = Vec::<RowGroup>::try_from(prot)?;
row_groups = Some(val);
}
5 => {
let val = Vec::<KeyValue>::try_from(prot)?;
key_value_metadata = Some(val);
}
6 => {
thrift_read_field!(created_by, prot, string);
}
7 => {
let val = Vec::<ColumnOrder>::try_from(prot)?;
column_orders = Some(val);
}
_ => {
prot.skip(field_ident.field_type)?;
}
}
}
let version = file_meta.version;
let num_rows = file_meta.num_rows;
let row_groups = file_meta.row_groups;
let created_by = file_meta.created_by.map(|c| c.to_owned());
let key_value_metadata = file_meta.key_value_metadata;

let version = version.expect("Required field version is missing");
let num_rows = num_rows.expect("Required field num_rows is missing");
let row_groups = row_groups.expect("Required field row_groups is missing");
let schema_descr = schema_descr.expect("Required field schema is missing");
let val = parquet_schema_from_array(&file_meta.schema)?;
let schema_descr = Arc::new(SchemaDescriptor::new(val));

// need schema_descr to get final RowGroupMetaData
let row_groups = convert_row_groups(row_groups, schema_descr.clone())?;

// need to map read column orders to actual values based on the schema
if column_orders
if file_meta
.column_orders
.as_ref()
.is_some_and(|cos| cos.len() != schema_descr.num_columns())
{
return Err(general_err!("Column order length mismatch"));
}

let column_orders = column_orders.map(|cos| {
let column_orders = file_meta.column_orders.map(|cos| {
let mut res = Vec::with_capacity(cos.len());
for (i, column) in schema_descr.columns().iter().enumerate() {
match cos[i] {
Expand All @@ -1144,7 +1100,7 @@ impl ParquetMetaDataReader {
res
});

let fmd = FileMetaData::new(
let fmd = crate::file::metadata::FileMetaData::new(
version,
num_rows,
created_by,
Expand Down Expand Up @@ -1219,6 +1175,22 @@ fn get_file_decryptor(
}

// temp structs used to construct RowGroupMetaData
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some way to avoid reading into a temp structure and directly construct a RowGroupMetaData?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above about FileMetaData 😄

I actually went down that road and it was slower. :( That may have been my incompetence, or it may be that the code got too bloated and the optimizer gave up.

// TODO: move these to a separate module
thrift_struct!(
struct FileMetaData<'a> {
/** Version of this file **/
1: required i32 version
2: required list<'a><SchemaElement> schema;
3: required i64 num_rows
4: required list<'a><RowGroup> row_groups
5: optional list<KeyValue> key_value_metadata
6: optional string created_by
7: optional list<ColumnOrder> column_orders;
//8: optional EncryptionAlgorithm encryption_algorithm
//9: optional binary footer_signing_key_metadata
}
);

thrift_struct!(
struct RowGroup<'a> {
1: required list<'a><ColumnChunk> columns
Expand Down
85 changes: 52 additions & 33 deletions parquet/src/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ fn from_thrift_helper(
.map(Repetition::try_from)
.transpose()?;

let mut fields = vec![];
let mut fields = Vec::with_capacity(n as usize);
let mut next_index = index + 1;
for _ in 0..n {
let child_result = from_thrift_helper(elements, next_index)?;
Expand Down Expand Up @@ -1429,7 +1429,7 @@ fn to_thrift_helper(schema: &Type, elements: &mut Vec<crate::format::SchemaEleme

// temporary struct that lives only long enough to create `TypePtr` schema
thrift_struct!(
struct SchemaElement<'a> {
pub(crate) struct SchemaElement<'a> {
/** Data type for this field. Not set if the current element is a non-leaf node */
1: optional PhysicalType type_;
2: optional i32 type_length;
Expand All @@ -1444,67 +1444,86 @@ struct SchemaElement<'a> {
}
);

fn next_schema_element<'a>(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<SchemaElement<'a>> {
SchemaElement::try_from(prot)
}

pub(crate) fn schema_from_thrift_input(prot: &mut ThriftCompactInputProtocol) -> Result<TypePtr> {
// need to be at the start of a list<SchemaElement>. read the list header.
let list_ident = prot.read_list_begin()?;
// This is a copy of `from_thrift` above, but rather than `format::SchemaElement` it takes
// the `SchemaElement` currently defined in this modules.

let t = from_thrift_input_helper(prot, 0)?;
if t.0 != list_ident.size as usize {
return Err(general_err!("Expected exactly one root node"));
// convert thrift decoded array of `SchemaElement` into this crate's representation of
// parquet types
pub(crate) fn parquet_schema_from_array<'a>(elements: &'a [SchemaElement<'a>]) -> Result<TypePtr> {
let mut index = 0;
let mut schema_nodes = Vec::with_capacity(1); // there should only be one element when done
while index < elements.len() {
let t = schema_from_array_helper(elements, index)?;
index = t.0;
schema_nodes.push(t.1);
}
if schema_nodes.len() != 1 {
return Err(general_err!(
"Expected exactly one root node, but found {}",
schema_nodes.len()
));
}

if !t.1.is_group() {
if !schema_nodes[0].is_group() {
return Err(general_err!("Expected root node to be a group type"));
}

Ok(t.1)
Ok(schema_nodes.remove(0))
}

fn from_thrift_input_helper(
prot: &mut ThriftCompactInputProtocol,
// recursive helper function for schema conversion
fn schema_from_array_helper<'a>(
elements: &[SchemaElement<'a>],
index: usize,
) -> Result<(usize, TypePtr)> {
// Whether or not the current node is root (message type).
// There is only one message type node in the schema tree.
let is_root_node = index == 0;
let element = next_schema_element(prot)?;

if index >= elements.len() {
return Err(general_err!(
"Index out of bound, index = {}, len = {}",
index,
elements.len()
));
}
let element = &elements[index];

// Check for empty schema
if let (true, None | Some(0)) = (is_root_node, element.num_children) {
let builder = Type::group_type_builder(element.name);
let builder = Type::group_type_builder(&element.name);
return Ok((index + 1, Arc::new(builder.build().unwrap())));
}

let converted_type = element.converted_type.unwrap_or(ConvertedType::NONE);
let logical_type = element.logical_type;

// LogicalType is only present in v2 Parquet files. ConvertedType is always
// populated, regardless of the version of the file (v1 or v2).
let logical_type = element.logical_type.clone();

check_logical_type(&logical_type)?;

let field_id = element.field_id;
match element.num_children {
let field_id = elements[index].field_id;
match elements[index].num_children {
// From parquet-format:
// The children count is used to construct the nested relationship.
// This field is not set when the element is a primitive type
// Sometimes parquet-cpp sets num_children field to 0 for primitive types, so we
// have to handle this case too.
None | Some(0) => {
// primitive type
if element.repetition_type.is_none() {
if elements[index].repetition_type.is_none() {
return Err(general_err!(
"Repetition level must be defined for a primitive type"
));
}
let repetition = element.repetition_type.unwrap();
if let Some(type_) = element.type_ {
let repetition = elements[index].repetition_type.unwrap();
if let Some(type_) = elements[index].type_ {
let physical_type = type_;
let length = element.type_length.unwrap_or(-1);
let scale = element.scale.unwrap_or(-1);
let precision = element.precision.unwrap_or(-1);
let name = element.name;
let length = elements[index].type_length.unwrap_or(-1);
let scale = elements[index].scale.unwrap_or(-1);
let precision = elements[index].precision.unwrap_or(-1);
let name = &elements[index].name;
let builder = Type::primitive_type_builder(name, physical_type)
.with_repetition(repetition)
.with_converted_type(converted_type)
Expand All @@ -1515,7 +1534,7 @@ fn from_thrift_input_helper(
.with_id(field_id);
Ok((index + 1, Arc::new(builder.build()?)))
} else {
let mut builder = Type::group_type_builder(element.name)
let mut builder = Type::group_type_builder(&elements[index].name)
.with_converted_type(converted_type)
.with_logical_type(logical_type)
.with_id(field_id);
Expand All @@ -1533,17 +1552,17 @@ fn from_thrift_input_helper(
}
}
Some(n) => {
let repetition = element.repetition_type;
let repetition = elements[index].repetition_type;

let mut fields = vec![];
let mut fields = Vec::with_capacity(n as usize);
let mut next_index = index + 1;
for _ in 0..n {
let child_result = from_thrift_input_helper(prot, next_index)?;
let child_result = schema_from_array_helper(elements, next_index)?;
next_index = child_result.0;
fields.push(child_result.1);
}

let mut builder = Type::group_type_builder(element.name)
let mut builder = Type::group_type_builder(&elements[index].name)
.with_converted_type(converted_type)
.with_logical_type(logical_type)
.with_fields(fields)
Expand Down
Loading