diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index 392964870c..dc497860a3 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -1,16 +1,21 @@ +use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use crate::kernel::schema::cast_record_batch; -use arrow_array::RecordBatch; -use arrow_schema::{Schema, SchemaRef}; +use arrow_array::{RecordBatch, RecordBatchOptions}; +use arrow_cast::{cast_with_options, CastOptions}; +use arrow_schema::{Field, Schema, SchemaRef}; use datafusion::common::{not_impl_err, ColumnStatistics, Result}; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use delta_kernel::table_features::ColumnMappingMode; /// A Schema Adapter Factory which provides casting record batches from parquet to meet /// delta lake conventions. #[derive(Debug)] -pub(crate) struct DeltaSchemaAdapterFactory {} +pub(crate) struct DeltaSchemaAdapterFactory { + pub column_mapping_mode: ColumnMappingMode, +} impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { fn create( @@ -21,6 +26,7 @@ impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { Box::new(DeltaSchemaAdapter { projected_table_schema, table_schema, + column_mapping_mode: self.column_mapping_mode, }) } } @@ -31,46 +37,149 @@ pub(crate) struct DeltaSchemaAdapter { projected_table_schema: SchemaRef, /// Schema for the table table_schema: SchemaRef, + column_mapping_mode: ColumnMappingMode, } impl SchemaAdapter for DeltaSchemaAdapter { fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { let field = self.table_schema.field(index); - Some(file_schema.fields.find(field.name())?.0) + + let physical_key_name = match self.column_mapping_mode { + ColumnMappingMode::None => return Some(file_schema.fields.find(field.name())?.0), + ColumnMappingMode::Name => "delta.columnMapping.physicalName", + ColumnMappingMode::Id => "delta.columnMapping.id", + }; + + field + .metadata() + .get(physical_key_name) + .and_then(|physical_name| file_schema.fields.find(physical_name.as_str())) + .map(|(field_id, _)| field_id) } fn map_schema(&self, file_schema: &Schema) -> Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if self - .projected_table_schema - .fields() - .find(file_field.name()) - .is_some() - { + let mut projection = Vec::with_capacity(self.projected_table_schema.fields().len()); + let mut logical_fields = Vec::new(); + + if self.column_mapping_mode == ColumnMappingMode::None { + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if self + .projected_table_schema + .fields + .find(file_field.name()) + .is_some() + { + projection.push(file_idx) + } + } + } else { + for table_field in self.projected_table_schema.fields().iter() { + let file_idx = self + .resolve_column(table_field, file_schema) + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal(format!( + "Column '{}' not found in file schema", + table_field.name(), + )) + })?; + projection.push(file_idx); + logical_fields.push(self.strip_metadata(table_field)); } } - Ok(( - Arc::new(SchemaMapping { - projected_schema: self.projected_table_schema.clone(), - }), - projection, - )) + let logical_schema = if self.column_mapping_mode == ColumnMappingMode::None { + self.projected_table_schema.clone() + } else { + let fields: Vec<_> = logical_fields.iter().map(|f| (**f).clone()).collect(); + Arc::new(Schema::new(fields)) + }; + + Ok((Arc::new(SchemaMapping { logical_schema }), projection)) + } +} + +impl DeltaSchemaAdapter { + fn resolve_column(&self, table_field: &Field, file_schema: &Schema) -> Option { + if let Some((file_idx, _)) = file_schema.fields.find(table_field.name()) { + return Some(file_idx); + } + + let physical_name = self.get_physical_name(table_field)?; + file_schema + .fields + .find(physical_name.as_str()) + .map(|(idx, _)| idx) + } + + fn get_physical_name(&self, table_field: &Field) -> Option { + let key = match self.column_mapping_mode { + ColumnMappingMode::None => return None, + ColumnMappingMode::Name => "delta.columnMapping.physicalName", + ColumnMappingMode::Id => "delta.columnMapping.id", + }; + + if let Some(physical_name) = table_field.metadata().get(key) { + return Some(physical_name.clone()); + } + + None + } + + fn strip_metadata(&self, field: &Field) -> Arc { + let stripped_metadata: HashMap = field + .metadata() + .iter() + .filter(|(k, _)| !k.starts_with("delta.columnMapping")) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + Arc::new(field.clone().with_metadata(stripped_metadata)) } } #[derive(Debug)] pub(crate) struct SchemaMapping { - projected_schema: SchemaRef, + logical_schema: SchemaRef, } impl SchemaMapper for SchemaMapping { fn map_batch(&self, batch: RecordBatch) -> Result { - let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?; - Ok(record_batch) + // Cast individual columns to match the target schema types + // This handles type differences like timestamp precision mismatches + let batch_columns = batch.columns().to_vec(); + let mut casted_columns = Vec::with_capacity(batch_columns.len()); + + for (col, target_field) in batch_columns + .iter() + .zip(self.logical_schema.fields().iter()) + { + if col.data_type() == target_field.data_type() { + // Types match, use column as-is + casted_columns.push(col.clone()); + } else { + // Types differ, cast the column + match cast_with_options(col, target_field.data_type(), &CastOptions::default()) { + Ok(casted) => casted_columns.push(casted), + Err(e) => { + return Err(datafusion::error::DataFusionError::Internal(format!( + "Failed to cast column '{}' from {} to {}: {}", + target_field.name(), + col.data_type(), + target_field.data_type(), + e + ))) + } + } + } + } + + RecordBatch::try_new_with_options( + self.logical_schema.clone(), + casted_columns, + &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())), + ) + .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None)) } fn map_column_statistics( diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 7a1ac88781..d600fe5143 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -43,6 +43,8 @@ use datafusion::{ prelude::Expr, scalar::ScalarValue, }; +use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; +use delta_kernel::table_features::ColumnMappingMode; use delta_kernel::table_properties::DataSkippingNumIndexedCols; use futures::StreamExt as _; use itertools::Itertools; @@ -437,6 +439,8 @@ impl<'a> DeltaScanBuilder<'a> { Some(schema.clone()), )?; + let column_mapping_mode = self.snapshot.table_configuration().column_mapping_mode(); + let logical_schema = if let Some(used_columns) = self.projection { let mut fields = Vec::with_capacity(used_columns.len()); for idx in used_columns { @@ -457,7 +461,17 @@ impl<'a> DeltaScanBuilder<'a> { logical_schema }; - let df_schema = logical_schema.clone().to_dfschema()?; + let logical_schema_for_scan = if column_mapping_mode != ColumnMappingMode::None { + df_logical_schema( + self.snapshot, + &config.file_column_name, + Some(schema.clone()), + )? + } else { + logical_schema.clone() + }; + + let df_schema = logical_schema_for_scan.clone().to_dfschema()?; let logical_filter = self .filter @@ -591,14 +605,25 @@ impl<'a> DeltaScanBuilder<'a> { .push(part); } - let file_schema = Arc::new(Schema::new( - schema - .fields() - .iter() - .filter(|f| !table_partition_cols.contains(f.name())) - .cloned() - .collect::>(), - )); + let file_schema = if column_mapping_mode != ColumnMappingMode::None { + Arc::new(Schema::new( + logical_schema_for_scan + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect::>(), + )) + } else { + Arc::new(Schema::new( + schema + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect::>(), + )) + }; let mut table_partition_cols = table_partition_cols .iter() @@ -660,7 +685,9 @@ impl<'a> DeltaScanBuilder<'a> { } }; let file_source = - file_source.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))?; + file_source.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory { + column_mapping_mode, + }))?; let file_scan_config = FileScanConfigBuilder::new(self.log_store.object_store_url(), file_schema, file_source) diff --git a/crates/core/src/kernel/transaction/protocol.rs b/crates/core/src/kernel/transaction/protocol.rs index 72dbf1b039..b6541ab353 100644 --- a/crates/core/src/kernel/transaction/protocol.rs +++ b/crates/core/src/kernel/transaction/protocol.rs @@ -221,7 +221,7 @@ impl ProtocolChecker { pub static INSTANCE: LazyLock = LazyLock::new(|| { let mut reader_features = HashSet::new(); reader_features.insert(ReaderFeature::TimestampWithoutTimezone); - // reader_features.insert(ReaderFeature::ColumnMapping); + reader_features.insert(ReaderFeature::ColumnMapping); let mut writer_features = HashSet::new(); writer_features.insert(WriterFeature::AppendOnly); diff --git a/python/deltalake/table.py b/python/deltalake/table.py index d6cc196eb9..a825ae2d57 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -61,7 +61,7 @@ MAX_SUPPORTED_READER_VERSION = 3 NOT_SUPPORTED_READER_VERSION = 2 -SUPPORTED_READER_FEATURES = {"timestampNtz"} +SUPPORTED_READER_FEATURES = {"timestampNtz", "columnMapping"} FSCK_METRICS_FILES_REMOVED_LABEL = "files_removed" @@ -922,15 +922,6 @@ def to_pyarrow_dataset( "but these are not yet supported by the deltalake reader." ) - if ( - table_protocol.reader_features - and "columnMapping" in table_protocol.reader_features - ): - raise DeltaProtocolError( - "The table requires reader feature 'columnMapping' " - "but this is not supported using pyarrow Datasets." - ) - if ( table_protocol.reader_features and "deletionVectors" in table_protocol.reader_features