Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add entries metadata table
  • Loading branch information
Fokko committed Mar 27, 2024
commit a8d1a04844d7be6e2a997bea84a80b481cd4f319
100 changes: 99 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import pyiceberg.expressions.parser as parser
import pyiceberg.expressions.visitors as visitors
from pyiceberg.conversions import from_bytes
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.expressions import (
AlwaysTrue,
Expand Down Expand Up @@ -138,7 +139,6 @@

from pyiceberg.catalog import Catalog


ALWAYS_TRUE = AlwaysTrue()
TABLE_ROOT_ID = -1

Expand Down Expand Up @@ -3081,3 +3081,101 @@ def snapshots(self) -> "pa.Table":
snapshots,
schema=snapshots_schema,
)

def entries(self) -> "pa.Table":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow

schema = self.tbl.metadata.schema()

readable_metrics_struct = []

def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa_bound_type = schema_to_pyarrow(bound_type)
return pa.struct([
pa.field("column_size", pa.int64(), nullable=True),
pa.field("value_count", pa.int64(), nullable=True),
pa.field("null_value_count", pa.int64(), nullable=True),
pa.field("nan_value_count", pa.int64(), nullable=True),
pa.field("lower_bound", pa_bound_type, nullable=True),
pa.field("upper_bound", pa_bound_type, nullable=True),
])

for field in self.tbl.metadata.schema().fields:
readable_metrics_struct.append(
pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False)
)

pa_record_struct = schema_to_pyarrow(self.tbl.metadata.specs_struct())

entries_schema = pa.schema([
pa.field('status', pa.int8(), nullable=False),
pa.field('snapshot_id', pa.int64(), nullable=False),
pa.field('sequence_number', pa.int64(), nullable=False),
pa.field('file_sequence_number', pa.int64(), nullable=False),
pa.field(
'data_file',
pa.struct([
pa.field('content', pa.int8(), nullable=False),
pa.field('file_path', pa.string(), nullable=False),
pa.field('file_format', pa.string(), nullable=False),
pa.field('partition', pa_record_struct, nullable=False),
pa.field('record_count', pa.int64(), nullable=False),
pa.field('file_size_in_bytes', pa.int64(), nullable=False),
pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('null_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('nan_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field('key_metadata', pa.binary(), nullable=True),
pa.field('split_offsets', pa.list_(pa.int64()), nullable=True),
pa.field('equality_ids', pa.list_(pa.int32()), nullable=True),
pa.field('sort_order_id', pa.int32(), nullable=True),
]),
nullable=False,
),
pa.field('readable_metrics', pa.struct(readable_metrics_struct), nullable=True),
])

entries = []
if snapshot := self.tbl.metadata.current_snapshot():
for manifests in snapshot.manifests(self.tbl.io):
for entry in manifests.fetch_manifest_entry(io=self.tbl.io):
column_sizes = entry.data_file.column_sizes or {}
value_counts = entry.data_file.value_counts or {}
null_value_counts = entry.data_file.null_value_counts or {}
nan_value_counts = entry.data_file.nan_value_counts or {}
lower_bounds = entry.data_file.lower_bounds or {}
upper_bounds = entry.data_file.upper_bounds or {}
readable_metrics = {
schema.find_column_name(field.field_id): {
"column_size": column_sizes.get(field.field_id),
"value_count": value_counts.get(field.field_id),
"null_value_count": null_value_counts.get(field.field_id),
"nan_value_count": nan_value_counts.get(field.field_id),
# Makes them readable
"lower_bound": from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
else None,
"upper_bound": from_bytes(field.field_type, upper_bound)
if (upper_bound := upper_bounds.get(field.field_id))
else None,
}
for field in self.tbl.metadata.schema().fields
}

entries.append({
'status': entry.status.value,
'snapshot_id': entry.snapshot_id,
'sequence_number': entry.data_sequence_number,
'file_sequence_number': entry.file_sequence_number,
'data_file': entry.data_file.__dict__,
'readable_metrics': readable_metrics,
})

return pa.Table.from_pylist(
entries,
schema=entries_schema,
)
27 changes: 26 additions & 1 deletion pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
IcebergRootModel,
Properties,
)
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.types import NestedField, StructType, transform_dict_value_to_str
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import datetime_to_millis

Expand Down Expand Up @@ -245,6 +245,31 @@ def specs(self) -> Dict[int, PartitionSpec]:
"""Return a dict the partition specs this table."""
return {spec.spec_id: spec for spec in self.partition_specs}

