Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
checkpoint
  • Loading branch information
sungwy committed May 4, 2024
commit 0cad231cdeac7c9833e08289a2dca70a452fa2c8
25 changes: 25 additions & 0 deletions pyiceberg/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,31 @@ def __repr__(self) -> str:
"""Return the string representation of the MonthTransform class."""
return "MonthTransform()"

def pyarrow_transform(self, source: IcebergType) -> Callable:
import pyarrow as pa
import pyarrow.compute as pc

if isinstance(source, DateType):

def month_func(v: Any) -> int:
return pc.add(
pc.multiply(pc.years_between(pa.scalar(date(1970, 1, 1)), v), pa.scalar(12)),
pc.add(pc.month(v), pa.scalar(-1)),
)

elif isinstance(source, (TimestampType, TimestamptzType)):

def month_func(v: Any) -> int:
return pc.add(
pc.multiply(pc.years_between(pa.scalar(datetime(1970, 1, 1)), pc.local_timestamp(v)), pa.scalar(12)),
pc.add(pc.month(v), pa.scalar(-1)),
)

else:
raise ValueError(f"Cannot apply month transform for type: {source}")

return lambda v: month_func(v) if v is not None else None


class DayTransform(TimeTransform[S]):
"""Transforms a datetime value into a day value.
Expand Down
89 changes: 89 additions & 0 deletions tests/integration/test_writes/test_partitioned_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
# pylint:disable=redefined-outer-name

from datetime import date, datetime, timezone

import pyarrow as pa
import pytest
from pyspark.sql import SparkSession
Expand All @@ -36,6 +38,54 @@
from utils import TABLE_SCHEMA, _create_table


@pytest.fixture(scope="session")
def arrow_table_dates() -> pa.Table:
"""Pyarrow table with only null values."""
TEST_DATES = [date(2023, 12, 31), date(2024, 1, 1), date(2024, 1, 31), date(2024, 2, 1)]
return pa.Table.from_pydict(
{"dates": TEST_DATES},
schema=pa.schema([
("dates", pa.date32()),
]),
)


@pytest.fixture(scope="session")
def arrow_table_timestamp() -> pa.Table:
"""Pyarrow table with only null values."""
TEST_DATETIMES = [
datetime(2023, 12, 31, 0, 0, 0),
datetime(2024, 1, 1, 0, 0, 0),
datetime(2024, 1, 31, 0, 0, 0),
datetime(2024, 2, 1, 0, 0, 0),
datetime(2024, 2, 1, 6, 0, 0),
]
return pa.Table.from_pydict(
{"dates": TEST_DATETIMES},
schema=pa.schema([
("timestamp", pa.timestamp(unit="us")),
]),
)


@pytest.fixture(scope="session")
def arrow_table_timestamptz() -> pa.Table:
"""Pyarrow table with only null values."""
TEST_DATETIMES_WITH_TZ = [
datetime(2023, 12, 31, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 1, 31, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 2, 1, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 2, 1, 6, 0, 0, tzinfo=timezone.utc),
]
return pa.Table.from_pydict(
{"dates": TEST_DATETIMES_WITH_TZ},
schema=pa.schema([
("timestamptz", pa.timestamp(unit="us", tz="UTC")),
]),
)


@pytest.mark.integration
@pytest.mark.parametrize(
"part_col", ['int', 'bool', 'string', "string_long", "long", "float", "double", "date", 'timestamp', 'timestamptz', 'binary']
Expand Down Expand Up @@ -384,3 +434,42 @@ def test_unsupported_transform(

with pytest.raises(ValueError, match="All transforms are not supported.*"):
tbl.append(arrow_table_with_null)


@pytest.mark.integration
@pytest.mark.parametrize(
"part_col", ['int', 'bool', 'string', "string_long", "long", "float", "double", "date", "timestamptz", "timestamp", "binary"]
)
@pytest.mark.parametrize("format_version", [1, 2])
def test_append_time_transform_partitioned_table(
session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table, part_col: str, format_version: int
) -> None:
# Given
identifier = f"default.arrow_table_v{format_version}_appended_with_null_partitioned_on_col_{part_col}"
nested_field = TABLE_SCHEMA.find_field(part_col)
partition_spec = PartitionSpec(
PartitionField(source_id=nested_field.field_id, field_id=1001, transform=IdentityTransform(), name=part_col)
)

# When
tbl = _create_table(
session_catalog=session_catalog,
identifier=identifier,
properties={"format-version": str(format_version)},
data=[],
partition_spec=partition_spec,
)
# Append with arrow_table_1 with lines [A,B,C] and then arrow_table_2 with lines[A,B,C,A,B,C]
tbl.append(arrow_table_with_null)
tbl.append(pa.concat_tables([arrow_table_with_null, arrow_table_with_null]))

# Then
assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}"
df = spark.table(identifier)
for col in TEST_DATA_WITH_NULL.keys():
df = spark.table(identifier)
assert df.where(f"{col} is not null").count() == 6, f"Expected 6 non-null rows for {col}"
assert df.where(f"{col} is null").count() == 3, f"Expected 3 null rows for {col}"
# expecting 6 files: first append with [A], [B], [C], second append with [A, A], [B, B], [C, C]
rows = spark.sql(f"select partition from {identifier}.files").collect()
assert len(rows) == 6