Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Retain previous flow when column mapping disabled
  • Loading branch information
Jonatan Martens committed Nov 26, 2025
commit e7df46944f72dcc34b8b7cc7d7ff65be02d175dc
69 changes: 35 additions & 34 deletions crates/core/src/delta_datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,46 +61,58 @@ impl SchemaAdapter for DeltaSchemaAdapter {
let mut projection = Vec::with_capacity(self.projected_table_schema.fields().len());
let mut logical_fields = Vec::new();

for (proj_idx, table_field) in self.projected_table_schema.fields().iter().enumerate() {
let file_idx = self
.resolve_column(proj_idx, table_field, file_schema)
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(format!(
"Column '{}' not found in file schema",
table_field.name()
))
})?;

projection.push(file_idx);
logical_fields.push(self.strip_metadata(table_field));
if self.column_mapping_mode == ColumnMappingMode::None {
for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if self
.projected_table_schema
.fields
.find(file_field.name())
.is_some()
{
projection.push(file_idx)
}
}
} else {
for table_field in self.projected_table_schema.fields().iter() {
let file_idx = self
.resolve_column(table_field, file_schema)
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(format!(
"Column '{}' not found in file schema",
table_field.name(),
))
})?;

projection.push(file_idx);
logical_fields.push(self.strip_metadata(table_field));
}
}

let fields: Vec<_> = logical_fields.iter().map(|f| (**f).clone()).collect();
let logical_schema = Arc::new(Schema::new(fields));
let logical_schema = if self.column_mapping_mode == ColumnMappingMode::None {
self.projected_table_schema.clone()
} else {
let fields: Vec<_> = logical_fields.iter().map(|f| (**f).clone()).collect();
Arc::new(Schema::new(fields))
};

Ok((Arc::new(SchemaMapping { logical_schema }), projection))
}
}

impl DeltaSchemaAdapter {
fn resolve_column(
&self,
proj_idx: usize,
table_field: &Field,
file_schema: &Schema,
) -> Option<usize> {
fn resolve_column(&self, table_field: &Field, file_schema: &Schema) -> Option<usize> {
if let Some((file_idx, _)) = file_schema.fields.find(table_field.name()) {
return Some(file_idx);
}

let physical_name = self.get_physical_name(proj_idx, table_field)?;
let physical_name = self.get_physical_name(table_field)?;
file_schema
.fields
.find(physical_name.as_str())
.map(|(idx, _)| idx)
}

fn get_physical_name(&self, proj_idx: usize, table_field: &Field) -> Option<String> {
fn get_physical_name(&self, table_field: &Field) -> Option<String> {
let key = match self.column_mapping_mode {
ColumnMappingMode::None => return None,
ColumnMappingMode::Name => "delta.columnMapping.physicalName",
Expand Down Expand Up @@ -133,18 +145,7 @@ pub(crate) struct SchemaMapping {

impl SchemaMapper for SchemaMapping {
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
// Create a target schema using logical field names and types
// Only for columns that actually exist in the batch
let fields: Vec<_> = self
.logical_schema
.fields()
.iter()
.take(batch.num_columns())
.map(|f| f.clone())
.collect();
let target_schema = Arc::new(Schema::new(fields));

let record_batch = cast_record_batch(&batch, target_schema, false, true)?;
let record_batch = cast_record_batch(&batch, self.logical_schema.clone(), false, true)?;
Ok(record_batch)
}

Expand Down