diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs index bb2eca0a75c1..3c293462a157 100644 --- a/parquet/benches/metadata.rs +++ b/parquet/benches/metadata.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use parquet::file::metadata::ParquetMetaDataReader; use rand::Rng; use thrift::protocol::TCompactOutputProtocol; @@ -198,19 +199,43 @@ fn criterion_benchmark(c: &mut Criterion) { }); let meta_data = get_footer_bytes(data.clone()); - c.bench_function("decode file metadata", |b| { + c.bench_function("decode parquet metadata", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata(&meta_data).unwrap(); + }) + }); + + c.bench_function("decode thrift file metadata", |b| { b.iter(|| { parquet::thrift::bench_file_metadata(&meta_data); }) }); - let buf = black_box(encoded_meta()).into(); - c.bench_function("decode file metadata (wide)", |b| { + c.bench_function("decode parquet metadata new", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_file_metadata(&meta_data).unwrap(); + }) + }); + + let buf: Bytes = black_box(encoded_meta()).into(); + c.bench_function("decode parquet metadata (wide)", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata(&buf).unwrap(); + }) + }); + + c.bench_function("decode thrift file metadata (wide)", |b| { b.iter(|| { parquet::thrift::bench_file_metadata(&buf); }) }); + c.bench_function("decode parquet metadata new (wide)", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_file_metadata(&buf).unwrap(); + }) + }); + // rewrite file with page statistics. then read page headers. #[cfg(feature = "arrow")] let (file_bytes, metadata) = rewrite_file(data.clone()); diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 92d70a933a03..78d294acd577 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -25,7 +25,7 @@ use std::{fmt, str}; pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel}; use crate::parquet_thrift::{FieldType, ThriftCompactInputProtocol}; -use crate::{thrift_enum, thrift_private_struct, thrift_union_all_empty}; +use crate::{thrift_enum, thrift_struct, thrift_union_all_empty}; use crate::errors::{ParquetError, Result}; @@ -210,14 +210,14 @@ union TimeUnit { // private structs for decoding logical type -thrift_private_struct!( +thrift_struct!( struct DecimalType { 1: required i32 scale 2: required i32 precision } ); -thrift_private_struct!( +thrift_struct!( struct TimestampType { 1: required bool is_adjusted_to_u_t_c 2: required TimeUnit unit @@ -227,14 +227,14 @@ struct TimestampType { // they are identical use TimestampType as TimeType; -thrift_private_struct!( +thrift_struct!( struct IntType { 1: required i8 bit_width 2: required bool is_signed } ); -thrift_private_struct!( +thrift_struct!( struct VariantType { // The version of the variant specification that the variant was // written with. @@ -242,69 +242,18 @@ struct VariantType { } ); -// TODO need macro for structs that need lifetime annotation +thrift_struct!( struct GeometryType<'a> { - crs: Option<&'a str>, -} - -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for GeometryType<'a> { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { - let mut crs: Option<&str> = None; - prot.read_struct_begin()?; - loop { - let field_ident = prot.read_field_begin()?; - if field_ident.field_type == FieldType::Stop { - break; - } - match field_ident.id { - 1 => { - let val = prot.read_string()?; - crs = Some(val); - } - _ => { - prot.skip(field_ident.field_type)?; - } - }; - } - Ok(Self { crs }) - } + 1: optional string<'a> crs; } +); +thrift_struct!( struct GeographyType<'a> { - crs: Option<&'a str>, - algorithm: Option, -} - -impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for GeographyType<'a> { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { - let mut crs: Option<&str> = None; - let mut algorithm: Option = None; - prot.read_struct_begin()?; - loop { - let field_ident = prot.read_field_begin()?; - if field_ident.field_type == FieldType::Stop { - break; - } - match field_ident.id { - 1 => { - let val = prot.read_string()?; - crs = Some(val); - } - 2 => { - let val = EdgeInterpolationAlgorithm::try_from(&mut *prot)?; - algorithm = Some(val); - } - - _ => { - prot.skip(field_ident.field_type)?; - } - }; - } - Ok(Self { crs, algorithm }) - } + 1: optional string<'a> crs; + 2: optional EdgeInterpolationAlgorithm algorithm; } +); /// Logical types used by version 2.4.0+ of the Parquet format. /// @@ -971,7 +920,7 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ColumnOrder { } let ret = match field_ident.id { 1 => { - // TODO: the sort order needs to be set correctly after parsing. + // NOTE: the sort order needs to be set correctly after parsing. prot.skip_empty_struct()?; Self::TYPE_DEFINED_ORDER(SortOrder::SIGNED) } diff --git a/parquet/src/encryption/decrypt.rs b/parquet/src/encryption/decrypt.rs index d9b9ff0326b4..d285f6a1237c 100644 --- a/parquet/src/encryption/decrypt.rs +++ b/parquet/src/encryption/decrypt.rs @@ -142,13 +142,13 @@ impl CryptoContext { column_ordinal: usize, ) -> Result { let (data_decryptor, metadata_decryptor) = match column_crypto_metadata { - ColumnCryptoMetaData::EncryptionWithFooterKey => { + ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY => { // TODO: In GCM-CTR mode will this need to be a non-GCM decryptor? let data_decryptor = file_decryptor.get_footer_decryptor()?; let metadata_decryptor = file_decryptor.get_footer_decryptor()?; (data_decryptor, metadata_decryptor) } - ColumnCryptoMetaData::EncryptionWithColumnKey(column_key_encryption) => { + ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(column_key_encryption) => { let key_metadata = &column_key_encryption.key_metadata; let full_column_name; let column_name = if column_key_encryption.path_in_schema.len() == 1 { diff --git a/parquet/src/encryption/encrypt.rs b/parquet/src/encryption/encrypt.rs index c8d3ffc0eef4..1a241bf7b170 100644 --- a/parquet/src/encryption/encrypt.rs +++ b/parquet/src/encryption/encrypt.rs @@ -421,14 +421,14 @@ pub(crate) fn get_column_crypto_metadata( ) -> Option { if properties.column_keys.is_empty() { // Uniform encryption - Some(ColumnCryptoMetaData::EncryptionWithFooterKey) + Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) } else { properties .column_keys .get(&column.path().string()) .map(|encryption_key| { // Column is encrypted with a column specific key - ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey { + ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(EncryptionWithColumnKey { path_in_schema: column.path().parts().to_vec(), key_metadata: encryption_key.key_metadata.clone(), }) diff --git a/parquet/src/file/column_crypto_metadata.rs b/parquet/src/file/column_crypto_metadata.rs index af670e675fcd..95cbc65cf716 100644 --- a/parquet/src/file/column_crypto_metadata.rs +++ b/parquet/src/file/column_crypto_metadata.rs @@ -17,30 +17,36 @@ //! Column chunk encryption metadata -use crate::errors::Result; +use crate::errors::{ParquetError, Result}; use crate::format::{ ColumnCryptoMetaData as TColumnCryptoMetaData, EncryptionWithColumnKey as TEncryptionWithColumnKey, EncryptionWithFooterKey as TEncryptionWithFooterKey, }; +use crate::parquet_thrift::{FieldType, ThriftCompactInputProtocol}; +use crate::{thrift_struct, thrift_union}; -/// ColumnCryptoMetadata for a column chunk -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum ColumnCryptoMetaData { - /// The column is encrypted with the footer key - EncryptionWithFooterKey, - /// The column is encrypted with a column-specific key - EncryptionWithColumnKey(EncryptionWithColumnKey), -} +// define this and ColumnCryptoMetadata here so they're only defined when +// the encryption feature is enabled +thrift_struct!( /// Encryption metadata for a column chunk encrypted with a column-specific key -#[derive(Clone, Debug, PartialEq, Eq)] pub struct EncryptionWithColumnKey { - /// Path to the column in the Parquet schema - pub path_in_schema: Vec, - /// Metadata required to retrieve the column encryption key - pub key_metadata: Option>, + /// Path to the column in the Parquet schema + 1: required list path_in_schema + + /// Path to the column in the Parquet schema + 2: optional binary key_metadata +} +); + +thrift_union!( +/// ColumnCryptoMetadata for a column chunk +union ColumnCryptoMetaData { + 1: ENCRYPTION_WITH_FOOTER_KEY + 2: (EncryptionWithColumnKey) ENCRYPTION_WITH_COLUMN_KEY } +); /// Converts Thrift definition into `ColumnCryptoMetadata`. pub fn try_from_thrift( @@ -48,10 +54,10 @@ pub fn try_from_thrift( ) -> Result { let crypto_metadata = match thrift_column_crypto_metadata { TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_) => { - ColumnCryptoMetaData::EncryptionWithFooterKey + ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY } TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(encryption_with_column_key) => { - ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey { + ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(EncryptionWithColumnKey { path_in_schema: encryption_with_column_key.path_in_schema.clone(), key_metadata: encryption_with_column_key.key_metadata.clone(), }) @@ -63,10 +69,10 @@ pub fn try_from_thrift( /// Converts `ColumnCryptoMetadata` into Thrift definition. pub fn to_thrift(column_crypto_metadata: &ColumnCryptoMetaData) -> TColumnCryptoMetaData { match column_crypto_metadata { - ColumnCryptoMetaData::EncryptionWithFooterKey => { + ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY => { TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(TEncryptionWithFooterKey {}) } - ColumnCryptoMetaData::EncryptionWithColumnKey(encryption_with_column_key) => { + ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(encryption_with_column_key) => { TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(TEncryptionWithColumnKey { path_in_schema: encryption_with_column_key.path_in_schema.clone(), key_metadata: encryption_with_column_key.key_metadata.clone(), @@ -81,14 +87,14 @@ mod tests { #[test] fn test_encryption_with_footer_key_from_thrift() { - let metadata = ColumnCryptoMetaData::EncryptionWithFooterKey; + let metadata = ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY; assert_eq!(try_from_thrift(&to_thrift(&metadata)).unwrap(), metadata); } #[test] fn test_encryption_with_column_key_from_thrift() { - let metadata = ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey { + let metadata = ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(EncryptionWithColumnKey { path_in_schema: vec!["abc".to_owned(), "def".to_owned()], key_metadata: Some(vec![0, 1, 2, 3, 4, 5]), }); diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 193b70d9dd4a..d8102761186c 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -93,9 +93,9 @@ //! ``` mod memory; pub(crate) mod reader; +pub(crate) mod thrift_gen; mod writer; -use crate::basic::{ColumnOrder, Compression, Encoding, Type}; #[cfg(feature = "encryption")] use crate::encryption::{ decrypt::FileDecryptor, @@ -104,7 +104,6 @@ use crate::encryption::{ #[cfg(feature = "encryption")] use crate::file::column_crypto_metadata::{self, ColumnCryptoMetaData}; pub(crate) use crate::file::metadata::memory::HeapSize; -use crate::file::page_index::index::{Index, NativeIndex}; use crate::file::{ page_encoding_stats::{self, PageEncodingStats}, page_index::offset_index::PageLocation, @@ -124,9 +123,17 @@ use crate::{ basic::BoundaryOrder, errors::{ParquetError, Result}, }; +use crate::{ + basic::{ColumnOrder, Compression, Encoding, Type}, + parquet_thrift::{FieldType, ThriftCompactInputProtocol}, +}; use crate::{ data_type::private::ParquetValueType, file::page_index::offset_index::OffsetIndexMetaData, }; +use crate::{ + file::page_index::index::{Index, NativeIndex}, + thrift_struct, +}; pub use reader::{FooterTail, ParquetMetaDataReader}; use std::ops::Range; use std::sync::Arc; @@ -423,14 +430,14 @@ impl From for ParquetMetaDataBuilder { } } +// TODO: should this move to thrift_gen? +thrift_struct!( /// A key-value pair for [`FileMetaData`]. -#[derive(Clone, Debug, Eq, PartialEq)] pub struct KeyValue { - /// The key. - pub key: String, - /// An optional value. - pub value: Option, + 1: required string key + 2: optional string value } +); impl KeyValue { /// Create a new key value pair @@ -546,17 +553,21 @@ impl FileMetaData { } } +// TODO: should this move to thrift_gen? +thrift_struct!( /// Sort order within a RowGroup of a leaf column -#[derive(Clone, Debug, Eq, PartialEq)] pub struct SortingColumn { - /// The ordinal position of the column (in this row group) * - pub column_idx: i32, - /// If true, indicates this column is sorted in descending order. * - pub descending: bool, - /// If true, nulls will come before non-null values, otherwise, - /// nulls go at the end. - pub nulls_first: bool, + /// The ordinal position of the column (in this row group) + 1: required i32 column_idx + + /// If true, indicates this column is sorted in descending order. + 2: required bool descending + + /// If true, nulls will come before non-null values, otherwise, + /// nulls go at the end. */ + 3: required bool nulls_first } +); impl From<&crate::format::SortingColumn> for SortingColumn { fn from(value: &crate::format::SortingColumn) -> Self { @@ -2203,9 +2214,9 @@ mod tests { .build(); #[cfg(not(feature = "encryption"))] - let base_expected_size = 2312; + let base_expected_size = 2280; #[cfg(feature = "encryption")] - let base_expected_size = 2648; + let base_expected_size = 2616; assert_eq!(parquet_meta.memory_size(), base_expected_size); @@ -2233,9 +2244,9 @@ mod tests { .build(); #[cfg(not(feature = "encryption"))] - let bigger_expected_size = 2816; + let bigger_expected_size = 2784; #[cfg(feature = "encryption")] - let bigger_expected_size = 3152; + let bigger_expected_size = 3120; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 53ae01221976..4905469737a0 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -17,30 +17,33 @@ use std::{io::Read, ops::Range, sync::Arc}; +use crate::{ + basic::ColumnOrder, + file::metadata::{FileMetaData, KeyValue}, + parquet_thrift::ThriftCompactInputProtocol, +}; #[cfg(feature = "encryption")] -use crate::encryption::{ - decrypt::{FileDecryptionProperties, FileDecryptor}, - modules::create_footer_aad, +use crate::{ + encryption::{ + decrypt::{CryptoContext, FileDecryptionProperties, FileDecryptor}, + modules::create_footer_aad, + }, + format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData}, }; -use crate::{basic::ColumnOrder, file::metadata::KeyValue}; use bytes::Bytes; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData}; +use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData}; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use crate::file::reader::ChunkReader; use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; -#[cfg(feature = "encryption")] -use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData}; use crate::schema::types; use crate::schema::types::SchemaDescriptor; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; #[cfg(all(feature = "async", feature = "arrow"))] use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch}; -#[cfg(feature = "encryption")] -use crate::encryption::decrypt::CryptoContext; use crate::file::page_index::offset_index::OffsetIndexMetaData; /// Reads the [`ParquetMetaData`] from a byte stream. @@ -1040,6 +1043,12 @@ impl ParquetMetaDataReader { Ok(ParquetMetaData::new(file_metadata, row_groups)) } + /// create meta data from thrift encoded bytes + pub fn decode_file_metadata(buf: &[u8]) -> Result { + let mut prot = ThriftCompactInputProtocol::new(buf); + ParquetMetaData::try_from(&mut prot) + } + /// Parses column orders from Thrift definition. /// If no column orders are defined, returns `None`. fn parse_column_orders( @@ -1106,6 +1115,7 @@ fn get_file_decryptor( mod tests { use super::*; use bytes::Bytes; + use zstd::zstd_safe::WriteBuf; use crate::basic::SortOrder; use crate::basic::Type; @@ -1309,6 +1319,27 @@ mod tests { "EOF: Parquet file too small. Size is 1728 but need 1729" ); } + + #[test] + fn test_new_decoder() { + let file = get_test_file("alltypes_tiny_pages.parquet"); + let len = file.len(); + + // read entire file + let bytes = file.get_bytes(0, len as usize).unwrap(); + let mut footer = [0u8; FOOTER_SIZE]; + footer.copy_from_slice(bytes.slice(len as usize - FOOTER_SIZE..).as_slice()); + let tail = ParquetMetaDataReader::decode_footer_tail(&footer).unwrap(); + let meta_len = tail.metadata_length(); + let metadata_bytes = bytes.slice(len as usize - FOOTER_SIZE - meta_len..); + + // get ParquetMetaData + let m = ParquetMetaDataReader::decode_file_metadata(&metadata_bytes).unwrap(); + let m2 = ParquetMetaDataReader::decode_metadata(&metadata_bytes).unwrap(); + + // check that metadatas are equivalent + assert_eq!(m, m2); + } } #[cfg(all(feature = "async", feature = "arrow", test))] diff --git a/parquet/src/file/metadata/thrift_gen.rs b/parquet/src/file/metadata/thrift_gen.rs new file mode 100644 index 000000000000..3888d247df1c --- /dev/null +++ b/parquet/src/file/metadata/thrift_gen.rs @@ -0,0 +1,509 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// a collection of generated structs used to parse thrift metadata + +use std::sync::Arc; + +#[cfg(feature = "encryption")] +use crate::file::column_crypto_metadata::ColumnCryptoMetaData; +use crate::{ + basic::{ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, Type}, + data_type::{ByteArray, FixedLenByteArray, Int96}, + errors::{ParquetError, Result}, + file::{ + metadata::{ + ColumnChunkMetaData, KeyValue, LevelHistogram, ParquetMetaData, RowGroupMetaData, + SortingColumn, + }, + page_encoding_stats::PageEncodingStats, + statistics::ValueStatistics, + }, + parquet_thrift::{FieldType, ThriftCompactInputProtocol}, + schema::types::{parquet_schema_from_array, ColumnDescriptor, SchemaDescriptor}, + thrift_struct, + util::bit_util::FromBytes, +}; + +// this needs to be visible to the schema conversion code +thrift_struct!( +pub(crate) struct SchemaElement<'a> { + /** Data type for this field. Not set if the current element is a non-leaf node */ + 1: optional Type type_; + 2: optional i32 type_length; + 3: optional Repetition repetition_type; + 4: required string<'a> name; + 5: optional i32 num_children; + 6: optional ConvertedType converted_type; + 7: optional i32 scale + 8: optional i32 precision + 9: optional i32 field_id; + 10: optional LogicalType logical_type +} +); + +// the following are only used internally so are private +thrift_struct!( +struct FileMetaData<'a> { + /** Version of this file **/ + 1: required i32 version + 2: required list<'a> schema; + 3: required i64 num_rows + 4: required list<'a> row_groups + 5: optional list key_value_metadata + 6: optional string created_by + 7: optional list column_orders; + //8: optional EncryptionAlgorithm encryption_algorithm + //9: optional binary footer_signing_key_metadata +} +); + +thrift_struct!( +struct RowGroup<'a> { + 1: required list<'a> columns + 2: required i64 total_byte_size + 3: required i64 num_rows + 4: optional list sorting_columns + 5: optional i64 file_offset + // we don't expose total_compressed_size so skip + //6: optional i64 total_compressed_size + 7: optional i16 ordinal +} +); + +#[cfg(feature = "encryption")] +thrift_struct!( +struct ColumnChunk<'a> { + 1: optional string<'a> file_path + 2: required i64 file_offset = 0 + 3: optional ColumnMetaData<'a> meta_data + 4: optional i64 offset_index_offset + 5: optional i32 offset_index_length + 6: optional i64 column_index_offset + 7: optional i32 column_index_length + 8: optional ColumnCryptoMetaData crypto_metadata + 9: optional binary<'a> encrypted_column_metadata +} +); +#[cfg(not(feature = "encryption"))] +thrift_struct!( +struct ColumnChunk<'a> { + 1: optional string file_path + 2: required i64 file_offset = 0 + 3: optional ColumnMetaData<'a> meta_data + 4: optional i64 offset_index_offset + 5: optional i32 offset_index_length + 6: optional i64 column_index_offset + 7: optional i32 column_index_length +} +); + +type CompressionCodec = Compression; +thrift_struct!( +struct ColumnMetaData<'a> { + 1: required Type type_ + 2: required list encodings + // we don't expose path_in_schema so skip + //3: required list path_in_schema + 4: required CompressionCodec codec + 5: required i64 num_values + 6: required i64 total_uncompressed_size + 7: required i64 total_compressed_size + // we don't expose key_value_metadata so skip + //8: optional list key_value_metadata + 9: required i64 data_page_offset + 10: optional i64 index_page_offset + 11: optional i64 dictionary_page_offset + 12: optional Statistics<'a> statistics + 13: optional list encoding_stats; + 14: optional i64 bloom_filter_offset; + 15: optional i32 bloom_filter_length; + 16: optional SizeStatistics size_statistics; + 17: optional GeospatialStatistics geospatial_statistics; +} +); + +thrift_struct!( +struct BoundingBox { + 1: required double xmin; + 2: required double xmax; + 3: required double ymin; + 4: required double ymax; + 5: optional double zmin; + 6: optional double zmax; + 7: optional double mmin; + 8: optional double mmax; +} +); + +thrift_struct!( +struct GeospatialStatistics { + /** A bounding box of geospatial instances */ + 1: optional BoundingBox bbox; + /** Geospatial type codes of all instances, or an empty list if not known */ + 2: optional list geospatial_types; +} +); + +thrift_struct!( +struct SizeStatistics { + 1: optional i64 unencoded_byte_array_data_bytes; + 2: optional list repetition_level_histogram; + 3: optional list definition_level_histogram; +} +); + +thrift_struct!( +struct Statistics<'a> { + 1: optional binary<'a> max; + 2: optional binary<'a> min; + 3: optional i64 null_count; + 4: optional i64 distinct_count; + 5: optional binary<'a> max_value; + 6: optional binary<'a> min_value; + 7: optional bool is_max_value_exact; + 8: optional bool is_min_value_exact; +} +); + +// convert collection of thrift RowGroups into RowGroupMetaData +fn convert_row_groups( + mut row_groups: Vec, + schema_descr: Arc, +) -> Result> { + let mut res: Vec = Vec::with_capacity(row_groups.len()); + for rg in row_groups.drain(0..) { + res.push(convert_row_group(rg, schema_descr.clone())?); + } + + Ok(res) +} + +fn convert_row_group( + row_group: RowGroup, + schema_descr: Arc, +) -> Result { + let num_rows = row_group.num_rows; + let sorting_columns = row_group.sorting_columns; + let total_byte_size = row_group.total_byte_size; + let file_offset = row_group.file_offset; + let ordinal = row_group.ordinal; + + let columns = convert_columns(row_group.columns, schema_descr.clone())?; + + Ok(RowGroupMetaData { + columns, + num_rows, + sorting_columns, + total_byte_size, + schema_descr, + file_offset, + ordinal, + }) +} + +fn convert_columns( + mut columns: Vec, + schema_descr: Arc, +) -> Result> { + let mut res: Vec = Vec::with_capacity(columns.len()); + for (c, d) in columns.drain(0..).zip(schema_descr.columns()) { + res.push(convert_column(c, d.clone())?); + } + + Ok(res) +} + +fn convert_column( + column: ColumnChunk, + column_descr: Arc, +) -> Result { + if column.meta_data.is_none() { + return Err(general_err!("Expected to have column metadata")); + } + let col_metadata = column.meta_data.unwrap(); + let column_type = col_metadata.type_; + let encodings = col_metadata.encodings; + let compression = col_metadata.codec; + let file_path = column.file_path.map(|v| v.to_owned()); + let file_offset = column.file_offset; + let num_values = col_metadata.num_values; + let total_compressed_size = col_metadata.total_compressed_size; + let total_uncompressed_size = col_metadata.total_uncompressed_size; + let data_page_offset = col_metadata.data_page_offset; + let index_page_offset = col_metadata.index_page_offset; + let dictionary_page_offset = col_metadata.dictionary_page_offset; + let statistics = convert_stats(column_type, col_metadata.statistics)?; + let encoding_stats = col_metadata.encoding_stats; + let bloom_filter_offset = col_metadata.bloom_filter_offset; + let bloom_filter_length = col_metadata.bloom_filter_length; + let offset_index_offset = column.offset_index_offset; + let offset_index_length = column.offset_index_length; + let column_index_offset = column.column_index_offset; + let column_index_length = column.column_index_length; + let (unencoded_byte_array_data_bytes, repetition_level_histogram, definition_level_histogram) = + if let Some(size_stats) = col_metadata.size_statistics { + ( + size_stats.unencoded_byte_array_data_bytes, + size_stats.repetition_level_histogram, + size_stats.definition_level_histogram, + ) + } else { + (None, None, None) + }; + + let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from); + let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from); + + // FIXME: need column crypto + + let result = ColumnChunkMetaData { + column_descr, + encodings, + file_path, + file_offset, + num_values, + compression, + total_compressed_size, + total_uncompressed_size, + data_page_offset, + index_page_offset, + dictionary_page_offset, + statistics, + encoding_stats, + bloom_filter_offset, + bloom_filter_length, + offset_index_offset, + offset_index_length, + column_index_offset, + column_index_length, + unencoded_byte_array_data_bytes, + repetition_level_histogram, + definition_level_histogram, + #[cfg(feature = "encryption")] + column_crypto_metadata: column.crypto_metadata, + }; + Ok(result) +} + +fn convert_stats( + physical_type: Type, + thrift_stats: Option, +) -> Result> { + use crate::file::statistics::Statistics as FStatistics; + Ok(match thrift_stats { + Some(stats) => { + // Number of nulls recorded, when it is not available, we just mark it as 0. + // TODO this should be `None` if there is no information about NULLS. + // see https://github.com/apache/arrow-rs/pull/6216/files + let null_count = stats.null_count.unwrap_or(0); + + if null_count < 0 { + return Err(ParquetError::General(format!( + "Statistics null count is negative {null_count}", + ))); + } + + // Generic null count. + let null_count = Some(null_count as u64); + // Generic distinct count (count of distinct values occurring) + let distinct_count = stats.distinct_count.map(|value| value as u64); + // Whether or not statistics use deprecated min/max fields. + let old_format = stats.min_value.is_none() && stats.max_value.is_none(); + // Generic min value as bytes. + let min = if old_format { + stats.min + } else { + stats.min_value + }; + // Generic max value as bytes. + let max = if old_format { + stats.max + } else { + stats.max_value + }; + + fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize) -> Result<()> { + if let Some(min) = min { + if min.len() < len { + return Err(ParquetError::General( + "Insufficient bytes to parse min statistic".to_string(), + )); + } + } + if let Some(max) = max { + if max.len() < len { + return Err(ParquetError::General( + "Insufficient bytes to parse max statistic".to_string(), + )); + } + } + Ok(()) + } + + match physical_type { + Type::BOOLEAN => check_len(&min, &max, 1), + Type::INT32 | Type::FLOAT => check_len(&min, &max, 4), + Type::INT64 | Type::DOUBLE => check_len(&min, &max, 8), + Type::INT96 => check_len(&min, &max, 12), + _ => Ok(()), + }?; + + // Values are encoded using PLAIN encoding definition, except that + // variable-length byte arrays do not include a length prefix. + // + // Instead of using actual decoder, we manually convert values. + let res = match physical_type { + Type::BOOLEAN => FStatistics::boolean( + min.map(|data| data[0] != 0), + max.map(|data| data[0] != 0), + distinct_count, + null_count, + old_format, + ), + Type::INT32 => FStatistics::int32( + min.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())), + max.map(|data| i32::from_le_bytes(data[..4].try_into().unwrap())), + distinct_count, + null_count, + old_format, + ), + Type::INT64 => FStatistics::int64( + min.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())), + max.map(|data| i64::from_le_bytes(data[..8].try_into().unwrap())), + distinct_count, + null_count, + old_format, + ), + Type::INT96 => { + // INT96 statistics may not be correct, because comparison is signed + let min = if let Some(data) = min { + assert_eq!(data.len(), 12); + Some(Int96::try_from_le_slice(data)?) + } else { + None + }; + let max = if let Some(data) = max { + assert_eq!(data.len(), 12); + Some(Int96::try_from_le_slice(data)?) + } else { + None + }; + FStatistics::int96(min, max, distinct_count, null_count, old_format) + } + Type::FLOAT => FStatistics::float( + min.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), + max.map(|data| f32::from_le_bytes(data[..4].try_into().unwrap())), + distinct_count, + null_count, + old_format, + ), + Type::DOUBLE => FStatistics::double( + min.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), + max.map(|data| f64::from_le_bytes(data[..8].try_into().unwrap())), + distinct_count, + null_count, + old_format, + ), + Type::BYTE_ARRAY => FStatistics::ByteArray( + ValueStatistics::new( + min.map(ByteArray::from), + max.map(ByteArray::from), + distinct_count, + null_count, + old_format, + ) + .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false)) + .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)), + ), + Type::FIXED_LEN_BYTE_ARRAY => FStatistics::FixedLenByteArray( + ValueStatistics::new( + min.map(ByteArray::from).map(FixedLenByteArray::from), + max.map(ByteArray::from).map(FixedLenByteArray::from), + distinct_count, + null_count, + old_format, + ) + .with_max_is_exact(stats.is_max_value_exact.unwrap_or(false)) + .with_min_is_exact(stats.is_min_value_exact.unwrap_or(false)), + ), + }; + + Some(res) + } + None => None, + }) +} + +/// Create ParquetMetaData from thrift input. Note that this only decodes the file metadata in +/// the Parquet footer. Page indexes will need to be added later. +impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ParquetMetaData { + type Error = ParquetError; + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + let file_meta = super::thrift_gen::FileMetaData::try_from(prot)?; + + let version = file_meta.version; + let num_rows = file_meta.num_rows; + let row_groups = file_meta.row_groups; + let created_by = file_meta.created_by.map(|c| c.to_owned()); + let key_value_metadata = file_meta.key_value_metadata; + + let val = parquet_schema_from_array(file_meta.schema)?; + let schema_descr = Arc::new(SchemaDescriptor::new(val)); + + // need schema_descr to get final RowGroupMetaData + let row_groups = convert_row_groups(row_groups, schema_descr.clone())?; + + // need to map read column orders to actual values based on the schema + if file_meta + .column_orders + .as_ref() + .is_some_and(|cos| cos.len() != schema_descr.num_columns()) + { + return Err(general_err!("Column order length mismatch")); + } + + let column_orders = file_meta.column_orders.map(|cos| { + let mut res = Vec::with_capacity(cos.len()); + for (i, column) in schema_descr.columns().iter().enumerate() { + match cos[i] { + ColumnOrder::TYPE_DEFINED_ORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + _ => res.push(cos[i]), + } + } + res + }); + + let fmd = crate::file::metadata::FileMetaData::new( + version, + num_rows, + created_by, + key_value_metadata, + schema_descr, + column_orders, + ); + + Ok(ParquetMetaData::new(fmd, row_groups)) + } +} diff --git a/parquet/src/file/page_encoding_stats.rs b/parquet/src/file/page_encoding_stats.rs index 67ca2a3e4c71..281954d939dd 100644 --- a/parquet/src/file/page_encoding_stats.rs +++ b/parquet/src/file/page_encoding_stats.rs @@ -18,18 +18,19 @@ //! Per-page encoding information. use crate::basic::{Encoding, PageType}; -use crate::errors::Result; +use crate::errors::{ParquetError, Result}; +use crate::parquet_thrift::{FieldType, ThriftCompactInputProtocol}; +use crate::thrift_struct; +// TODO: This should probably all be moved to thrift_gen +thrift_struct!( /// PageEncodingStats for a column chunk and data page. -#[derive(Clone, Debug, PartialEq, Eq)] pub struct PageEncodingStats { - /// the page type (data/dic/...) - pub page_type: PageType, - /// encoding of the page - pub encoding: Encoding, - /// number of pages of this type with this encoding - pub count: i32, + 1: required PageType page_type; + 2: required Encoding encoding; + 3: required i32 count; } +); /// Converts Thrift definition into `PageEncodingStats`. pub fn try_from_thrift( diff --git a/parquet/src/parquet_macros.rs b/parquet/src/parquet_macros.rs index 40e148a75e72..ebd86d861507 100644 --- a/parquet/src/parquet_macros.rs +++ b/parquet/src/parquet_macros.rs @@ -72,37 +72,6 @@ macro_rules! thrift_enum { } } -#[macro_export] -#[allow(clippy::crate_in_macro_def)] -/// macro to generate rust enums for empty thrift structs used in unions -macro_rules! thrift_empty_struct { - ($identifier: ident) => { - #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] - pub struct $identifier {} - - impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier { - type Error = ParquetError; - fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { - prot.skip_empty_struct()?; - Ok(Self {}) - } - } - - // TODO: remove when we finally get rid of the format module - impl From for $identifier { - fn from(_: $crate::format::$identifier) -> Self { - Self {} - } - } - - impl From<$identifier> for crate::format::$identifier { - fn from(_: $identifier) -> Self { - Self {} - } - } - }; -} - /// macro to generate rust enums for thrift unions where all fields are typed with empty structs #[macro_export] #[allow(clippy::crate_in_macro_def)] @@ -166,22 +135,73 @@ macro_rules! thrift_union_all_empty { } } -/// macro to generate rust structs from a thrift struct definition +/// macro to generate rust enums for thrift unions where all variants are a mix of unit and tuple types. +/// this requires modifying the thrift IDL. For variants with empty structs as their type, +/// delete the typename (i.e. "1: EmptyStruct Var1;" => "1: Var1"). For variants with a non-empty +/// type, put the typename in parens (e.g" "1: Type Var1;" => "1: (Type) Var1;"). #[macro_export] -macro_rules! thrift_private_struct { - ($(#[$($def_attrs:tt)*])* struct $identifier:ident { $($(#[$($field_attrs:tt)*])* $field_id:literal : $required_or_optional:ident $field_type:ident $(< $element_type:ident >)? $field_name:ident $(= $default_value:literal)? $(;)?)* }) => { +#[allow(clippy::crate_in_macro_def)] +macro_rules! thrift_union { + ($(#[$($def_attrs:tt)*])* union $identifier:ident { $($(#[$($field_attrs:tt)*])* $field_id:literal : $( ( $field_type:ident $(< $element_type:ident >)? ) )? $field_name:ident $(;)?)* }) => { $(#[cfg_attr(not(doctest), $($def_attrs)*)])* - #[derive(Clone, Debug, PartialEq)] + #[derive(Clone, Debug, Eq, PartialEq)] #[allow(non_camel_case_types)] #[allow(non_snake_case)] - struct $identifier { - $($(#[cfg_attr(not(doctest), $($field_attrs)*)])* pub $field_name: $crate::__thrift_required_or_optional!($required_or_optional $crate::__thrift_field_type!($field_type $($element_type)?))),* + #[allow(missing_docs)] + pub enum $identifier { + $($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $field_name $( ( $crate::__thrift_union_type!{$field_type $($element_type)?} ) )?),* } impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier { type Error = ParquetError; + + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + prot.read_struct_begin()?; + let field_ident = prot.read_field_begin()?; + if field_ident.field_type == FieldType::Stop { + return Err(general_err!("Received empty union from remote {}", stringify!($identifier))); + } + let ret = match field_ident.id { + $($field_id => { + let val = $crate::__thrift_read_variant!(prot, $field_name $($field_type $($element_type)?)?); + val + })* + _ => { + return Err(general_err!("Unexpected {} {}", stringify!($identifier), field_ident.id)); + } + }; + let field_ident = prot.read_field_begin()?; + if field_ident.field_type != FieldType::Stop { + return Err(general_err!( + concat!("Received multiple fields for union from remote {}", stringify!($identifier)) + )); + } + prot.read_struct_end()?; + Ok(ret) + } + } + } +} + +/// macro to generate rust structs from a thrift struct definition +/// unlike enum and union, this macro will allow for visibility specifier +/// can also take optional lifetime for struct and elements within it (need e.g.) +#[macro_export] +macro_rules! thrift_struct { + ($(#[$($def_attrs:tt)*])* $vis:vis struct $identifier:ident $(< $lt:lifetime >)? { $($(#[$($field_attrs:tt)*])* $field_id:literal : $required_or_optional:ident $field_type:ident $(< $field_lt:lifetime >)? $(< $element_type:ident >)? $field_name:ident $(= $default_value:literal)? $(;)?)* }) => { + $(#[cfg_attr(not(doctest), $($def_attrs)*)])* + #[derive(Clone, Debug, Eq, PartialEq)] + #[allow(non_camel_case_types)] + #[allow(non_snake_case)] + #[allow(missing_docs)] + $vis struct $identifier $(<$lt>)? { + $($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $vis $field_name: $crate::__thrift_required_or_optional!($required_or_optional $crate::__thrift_field_type!($field_type $($field_lt)? $($element_type)?))),* + } + + impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier $(<$lt>)? { + type Error = ParquetError; fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { - $(let mut $field_name: Option<$field_type> = None;)* + $(let mut $field_name: Option<$crate::__thrift_field_type!($field_type $($field_lt)? $($element_type)?)> = None;)* prot.read_struct_begin()?; loop { let field_ident = prot.read_field_begin()?; @@ -190,7 +210,7 @@ macro_rules! thrift_private_struct { } match field_ident.id { $($field_id => { - let val = $crate::__thrift_read_field!(prot $field_type); + let val = $crate::__thrift_read_field!(prot, $field_type $($field_lt)? $($element_type)?); $field_name = Some(val); })* _ => { @@ -198,14 +218,24 @@ macro_rules! thrift_private_struct { } }; } + prot.read_struct_end()?; + $($crate::__thrift_result_required_or_optional!($required_or_optional $field_name);)* Ok(Self { - $($field_name: $crate::__thrift_result_required_or_optional!($required_or_optional $field_name)),* + $($field_name),* }) } } } } +/// macro to use when decoding struct fields +#[macro_export] +macro_rules! thrift_read_field { + ($field_name:ident, $prot:tt, $field_type:ident) => { + $field_name = Some($crate::__thrift_read_field!($prot, $field_type)); + }; +} + #[doc(hidden)] #[macro_export] macro_rules! __thrift_required_or_optional { @@ -217,35 +247,50 @@ macro_rules! __thrift_required_or_optional { #[macro_export] macro_rules! __thrift_result_required_or_optional { (required $field_name:ident) => { - $field_name.expect(&format!( - "Required field {} not present", - stringify!($field_name) - )) - }; - (optional $field_name:ident) => { - $field_name + let $field_name = $field_name.expect(concat!( + "Required field ", + stringify!($field_name), + " is missing", + )); }; + (optional $field_name:ident) => {}; } #[doc(hidden)] #[macro_export] macro_rules! __thrift_read_field { - ($prot:tt bool) => { - $prot.read_bool()? + ($prot:tt, list $lt:lifetime binary) => { + Vec::<&'a [u8]>::try_from(&mut *$prot)? + }; + ($prot:tt, list $lt:lifetime $element_type:ident) => { + Vec::<$element_type>::try_from(&mut *$prot)? }; - ($prot:tt i8) => { - $prot.read_i8()? + ($prot:tt, list string) => { + Vec::::try_from(&mut *$prot)? + }; + ($prot:tt, list $element_type:ident) => { + Vec::<$element_type>::try_from(&mut *$prot)? + }; + ($prot:tt, string $lt:lifetime) => { + <&$lt str>::try_from(&mut *$prot)? + }; + ($prot:tt, binary $lt:lifetime) => { + <&$lt [u8]>::try_from(&mut *$prot)? + }; + ($prot:tt, $field_type:ident $lt:lifetime) => { + $field_type::try_from(&mut *$prot)? }; - ($prot:tt i32) => { - $prot.read_i32()? + ($prot:tt, string) => { + String::try_from(&mut *$prot)? }; - ($prot:tt i64) => { - $prot.read_i64()? + ($prot:tt, binary) => { + // this one needs to not conflict with `list` + $prot.read_bytes()?.to_vec() }; - ($prot:tt string) => { - $prot.read_string()? + ($prot:tt, double) => { + $crate::parquet_thrift::OrderedF64::try_from(&mut *$prot)? }; - ($prot:tt $field_type:ident) => { + ($prot:tt, $field_type:ident) => { $field_type::try_from(&mut *$prot)? }; } @@ -253,12 +298,36 @@ macro_rules! __thrift_read_field { #[doc(hidden)] #[macro_export] macro_rules! __thrift_field_type { + (binary $lt:lifetime) => { &$lt [u8] }; + (string $lt:lifetime) => { &$lt str }; + ($field_type:ident $lt:lifetime) => { $field_type<$lt> }; + (list $lt:lifetime $element_type:ident) => { Vec< $crate::__thrift_field_type!($element_type $lt) > }; + (list string) => { Vec }; (list $element_type:ident) => { Vec< $crate::__thrift_field_type!($element_type) > }; - (set $element_type:ident) => { Vec< $crate::__thrift_field_type!($element_type) > }; (binary) => { Vec }; (string) => { String }; + (double) => { $crate::parquet_thrift::OrderedF64 }; ($field_type:ty) => { $field_type }; - (Box $element_type:ident) => { std::boxed::Box< $crate::field_type!($element_type) > }; - (Rc $element_type:ident) => { std::rc::Rc< $crate::__thrift_field_type!($element_type) > }; - (Arc $element_type:ident) => { std::sync::Arc< $crate::__thrift_field_type!($element_type) > }; +} + +#[doc(hidden)] +#[macro_export] +macro_rules! __thrift_union_type { + ($field_type:ident) => { $field_type }; + (list $field_type:ident) => { Vec<$field_type> }; +} + +#[doc(hidden)] +#[macro_export] +macro_rules! __thrift_read_variant { + ($prot:tt, $field_name:ident $field_type:ident) => { + Self::$field_name($field_type::try_from(&mut *$prot)?) + }; + ($prot:tt, $field_name:ident list $field_type:ident) => { + Self::$field_name(Vec::<$field_type>::try_from(&mut *$prot)?) + }; + ($prot:tt, $field_name:ident) => {{ + $prot.skip_empty_struct()?; + Self::$field_name + }}; } diff --git a/parquet/src/parquet_thrift.rs b/parquet/src/parquet_thrift.rs index 2514f3fc70ff..7f5fe475217f 100644 --- a/parquet/src/parquet_thrift.rs +++ b/parquet/src/parquet_thrift.rs @@ -20,8 +20,37 @@ // to not allocate byte arrays or strings. #![allow(dead_code)] +use std::cmp::Ordering; + use crate::errors::{ParquetError, Result}; +// Couldn't implement thrift structs with f64 do to lack of Eq +// for f64. This is a hacky workaround for now...there are other +// wrappers out there that should probably be used instead. +// thrift seems to re-export an impl from ordered-float +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct OrderedF64(f64); + +impl From for f64 { + fn from(value: OrderedF64) -> Self { + value.0 + } +} + +impl Eq for OrderedF64 {} // Marker trait, requires PartialEq + +impl Ord for OrderedF64 { + fn cmp(&self, other: &Self) -> Ordering { + self.0.total_cmp(&other.0) + } +} + +impl PartialOrd for OrderedF64 { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + // Thrift compact protocol types for struct fields. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub(crate) enum FieldType { @@ -350,6 +379,11 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { } } + fn skip_binary(&mut self) -> Result<()> { + let len = self.read_vlq()? as usize; + self.skip_bytes(len) + } + /// Skip a field with type `field_type` recursively until the default /// maximum skip depth is reached. pub(crate) fn skip(&mut self, field_type: FieldType) -> Result<()> { @@ -381,10 +415,7 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { FieldType::I32 => self.skip_vlq().map(|_| ()), FieldType::I64 => self.skip_vlq().map(|_| ()), FieldType::Double => self.skip_bytes(8).map(|_| ()), - FieldType::Binary => { - let len = self.read_vlq()? as usize; - self.skip_bytes(len) - } + FieldType::Binary => self.skip_binary().map(|_| ()), FieldType::Struct => { self.read_struct_begin()?; loop { @@ -413,3 +444,85 @@ impl<'b, 'a: 'b> ThriftCompactInputProtocol<'a> { fn eof_error() -> ParquetError { eof_err!("Unexpected EOF") } + +impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for bool { + type Error = ParquetError; + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + prot.read_bool() + } +} + +impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i8 { + type Error = ParquetError; + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + prot.read_i8() + } +} + +impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i16 { + type Error = ParquetError; + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + prot.read_i16() + } +} + +impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i32 { + type Error = ParquetError; + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + prot.read_i32() + } +} + +impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for i64 { + type Error = ParquetError; + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + prot.read_i64() + } +} + +impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for OrderedF64 { + type Error = ParquetError; + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + Ok(OrderedF64(prot.read_double()?)) + } +} + +impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for &'a str { + type Error = ParquetError; + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + prot.read_string() + } +} + +impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for String { + type Error = ParquetError; + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + Ok(prot.read_string()?.to_owned()) + } +} + +impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for &'a [u8] { + type Error = ParquetError; + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + prot.read_bytes() + } +} + +impl<'a, T> TryFrom<&mut ThriftCompactInputProtocol<'a>> for Vec +where + T: for<'b> TryFrom<&'b mut ThriftCompactInputProtocol<'a>>, + ParquetError: for<'b> From<>>::Error>, +{ + type Error = ParquetError; + + fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result { + let list_ident = prot.read_list_begin()?; + let mut res = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let val = T::try_from(prot)?; + res.push(val); + } + + Ok(res) + } +} diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 8cda0e0d1841..1406295c3a4f 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -17,8 +17,10 @@ //! Contains structs and methods to build Parquet schema and schema descriptors. +use std::vec::IntoIter; use std::{collections::HashMap, fmt, sync::Arc}; +use crate::file::metadata::thrift_gen::SchemaElement; use crate::file::metadata::HeapSize; use crate::basic::{ @@ -1026,11 +1028,14 @@ impl HeapSize for SchemaDescriptor { impl SchemaDescriptor { /// Creates new schema descriptor from Parquet schema. pub fn new(tp: TypePtr) -> Self { + const INIT_SCHEMA_DEPTH: usize = 16; assert!(tp.is_group(), "SchemaDescriptor should take a GroupType"); - let mut leaves = vec![]; - let mut leaf_to_base = Vec::new(); + let n_leaves = num_leaves(&tp); + let mut leaves = Vec::with_capacity(n_leaves); + let mut leaf_to_base = Vec::with_capacity(n_leaves); + let mut path = Vec::with_capacity(INIT_SCHEMA_DEPTH); for (root_idx, f) in tp.get_fields().iter().enumerate() { - let mut path = vec![]; + path.clear(); build_tree(f, root_idx, 0, 0, &mut leaves, &mut leaf_to_base, &mut path); } @@ -1109,6 +1114,26 @@ impl SchemaDescriptor { } } +// do a quick walk of the tree to get proper sizing for SchemaDescriptor arrays +fn num_leaves(tp: &TypePtr) -> usize { + let mut n_leaves = 0usize; + for f in tp.get_fields().iter() { + count_leaves(f, &mut n_leaves); + } + n_leaves +} + +fn count_leaves(tp: &TypePtr, n_leaves: &mut usize) { + match tp.as_ref() { + Type::PrimitiveType { .. } => *n_leaves += 1, + Type::GroupType { ref fields, .. } => { + for f in fields { + count_leaves(f, n_leaves); + } + } + } +} + fn build_tree<'a>( tp: &'a TypePtr, root_idx: usize, @@ -1287,7 +1312,7 @@ fn from_thrift_helper( .map(Repetition::try_from) .transpose()?; - let mut fields = vec![]; + let mut fields = Vec::with_capacity(n as usize); let mut next_index = index + 1; for _ in 0..n { let child_result = from_thrift_helper(elements, next_index)?; @@ -1402,6 +1427,151 @@ fn to_thrift_helper(schema: &Type, elements: &mut Vec`. + +// convert thrift decoded array of `SchemaElement` into this crate's representation of +// parquet types. this function consumes `elements`. +pub(crate) fn parquet_schema_from_array<'a>(elements: Vec>) -> Result { + let mut index = 0; + let num_elements = elements.len(); + let mut schema_nodes = Vec::with_capacity(1); // there should only be one element when done + + // turn into iterator so we can take ownership of elements of the vector + let mut elements = elements.into_iter(); + + while index < num_elements { + let t = schema_from_array_helper(&mut elements, num_elements, index)?; + index = t.0; + schema_nodes.push(t.1); + } + if schema_nodes.len() != 1 { + return Err(general_err!( + "Expected exactly one root node, but found {}", + schema_nodes.len() + )); + } + + if !schema_nodes[0].is_group() { + return Err(general_err!("Expected root node to be a group type")); + } + + Ok(schema_nodes.remove(0)) +} + +// recursive helper function for schema conversion +fn schema_from_array_helper<'a>( + elements: &mut IntoIter>, + num_elements: usize, + index: usize, +) -> Result<(usize, TypePtr)> { + // Whether or not the current node is root (message type). + // There is only one message type node in the schema tree. + let is_root_node = index == 0; + + if index >= num_elements { + return Err(general_err!( + "Index out of bound, index = {}, len = {}", + index, + num_elements + )); + } + let element = elements.next().expect("schema vector should not be empty"); + + // Check for empty schema + if let (true, None | Some(0)) = (is_root_node, element.num_children) { + let builder = Type::group_type_builder(element.name); + return Ok((index + 1, Arc::new(builder.build().unwrap()))); + } + + let converted_type = element.converted_type.unwrap_or(ConvertedType::NONE); + + // LogicalType is prefered to ConvertedType, but both may be present. + let logical_type = element.logical_type; + + check_logical_type(&logical_type)?; + + let field_id = element.field_id; + match element.num_children { + // From parquet-format: + // The children count is used to construct the nested relationship. + // This field is not set when the element is a primitive type + // Sometimes parquet-cpp sets num_children field to 0 for primitive types, so we + // have to handle this case too. + None | Some(0) => { + // primitive type + if element.repetition_type.is_none() { + return Err(general_err!( + "Repetition level must be defined for a primitive type" + )); + } + let repetition = element.repetition_type.unwrap(); + if let Some(type_) = element.type_ { + let physical_type = type_; + let length = element.type_length.unwrap_or(-1); + let scale = element.scale.unwrap_or(-1); + let precision = element.precision.unwrap_or(-1); + let name = element.name; + let builder = Type::primitive_type_builder(name, physical_type) + .with_repetition(repetition) + .with_converted_type(converted_type) + .with_logical_type(logical_type) + .with_length(length) + .with_precision(precision) + .with_scale(scale) + .with_id(field_id); + Ok((index + 1, Arc::new(builder.build()?))) + } else { + let mut builder = Type::group_type_builder(element.name) + .with_converted_type(converted_type) + .with_logical_type(logical_type) + .with_id(field_id); + if !is_root_node { + // Sometimes parquet-cpp and parquet-mr set repetition level REQUIRED or + // REPEATED for root node. + // + // We only set repetition for group types that are not top-level message + // type. According to parquet-format: + // Root of the schema does not have a repetition_type. + // All other types must have one. + builder = builder.with_repetition(repetition); + } + Ok((index + 1, Arc::new(builder.build().unwrap()))) + } + } + Some(n) => { + let repetition = element.repetition_type; + + let mut fields = Vec::with_capacity(n as usize); + let mut next_index = index + 1; + for _ in 0..n { + let child_result = schema_from_array_helper(elements, num_elements, next_index)?; + next_index = child_result.0; + fields.push(child_result.1); + } + + let mut builder = Type::group_type_builder(element.name) + .with_converted_type(converted_type) + .with_logical_type(logical_type) + .with_fields(fields) + .with_id(field_id); + if let Some(rep) = repetition { + // Sometimes parquet-cpp and parquet-mr set repetition level REQUIRED or + // REPEATED for root node. + // + // We only set repetition for group types that are not top-level message + // type. According to parquet-format: + // Root of the schema does not have a repetition_type. + // All other types must have one. + if !is_root_node { + builder = builder.with_repetition(rep); + } + } + Ok((next_index, Arc::new(builder.build().unwrap()))) + } + } +} + #[cfg(test)] mod tests { use super::*;