From f51d7de6288e907c3d21676a043bee411c8ba94b Mon Sep 17 00:00:00 2001 From: Jonatan Martens Date: Mon, 24 Nov 2025 13:48:11 +0200 Subject: [PATCH 1/7] Remove column mapping feature check on read --- crates/core/src/kernel/transaction/protocol.rs | 2 +- python/deltalake/table.py | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/crates/core/src/kernel/transaction/protocol.rs b/crates/core/src/kernel/transaction/protocol.rs index 72dbf1b039..b6541ab353 100644 --- a/crates/core/src/kernel/transaction/protocol.rs +++ b/crates/core/src/kernel/transaction/protocol.rs @@ -221,7 +221,7 @@ impl ProtocolChecker { pub static INSTANCE: LazyLock = LazyLock::new(|| { let mut reader_features = HashSet::new(); reader_features.insert(ReaderFeature::TimestampWithoutTimezone); - // reader_features.insert(ReaderFeature::ColumnMapping); + reader_features.insert(ReaderFeature::ColumnMapping); let mut writer_features = HashSet::new(); writer_features.insert(WriterFeature::AppendOnly); diff --git a/python/deltalake/table.py b/python/deltalake/table.py index d6cc196eb9..62ab5dd9b6 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -922,15 +922,6 @@ def to_pyarrow_dataset( "but these are not yet supported by the deltalake reader." ) - if ( - table_protocol.reader_features - and "columnMapping" in table_protocol.reader_features - ): - raise DeltaProtocolError( - "The table requires reader feature 'columnMapping' " - "but this is not supported using pyarrow Datasets." - ) - if ( table_protocol.reader_features and "deletionVectors" in table_protocol.reader_features From 0c1a56db3b061d22d7a960fde3725bcf5599638d Mon Sep 17 00:00:00 2001 From: Jonatan Martens Date: Tue, 25 Nov 2025 16:37:34 +0200 Subject: [PATCH 2/7] Add support for reading column mapping --- .../src/delta_datafusion/schema_adapter.rs | 130 ++++++++++++++---- .../src/delta_datafusion/table_provider.rs | 50 +++++-- 2 files changed, 148 insertions(+), 32 deletions(-) diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index 392964870c..2f9ea59111 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -1,16 +1,21 @@ +use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use crate::kernel::schema::cast_record_batch; -use arrow_array::RecordBatch; -use arrow_schema::{Schema, SchemaRef}; +use arrow_array::{RecordBatch, RecordBatchOptions}; +use arrow_cast::{cast_with_options, CastOptions}; +use arrow_schema::{Field, Schema, SchemaRef}; use datafusion::common::{not_impl_err, ColumnStatistics, Result}; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use delta_kernel::table_features::ColumnMappingMode; /// A Schema Adapter Factory which provides casting record batches from parquet to meet /// delta lake conventions. #[derive(Debug)] -pub(crate) struct DeltaSchemaAdapterFactory {} +pub(crate) struct DeltaSchemaAdapterFactory { + pub column_mapping_mode: ColumnMappingMode, +} impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { fn create( @@ -21,6 +26,7 @@ impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { Box::new(DeltaSchemaAdapter { projected_table_schema, table_schema, + column_mapping_mode: self.column_mapping_mode, }) } } @@ -31,45 +37,123 @@ pub(crate) struct DeltaSchemaAdapter { projected_table_schema: SchemaRef, /// Schema for the table table_schema: SchemaRef, + column_mapping_mode: ColumnMappingMode, } impl SchemaAdapter for DeltaSchemaAdapter { fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { let field = self.table_schema.field(index); - Some(file_schema.fields.find(field.name())?.0) + + let physical_key_name = match self.column_mapping_mode { + ColumnMappingMode::None => return Some(file_schema.fields.find(field.name())?.0), + ColumnMappingMode::Name => "delta.columnMapping.physicalName", + ColumnMappingMode::Id => "delta.columnMapping.id", + }; + + field + .metadata() + .get(physical_key_name) + .and_then(|physical_name| file_schema.fields.find(physical_name.as_str())) + .map(|(field_id, _)| field_id) } fn map_schema(&self, file_schema: &Schema) -> Result<(Arc, Vec)> { - let mut projection = Vec::with_capacity(file_schema.fields().len()); - - for (file_idx, file_field) in file_schema.fields.iter().enumerate() { - if self - .projected_table_schema - .fields() - .find(file_field.name()) - .is_some() - { - projection.push(file_idx); - } + let mut projection = Vec::with_capacity(self.projected_table_schema.fields().len()); + let mut logical_fields = Vec::new(); + + for (proj_idx, table_field) in self.projected_table_schema.fields().iter().enumerate() { + let file_idx = self + .resolve_column(proj_idx, table_field, file_schema) + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal(format!( + "Column '{}' not found in file schema", + table_field.name() + )) + })?; + + projection.push(file_idx); + logical_fields.push(self.strip_metadata(table_field)); } - Ok(( - Arc::new(SchemaMapping { - projected_schema: self.projected_table_schema.clone(), - }), - projection, - )) + let fields: Vec<_> = logical_fields.iter().map(|f| (**f).clone()).collect(); + let logical_schema = Arc::new(Schema::new(fields)); + + Ok((Arc::new(SchemaMapping { logical_schema }), projection)) + } +} + +impl DeltaSchemaAdapter { + fn resolve_column( + &self, + proj_idx: usize, + table_field: &Field, + file_schema: &Schema, + ) -> Option { + // Try direct name match first + if let Some((file_idx, _)) = file_schema.fields.find(table_field.name()) { + return Some(file_idx); + } + + let physical_name = self.get_physical_name(proj_idx, table_field)?; + // Try resolving via column mapping + file_schema + .fields + .find(physical_name.as_str()) + .map(|(idx, _)| idx) + } + + fn get_physical_name(&self, proj_idx: usize, table_field: &Field) -> Option { + let key = match self.column_mapping_mode { + ColumnMappingMode::None => return None, + ColumnMappingMode::Name => "delta.columnMapping.physicalName", + ColumnMappingMode::Id => "delta.columnMapping.id", + }; + + // Check the projected field's metadata + if let Some(physical_name) = table_field.metadata().get(key) { + return Some(physical_name.clone()); + } + + // Fallback: check original field at same position (for aliased columns) + if proj_idx < self.table_schema.fields().len() { + let original_field = &self.table_schema.fields()[proj_idx]; + return original_field.metadata().get(key).cloned(); + } + + None + } + + fn strip_metadata(&self, field: &Field) -> Arc { + let stripped_metadata: HashMap = field + .metadata() + .iter() + .filter(|(k, _)| !k.starts_with("delta.columnMapping")) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + Arc::new(field.clone().with_metadata(stripped_metadata)) } } #[derive(Debug)] pub(crate) struct SchemaMapping { - projected_schema: SchemaRef, + logical_schema: SchemaRef, } impl SchemaMapper for SchemaMapping { fn map_batch(&self, batch: RecordBatch) -> Result { - let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?; + // Create a target schema using logical field names and types + // Only for columns that actually exist in the batch + let fields: Vec<_> = self + .logical_schema + .fields() + .iter() + .take(batch.num_columns()) + .map(|f| f.clone()) + .collect(); + let target_schema = Arc::new(Schema::new(fields)); + + let record_batch = cast_record_batch(&batch, target_schema, false, true)?; Ok(record_batch) } diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 7a1ac88781..e1abde70a7 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -43,6 +43,8 @@ use datafusion::{ prelude::Expr, scalar::ScalarValue, }; +use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; +use delta_kernel::table_features::ColumnMappingMode; use delta_kernel::table_properties::DataSkippingNumIndexedCols; use futures::StreamExt as _; use itertools::Itertools; @@ -437,6 +439,8 @@ impl<'a> DeltaScanBuilder<'a> { Some(schema.clone()), )?; + let column_mapping_mode = self.snapshot.table_configuration().column_mapping_mode(); + let logical_schema = if let Some(used_columns) = self.projection { let mut fields = Vec::with_capacity(used_columns.len()); for idx in used_columns { @@ -591,14 +595,40 @@ impl<'a> DeltaScanBuilder<'a> { .push(part); } - let file_schema = Arc::new(Schema::new( - schema - .fields() - .iter() - .filter(|f| !table_partition_cols.contains(f.name())) - .cloned() - .collect::>(), - )); + let physical_delta_schema = if column_mapping_mode != ColumnMappingMode::None { + delta_kernel::schema::StructType::try_new( + self.snapshot + .snapshot() + .schema() + .fields() + .map(|field| field.make_physical(column_mapping_mode)), + )? + } else { + (*self.snapshot.snapshot().schema()).clone() + }; + let physical_arrow_schema: SchemaRef = Arc::new((&physical_delta_schema).try_into_arrow()?); + + let file_schema = if column_mapping_mode != ColumnMappingMode::None { + // Use the logical schema (which has logical names and physical name metadata) + Arc::new(Schema::new( + logical_schema + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect::>(), + )) + } else { + // Use the physical schema for non-mapped tables + Arc::new(Schema::new( + physical_arrow_schema + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect::>(), + )) + }; let mut table_partition_cols = table_partition_cols .iter() @@ -660,7 +690,9 @@ impl<'a> DeltaScanBuilder<'a> { } }; let file_source = - file_source.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))?; + file_source.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory { + column_mapping_mode, + }))?; let file_scan_config = FileScanConfigBuilder::new(self.log_store.object_store_url(), file_schema, file_source) From c8e7876c015f6c3d937c0f1bec571a8cb609022d Mon Sep 17 00:00:00 2001 From: Jonatan Martens Date: Tue, 25 Nov 2025 16:43:19 +0200 Subject: [PATCH 3/7] Add columnMapping as supported feature in python layer --- python/deltalake/table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 62ab5dd9b6..a825ae2d57 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -61,7 +61,7 @@ MAX_SUPPORTED_READER_VERSION = 3 NOT_SUPPORTED_READER_VERSION = 2 -SUPPORTED_READER_FEATURES = {"timestampNtz"} +SUPPORTED_READER_FEATURES = {"timestampNtz", "columnMapping"} FSCK_METRICS_FILES_REMOVED_LABEL = "files_removed" From 25db9e96ebe481f8e9aab9e96b988c17e2d958f2 Mon Sep 17 00:00:00 2001 From: Jonatan Martens Date: Tue, 25 Nov 2025 18:22:59 +0200 Subject: [PATCH 4/7] Fix wrong schema usage --- .../src/delta_datafusion/table_provider.rs | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index e1abde70a7..0e74cdc908 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -461,7 +461,17 @@ impl<'a> DeltaScanBuilder<'a> { logical_schema }; - let df_schema = logical_schema.clone().to_dfschema()?; + let logical_schema_for_scan = if column_mapping_mode != ColumnMappingMode::None { + df_logical_schema( + self.snapshot, + &config.file_column_name, + Some(schema.clone()), + )? + } else { + logical_schema.clone() + }; + + let df_schema = logical_schema_for_scan.clone().to_dfschema()?; let logical_filter = self .filter @@ -595,23 +605,10 @@ impl<'a> DeltaScanBuilder<'a> { .push(part); } - let physical_delta_schema = if column_mapping_mode != ColumnMappingMode::None { - delta_kernel::schema::StructType::try_new( - self.snapshot - .snapshot() - .schema() - .fields() - .map(|field| field.make_physical(column_mapping_mode)), - )? - } else { - (*self.snapshot.snapshot().schema()).clone() - }; - let physical_arrow_schema: SchemaRef = Arc::new((&physical_delta_schema).try_into_arrow()?); - let file_schema = if column_mapping_mode != ColumnMappingMode::None { // Use the logical schema (which has logical names and physical name metadata) Arc::new(Schema::new( - logical_schema + logical_schema_for_scan .fields() .iter() .filter(|f| !table_partition_cols.contains(f.name())) @@ -619,9 +616,9 @@ impl<'a> DeltaScanBuilder<'a> { .collect::>(), )) } else { - // Use the physical schema for non-mapped tables + // Use the original schema for non-mapped tables Arc::new(Schema::new( - physical_arrow_schema + schema .fields() .iter() .filter(|f| !table_partition_cols.contains(f.name())) From 5b4b776cc73b6a93b664487b4611ba81a0d30b5f Mon Sep 17 00:00:00 2001 From: Jonatan Martens Date: Wed, 26 Nov 2025 10:35:49 +0200 Subject: [PATCH 5/7] Remove unnecessary code --- crates/core/src/delta_datafusion/schema_adapter.rs | 9 --------- crates/core/src/delta_datafusion/table_provider.rs | 2 -- 2 files changed, 11 deletions(-) diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index 2f9ea59111..aacc1b1dda 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -89,13 +89,11 @@ impl DeltaSchemaAdapter { table_field: &Field, file_schema: &Schema, ) -> Option { - // Try direct name match first if let Some((file_idx, _)) = file_schema.fields.find(table_field.name()) { return Some(file_idx); } let physical_name = self.get_physical_name(proj_idx, table_field)?; - // Try resolving via column mapping file_schema .fields .find(physical_name.as_str()) @@ -109,17 +107,10 @@ impl DeltaSchemaAdapter { ColumnMappingMode::Id => "delta.columnMapping.id", }; - // Check the projected field's metadata if let Some(physical_name) = table_field.metadata().get(key) { return Some(physical_name.clone()); } - // Fallback: check original field at same position (for aliased columns) - if proj_idx < self.table_schema.fields().len() { - let original_field = &self.table_schema.fields()[proj_idx]; - return original_field.metadata().get(key).cloned(); - } - None } diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 0e74cdc908..d600fe5143 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -606,7 +606,6 @@ impl<'a> DeltaScanBuilder<'a> { } let file_schema = if column_mapping_mode != ColumnMappingMode::None { - // Use the logical schema (which has logical names and physical name metadata) Arc::new(Schema::new( logical_schema_for_scan .fields() @@ -616,7 +615,6 @@ impl<'a> DeltaScanBuilder<'a> { .collect::>(), )) } else { - // Use the original schema for non-mapped tables Arc::new(Schema::new( schema .fields() From e7df46944f72dcc34b8b7cc7d7ff65be02d175dc Mon Sep 17 00:00:00 2001 From: Jonatan Martens Date: Wed, 26 Nov 2025 12:01:43 +0200 Subject: [PATCH 6/7] Retain previous flow when column mapping disabled --- .../src/delta_datafusion/schema_adapter.rs | 69 ++++++++++--------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index aacc1b1dda..b7aa35f95e 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -61,46 +61,58 @@ impl SchemaAdapter for DeltaSchemaAdapter { let mut projection = Vec::with_capacity(self.projected_table_schema.fields().len()); let mut logical_fields = Vec::new(); - for (proj_idx, table_field) in self.projected_table_schema.fields().iter().enumerate() { - let file_idx = self - .resolve_column(proj_idx, table_field, file_schema) - .ok_or_else(|| { - datafusion::error::DataFusionError::Internal(format!( - "Column '{}' not found in file schema", - table_field.name() - )) - })?; - - projection.push(file_idx); - logical_fields.push(self.strip_metadata(table_field)); + if self.column_mapping_mode == ColumnMappingMode::None { + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if self + .projected_table_schema + .fields + .find(file_field.name()) + .is_some() + { + projection.push(file_idx) + } + } + } else { + for table_field in self.projected_table_schema.fields().iter() { + let file_idx = self + .resolve_column(table_field, file_schema) + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal(format!( + "Column '{}' not found in file schema", + table_field.name(), + )) + })?; + + projection.push(file_idx); + logical_fields.push(self.strip_metadata(table_field)); + } } - let fields: Vec<_> = logical_fields.iter().map(|f| (**f).clone()).collect(); - let logical_schema = Arc::new(Schema::new(fields)); + let logical_schema = if self.column_mapping_mode == ColumnMappingMode::None { + self.projected_table_schema.clone() + } else { + let fields: Vec<_> = logical_fields.iter().map(|f| (**f).clone()).collect(); + Arc::new(Schema::new(fields)) + }; Ok((Arc::new(SchemaMapping { logical_schema }), projection)) } } impl DeltaSchemaAdapter { - fn resolve_column( - &self, - proj_idx: usize, - table_field: &Field, - file_schema: &Schema, - ) -> Option { + fn resolve_column(&self, table_field: &Field, file_schema: &Schema) -> Option { if let Some((file_idx, _)) = file_schema.fields.find(table_field.name()) { return Some(file_idx); } - let physical_name = self.get_physical_name(proj_idx, table_field)?; + let physical_name = self.get_physical_name(table_field)?; file_schema .fields .find(physical_name.as_str()) .map(|(idx, _)| idx) } - fn get_physical_name(&self, proj_idx: usize, table_field: &Field) -> Option { + fn get_physical_name(&self, table_field: &Field) -> Option { let key = match self.column_mapping_mode { ColumnMappingMode::None => return None, ColumnMappingMode::Name => "delta.columnMapping.physicalName", @@ -133,18 +145,7 @@ pub(crate) struct SchemaMapping { impl SchemaMapper for SchemaMapping { fn map_batch(&self, batch: RecordBatch) -> Result { - // Create a target schema using logical field names and types - // Only for columns that actually exist in the batch - let fields: Vec<_> = self - .logical_schema - .fields() - .iter() - .take(batch.num_columns()) - .map(|f| f.clone()) - .collect(); - let target_schema = Arc::new(Schema::new(fields)); - - let record_batch = cast_record_batch(&batch, target_schema, false, true)?; + let record_batch = cast_record_batch(&batch, self.logical_schema.clone(), false, true)?; Ok(record_batch) } From 4a53eccdf3e188f84eec29a5222c884d02ae3d3c Mon Sep 17 00:00:00 2001 From: Jonatan Martens Date: Thu, 27 Nov 2025 14:35:09 +0200 Subject: [PATCH 7/7] Fix map_batch --- .../src/delta_datafusion/schema_adapter.rs | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index b7aa35f95e..dc497860a3 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -145,8 +145,41 @@ pub(crate) struct SchemaMapping { impl SchemaMapper for SchemaMapping { fn map_batch(&self, batch: RecordBatch) -> Result { - let record_batch = cast_record_batch(&batch, self.logical_schema.clone(), false, true)?; - Ok(record_batch) + // Cast individual columns to match the target schema types + // This handles type differences like timestamp precision mismatches + let batch_columns = batch.columns().to_vec(); + let mut casted_columns = Vec::with_capacity(batch_columns.len()); + + for (col, target_field) in batch_columns + .iter() + .zip(self.logical_schema.fields().iter()) + { + if col.data_type() == target_field.data_type() { + // Types match, use column as-is + casted_columns.push(col.clone()); + } else { + // Types differ, cast the column + match cast_with_options(col, target_field.data_type(), &CastOptions::default()) { + Ok(casted) => casted_columns.push(casted), + Err(e) => { + return Err(datafusion::error::DataFusionError::Internal(format!( + "Failed to cast column '{}' from {} to {}: {}", + target_field.name(), + col.data_type(), + target_field.data_type(), + e + ))) + } + } + } + } + + RecordBatch::try_new_with_options( + self.logical_schema.clone(), + casted_columns, + &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())), + ) + .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None)) } fn map_column_statistics(