Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 131 additions & 22 deletions crates/core/src/delta_datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

use crate::kernel::schema::cast_record_batch;

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / build-deploy

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.9

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / check

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / cloud (azure)

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `crate::kernel::schema::cast_record_batch`

Check warning on line 5 in crates/core/src/delta_datafusion/schema_adapter.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `crate::kernel::schema::cast_record_batch`
use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use arrow_array::{RecordBatch, RecordBatchOptions};
use arrow_cast::{cast_with_options, CastOptions};
use arrow_schema::{Field, Schema, SchemaRef};
use datafusion::common::{not_impl_err, ColumnStatistics, Result};
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
use delta_kernel::table_features::ColumnMappingMode;

/// A Schema Adapter Factory which provides casting record batches from parquet to meet
/// delta lake conventions.
#[derive(Debug)]
pub(crate) struct DeltaSchemaAdapterFactory {}
pub(crate) struct DeltaSchemaAdapterFactory {
pub column_mapping_mode: ColumnMappingMode,
}

impl SchemaAdapterFactory for DeltaSchemaAdapterFactory {
fn create(
Expand All @@ -21,6 +26,7 @@
Box::new(DeltaSchemaAdapter {
projected_table_schema,
table_schema,
column_mapping_mode: self.column_mapping_mode,
})
}
}
Expand All @@ -31,46 +37,149 @@
projected_table_schema: SchemaRef,
/// Schema for the table
table_schema: SchemaRef,
column_mapping_mode: ColumnMappingMode,
}

impl SchemaAdapter for DeltaSchemaAdapter {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)

let physical_key_name = match self.column_mapping_mode {
ColumnMappingMode::None => return Some(file_schema.fields.find(field.name())?.0),
ColumnMappingMode::Name => "delta.columnMapping.physicalName",
ColumnMappingMode::Id => "delta.columnMapping.id",
};

field
.metadata()
.get(physical_key_name)
.and_then(|physical_name| file_schema.fields.find(physical_name.as_str()))
.map(|(field_id, _)| field_id)
}

fn map_schema(&self, file_schema: &Schema) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if self
.projected_table_schema
.fields()
.find(file_field.name())
.is_some()
{
let mut projection = Vec::with_capacity(self.projected_table_schema.fields().len());
let mut logical_fields = Vec::new();

if self.column_mapping_mode == ColumnMappingMode::None {
for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if self
.projected_table_schema
.fields
.find(file_field.name())
.is_some()
{
projection.push(file_idx)
}
}
} else {
for table_field in self.projected_table_schema.fields().iter() {
let file_idx = self
.resolve_column(table_field, file_schema)
.ok_or_else(|| {
datafusion::error::DataFusionError::Internal(format!(
"Column '{}' not found in file schema",
table_field.name(),
))
})?;

projection.push(file_idx);
logical_fields.push(self.strip_metadata(table_field));
}
}

Ok((
Arc::new(SchemaMapping {
projected_schema: self.projected_table_schema.clone(),
}),
projection,
))
let logical_schema = if self.column_mapping_mode == ColumnMappingMode::None {
self.projected_table_schema.clone()
} else {
let fields: Vec<_> = logical_fields.iter().map(|f| (**f).clone()).collect();
Arc::new(Schema::new(fields))
};

Ok((Arc::new(SchemaMapping { logical_schema }), projection))
}
}

impl DeltaSchemaAdapter {
fn resolve_column(&self, table_field: &Field, file_schema: &Schema) -> Option<usize> {
if let Some((file_idx, _)) = file_schema.fields.find(table_field.name()) {
return Some(file_idx);
}

let physical_name = self.get_physical_name(table_field)?;
file_schema
.fields
.find(physical_name.as_str())
.map(|(idx, _)| idx)
}

fn get_physical_name(&self, table_field: &Field) -> Option<String> {
let key = match self.column_mapping_mode {
ColumnMappingMode::None => return None,
ColumnMappingMode::Name => "delta.columnMapping.physicalName",
ColumnMappingMode::Id => "delta.columnMapping.id",
};

if let Some(physical_name) = table_field.metadata().get(key) {
return Some(physical_name.clone());
}

None
}

fn strip_metadata(&self, field: &Field) -> Arc<Field> {
let stripped_metadata: HashMap<String, String> = field
.metadata()
.iter()
.filter(|(k, _)| !k.starts_with("delta.columnMapping"))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

Arc::new(field.clone().with_metadata(stripped_metadata))
}
}

#[derive(Debug)]
pub(crate) struct SchemaMapping {
projected_schema: SchemaRef,
logical_schema: SchemaRef,
}