def specs_struct(self) -> StructType:
"""Produce a struct of all the combined PartitionSpecs.

The partition fields should be optional: Partition fields may be added later,
in which case not all files would have the result field, and it may be null.

:return: A StructType that represents all the combined PartitionSpecs of the table
"""
specs = self.specs()

# Collect all the fields
struct_fields = {field.field_id: field for spec in specs.values() for field in spec.fields}

schema = self.schema()

nested_fields = []
# Sort them by field_id in order to get a deterministic output
for field_id in sorted(struct_fields):
field = struct_fields[field_id]
source_type = schema.find_type(field.source_id)
result_type = field.transform.result_type(source_type)
nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False))

return StructType(*nested_fields)

def new_snapshot_id(self) -> int:
"""Generate a new snapshot-id that's not in use."""
snapshot_id = _generate_snapshot_id()
Expand Down
20 changes: 20 additions & 0 deletions pyiceberg/typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from abc import abstractmethod
from decimal import Decimal
from enum import Enum
from functools import lru_cache
from typing import (
TYPE_CHECKING,
Expand All @@ -38,6 +39,8 @@

from pydantic import BaseModel, ConfigDict, RootModel

from pyiceberg.utils.lazydict import LazyDict

if TYPE_CHECKING:
from pyiceberg.types import StructType

Expand Down Expand Up @@ -159,6 +162,18 @@ def _get_struct_fields(struct_type: StructType) -> Tuple[str, ...]:
return tuple([field.name for field in struct_type.fields])


def _unwrap(r: Any) -> Any:
if isinstance(r, Record):
return r.__dict__
elif isinstance(r, Enum):
return r.value
elif isinstance(r, LazyDict):
# Arrow does not work well with the LazyDict
return dict(r)
else:
return r


class Record(StructProtocol):
__slots__ = ("_position_to_field_name",)
_position_to_field_name: Tuple[str, ...]
Expand Down Expand Up @@ -199,3 +214,8 @@ def __repr__(self) -> str:
def record_fields(self) -> List[str]:
"""Return values of all the fields of the Record class except those specified in skip_fields."""
return [self.__getattribute__(v) if hasattr(self, v) else None for v in self._position_to_field_name]

@property
def __dict__(self) -> Dict[str, Any]: # type: ignore
"""Returns a non-lazy dictionary of the Record class."""
return {v: _unwrap(self.__getattribute__(v)) if hasattr(self, v) else None for v in self._position_to_field_name}
59 changes: 58 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import socket
import string
import uuid
from datetime import datetime
from datetime import date, datetime
from pathlib import Path
from random import choice
from tempfile import TemporaryDirectory
Expand Down Expand Up @@ -1987,3 +1987,60 @@ def spark() -> SparkSession:
)

return spark


TEST_DATA_WITH_NULL = {
'bool': [False, None, True],
'string': ['a', None, 'z'],
# Go over the 16 bytes to kick in truncation
'string_long': ['a' * 22, None, 'z' * 22],
'int': [1, None, 9],
'long': [1, None, 9],
'float': [0.0, None, 0.9],
'double': [0.0, None, 0.9],
'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
'timestamptz': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
'date': [date(2023, 1, 1), None, date(2023, 3, 1)],
# Not supported by Spark
# 'time': [time(1, 22, 0), None, time(19, 25, 0)],
# Not natively supported by Arrow
# 'uuid': [uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None, uuid.UUID('11111111-1111-1111-1111-111111111111').bytes],
'binary': [b'\01', None, b'\22'],
'fixed': [
uuid.UUID('00000000-0000-0000-0000-000000000000').bytes,
None,
uuid.UUID('11111111-1111-1111-1111-111111111111').bytes,
],
}


@pytest.fixture(scope="session")
def pa_schema() -> "pa.Schema":
import pyarrow as pa

return pa.schema([
("bool", pa.bool_()),
("string", pa.string()),
("string_long", pa.string()),
("int", pa.int32()),
("long", pa.int64()),
("float", pa.float32()),
("double", pa.float64()),
("timestamp", pa.timestamp(unit="us")),
("timestamptz", pa.timestamp(unit="us", tz="UTC")),
("date", pa.date32()),
# Not supported by Spark
# ("time", pa.time64("us")),
# Not natively supported by Arrow
# ("uuid", pa.fixed(16)),
("binary", pa.large_binary()),
("fixed", pa.binary(16)),
])


@pytest.fixture(scope="session")
def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table":
"""Pyarrow table with all kinds of columns."""
import pyarrow as pa

return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
Loading