Skip to content
Prev Previous commit
Next Next commit
fix: formatting
  • Loading branch information
felixscherz committed Aug 5, 2024
commit 7d369ee5439ea0ffbc47de982d284de226f76964
156 changes: 51 additions & 105 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,22 @@
from copy import copy
from enum import Enum
from types import TracebackType
from typing import Any, Generator
from typing import Callable
from typing import Dict
from typing import Iterator
from typing import List
from typing import Literal
from typing import Optional
from typing import Type
from typing import (
Any,
Dict,
Iterator,
List,
Literal,
Optional,
Type,
)

from pydantic_core import to_json

from pyiceberg.avro.file import AvroFile, AvroOutputFile
from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ValidationError
from pyiceberg.io import FileIO
from pyiceberg.io import InputFile
from pyiceberg.io import OutputFile
from pyiceberg.io import FileIO, InputFile, OutputFile
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import Record, TableVersion
Expand All @@ -53,7 +52,6 @@
StringType,
StructType,
)
from pyiceberg.typedef import EMPTY_DICT

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
Expand Down Expand Up @@ -103,9 +101,7 @@ def __repr__(self) -> str:

DATA_FILE_TYPE: Dict[int, StructType] = {
1: StructType(
NestedField(
field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"
),
NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(
field_id=101,
name="file_format",
Expand All @@ -120,15 +116,9 @@ def __repr__(self) -> str:
required=True,
doc="Partition data tuple, schema based on the partition spec",
),
NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
NestedField(
field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"
),
NestedField(
field_id=104,
name="file_size_in_bytes",
field_type=LongType(),
required=True,
doc="Total file size in bytes",
field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"
),
NestedField(
field_id=105,
Expand Down Expand Up @@ -181,11 +171,7 @@ def __repr__(self) -> str:
doc="Map of column id to upper bound",
),
NestedField(
field_id=131,
name="key_metadata",
field_type=BinaryType(),
required=False,
doc="Encryption key metadata blob",
field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob"
),
NestedField(
field_id=132,
Expand All @@ -205,9 +191,7 @@ def __repr__(self) -> str:
doc="File format name: avro, orc, or parquet",
initial_default=DataFileContent.DATA,
),
NestedField(
field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"
),
NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(
field_id=101,
name="file_format",
Expand All @@ -222,15 +206,9 @@ def __repr__(self) -> str:
required=True,
doc="Partition data tuple, schema based on the partition spec",
),
NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
NestedField(
field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"
),
NestedField(
field_id=104,
name="file_size_in_bytes",
field_type=LongType(),
required=True,
doc="Total file size in bytes",
field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"
),
NestedField(
field_id=108,
Expand Down Expand Up @@ -275,11 +253,7 @@ def __repr__(self) -> str:
doc="Map of column id to upper bound",
),
NestedField(
field_id=131,
name="key_metadata",
field_type=BinaryType(),
required=False,
doc="Encryption key metadata blob",
field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob"
),
NestedField(
field_id=132,
Expand Down Expand Up @@ -307,34 +281,28 @@ def __repr__(self) -> str:


def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType:
data_file_partition_type = StructType(
*[
NestedField(
field_id=field.field_id,
name=field.name,
field_type=field.field_type,
required=field.required,
)
for field in partition_type.fields
]
)
data_file_partition_type = StructType(*[
NestedField(
field_id=field.field_id,
name=field.name,
field_type=field.field_type,
required=field.required,
)
for field in partition_type.fields
])

return StructType(
*[
(
NestedField(
field_id=102,
name="partition",
field_type=data_file_partition_type,
required=True,
doc="Partition data tuple, schema based on the partition spec",
)
if field.field_id == 102
else field
)
for field in DATA_FILE_TYPE[format_version].fields
]
)
return StructType(*[
NestedField(
field_id=102,
name="partition",
field_type=data_file_partition_type,
required=True,
doc="Partition data tuple, schema based on the partition spec",
)
if field.field_id == 102
else field
for field in DATA_FILE_TYPE[format_version].fields
])


class DataFile(Record):
Expand Down Expand Up @@ -415,18 +383,14 @@ def __eq__(self, other: Any) -> bool:
),
}

MANIFEST_ENTRY_SCHEMAS_STRUCT = {
format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()
}
MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()}


def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema:
return Schema(
*[
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
]
)
return Schema(*[
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
])


class ManifestEntry(Record):
Expand Down Expand Up @@ -534,9 +498,7 @@ def update(self, value: Any) -> None:
self._min = min(self._min, value)


def construct_partition_summaries(
spec: PartitionSpec, schema: Schema, partitions: List[Record]
) -> List[PartitionFieldSummary]:
def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]:
types = [field.field_type for field in spec.partition_type(schema).fields]
field_stats = [PartitionFieldStats(field_type) for field_type in types]
for partition_keys in partitions:
Expand All @@ -560,9 +522,7 @@ def construct_partition_summaries(
NestedField(512, "added_rows_count", LongType(), required=False),
NestedField(513, "existing_rows_count", LongType(), required=False),
NestedField(514, "deleted_rows_count", LongType(), required=False),
NestedField(
507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False
),
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
NestedField(519, "key_metadata", BinaryType(), required=False),
),
2: Schema(
Expand All @@ -579,16 +539,12 @@ def construct_partition_summaries(
NestedField(512, "added_rows_count", LongType(), required=True),
NestedField(513, "existing_rows_count", LongType(), required=True),
NestedField(514, "deleted_rows_count", LongType(), required=True),
NestedField(
507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False
),
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
NestedField(519, "key_metadata", BinaryType(), required=False),
),
}

MANIFEST_LIST_FILE_STRUCTS = {
format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()
}
MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()}


POSITIONAL_DELETE_SCHEMA = Schema(
Expand Down Expand Up @@ -712,9 +668,7 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani

# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
if entry.file_sequence_number is None and (
manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED
):
if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
# Only available in V2, always 0 in V1
entry.file_sequence_number = manifest.sequence_number

Expand Down Expand Up @@ -1069,11 +1023,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
super().__init__(
format_version=1,
output_file=output_file,
meta={
"snapshot-id": str(snapshot_id),
"parent-snapshot-id": str(parent_snapshot_id),
"format-version": "1",
},
meta={"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"},
)

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
Expand All @@ -1086,9 +1036,7 @@ class ManifestListWriterV2(ManifestListWriter):
_commit_snapshot_id: int
_sequence_number: int

def __init__(
self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int
):
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
super().__init__(
format_version=2,
output_file=output_file,
Expand Down Expand Up @@ -1140,5 +1088,3 @@ def write_manifest_list(
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number)
else:
raise ValueError(f"Cannot write manifest list for table version: {format_version}")