Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix: Cast smaller integer types to int32/int64 on write for Spark com…
…patibility

When writing PyArrow tables with smaller integer types (uint8, int8,
int16, uint16) to Iceberg tables with IntegerType columns, PyIceberg
preserves the original Arrow type in the Parquet file. This causes
Spark to fail with:

    java.lang.UnsupportedOperationException: Unsupported logical type: UINT_8

The fix casts smaller integer types to their canonical Iceberg
representation (int32 for IntegerType, int64 for LongType) during
write, ensuring cross-platform compatibility.

Only widening conversions are allowed - narrowing conversions (e.g.,
int64 to int32) continue to be rejected via the existing promote()
function.

Closes #2791
  • Loading branch information
Somasundaram Sekar committed Dec 8, 2025
commit 9f8b7b7a44c0b0e9625c53c37948e5f368938887
9 changes: 9 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1903,6 +1903,15 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:
return values.cast(target_type)
raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}")
elif isinstance(field.field_type, (IntegerType, LongType)):
# Cast smaller integer types to target type for cross-platform compatibility
# Only allow widening conversions (smaller bit width to larger)
# Narrowing conversions fall through to promote() handling below
if pa.types.is_integer(values.type):
source_width = values.type.bit_width
target_width = target_type.bit_width
if source_width < target_width:
return values.cast(target_type)

if field.field_type != file_field.field_type:
target_schema = schema_to_pyarrow(
Expand Down
32 changes: 32 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2716,6 +2716,38 @@ def test__to_requested_schema_timestamps_without_downcast_raises_exception(
assert "Unsupported schema projection from timestamp[ns] to timestamp[us]" in str(exc_info.value)


@pytest.mark.parametrize(
"arrow_type,iceberg_type,expected_arrow_type",
[
(pa.uint8(), IntegerType(), pa.int32()),
(pa.int8(), IntegerType(), pa.int32()),
(pa.int16(), IntegerType(), pa.int32()),
(pa.uint16(), IntegerType(), pa.int32()),
(pa.uint32(), LongType(), pa.int64()),
(pa.int32(), LongType(), pa.int64()),
],
)
def test__to_requested_schema_integer_promotion(
arrow_type: pa.DataType,
iceberg_type: PrimitiveType,
expected_arrow_type: pa.DataType,
) -> None:
"""Test that smaller integer types are cast to target Iceberg type during write."""
requested_schema = Schema(NestedField(1, "col", iceberg_type, required=False))
file_schema = requested_schema

arrow_schema = pa.schema([pa.field("col", arrow_type)])
data = pa.array([1, 2, 3, None], type=arrow_type)
batch = pa.RecordBatch.from_arrays([data], schema=arrow_schema)

result = _to_requested_schema(
requested_schema, file_schema, batch, downcast_ns_timestamp_to_us=False, include_field_ids=False
)

assert result.schema[0].type == expected_arrow_type
assert result.column(0).to_pylist() == [1, 2, 3, None]


def test_pyarrow_file_io_fs_by_scheme_cache() -> None:
# It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region`
# Refer to: https://github.com/apache/arrow/issues/43713
Expand Down