impl SchemaMapper for SchemaMapping {
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?;
Ok(record_batch)
// Cast individual columns to match the target schema types
// This handles type differences like timestamp precision mismatches
let batch_columns = batch.columns().to_vec();
let mut casted_columns = Vec::with_capacity(batch_columns.len());

for (col, target_field) in batch_columns
.iter()
.zip(self.logical_schema.fields().iter())
{
if col.data_type() == target_field.data_type() {
// Types match, use column as-is
casted_columns.push(col.clone());
} else {
// Types differ, cast the column
match cast_with_options(col, target_field.data_type(), &CastOptions::default()) {
Ok(casted) => casted_columns.push(casted),
Err(e) => {
return Err(datafusion::error::DataFusionError::Internal(format!(
"Failed to cast column '{}' from {} to {}: {}",
target_field.name(),
col.data_type(),
target_field.data_type(),
e
)))
}
}
}
}

RecordBatch::try_new_with_options(
self.logical_schema.clone(),
casted_columns,
&RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
)
.map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
}

fn map_column_statistics(
Expand Down
47 changes: 37 additions & 10 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
prelude::Expr,
scalar::ScalarValue,
};
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / build-deploy

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.9

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / check

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / cloud (azure)

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`

Check warning on line 46 in crates/core/src/delta_datafusion/table_provider.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `delta_kernel::engine::arrow_conversion::TryIntoArrow as _`
use delta_kernel::table_features::ColumnMappingMode;
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
use futures::StreamExt as _;
use itertools::Itertools;
Expand Down Expand Up @@ -437,6 +439,8 @@
Some(schema.clone()),
)?;

let column_mapping_mode = self.snapshot.table_configuration().column_mapping_mode();

let logical_schema = if let Some(used_columns) = self.projection {
let mut fields = Vec::with_capacity(used_columns.len());
for idx in used_columns {
Expand All @@ -457,7 +461,17 @@
logical_schema
};

let df_schema = logical_schema.clone().to_dfschema()?;
let logical_schema_for_scan = if column_mapping_mode != ColumnMappingMode::None {
df_logical_schema(
self.snapshot,
&config.file_column_name,
Some(schema.clone()),
)?
} else {
logical_schema.clone()
};

let df_schema = logical_schema_for_scan.clone().to_dfschema()?;

let logical_filter = self
.filter
Expand Down Expand Up @@ -591,14 +605,25 @@
.push(part);
}

let file_schema = Arc::new(Schema::new(
schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect::<Vec<arrow::datatypes::FieldRef>>(),
));
let file_schema = if column_mapping_mode != ColumnMappingMode::None {
Arc::new(Schema::new(
logical_schema_for_scan
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect::<Vec<arrow::datatypes::FieldRef>>(),
))
} else {
Arc::new(Schema::new(
schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect::<Vec<arrow::datatypes::FieldRef>>(),
))
};

let mut table_partition_cols = table_partition_cols
.iter()
Expand Down Expand Up @@ -660,7 +685,9 @@
}
};
let file_source =
file_source.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))?;
file_source.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {
column_mapping_mode,
}))?;

let file_scan_config =
FileScanConfigBuilder::new(self.log_store.object_store_url(), file_schema, file_source)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/transaction/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl ProtocolChecker {
pub static INSTANCE: LazyLock<ProtocolChecker> = LazyLock::new(|| {
let mut reader_features = HashSet::new();
reader_features.insert(ReaderFeature::TimestampWithoutTimezone);
// reader_features.insert(ReaderFeature::ColumnMapping);
reader_features.insert(ReaderFeature::ColumnMapping);

let mut writer_features = HashSet::new();
writer_features.insert(WriterFeature::AppendOnly);
Expand Down
11 changes: 1 addition & 10 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@

MAX_SUPPORTED_READER_VERSION = 3
NOT_SUPPORTED_READER_VERSION = 2
SUPPORTED_READER_FEATURES = {"timestampNtz"}
SUPPORTED_READER_FEATURES = {"timestampNtz", "columnMapping"}

FSCK_METRICS_FILES_REMOVED_LABEL = "files_removed"

Expand Down Expand Up @@ -922,15 +922,6 @@ def to_pyarrow_dataset(
"but these are not yet supported by the deltalake reader."
)

if (
table_protocol.reader_features
and "columnMapping" in table_protocol.reader_features
):
raise DeltaProtocolError(
"The table requires reader feature 'columnMapping' "
"but this is not supported using pyarrow Datasets."
)

if (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removed? The pr itself doesn't provide support for reading column mapping with pyarrow

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My primary use case is loading delta tables with datafusion (in python not rust), so must have missed it.
Will look into adding support for reading with pyarrow

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have opened a pr to arrow apache/arrow#48289, once that is merged we can support column mapping in datasets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worthwhile to get this merged in and throw an error on pyarrow usage, I have no clue what percentage of using pyarrow but I think it's likely considered legacy

table_protocol.reader_features
and "deletionVectors" in table_protocol.reader_features
Expand Down
Loading