diff --git a/parquet-geospatial/Cargo.toml b/parquet-geospatial/Cargo.toml index aefff9462b01..0a28ed3c5bcd 100644 --- a/parquet-geospatial/Cargo.toml +++ b/parquet-geospatial/Cargo.toml @@ -33,7 +33,7 @@ rust-version = { workspace = true } [dependencies] arrow-schema = { workspace = true } geo-traits = { version = "0.3" } -wkb = { version = "0.9" } +wkb = { version = "0.9.1" } [dev-dependencies] wkt = { version = "0.14" } diff --git a/parquet-geospatial/src/lib.rs b/parquet-geospatial/src/lib.rs index 1929c00fd88a..2ec60c44cc16 100644 --- a/parquet-geospatial/src/lib.rs +++ b/parquet-geospatial/src/lib.rs @@ -29,3 +29,4 @@ pub mod bounding; pub mod interval; +pub mod testing; diff --git a/parquet-geospatial/src/testing.rs b/parquet-geospatial/src/testing.rs new file mode 100644 index 000000000000..2807a53ac9dc --- /dev/null +++ b/parquet-geospatial/src/testing.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Testing utilities for geospatial Parquet types + +/// Build well-known binary representing a point with the given XY coordinate +pub fn wkb_point_xy(x: f64, y: f64) -> Vec { + let mut item: [u8; 21] = [0; 21]; + item[0] = 0x01; + item[1] = 0x01; + item[5..13].copy_from_slice(x.to_le_bytes().as_slice()); + item[13..21].copy_from_slice(y.to_le_bytes().as_slice()); + item.to_vec() +} + +/// Build well-known binary representing a point with the given XYZM coordinate +pub fn wkb_point_xyzm(x: f64, y: f64, z: f64, m: f64) -> Vec { + let mut item: [u8; 37] = [0; 37]; + item[0] = 0x01; + item[1..5].copy_from_slice(3001_u32.to_le_bytes().as_slice()); + item[5..13].copy_from_slice(x.to_le_bytes().as_slice()); + item[13..21].copy_from_slice(y.to_le_bytes().as_slice()); + item[21..29].copy_from_slice(z.to_le_bytes().as_slice()); + item[29..37].copy_from_slice(m.to_le_bytes().as_slice()); + item.to_vec() +} + +#[cfg(test)] +mod test { + + use wkb::reader::Wkb; + + use super::*; + + #[test] + fn test_wkb_item() { + let bytes = wkb_point_xy(1.0, 2.0); + let geometry = Wkb::try_new(&bytes).unwrap(); + let mut wkt = String::new(); + wkt::to_wkt::write_geometry(&mut wkt, &geometry).unwrap(); + assert_eq!(wkt, "POINT(1 2)"); + } + + #[test] + fn test_wkb_point_xyzm() { + let bytes = wkb_point_xyzm(1.0, 2.0, 3.0, 4.0); + let geometry = Wkb::try_new(&bytes).unwrap(); + let mut wkt = String::new(); + wkt::to_wkt::write_geometry(&mut wkt, &geometry).unwrap(); + assert_eq!(wkt, "POINT ZM(1 2 3 4)"); + } +} diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 09b9a916917a..aa0071ca38e5 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -45,6 +45,7 @@ arrow-data = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } arrow-select = { workspace = true, optional = true } arrow-ipc = { workspace = true, optional = true } +parquet-geospatial = { workspace = true, optional = true } parquet-variant = { workspace = true, optional = true } parquet-variant-json = { workspace = true, optional = true } parquet-variant-compute = { workspace = true, optional = true } @@ -131,6 +132,8 @@ flate2-rust_backened = ["flate2/rust_backend"] flate2-zlib-rs = ["flate2/zlib-rs"] # Enable parquet variant support variant_experimental = ["arrow", "parquet-variant", "parquet-variant-json", "parquet-variant-compute"] +# Enable geospatial support +geospatial = ["parquet-geospatial"] [[example]] diff --git a/parquet/README.md b/parquet/README.md index 5e087ac6a929..515cb037d6fb 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -65,6 +65,7 @@ The `parquet` crate provides the following features which may be enabled in your - `simdutf8` (default) - Use the [`simdutf8`] crate for SIMD-accelerated UTF-8 validation - `encryption` - support for reading / writing encrypted Parquet files - `variant_experimental` - ⚠️ Experimental [Parquet Variant] support, which may change, even between minor releases. +- `geospatial` - ⚠️ Experimental geospatial support, which may change, even between minor releases. [`arrow`]: https://crates.io/crates/arrow [`simdutf8`]: https://crates.io/crates/simdutf8 diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 2deb3c535a12..5cc920d01fe6 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -23,6 +23,8 @@ use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder}; use crate::encodings::rle::RleEncoder; use crate::errors::{ParquetError, Result}; use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; +use crate::geospatial::accumulator::{try_new_geo_stats_accumulator, GeoStatsAccumulator}; +use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; @@ -421,6 +423,7 @@ pub struct ByteArrayEncoder { min_value: Option, max_value: Option, bloom_filter: Option, + geo_stats_accumulator: Option>, } impl ColumnValueEncoder for ByteArrayEncoder { @@ -447,6 +450,8 @@ impl ColumnValueEncoder for ByteArrayEncoder { let statistics_enabled = props.statistics_enabled(descr.path()); + let geo_stats_accumulator = try_new_geo_stats_accumulator(descr); + Ok(Self { fallback, statistics_enabled, @@ -454,6 +459,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { dict_encoder: dictionary, min_value: None, max_value: None, + geo_stats_accumulator, }) } @@ -536,6 +542,10 @@ impl ColumnValueEncoder for ByteArrayEncoder { _ => self.fallback.flush_data_page(min_value, max_value), } } + + fn flush_geospatial_statistics(&mut self) -> Option> { + self.geo_stats_accumulator.as_mut().map(|a| a.finish())? + } } /// Encodes the provided `values` and `indices` to `encoder` @@ -547,7 +557,9 @@ where T::Item: Copy + Ord + AsRef<[u8]>, { if encoder.statistics_enabled != EnabledStatistics::None { - if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) { + if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() { + update_geo_stats_accumulator(accumulator.as_mut(), values, indices.iter().cloned()); + } else if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) { if encoder.min_value.as_ref().is_none_or(|m| m > &min) { encoder.min_value = Some(min); } @@ -595,3 +607,20 @@ where } Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into())) } + +/// Updates geospatial statistics for the provided array and indices +fn update_geo_stats_accumulator( + bounder: &mut dyn GeoStatsAccumulator, + array: T, + valid: impl Iterator, +) where + T: ArrayAccessor, + T::Item: Copy + Ord + AsRef<[u8]>, +{ + if bounder.is_valid() { + for idx in valid { + let val = array.value(idx); + bounder.update_wkb(val.as_ref()); + } + } +} diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 6b4dc87abba4..3923a03a8ebb 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -228,11 +228,18 @@ impl ArrowWriter { options: ArrowWriterOptions, ) -> Result { let mut props = options.properties; - let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types()); - if let Some(schema_root) = &options.schema_root { - converter = converter.schema_root(schema_root); - } - let schema = converter.convert(&arrow_schema)?; + + let schema = if let Some(parquet_schema) = options.schema_descr { + parquet_schema.clone() + } else { + let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types()); + if let Some(schema_root) = &options.schema_root { + converter = converter.schema_root(schema_root); + } + + converter.convert(&arrow_schema)? + }; + if !options.skip_arrow_metadata { // add serialized arrow schema add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); @@ -457,6 +464,7 @@ pub struct ArrowWriterOptions { properties: WriterProperties, skip_arrow_metadata: bool, schema_root: Option, + schema_descr: Option, } impl ArrowWriterOptions { @@ -490,6 +498,18 @@ impl ArrowWriterOptions { ..self } } + + /// Explicitly specify the Parquet schema to be used + /// + /// If omitted (the default), the [`ArrowSchemaConverter`] is used to compute the + /// Parquet [`SchemaDescriptor`]. This may be used When the [`SchemaDescriptor`] is + /// already known or must be calculated using custom logic. + pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self { + Self { + schema_descr: Some(schema_descr), + ..self + } + } } /// A single column chunk produced by [`ArrowColumnWriter`] @@ -1513,7 +1533,7 @@ mod tests { use crate::file::page_index::column_index::ColumnIndexMetaData; use crate::file::reader::SerializedPageReader; use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol}; - use crate::schema::types::ColumnPath; + use crate::schema::types::{ColumnPath, Type}; use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Schema}; use arrow::error::Result as ArrowResult; @@ -4135,6 +4155,69 @@ mod tests { } } + #[test] + fn test_arrow_writer_explicit_schema() { + // Write an int32 array using explicit int64 storage + let batch_schema = Arc::new(Schema::new(vec![Field::new( + "integers", + DataType::Int32, + true, + )])); + let parquet_schema = Type::group_type_builder("root") + .with_fields(vec![Type::primitive_type_builder( + "integers", + crate::basic::Type::INT64, + ) + .build() + .unwrap() + .into()]) + .build() + .unwrap(); + let parquet_schema_descr = SchemaDescriptor::new(parquet_schema.into()); + + let batch = RecordBatch::try_new( + batch_schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _], + ) + .unwrap(); + + let explicit_schema_options = + ArrowWriterOptions::new().with_parquet_schema(parquet_schema_descr); + let mut buf = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new_with_options( + &mut buf, + batch_schema.clone(), + explicit_schema_options, + ) + .unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let bytes = Bytes::from(buf); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap(); + + let expected_schema = Arc::new(Schema::new(vec![Field::new( + "integers", + DataType::Int64, + true, + )])); + assert_eq!(reader_builder.schema(), &expected_schema); + + let batches = reader_builder + .build() + .unwrap() + .collect::, ArrowError>>() + .unwrap(); + assert_eq!(batches.len(), 1); + + let expected_batch = RecordBatch::try_new( + expected_schema.clone(), + vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as _], + ) + .unwrap(); + assert_eq!(batches[0], expected_batch); + } + #[test] fn mismatched_schemas() { let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]); diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 7371c72a5896..2fd4ca272868 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -28,6 +28,8 @@ use crate::data_type::DataType; use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; use crate::errors::{ParquetError, Result}; use crate::file::properties::{EnabledStatistics, WriterProperties}; +use crate::geospatial::accumulator::{try_new_geo_stats_accumulator, GeoStatsAccumulator}; +use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; /// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`] @@ -121,6 +123,10 @@ pub trait ColumnValueEncoder { /// will *not* be tracked by the bloom filter as it is empty since. This should be called once /// near the end of encoding. fn flush_bloom_filter(&mut self) -> Option; + + /// Computes [`GeospatialStatistics`], if any, and resets internal state such that any internal + /// accumulator is prepared to accumulate statistics for the next column chunk. + fn flush_geospatial_statistics(&mut self) -> Option>; } pub struct ColumnValueEncoderImpl { @@ -133,6 +139,7 @@ pub struct ColumnValueEncoderImpl { max_value: Option, bloom_filter: Option, variable_length_bytes: Option, + geo_stats_accumulator: Option>, } impl ColumnValueEncoderImpl { @@ -145,10 +152,12 @@ impl ColumnValueEncoderImpl { fn write_slice(&mut self, slice: &[T::T]) -> Result<()> { if self.statistics_enabled != EnabledStatistics::None - // INTERVAL has undefined sort order, so don't write min/max stats for it + // INTERVAL, Geometry, and Geography have undefined sort order, so don't write min/max stats for them && self.descr.converted_type() != ConvertedType::INTERVAL { - if let Some((min, max)) = self.min_max(slice, None) { + if let Some(accumulator) = self.geo_stats_accumulator.as_deref_mut() { + update_geo_stats_accumulator(accumulator, slice.iter()); + } else if let Some((min, max)) = self.min_max(slice, None) { update_min(&self.descr, &min, &mut self.min_value); update_max(&self.descr, &max, &mut self.max_value); } @@ -201,6 +210,8 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp)) .transpose()?; + let geo_stats_accumulator = try_new_geo_stats_accumulator(descr); + Ok(Self { encoder, dict_encoder, @@ -211,6 +222,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { min_value: None, max_value: None, variable_length_bytes: None, + geo_stats_accumulator, }) } @@ -307,6 +319,10 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { variable_length_bytes: self.variable_length_bytes.take(), }) } + + fn flush_geospatial_statistics(&mut self) -> Option> { + self.geo_stats_accumulator.as_mut().map(|a| a.finish())? + } } fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)> @@ -367,3 +383,15 @@ fn replace_zero(val: &T, descr: &ColumnDescriptor, replace: _ => val.clone(), } } + +fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I) +where + T: ParquetValueType + 'a, + I: Iterator, +{ + if bounder.is_valid() { + for val in iter { + bounder.update_wkb(val.as_bytes()); + } + } +} diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 0ff1ccc52569..5690db7d9c8f 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -1222,6 +1222,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .set_definition_level_histogram( self.column_metrics.definition_level_histogram.take(), ); + + if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() { + builder = builder.set_geo_statistics(geo_stats); + } } builder = self.set_column_chunk_encryption_properties(builder); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index a6c13cfa2cb0..a923fb8c6a17 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -723,6 +723,9 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { if let Some(statistics) = metadata.statistics() { builder = builder.set_statistics(statistics.clone()) } + if let Some(geo_statistics) = metadata.geo_statistics() { + builder = builder.set_geo_statistics(Box::new(geo_statistics.clone())) + } if let Some(page_encoding_stats) = metadata.page_encoding_stats() { builder = builder.set_page_encoding_stats(page_encoding_stats.clone()) } diff --git a/parquet/src/geospatial/accumulator.rs b/parquet/src/geospatial/accumulator.rs new file mode 100644 index 000000000000..e86452fc8eb6 --- /dev/null +++ b/parquet/src/geospatial/accumulator.rs @@ -0,0 +1,385 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides implementations and traits for building [`GeospatialStatistics`] + +use std::sync::{Arc, OnceLock}; + +use crate::{ + basic::LogicalType, errors::ParquetError, geospatial::statistics::GeospatialStatistics, + schema::types::ColumnDescPtr, +}; + +/// Create a new [`GeoStatsAccumulator`] instance if `descr` represents a Geometry or +/// Geography [`LogicalType`] +/// +/// Returns a suitable [`GeoStatsAccumulator`] if `descr` represents a non-geospatial type +/// or `None` otherwise. +pub fn try_new_geo_stats_accumulator( + descr: &ColumnDescPtr, +) -> Option> { + if !matches!( + descr.logical_type(), + Some(LogicalType::Geometry { .. }) | Some(LogicalType::Geography { .. }) + ) { + return None; + } + + Some( + ACCUMULATOR_FACTORY + .get_or_init(|| Arc::new(DefaultGeoStatsAccumulatorFactory::default())) + .new_accumulator(descr), + ) +} + +/// Initialize the global [`GeoStatsAccumulatorFactory`] +/// +/// This may only be done once before any calls to [`try_new_geo_stats_accumulator`]. +/// Clients may use this to implement support for builds of the Parquet crate without +/// geospatial support or to implement support for Geography bounding using external +/// dependencies. +pub fn init_geo_stats_accumulator_factory( + factory: Arc, +) -> Result<(), ParquetError> { + if ACCUMULATOR_FACTORY.set(factory).is_err() { + Err(ParquetError::General( + "Global GeoStatsAccumulatorFactory already set".to_string(), + )) + } else { + Ok(()) + } +} + +/// Global accumulator factory instance +static ACCUMULATOR_FACTORY: OnceLock> = OnceLock::new(); + +/// Factory for [`GeospatialStatistics`] accumulators +/// +/// The GeoStatsAccumulatorFactory is a trait implemented by the global factory that +/// generates new instances of a [`GeoStatsAccumulator`] when constructing new +/// encoders for a Geometry or Geography logical type. +pub trait GeoStatsAccumulatorFactory: Send + Sync { + /// Create a new [`GeoStatsAccumulator`] appropriate for the logical type of a given + /// [`ColumnDescPtr`] + fn new_accumulator(&self, descr: &ColumnDescPtr) -> Box; +} + +/// Dynamic [`GeospatialStatistics`] accumulator +/// +/// The GeoStatsAccumulator is a trait whose implementors can ingest the (non-null) +/// elements of a column and return compliant [`GeospatialStatistics`] (or `None`). +/// When built with geospatial support this will usually be the +/// [`ParquetGeoStatsAccumulator`] +pub trait GeoStatsAccumulator: Send { + /// Returns true if this instance can return [`GeospatialStatistics`] from + /// [`GeoStatsAccumulator::finish`]. + /// + /// This method returns false when this crate is built without geospatial support + /// (i.e., from the [`VoidGeoStatsAccumulator`]) or if the accumulator encountered + /// invalid or unsupported elements for which it cannot compute valid statistics. + fn is_valid(&self) -> bool; + + /// Update with a single slice of WKB-encoded values + /// + /// This method is infallible; however, in the event of improperly encoded values, + /// implementations must ensure that [`GeoStatsAccumulator::finish`] returns `None`. + fn update_wkb(&mut self, wkb: &[u8]); + + /// Compute the final statistics and reset internal state + fn finish(&mut self) -> Option>; +} + +/// Default accumulator for [`GeospatialStatistics`] +/// +/// When this crate is built with geospatial support, this factory constructs a +/// [`ParquetGeoStatsAccumulator`] that ensures Geometry columns are written with +/// statistics when statistics for that column are enabled. Otherwise, this factory +/// returns a [`VoidGeoStatsAccumulator`] that never adds any geospatial statistics. +/// +/// Bounding for Geography columns is not currently implemented by parquet-geospatial +/// and this factory will always return a [`VoidGeoStatsAccumulator`]. +#[derive(Debug, Default)] +pub struct DefaultGeoStatsAccumulatorFactory {} + +impl GeoStatsAccumulatorFactory for DefaultGeoStatsAccumulatorFactory { + fn new_accumulator(&self, _descr: &ColumnDescPtr) -> Box { + #[cfg(feature = "geospatial")] + if let Some(crate::basic::LogicalType::Geometry { .. }) = _descr.logical_type() { + Box::new(ParquetGeoStatsAccumulator::default()) + } else { + Box::new(VoidGeoStatsAccumulator::default()) + } + + #[cfg(not(feature = "geospatial"))] + return Box::new(VoidGeoStatsAccumulator::default()); + } +} + +/// A [`GeoStatsAccumulator`] that never computes any [`GeospatialStatistics`] +#[derive(Debug, Default)] +pub struct VoidGeoStatsAccumulator {} + +impl GeoStatsAccumulator for VoidGeoStatsAccumulator { + fn is_valid(&self) -> bool { + false + } + + fn update_wkb(&mut self, _wkb: &[u8]) {} + + fn finish(&mut self) -> Option> { + None + } +} + +/// A [`GeoStatsAccumulator`] that uses the parquet-geospatial crate to compute Geometry statistics +/// +/// Note that this accumulator only supports Geometry types and will return invalid statistics for +/// non-point Geography input ([`GeoStatsAccumulatorFactory::new_accumulator`] is responsible +/// for ensuring an appropriate accumulator based on the logical type). +#[cfg(feature = "geospatial")] +#[derive(Debug)] +pub struct ParquetGeoStatsAccumulator { + bounder: parquet_geospatial::bounding::GeometryBounder, + invalid: bool, +} + +#[cfg(feature = "geospatial")] +impl Default for ParquetGeoStatsAccumulator { + fn default() -> Self { + Self { + bounder: parquet_geospatial::bounding::GeometryBounder::empty(), + invalid: false, + } + } +} + +#[cfg(feature = "geospatial")] +impl GeoStatsAccumulator for ParquetGeoStatsAccumulator { + fn is_valid(&self) -> bool { + !self.invalid + } + + fn update_wkb(&mut self, wkb: &[u8]) { + if self.bounder.update_wkb(wkb).is_err() { + self.invalid = true; + } + } + + fn finish(&mut self) -> Option> { + use parquet_geospatial::interval::IntervalTrait; + + use crate::geospatial::bounding_box::BoundingBox; + + if self.invalid { + // Reset + self.invalid = false; + self.bounder = parquet_geospatial::bounding::GeometryBounder::empty(); + return None; + } + + let bbox = if self.bounder.x().is_empty() || self.bounder.y().is_empty() { + None + } else { + let mut bbox = BoundingBox::new( + self.bounder.x().lo(), + self.bounder.x().hi(), + self.bounder.y().lo(), + self.bounder.y().hi(), + ); + + if !self.bounder.z().is_empty() { + bbox = bbox.with_zrange(self.bounder.z().lo(), self.bounder.z().hi()); + } + + if !self.bounder.m().is_empty() { + bbox = bbox.with_mrange(self.bounder.m().lo(), self.bounder.m().hi()); + } + + Some(bbox) + }; + + let bounder_geometry_types = self.bounder.geometry_types(); + let geometry_types = if bounder_geometry_types.is_empty() { + None + } else { + Some(bounder_geometry_types) + }; + + // Reset + self.bounder = parquet_geospatial::bounding::GeometryBounder::empty(); + + Some(Box::new(GeospatialStatistics::new(bbox, geometry_types))) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_void_accumulator() { + let mut accumulator = VoidGeoStatsAccumulator {}; + assert!(!accumulator.is_valid()); + accumulator.update_wkb(&[0x01, 0x02, 0x03]); + assert!(accumulator.finish().is_none()); + } + + #[cfg(feature = "geospatial")] + #[test] + fn test_default_accumulator_geospatial_factory() { + use std::sync::Arc; + + use parquet_geospatial::testing::wkb_point_xy; + + use crate::{ + basic::LogicalType, + geospatial::bounding_box::BoundingBox, + schema::types::{ColumnDescriptor, ColumnPath, Type}, + }; + + // Check that we have a working accumulator for Geometry + let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY) + .with_logical_type(Some(LogicalType::Geometry { crs: None })) + .build() + .unwrap(); + let column_descr = + ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![])); + let mut accumulator = try_new_geo_stats_accumulator(&Arc::new(column_descr)).unwrap(); + + assert!(accumulator.is_valid()); + accumulator.update_wkb(&wkb_point_xy(1.0, 2.0)); + accumulator.update_wkb(&wkb_point_xy(11.0, 12.0)); + let stats = accumulator.finish().unwrap(); + assert_eq!( + stats.bounding_box().unwrap(), + &BoundingBox::new(1.0, 11.0, 2.0, 12.0) + ); + + // Check that we have a void accumulator for Geography + let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY) + .with_logical_type(Some(LogicalType::Geography { + crs: None, + algorithm: None, + })) + .build() + .unwrap(); + let column_descr = + ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![])); + let mut accumulator = try_new_geo_stats_accumulator(&Arc::new(column_descr)).unwrap(); + + assert!(!accumulator.is_valid()); + assert!(accumulator.finish().is_none()); + + // Check that we return None if the type is not geometry or goegraphy + let parquet_type = Type::primitive_type_builder("geom", crate::basic::Type::BYTE_ARRAY) + .build() + .unwrap(); + let column_descr = + ColumnDescriptor::new(Arc::new(parquet_type), 0, 0, ColumnPath::new(vec![])); + assert!(try_new_geo_stats_accumulator(&Arc::new(column_descr)).is_none()); + + // We should not be able to initialize a global accumulator after we've initialized at least + // one accumulator + assert!(init_geo_stats_accumulator_factory(Arc::new( + DefaultGeoStatsAccumulatorFactory::default() + )) + .is_err()) + } + + #[cfg(feature = "geospatial")] + #[test] + fn test_geometry_accumulator() { + use parquet_geospatial::testing::{wkb_point_xy, wkb_point_xyzm}; + + use crate::geospatial::bounding_box::BoundingBox; + + let mut accumulator = ParquetGeoStatsAccumulator::default(); + + // A fresh instance should be able to bound input + assert!(accumulator.is_valid()); + accumulator.update_wkb(&wkb_point_xy(1.0, 2.0)); + accumulator.update_wkb(&wkb_point_xy(11.0, 12.0)); + let stats = accumulator.finish().unwrap(); + assert_eq!(stats.geospatial_types().unwrap(), &vec![1]); + assert_eq!( + stats.bounding_box().unwrap(), + &BoundingBox::new(1.0, 11.0, 2.0, 12.0) + ); + + // finish() should have reset the bounder such that the first values + // aren't when computing the next bound of statistics. + assert!(accumulator.is_valid()); + accumulator.update_wkb(&wkb_point_xy(21.0, 22.0)); + accumulator.update_wkb(&wkb_point_xy(31.0, 32.0)); + let stats = accumulator.finish().unwrap(); + assert_eq!(stats.geospatial_types().unwrap(), &vec![1]); + assert_eq!( + stats.bounding_box().unwrap(), + &BoundingBox::new(21.0, 31.0, 22.0, 32.0) + ); + + // When an accumulator encounters invalid input, it reports is_valid() false + // and does not compute subsequent statistics + assert!(accumulator.is_valid()); + accumulator.update_wkb(&wkb_point_xy(41.0, 42.0)); + accumulator.update_wkb("these bytes are not WKB".as_bytes()); + assert!(!accumulator.is_valid()); + assert!(accumulator.finish().is_none()); + + // Subsequent rounds of accumulation should work as expected + assert!(accumulator.is_valid()); + accumulator.update_wkb(&wkb_point_xy(41.0, 42.0)); + accumulator.update_wkb(&wkb_point_xy(51.0, 52.0)); + let stats = accumulator.finish().unwrap(); + assert_eq!(stats.geospatial_types().unwrap(), &vec![1]); + assert_eq!( + stats.bounding_box().unwrap(), + &BoundingBox::new(41.0, 51.0, 42.0, 52.0) + ); + + // When there was no input at all (occurs in the all null case), both geometry + // types and bounding box will be None. This is because Parquet Thrift statistics + // have no mechanism to communicate "empty". (The all null situation may be determined + // from the null count in this case). + assert!(accumulator.is_valid()); + let stats = accumulator.finish().unwrap(); + assert!(stats.geospatial_types().is_none()); + assert!(stats.bounding_box().is_none()); + + // When there was 100% "empty" input (i.e., non-null geometries without + // coordinates), there should be statistics with geometry types but no + // bounding box. + assert!(accumulator.is_valid()); + accumulator.update_wkb(&wkb_point_xy(f64::NAN, f64::NAN)); + let stats = accumulator.finish().unwrap(); + assert_eq!(stats.geospatial_types().unwrap(), &vec![1]); + assert!(stats.bounding_box().is_none()); + + // If Z and/or M are present, they should be reported in the bounding box + assert!(accumulator.is_valid()); + accumulator.update_wkb(&wkb_point_xyzm(1.0, 2.0, 3.0, 4.0)); + accumulator.update_wkb(&wkb_point_xyzm(5.0, 6.0, 7.0, 8.0)); + let stats = accumulator.finish().unwrap(); + assert_eq!(stats.geospatial_types().unwrap(), &vec![3001]); + assert_eq!( + stats.bounding_box().unwrap(), + &BoundingBox::new(1.0, 5.0, 2.0, 6.0) + .with_zrange(3.0, 7.0) + .with_mrange(4.0, 8.0) + ); + } +} diff --git a/parquet/src/geospatial/mod.rs b/parquet/src/geospatial/mod.rs index b6864cf8bd64..9d55fca89d46 100644 --- a/parquet/src/geospatial/mod.rs +++ b/parquet/src/geospatial/mod.rs @@ -31,7 +31,7 @@ //! * mmin, mmax: (optional) the minimum and maximum linear reference values //! //! In 2D representation, where x are points: -//! ```text +//! ```text //! ymax +-----------------------+ //! | x | //! | x | @@ -46,5 +46,6 @@ //! [parquet-geo-spec]: https://github.com/apache/parquet-format/blob/master/Geospatial.md //! [geo-types]: https://github.com/apache/parquet-format/blob/master/Geospatial.md#geospatial-types +pub mod accumulator; pub mod bounding_box; pub mod statistics; diff --git a/parquet/src/geospatial/statistics.rs b/parquet/src/geospatial/statistics.rs index d3287412b143..245f012c50be 100644 --- a/parquet/src/geospatial/statistics.rs +++ b/parquet/src/geospatial/statistics.rs @@ -43,9 +43,7 @@ use crate::geospatial::bounding_box::BoundingBox; /// ``` #[derive(Clone, Debug, PartialEq, Default)] pub struct GeospatialStatistics { - /// Optional bounding defining the spatial extent, where None represents a lack of information. bbox: Option, - /// Optional list of geometry type identifiers, where None represents lack of information geospatial_types: Option>, } @@ -58,54 +56,13 @@ impl GeospatialStatistics { } } - /// Optional bounding defining the spatial extent, where `None` represents a lack of information. - pub fn bounding_box(&self) -> Option<&BoundingBox> { - self.bbox.as_ref() - } - - /// Optional list of geometry type identifiers, where `None` represents a lack of information. + /// Optional list of geometry type identifiers, where `None` represents lack of information pub fn geospatial_types(&self) -> Option<&Vec> { self.geospatial_types.as_ref() } -} - -#[cfg(test)] -mod tests { - use super::*; - // TODO(ets): add round trip to/from parquet tests - - #[test] - fn test_read_geospatial_statistics_from_file() { - use crate::file::reader::{FileReader, SerializedFileReader}; - use std::fs::File; - - let path = format!( - "{}/geospatial/geospatial.parquet", - arrow::util::test_util::parquet_test_data(), - ); - let file = File::open(path).unwrap(); - let reader = SerializedFileReader::try_from(file).unwrap(); - let metadata = reader.metadata(); - - // geospatial.parquet schema: - // optional binary field_id=-1 group (String); - // optional binary field_id=-1 wkt (String); - // optional binary field_id=-1 geometry (Geometry(crs=)); - let geo_statistics = metadata.row_group(0).column(2).geo_statistics(); - assert!(geo_statistics.is_some()); - - let expected_bbox = BoundingBox::new(10.0, 40.0, 10.0, 40.0) - .with_zrange(30.0, 80.0) - .with_mrange(200.0, 1600.0); - let expected_geospatial_types = vec![ - 1, 2, 3, 4, 5, 6, 7, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 2001, 2002, 2003, 2004, - 2005, 2006, 2007, 3001, 3002, 3003, 3004, 3005, 3006, 3007, - ]; - assert_eq!( - geo_statistics.unwrap().geospatial_types, - Some(expected_geospatial_types) - ); - assert_eq!(geo_statistics.unwrap().bbox, Some(expected_bbox)); + /// Optional bounding defining the spatial extent, where `None` represents a lack of information. + pub fn bounding_box(&self) -> Option<&BoundingBox> { + self.bbox.as_ref() } } diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index 9629e17b4752..91a64adef75d 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -985,7 +985,7 @@ impl ColumnDescriptor { /// ) /// ); /// ``` -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct SchemaDescriptor { /// The top-level logical schema (the "message" type). /// diff --git a/parquet/tests/geospatial.rs b/parquet/tests/geospatial.rs index b3de40491b30..53a947327e86 100644 --- a/parquet/tests/geospatial.rs +++ b/parquet/tests/geospatial.rs @@ -16,6 +16,7 @@ // under the License. //! Tests for Geometry and Geography logical types + use parquet::{ basic::{EdgeInterpolationAlgorithm, LogicalType}, file::{ @@ -121,3 +122,317 @@ fn test_read_geospatial_statistics() { ); assert_eq!(geo_statistics.unwrap().bounding_box(), Some(&expected_bbox)); } + +#[cfg(all(feature = "arrow", feature = "geospatial"))] +mod test { + //! Tests for Geometry and Geography logical types that require the arrow + //! and/or geospatial features enabled + + use super::*; + + use std::{iter::zip, sync::Arc}; + + use arrow_array::{create_array, ArrayRef, BinaryArray, RecordBatch}; + use arrow_schema::{DataType, Field, Schema}; + use bytes::Bytes; + use parquet::{ + arrow::{arrow_writer::ArrowWriterOptions, ArrowWriter}, + column::reader::ColumnReader, + data_type::{ByteArray, ByteArrayType}, + file::{ + metadata::RowGroupMetaData, + properties::{EnabledStatistics, WriterProperties}, + reader::FileReader, + writer::SerializedFileWriter, + }, + geospatial::statistics::GeospatialStatistics, + schema::types::{SchemaDescriptor, Type}, + }; + use parquet_geospatial::testing::wkb_point_xy; + + fn read_row_group_metadata(b: Bytes) -> Vec { + let reader = SerializedFileReader::new(b).unwrap(); + reader.metadata().row_groups().to_vec() + } + + fn read_geo_statistics(b: Bytes, column: usize) -> Vec> { + read_row_group_metadata(b) + .iter() + .map(|row_group| row_group.column(column).geo_statistics().cloned()) + .collect() + } + + #[test] + fn test_write_statistics_not_arrow() { + // Four row groups: one all non-null, one with a null, one with all nulls, + // one with invalid WKB + let column_values = vec![ + [wkb_point_xy(1.0, 2.0), wkb_point_xy(11.0, 12.0)].map(ByteArray::from), + ["this is not valid wkb".into(), wkb_point_xy(31.0, 32.0)].map(ByteArray::from), + [wkb_point_xy(21.0, 22.0), vec![]].map(ByteArray::from), + [ByteArray::new(), ByteArray::new()], + ]; + let def_levels = [[1, 1], [1, 1], [1, 0], [0, 0]]; + + // Ensure that nulls are omitted, that completely empty stats are omitted, + // and that invalid WKB results in empty stats + let expected_geometry_types = [Some(vec![1]), None, Some(vec![1]), None]; + let expected_bounding_box = [ + Some(BoundingBox::new(1.0, 11.0, 2.0, 12.0)), + None, + Some(BoundingBox::new(21.0, 21.0, 22.0, 22.0)), + None, + ]; + + let root = parquet_schema_geometry(); + let schema = SchemaDescriptor::new(root.into()); + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Chunk) + .build(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = + SerializedFileWriter::new(&mut buf, schema.root_schema_ptr(), Arc::new(props)).unwrap(); + + for (def_levels, values) in zip(&def_levels, &column_values) { + let mut rg = writer.next_row_group().unwrap(); + let mut col = rg.next_column().unwrap().unwrap(); + col.typed::() + .write_batch(values, Some(def_levels), None) + .unwrap(); + col.close().unwrap(); + rg.close().unwrap(); + } + + writer.close().unwrap(); + + // Check geospatial statistics on file read + let buf_bytes = Bytes::from(buf); + let all_geo_stats = read_geo_statistics(buf_bytes.clone(), 0); + assert_eq!(all_geo_stats.len(), column_values.len()); + assert_eq!(expected_geometry_types.len(), column_values.len()); + assert_eq!(expected_bounding_box.len(), column_values.len()); + + for i in 0..column_values.len() { + if let Some(geo_stats) = all_geo_stats[i].as_ref() { + assert_eq!( + geo_stats.geospatial_types(), + expected_geometry_types[i].as_ref() + ); + assert_eq!(geo_stats.bounding_box(), expected_bounding_box[i].as_ref()); + } else { + assert!(expected_geometry_types[i].is_none()); + assert!(expected_bounding_box[i].is_none()); + } + } + + for (i, rg) in read_row_group_metadata(buf_bytes).iter().enumerate() { + // We should have written Statistics with a null_count + let stats = rg.column(0).statistics().unwrap(); + let expected_null_count: u64 = def_levels[i].iter().map(|l| (*l == 0) as u64).sum(); + assert_eq!(stats.null_count_opt(), Some(expected_null_count)); + + // ...but there should be no min or max value + assert!(stats.min_bytes_opt().is_none()); + assert!(stats.max_bytes_opt().is_none()); + + // There should be no index for this column + assert!(rg.column(0).column_index_length().is_none()); + assert!(rg.column(0).column_index_offset().is_none()); + } + } + + #[test] + fn test_write_statistics_arrow() { + let arrow_schema = Arc::new(Schema::new(vec![Field::new( + "geom", + DataType::Binary, + true, + )])); + + // Check the same cases as for the non-arrow writer. These need checking again because + // the arrow writer uses a different encoder where the code path for skipping nulls + // is independent. + let column_values = [ + wkb_array_xy([Some((1.0, 2.0)), Some((11.0, 12.0))]), + create_array!( + Binary, + [ + "this is not valid wkb".as_bytes(), + &wkb_point_xy(31.0, 32.0) + ] + ), + wkb_array_xy([Some((21.0, 22.0)), None]), + wkb_array_xy([None, None]), + ]; + + let expected_geometry_types = [Some(vec![1]), None, Some(vec![1]), None]; + let expected_bounding_box = [ + Some(BoundingBox::new(1.0, 11.0, 2.0, 12.0)), + None, + Some(BoundingBox::new(21.0, 21.0, 22.0, 22.0)), + None, + ]; + + let root = parquet_schema_geometry(); + let schema = SchemaDescriptor::new(root.into()); + + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Chunk) + .build(); + let options = ArrowWriterOptions::new() + .with_parquet_schema(schema) + .with_properties(props); + + let mut buf = Vec::with_capacity(1024); + let mut file_writer = + ArrowWriter::try_new_with_options(&mut buf, arrow_schema.clone(), options).unwrap(); + + for values in &column_values { + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![values.clone()]).unwrap(); + file_writer.write(&batch).unwrap(); + file_writer.flush().unwrap(); + } + + file_writer.close().unwrap(); + + // Check statistics on file read + let buf_bytes = Bytes::from(buf); + let all_geo_stats = read_geo_statistics(buf_bytes.clone(), 0); + assert_eq!(all_geo_stats.len(), column_values.len()); + + for i in 0..column_values.len() { + if let Some(geo_stats) = all_geo_stats[i].as_ref() { + assert_eq!( + geo_stats.geospatial_types(), + expected_geometry_types[i].as_ref() + ); + assert_eq!(geo_stats.bounding_box(), expected_bounding_box[i].as_ref()); + } else { + assert!(expected_geometry_types[i].is_none()); + assert!(expected_bounding_box[i].is_none()); + } + } + + for (i, rg) in read_row_group_metadata(buf_bytes).iter().enumerate() { + // We should have written Statistics with a null_count + let stats = rg.column(0).statistics().unwrap(); + let expected_null_count = column_values[i].null_count(); + assert_eq!(stats.null_count_opt(), Some(expected_null_count as u64)); + + // ...but there should be no min or max value + assert!(stats.min_bytes_opt().is_none()); + assert!(stats.max_bytes_opt().is_none()); + + // There should be no index for this column + assert!(rg.column(0).column_index_length().is_none()); + assert!(rg.column(0).column_index_offset().is_none()); + } + } + + #[test] + fn test_roundtrip_statistics_geospatial() { + let path = format!( + "{}/geospatial/geospatial.parquet", + arrow::util::test_util::parquet_test_data(), + ); + + test_roundtrip_statistics(&path, 2); + } + + #[test] + fn test_roundtrip_geospatial_with_nan() { + let path = format!( + "{}/geospatial/geospatial-with-nan.parquet", + arrow::util::test_util::parquet_test_data(), + ); + + test_roundtrip_statistics(&path, 0); + } + + #[test] + fn test_roundtrip_statistics_crs() { + let path = format!( + "{}/geospatial/crs-default.parquet", + arrow::util::test_util::parquet_test_data(), + ); + + test_roundtrip_statistics(&path, 0); + } + + fn test_roundtrip_statistics(path: &str, column: usize) { + let file_bytes = Bytes::from(std::fs::read(path).unwrap()); + + let reader = SerializedFileReader::new(file_bytes.clone()).unwrap(); + let mut values = Vec::new(); + let mut def_levels = Vec::new(); + + let root = parquet_schema_geometry(); + let schema = SchemaDescriptor::new(root.into()); + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Chunk) + .build(); + + let mut buf = Vec::with_capacity(1024); + let mut writer = + SerializedFileWriter::new(&mut buf, schema.root_schema_ptr(), Arc::new(props)).unwrap(); + + for i in 0..reader.num_row_groups() { + let row_group = reader.get_row_group(i).unwrap(); + values.truncate(0); + def_levels.truncate(0); + + let mut row_group_out = writer.next_row_group().unwrap(); + + if let ColumnReader::ByteArrayColumnReader(mut reader) = + row_group.get_column_reader(column).unwrap() + { + reader + .read_records(1000000, Some(&mut def_levels), None, &mut values) + .unwrap(); + + let mut col = row_group_out.next_column().unwrap().unwrap(); + col.typed::() + .write_batch(&values, Some(&def_levels), None) + .unwrap(); + col.close().unwrap(); + row_group_out.close().unwrap(); + } else { + panic!("Unexpected geometry column type"); + } + } + + writer.close().unwrap(); + + let actual_stats = read_geo_statistics(buf.into(), 0); + let expected_stats = read_geo_statistics(file_bytes.clone(), column); + + assert_eq!(actual_stats.len(), expected_stats.len()); + for i in 0..expected_stats.len() { + assert_eq!(actual_stats[i], expected_stats[i], "Row group {i}"); + } + } + + fn parquet_schema_geometry() -> Type { + Type::group_type_builder("root") + .with_fields(vec![Type::primitive_type_builder( + "geo", + parquet::basic::Type::BYTE_ARRAY, + ) + .with_logical_type(Some(LogicalType::Geometry { crs: None })) + .build() + .unwrap() + .into()]) + .build() + .unwrap() + } + + fn wkb_array_xy(coords: impl IntoIterator>) -> ArrayRef { + let array = BinaryArray::from_iter( + coords + .into_iter() + .map(|maybe_xy| maybe_xy.map(|(x, y)| wkb_point_xy(x, y))), + ); + Arc::new(array) + } +}