diff --git a/parquet-variant-compute/benches/variant_kernels.rs b/parquet-variant-compute/benches/variant_kernels.rs index 8fd6af333fed..4f9624a536b6 100644 --- a/parquet-variant-compute/benches/variant_kernels.rs +++ b/parquet-variant-compute/benches/variant_kernels.rs @@ -17,6 +17,7 @@ use arrow::array::{Array, ArrayRef, StringArray}; use arrow::util::test_util::seedable_rng; +use arrow_schema::{DataType, Field, Fields}; use criterion::{criterion_group, criterion_main, Criterion}; use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::variant_get::{variant_get, GetOptions}; @@ -27,6 +28,14 @@ use rand::Rng; use rand::SeedableRng; use std::fmt::Write; use std::sync::Arc; + +fn unshredded_schema_fields() -> Fields { + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + Fields::from(vec![metadata_field, value_field]) +} + fn benchmark_batch_json_string_to_variant(c: &mut Criterion) { let input_array = StringArray::from_iter_values(json_repeated_struct(8000)); let array_ref: ArrayRef = Arc::new(input_array); @@ -93,7 +102,7 @@ pub fn variant_get_bench(c: &mut Criterion) { }; c.bench_function("variant_get_primitive", |b| { - b.iter(|| variant_get(&input.clone(), options.clone())) + b.iter(|| variant_get(&input.clone(), options.clone(), unshredded_schema_fields())) }); } @@ -108,7 +117,7 @@ criterion_main!(benches); fn create_primitive_variant_array(size: usize) -> VariantArray { let mut rng = StdRng::seed_from_u64(42); - let mut variant_builder = VariantArrayBuilder::new(1); + let mut variant_builder = VariantArrayBuilder::try_new(1, unshredded_schema_fields()).unwrap(); for _ in 0..size { let mut builder = VariantBuilder::new(); diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs index 05207d094a25..7f81bc2a9985 100644 --- a/parquet-variant-compute/src/from_json.rs +++ b/parquet-variant-compute/src/from_json.rs @@ -20,7 +20,7 @@ use crate::{VariantArray, VariantArrayBuilder}; use arrow::array::{Array, ArrayRef, StringArray}; -use arrow_schema::ArrowError; +use arrow_schema::{ArrowError, DataType, Field, Fields}; use parquet_variant_json::json_to_variant; /// Parse a batch of JSON strings into a batch of Variants represented as @@ -34,7 +34,14 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result Result { + let (metadata_idx, metadata_field) = fields + .iter() + .enumerate() + .find(|(_, f)| f.name() == METADATA) + .ok_or_else(|| { + ArrowError::InvalidArgumentError( + "Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(), + ) + })?; + + if metadata_field.is_nullable() { + return Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: metadata field can not be nullable".to_string(), + )); + } + + if metadata_field.data_type() != &DataType::BinaryView { + return Err(ArrowError::NotYetImplemented(format!( + "VariantArray 'metadata' field must be BinaryView, got {}", + metadata_field.data_type() + ))); + } + + Ok(metadata_idx) + } + + /// Both `value` and `typed_value` are optional fields used together to encode a single value. + /// + /// Values in the two fields must be interpreted according to the following table: + /// + /// | `value` | `typed_value` | Meaning | + /// |----------|---------------|-------------------------------------------------------------| + /// | null | null | The value is missing; only valid for shredded object fields | + /// | non-null | null | The value is present and may be any type, including null | + /// | null | non-null | The value is present and is the shredded type | + /// | non-null | non-null | The value is present and is a partially shredded object | + fn validate_value_and_typed_value( + fields: &Fields, + inside_shredded_object: bool, + ) -> Result { + let value_field_res = fields.iter().enumerate().find(|(_, f)| f.name() == VALUE); + let typed_value_field_res = fields + .iter() + .enumerate() + .find(|(_, f)| f.name() == TYPED_VALUE); + + // validate types + if let Some((_, value_field)) = value_field_res { + if value_field.data_type() != &DataType::BinaryView { + return Err(ArrowError::NotYetImplemented(format!( + "VariantArray 'value' field must be BinaryView, got {}", + value_field.data_type() + ))); + } + } + + if let Some((_, typed_value_field)) = typed_value_field_res { + match typed_value_field.data_type() { + DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal32(_, _) + | DataType::Decimal64(_, _) + | DataType::Decimal128(_, _) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Utf8View + | DataType::BinaryView + | DataType::ListView(_) + | DataType::Struct(_) + | DataType::Dictionary(_, _) => {} + foreign => { + return Err(ArrowError::NotYetImplemented(format!( + "Unsupported VariantArray 'typed_value' field, got {foreign}" + ))) + } + } + } + + match (value_field_res, typed_value_field_res) { + (None, None) => { + if inside_shredded_object { + return Ok(ValueSchema::MissingValue); + } + + Err(ArrowError::InvalidArgumentError("Invalid VariantArray: StructArray must contain either `value` or `typed_value` fields or both.".to_string())) + } + (Some(value_field), None) => Ok(ValueSchema::Value(value_field.0)), + (None, Some(shredded_field)) => Ok(ValueSchema::ShreddedValue(shredded_field.0)), + (Some((value_idx, _)), Some((shredded_value_idx, shredded_field))) => { + match shredded_field.data_type() { + DataType::Struct(fields) => { + let _ = Self::validate_value_and_typed_value(fields, false)?; + + Ok(ValueSchema::PartiallyShredded { + value_idx, + shredded_value_idx, + }) + } + DataType::Dictionary(_key, shredded_schema) => { + if let DataType::Struct(fields) = shredded_schema.as_ref() { + let _ = Self::validate_value_and_typed_value(fields, true)?; + + Ok(ValueSchema::PartiallyShredded { + value_idx, + shredded_value_idx, + }) + } else { + Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: shredded fields must be of struct or list types".to_string(), + )) + } + } + _ => Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: shredded fields must be of struct or list types" + .to_string(), + )), + } + } + } + } + + pub fn try_new(fields: Fields) -> Result { + let metadata_idx = Self::validate_metadata(&fields)?; + let value_schema = Self::validate_value_and_typed_value(&fields, false)?; + + Ok(Self { + inner: fields.clone(), + metadata_idx, + value_schema, + }) + } + + pub fn inner(&self) -> &Fields { + &self.inner + } + + pub fn into_inner(self) -> Fields { + self.inner + } + + pub fn metadata_idx(&self) -> usize { + self.metadata_idx + } + + pub fn metadata(&self) -> &FieldRef { + self.inner.get(self.metadata_idx).unwrap() + } + + pub fn value_idx(&self) -> Option { + match self.value_schema { + ValueSchema::MissingValue => None, + ValueSchema::ShreddedValue(_) => None, + ValueSchema::Value(value_idx) => Some(value_idx), + ValueSchema::PartiallyShredded { value_idx, .. } => Some(value_idx), + } + } + + pub fn value(&self) -> Option<&FieldRef> { + self.value_idx().map(|i| self.inner.get(i).unwrap()) + } + + pub fn shredded_value_idx(&self) -> Option { + match self.value_schema { + ValueSchema::MissingValue => None, + ValueSchema::Value(_) => None, + ValueSchema::ShreddedValue(shredded_idx) => Some(shredded_idx), + ValueSchema::PartiallyShredded { + shredded_value_idx, .. + } => Some(shredded_value_idx), + } + } + + pub fn shredded_value(&self) -> Option<&FieldRef> { + self.shredded_value_idx() + .map(|i| self.inner.get(i).unwrap()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use arrow_schema::Field; + + #[test] + fn test_unshredded_variant_schema() { + // a regular variant schema + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let fields = Fields::from(vec![metadata_field, value_field]); + let variant_schema = VariantSchema::try_new(fields).unwrap(); + + assert_eq!(variant_schema.metadata_idx, 0); + assert_eq!(variant_schema.value_schema, ValueSchema::Value(1)); + } + + #[test] + fn test_unshredded_variant_schema_order_agnostic() { + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let fields = Fields::from(vec![value_field, metadata_field]); // note the order switch + let variant_schema = VariantSchema::try_new(fields).unwrap(); + + assert_eq!(variant_schema.value_schema, ValueSchema::Value(0)); + assert_eq!(variant_schema.metadata_idx, 1); + } + + #[test] + fn test_shredded_variant_schema() { + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let shredded_field = Field::new("typed_value", DataType::Int8, true); + + let fields = Fields::from(vec![metadata_field, shredded_field]); + let variant_schema = VariantSchema::try_new(fields).unwrap(); + + assert_eq!(variant_schema.metadata_idx, 0); + assert_eq!(variant_schema.value_schema, ValueSchema::ShreddedValue(1)); + } + + #[test] + fn test_regular_variant_schema_missing_metadata() { + let value_field = Field::new("value", DataType::BinaryView, false); + let schema = Fields::from(vec![value_field]); + + let err = VariantSchema::try_new(schema).unwrap_err(); + + assert_eq!( + err.to_string(), + "Invalid argument error: Invalid VariantArray: StructArray must contain a 'metadata' field" + ); + } + + #[test] + fn test_regular_variant_schema_nullable_metadata() { + let metadata_field = Field::new("metadata", DataType::BinaryView, true); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + let err = VariantSchema::try_new(schema).unwrap_err(); + + assert_eq!( + err.to_string(), + "Invalid argument error: Invalid VariantArray: metadata field can not be nullable" + ); + } +} diff --git a/parquet-variant-compute/src/variant_array.rs b/parquet-variant-compute/src/variant_array.rs index 843352d1ff01..8488d3adea96 100644 --- a/parquet-variant-compute/src/variant_array.rs +++ b/parquet-variant-compute/src/variant_array.rs @@ -24,6 +24,8 @@ use parquet_variant::Variant; use std::any::Any; use std::sync::Arc; +use crate::shredding::VariantSchema; + /// An array of Parquet [`Variant`] values /// /// A [`VariantArray`] wraps an Arrow [`StructArray`] that stores the underlying @@ -60,11 +62,7 @@ pub struct VariantArray { /// int8. inner: StructArray, - /// Reference to the metadata column of inner - metadata_ref: ArrayRef, - - /// Reference to the value column of inner - value_ref: ArrayRef, + variant_schema: VariantSchema, } impl VariantArray { @@ -93,35 +91,12 @@ impl VariantArray { "Invalid VariantArray: requires StructArray as input".to_string(), )); }; - // Ensure the StructArray has a metadata field of BinaryView - let Some(metadata_field) = VariantArray::find_metadata_field(inner) else { - return Err(ArrowError::InvalidArgumentError( - "Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(), - )); - }; - if metadata_field.data_type() != &DataType::BinaryView { - return Err(ArrowError::NotYetImplemented(format!( - "VariantArray 'metadata' field must be BinaryView, got {}", - metadata_field.data_type() - ))); - } - let Some(value_field) = VariantArray::find_value_field(inner) else { - return Err(ArrowError::InvalidArgumentError( - "Invalid VariantArray: StructArray must contain a 'value' field".to_string(), - )); - }; - if value_field.data_type() != &DataType::BinaryView { - return Err(ArrowError::NotYetImplemented(format!( - "VariantArray 'value' field must be BinaryView, got {}", - value_field.data_type() - ))); - } + let variant_schema = VariantSchema::try_new(inner.fields().clone())?; Ok(Self { inner: inner.clone(), - metadata_ref: metadata_field, - value_ref: value_field, + variant_schema, }) } @@ -137,34 +112,41 @@ impl VariantArray { /// Return the [`Variant`] instance stored at the given row /// - /// Panics if the index is out of bounds. + /// Panics if the index is out of bounds or value array does not exist. /// /// Note: Does not do deep validation of the [`Variant`], so it is up to the /// caller to ensure that the metadata and value were constructed correctly. + /// + /// Todo: reconstruct partially shredded or shredded variants pub fn value(&self, index: usize) -> Variant { let metadata = self.metadata_field().as_binary_view().value(index); - let value = self.value_field().as_binary_view().value(index); + let value = self + .value_field() + .expect("value field does not exist") + .as_binary_view() + .value(index); Variant::new(metadata, value) } - fn find_metadata_field(array: &StructArray) -> Option { - array.column_by_name("metadata").cloned() - } - - fn find_value_field(array: &StructArray) -> Option { - array.column_by_name("value").cloned() - } - /// Return a reference to the metadata field of the [`StructArray`] pub fn metadata_field(&self) -> &ArrayRef { - // spec says fields order is not guaranteed, so we search by name - &self.metadata_ref + let metadata_idx = self.variant_schema.metadata_idx(); + + self.inner.column(metadata_idx) } /// Return a reference to the value field of the `StructArray` - pub fn value_field(&self) -> &ArrayRef { - // spec says fields order is not guaranteed, so we search by name - &self.value_ref + pub fn value_field(&self) -> Option<&ArrayRef> { + self.variant_schema + .value_idx() + .map(|i| self.inner.column(i)) + } + + /// Return a reference to the shredded value field of the `StructArray` + pub fn shredded_value_field(&self) -> Option<&ArrayRef> { + self.variant_schema + .shredded_value_idx() + .map(|i| self.inner.column(i)) } } @@ -187,12 +169,9 @@ impl Array for VariantArray { fn slice(&self, offset: usize, length: usize) -> ArrayRef { let slice = self.inner.slice(offset, length); - let met = self.metadata_ref.slice(offset, length); - let val = self.value_ref.slice(offset, length); Arc::new(Self { inner: slice, - metadata_ref: met, - value_ref: val, + variant_schema: self.variant_schema.clone(), }) } @@ -258,14 +237,14 @@ mod test { let err = VariantArray::try_new(Arc::new(array)); assert_eq!( err.unwrap_err().to_string(), - "Invalid argument error: Invalid VariantArray: StructArray must contain a 'value' field" + "Invalid argument error: Invalid VariantArray: StructArray must contain either `value` or `typed_value` fields or both." ); } #[test] fn invalid_metadata_field_type() { let fields = Fields::from(vec![ - Field::new("metadata", DataType::Binary, true), // Not yet supported + Field::new("metadata", DataType::Binary, false), // Not yet supported Field::new("value", DataType::BinaryView, true), ]); let array = StructArray::new( @@ -283,7 +262,7 @@ mod test { #[test] fn invalid_value_field_type() { let fields = Fields::from(vec![ - Field::new("metadata", DataType::BinaryView, true), + Field::new("metadata", DataType::BinaryView, false), Field::new("value", DataType::Binary, true), // Not yet supported ]); let array = StructArray::new( diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index 6a8dba06f15d..58264e3d7b52 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -17,9 +17,9 @@ //! [`VariantArrayBuilder`] implementation -use crate::VariantArray; +use crate::{shredding::VariantSchema, VariantArray}; use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray}; -use arrow_schema::{DataType, Field, Fields}; +use arrow_schema::{ArrowError, Fields}; use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt}; use std::sync::Arc; @@ -39,8 +39,14 @@ use std::sync::Arc; /// # use arrow::array::Array; /// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt}; /// # use parquet_variant_compute::VariantArrayBuilder; +/// # use arrow_schema::{DataType, Field, Fields}; /// // Create a new VariantArrayBuilder with a capacity of 100 rows -/// let mut builder = VariantArrayBuilder::new(100); +/// +/// let metadata_field = Field::new("metadata", DataType::BinaryView, false); +/// let value_field = Field::new("value", DataType::BinaryView, false); +/// let schema = Fields::from(vec![metadata_field, value_field]); +/// +/// let mut builder = VariantArrayBuilder::try_new(100, schema).unwrap(); /// // append variant values /// builder.append_variant(Variant::from(42)); /// // append a null row (note not a Variant::Null) @@ -81,26 +87,19 @@ pub struct VariantArrayBuilder { /// (offset, len) pairs for locations of values in the buffer value_locations: Vec<(usize, usize)>, /// The fields of the final `StructArray` - /// - /// TODO: 1) Add extension type metadata - /// TODO: 2) Add support for shredding - fields: Fields, + schema: VariantSchema, } impl VariantArrayBuilder { - pub fn new(row_capacity: usize) -> Self { - // The subfields are expected to be non-nullable according to the parquet variant spec. - let metadata_field = Field::new("metadata", DataType::BinaryView, false); - let value_field = Field::new("value", DataType::BinaryView, false); - - Self { + pub fn try_new(row_capacity: usize, schema: Fields) -> Result { + Ok(Self { nulls: NullBufferBuilder::new(row_capacity), metadata_buffer: Vec::new(), // todo allocation capacity metadata_locations: Vec::with_capacity(row_capacity), value_buffer: Vec::new(), value_locations: Vec::with_capacity(row_capacity), - fields: Fields::from(vec![metadata_field, value_field]), - } + schema: VariantSchema::try_new(schema)?, + }) } /// Build the final builder @@ -111,7 +110,7 @@ impl VariantArrayBuilder { metadata_locations, value_buffer, value_locations, - fields, + schema, } = self; let metadata_array = binary_view_array_from_buffers(metadata_buffer, metadata_locations); @@ -120,7 +119,7 @@ impl VariantArrayBuilder { // The build the final struct array let inner = StructArray::new( - fields, + schema.into_inner(), vec![ Arc::new(metadata_array) as ArrayRef, Arc::new(value_array) as ArrayRef, @@ -161,7 +160,13 @@ impl VariantArrayBuilder { /// ``` /// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt}; /// # use parquet_variant_compute::{VariantArray, VariantArrayBuilder}; - /// let mut array_builder = VariantArrayBuilder::new(10); + /// # use arrow_schema::{Field, Fields, DataType}; + /// + /// let metadata_field = Field::new("metadata", DataType::BinaryView, false); + /// let value_field = Field::new("value", DataType::BinaryView, false); + /// let schema = Fields::from(vec![metadata_field, value_field]); + /// + /// let mut array_builder = VariantArrayBuilder::try_new(10, schema).unwrap(); /// /// // First row has a string /// let mut variant_builder = array_builder.variant_builder(); @@ -359,11 +364,17 @@ fn binary_view_array_from_buffers( mod test { use super::*; use arrow::array::Array; + use arrow_schema::{DataType, Field}; /// Test that both the metadata and value buffers are non nullable #[test] fn test_variant_array_builder_non_nullable() { - let mut builder = VariantArrayBuilder::new(10); + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + let mut builder = VariantArrayBuilder::try_new(10, schema).unwrap(); builder.append_null(); // should not panic builder.append_variant(Variant::from(42i32)); let variant_array = builder.build(); @@ -375,7 +386,7 @@ mod test { // the metadata and value fields of non shredded variants should not be null assert!(variant_array.metadata_field().nulls().is_none()); - assert!(variant_array.value_field().nulls().is_none()); + assert!(variant_array.value_field().unwrap().nulls().is_none()); let DataType::Struct(fields) = variant_array.data_type() else { panic!("Expected VariantArray to have Struct data type"); }; @@ -391,7 +402,13 @@ mod test { /// Test using sub builders to append variants #[test] fn test_variant_array_builder_variant_builder() { - let mut builder = VariantArrayBuilder::new(10); + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + let mut builder = VariantArrayBuilder::try_new(10, schema).unwrap(); + builder.append_null(); // should not panic builder.append_variant(Variant::from(42i32)); @@ -431,7 +448,12 @@ mod test { /// Test using non-finished sub builders to append variants #[test] fn test_variant_array_builder_variant_builder_reset() { - let mut builder = VariantArrayBuilder::new(10); + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + let mut builder = VariantArrayBuilder::try_new(10, schema).unwrap(); // make a sub-object in the first row let mut sub_builder = builder.variant_builder(); diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index b3a3d9e41f13..33e33d9898f4 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -21,7 +21,7 @@ use arrow::{ compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, Field}; +use arrow_schema::{ArrowError, Field, Fields}; use parquet_variant::VariantPath; use crate::{VariantArray, VariantArrayBuilder}; @@ -32,7 +32,11 @@ use crate::{VariantArray, VariantArrayBuilder}; /// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point /// to the specified path. /// 2. `as_type: Some()`: an array of the specified type is returned. -pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { +pub fn variant_get( + input: &ArrayRef, + options: GetOptions, + shredded_schema: Fields, +) -> Result { let variant_array: &VariantArray = input.as_any().downcast_ref().ok_or_else(|| { ArrowError::InvalidArgumentError( "expected a VariantArray as the input for variant_get".to_owned(), @@ -45,7 +49,7 @@ pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { ))); } - let mut builder = VariantArrayBuilder::new(variant_array.len()); + let mut builder = VariantArrayBuilder::try_new(variant_array.len(), shredded_schema)?; for i in 0..variant_array.len() { let new_variant = variant_array.value(i); // TODO: perf? @@ -90,6 +94,9 @@ mod test { use std::sync::Arc; use arrow::array::{Array, ArrayRef, StringArray}; + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Fields; use parquet_variant::VariantPath; use crate::batch_json_string_to_variant; @@ -103,8 +110,17 @@ mod test { let input_variant_array_ref: ArrayRef = Arc::new(batch_json_string_to_variant(&input_array_ref).unwrap()); - let result = - variant_get(&input_variant_array_ref, GetOptions::new_with_path(path)).unwrap(); + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + let result = variant_get( + &input_variant_array_ref, + GetOptions::new_with_path(path), + schema, + ) + .unwrap(); // Create expected array from JSON string let expected_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(expected_json)]));