Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
checkpoint2
  • Loading branch information
sungwy committed May 5, 2024
commit 96e55334d95f7f3d7aea6f6d8c220b2d1a7aa73d
78 changes: 60 additions & 18 deletions pyiceberg/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from abc import ABC, abstractmethod
from enum import IntEnum
from functools import singledispatch
from typing import Any, Callable, Generic, Optional, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar
from typing import Literal as LiteralType
from uuid import UUID

Expand Down Expand Up @@ -82,6 +82,9 @@
from pyiceberg.utils.parsing import ParseNumberFromBrackets
from pyiceberg.utils.singleton import Singleton

if TYPE_CHECKING:
import pyarrow as pa

S = TypeVar("S")
T = TypeVar("T")

Expand Down Expand Up @@ -391,6 +394,21 @@ def __repr__(self) -> str:
"""Return the string representation of the YearTransform class."""
return "YearTransform()"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
import pyarrow as pa
import pyarrow.compute as pc

if isinstance(source, DateType):
epoch = datetime.EPOCH_DATE
elif isinstance(source, TimestampType):
epoch = datetime.EPOCH_TIMESTAMP
elif isinstance(source, TimestamptzType):
epoch = datetime.EPOCH_TIMESTAMPTZ
else:
raise ValueError(f"Cannot apply year transform for type: {source}")

return lambda v: pc.years_between(pa.scalar(epoch), v) if v is not None else None


class MonthTransform(TimeTransform[S]):
"""Transforms a datetime value into a month value.
Expand Down Expand Up @@ -433,29 +451,25 @@ def __repr__(self) -> str:
"""Return the string representation of the MonthTransform class."""
return "MonthTransform()"

def pyarrow_transform(self, source: IcebergType) -> Callable:
def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
import pyarrow as pa
import pyarrow.compute as pc

if isinstance(source, DateType):

def month_func(v: Any) -> int:
return pc.add(
pc.multiply(pc.years_between(pa.scalar(date(1970, 1, 1)), v), pa.scalar(12)),
pc.add(pc.month(v), pa.scalar(-1)),
)

elif isinstance(source, (TimestampType, TimestamptzType)):

def month_func(v: Any) -> int:
return pc.add(
pc.multiply(pc.years_between(pa.scalar(datetime(1970, 1, 1)), pc.local_timestamp(v)), pa.scalar(12)),
pc.add(pc.month(v), pa.scalar(-1)),
)

if isinstance(source, DateType):
epoch = datetime.EPOCH_DATE
elif isinstance(source, TimestampType):
epoch = datetime.EPOCH_TIMESTAMP
elif isinstance(source, TimestamptzType):
epoch = datetime.EPOCH_TIMESTAMPTZ
else:
raise ValueError(f"Cannot apply month transform for type: {source}")

def month_func(v: pa.Array) -> pa.Array:
return pc.add(
pc.multiply(pc.years_between(pa.scalar(epoch), v), pa.scalar(12)),
pc.add(pc.month(v), pa.scalar(-1)),
)

return lambda v: month_func(v) if v is not None else None


Expand Down Expand Up @@ -503,6 +517,21 @@ def __repr__(self) -> str:
"""Return the string representation of the DayTransform class."""
return "DayTransform()"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
import pyarrow as pa
import pyarrow.compute as pc

if isinstance(source, DateType):
epoch = datetime.EPOCH_DATE
elif isinstance(source, TimestampType):
epoch = datetime.EPOCH_TIMESTAMP
elif isinstance(source, TimestamptzType):
epoch = datetime.EPOCH_TIMESTAMPTZ
else:
raise ValueError(f"Cannot apply day transform for type: {source}")

return lambda v: pc.days_between(pa.scalar(epoch), v) if v is not None else None


class HourTransform(TimeTransform[S]):
"""Transforms a datetime value into a hour value.
Expand Down Expand Up @@ -540,6 +569,19 @@ def __repr__(self) -> str:
"""Return the string representation of the HourTransform class."""
return "HourTransform()"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
import pyarrow as pa
import pyarrow.compute as pc

if isinstance(source, TimestampType):
epoch = datetime.EPOCH_TIMESTAMP
elif isinstance(source, TimestamptzType):
epoch = datetime.EPOCH_TIMESTAMPTZ
else:
raise ValueError(f"Cannot apply month transform for type: {source}")

return lambda v: pc.hours_between(pa.scalar(epoch), v) if v is not None else None


def _base64encode(buffer: bytes) -> str:
"""Convert bytes to base64 string."""
Expand Down
76 changes: 12 additions & 64 deletions tests/integration/test_writes/test_partitioned_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
# under the License.
# pylint:disable=redefined-outer-name

from datetime import date, datetime, timezone

import pyarrow as pa
import pytest
from pyspark.sql import SparkSession
from typing import Any

from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
Expand All @@ -31,61 +31,14 @@
HourTransform,
IdentityTransform,
MonthTransform,
Transform,
TruncateTransform,
YearTransform,
)
from tests.conftest import TEST_DATA_WITH_NULL
from utils import TABLE_SCHEMA, _create_table


@pytest.fixture(scope="session")
def arrow_table_dates() -> pa.Table:
"""Pyarrow table with only null values."""
TEST_DATES = [date(2023, 12, 31), date(2024, 1, 1), date(2024, 1, 31), date(2024, 2, 1)]
return pa.Table.from_pydict(
{"dates": TEST_DATES},
schema=pa.schema([
("dates", pa.date32()),
]),
)


@pytest.fixture(scope="session")
def arrow_table_timestamp() -> pa.Table:
"""Pyarrow table with only null values."""
TEST_DATETIMES = [
datetime(2023, 12, 31, 0, 0, 0),
datetime(2024, 1, 1, 0, 0, 0),
datetime(2024, 1, 31, 0, 0, 0),
datetime(2024, 2, 1, 0, 0, 0),
datetime(2024, 2, 1, 6, 0, 0),
]
return pa.Table.from_pydict(
{"dates": TEST_DATETIMES},
schema=pa.schema([
("timestamp", pa.timestamp(unit="us")),
]),
)


@pytest.fixture(scope="session")
def arrow_table_timestamptz() -> pa.Table:
"""Pyarrow table with only null values."""
TEST_DATETIMES_WITH_TZ = [
datetime(2023, 12, 31, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 1, 31, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 2, 1, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 2, 1, 6, 0, 0, tzinfo=timezone.utc),
]
return pa.Table.from_pydict(
{"dates": TEST_DATETIMES_WITH_TZ},
schema=pa.schema([
("timestamptz", pa.timestamp(unit="us", tz="UTC")),
]),
)


@pytest.mark.integration
@pytest.mark.parametrize(
"part_col", ['int', 'bool', 'string', "string_long", "long", "float", "double", "date", 'timestamp', 'timestamptz', 'binary']
Expand Down Expand Up @@ -437,39 +390,34 @@ def test_unsupported_transform(


@pytest.mark.integration
@pytest.mark.parametrize('transform', [YearTransform(), MonthTransform(), DayTransform()])
@pytest.mark.parametrize(
"part_col", ['int', 'bool', 'string', "string_long", "long", "float", "double", "date", "timestamptz", "timestamp", "binary"]
"part_col", ["date", "timestamp", "timestamptz"]
)
@pytest.mark.parametrize("format_version", [1, 2])
def test_append_time_transform_partitioned_table(
session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table, part_col: str, format_version: int
def test_append_ymd_transform_partitioned(
session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table, transform: Transform[Any, Any], part_col: str, format_version: int
) -> None:
# Given
identifier = f"default.arrow_table_v{format_version}_appended_with_null_partitioned_on_col_{part_col}"
identifier = f"default.arrow_table_v{format_version}_with_ymd_transform_partitioned_on_col_{part_col}"
nested_field = TABLE_SCHEMA.find_field(part_col)
partition_spec = PartitionSpec(
PartitionField(source_id=nested_field.field_id, field_id=1001, transform=IdentityTransform(), name=part_col)
PartitionField(source_id=nested_field.field_id, field_id=1001, transform=transform, name=part_col)
)

# When
tbl = _create_table(
session_catalog=session_catalog,
identifier=identifier,
properties={"format-version": str(format_version)},
data=[],
data=[arrow_table_with_null],
partition_spec=partition_spec,
)
# Append with arrow_table_1 with lines [A,B,C] and then arrow_table_2 with lines[A,B,C,A,B,C]
tbl.append(arrow_table_with_null)
tbl.append(pa.concat_tables([arrow_table_with_null, arrow_table_with_null]))

# Then
assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}"
df = spark.table(identifier)
assert df.count() == 3, f"Expected 3 total rows for {identifier}"
for col in TEST_DATA_WITH_NULL.keys():
df = spark.table(identifier)
assert df.where(f"{col} is not null").count() == 6, f"Expected 6 non-null rows for {col}"
assert df.where(f"{col} is null").count() == 3, f"Expected 3 null rows for {col}"
# expecting 6 files: first append with [A], [B], [C], second append with [A, A], [B, B], [C, C]
rows = spark.sql(f"select partition from {identifier}.files").collect()
assert len(rows) == 6
assert df.where(f"{col} is not null").count() == 2, f"Expected 2 non-null rows for {col}"
assert df.where(f"{col} is null").count() == 1, f"Expected 1 null row for {col} is null"
71 changes: 69 additions & 2 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=eval-used,protected-access,redefined-outer-name
from datetime import date
from datetime import date, datetime, timezone
from decimal import Decimal
from typing import Any, Callable, Optional
from typing import TYPE_CHECKING, Any, Callable, Optional
from uuid import UUID

import mmh3 as mmh3
Expand Down Expand Up @@ -69,6 +69,7 @@
TimestampLiteral,
literal,
)
from pyiceberg.partitioning import _to_partition_representation
from pyiceberg.schema import Accessor
from pyiceberg.transforms import (
BucketTransform,
Expand Down Expand Up @@ -111,6 +112,9 @@
timestamptz_to_micros,
)

if TYPE_CHECKING:
import pyarrow as pa


@pytest.mark.parametrize(
"test_input,test_type,expected",
Expand Down Expand Up @@ -1808,3 +1812,66 @@ def test_strict_binary(bound_reference_binary: BoundReference[str]) -> None:
_test_projection(
lhs=transform.strict_project(name="name", pred=BoundIn(term=bound_reference_binary, literals=set_of_literals)), rhs=None
)


@pytest.fixture(scope="session")
def arrow_table_date_timestamps() -> "pa.Table":
"""Pyarrow table with only date, timestamp and timestamptz values."""
import pyarrow as pa

return pa.Table.from_pydict(
{
"date": [date(2023, 12, 31), date(2024, 1, 1), date(2024, 1, 31), date(2024, 2, 1), date(2024, 2, 1), None],
"timestamp": [
datetime(2023, 12, 31, 0, 0, 0),
datetime(2024, 1, 1, 0, 0, 0),
datetime(2024, 1, 31, 0, 0, 0),
datetime(2024, 2, 1, 0, 0, 0),
datetime(2024, 2, 1, 6, 0, 0),
None,
],
"timestamptz": [
datetime(2023, 12, 31, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 1, 31, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 2, 1, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 2, 1, 6, 0, 0, tzinfo=timezone.utc),
None,
],
},
schema=pa.schema([
("date", pa.date32()),
("timestamp", pa.timestamp(unit="us")),
("timestamptz", pa.timestamp(unit="us", tz="UTC")),
]),
)


@pytest.mark.parametrize('transform', [YearTransform(), MonthTransform(), DayTransform()])
@pytest.mark.parametrize(
"source_col, source_type", [("date", DateType()), ("timestamp", TimestampType()), ("timestamptz", TimestamptzType())]
)
def test_ymd_pyarrow_transforms(
arrow_table_date_timestamps: "pa.Table",
source_col: str,
source_type: PrimitiveType,
transform: Transform[Any, Any],
) -> None:
assert transform.pyarrow_transform(source_type)(arrow_table_date_timestamps[source_col]).to_pylist() == [
transform.transform(source_type)(_to_partition_representation(source_type, v))
for v in arrow_table_date_timestamps[source_col].to_pylist()
]


@pytest.mark.parametrize("source_col, source_type", [("timestamp", TimestampType()), ("timestamptz", TimestamptzType())])
def test_hour_pyarrow_transforms(arrow_table_date_timestamps: "pa.Table", source_col: str, source_type: PrimitiveType) -> None:
assert HourTransform().pyarrow_transform(source_type)(arrow_table_date_timestamps[source_col]).to_pylist() == [
HourTransform().transform(source_type)(_to_partition_representation(source_type, v))
for v in arrow_table_date_timestamps[source_col].to_pylist()
]


def test_hour_pyarrow_transforms_throws_with_dates(arrow_table_date_timestamps: "pa.Table") -> None:
# HourTransform is not supported for DateType
with pytest.raises(ValueError):
HourTransform().pyarrow_transform(DateType())(arrow_table_date_timestamps["date"])