Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'main' into manifest_compaction
# Conflicts:
#	pyiceberg/table/__init__.py
#	tests/integration/test_writes/test_writes.py
  • Loading branch information
HonahX committed Jul 10, 2024
commit 914d6ef59e573fdb43bbd0c975958436d7aae520
158 changes: 151 additions & 7 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
manifest_evaluator,
)
from pyiceberg.io import FileIO, OutputFile, load_file_io
from pyiceberg.io.pyarrow import _dataframe_to_data_files, expression_to_pyarrow, project_table
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
DataFile,
Expand Down Expand Up @@ -3084,14 +3085,16 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))


class _SnapshotProducer(UpdateTableMetadata["_SnapshotProducer"]):
class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
commit_uuid: uuid.UUID
_io: FileIO
_operation: Operation
_snapshot_id: int
_parent_snapshot_id: Optional[int]
_added_data_files: List[DataFile]
_manifest_num_counter: itertools.count[int]
_deleted_data_files: Set[DataFile]
_manifest_counter: itertools.count[int]

def __init__(
self,
Expand All @@ -3114,12 +3117,13 @@ def __init__(
self._deleted_data_files = set()
self.snapshot_properties = snapshot_properties
self._manifest_num_counter = itertools.count(0)
self._manifest_counter = itertools.count(0)

def append_data_file(self, data_file: DataFile) -> _SnapshotProducer:
def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._added_data_files.append(data_file)
return self

def delete_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer[U]:
def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._deleted_data_files.add(data_file)
return self

Expand Down Expand Up @@ -3288,6 +3292,147 @@ def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = T
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)


class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
"""Will delete manifest entries from the current snapshot based on the predicate.

This will produce a DELETE snapshot:
Data files were removed and their contents logically deleted and/or delete
files were added to delete rows.

From the specification
"""

_predicate: BooleanExpression

def __init__(
self,
operation: Operation,
transaction: Transaction,
io: FileIO,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
):
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
self._predicate = AlwaysFalse()

def _commit(self) -> UpdatesAndRequirements:
# Only produce a commit when there is something to delete
if self.files_affected:
return super()._commit()
else:
return (), ()

def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
schema = self._transaction.table_metadata.schema()
spec = self._transaction.table_metadata.specs()[spec_id]
project = inclusive_projection(schema, spec)
return project(self._predicate)

@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
schema = self._transaction.table_metadata.schema()
spec = self._transaction.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True)

def delete_by_predicate(self, predicate: BooleanExpression) -> None:
self._predicate = Or(self._predicate, predicate)

@cached_property
def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]:
"""Computes all the delete operation and cache it when nothing changes.

Returns:
- List of existing manifests that are not affected by the delete operation.
- The manifest-entries that are deleted based on the metadata.
- Flag indicating that rewrites of data-files are needed.
"""
schema = self._transaction.table_metadata.schema()

def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry:
return ManifestEntry(
status=status,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval

existing_manifests = []
total_deleted_entries = []
partial_rewrites_needed = False
self._deleted_data_files = set()
if snapshot := self._transaction.table_metadata.current_snapshot():
for manifest_file in snapshot.manifests(io=self._io):
if manifest_file.content == ManifestContent.DATA:
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
# If the manifest isn't relevant, we can just keep it in the manifest-list
existing_manifests.append(manifest_file)
else:
# It is relevant, let's check out the content
deleted_entries = []
existing_entries = []
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH:
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
self._deleted_data_files.add(entry.data_file)
elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH:
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
else:
# Based on the metadata, it is unsure to say if the file can be deleted
partial_rewrites_needed = True

if len(deleted_entries) > 0:
total_deleted_entries += deleted_entries

# Rewrite the manifest
if len(existing_entries) > 0:
output_file_location = _new_manifest_path(
location=self._transaction.table_metadata.location,
num=next(self._manifest_counter),
commit_uuid=self.commit_uuid,
)
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self._io.new_output(output_file_location),
snapshot_id=self._snapshot_id,
) as writer:
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
existing_manifests.append(writer.to_manifest_file())
# else:
# deleted_manifests.append()
else:
existing_manifests.append(manifest_file)
else:
existing_manifests.append(manifest_file)

return existing_manifests, total_deleted_entries, partial_rewrites_needed

def _existing_manifests(self) -> List[ManifestFile]:
return self._compute_deletes[0]

def _deleted_entries(self) -> List[ManifestEntry]:
return self._compute_deletes[1]

@property
def rewrites_needed(self) -> bool:
"""Indicate if data files need to be rewritten."""
return self._compute_deletes[2]

@property
def files_affected(self) -> bool:
"""Indicate if any manifest-entries can be dropped."""
return len(self._deleted_entries()) > 0

class FastAppendFiles(_SnapshotProducer):
def _existing_manifests(self) -> List[ManifestFile]:
"""To determine if there are any existing manifest files.
Expand Down Expand Up @@ -3366,9 +3511,8 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile
return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests


class OverwriteFiles(_SnapshotProducer):
def _existing_manifests(self) -> List[ManifestFile]:
"""To determine if there are any existing manifest files.
class OverwriteFiles(_SnapshotProducer["OverwriteFiles"]):
"""Overwrites data from the table. This will produce an OVERWRITE snapshot.

Data and delete files were added and removed in a logical overwrite operation.
"""
Expand Down Expand Up @@ -3469,7 +3613,7 @@ def merge_append(self) -> MergeAppendFiles:
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
)

def overwrite(self) -> OverwriteFiles:
def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
return OverwriteFiles(
commit_uuid=commit_uuid,
operation=Operation.OVERWRITE
Expand Down
67 changes: 67 additions & 0 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,73 @@ def table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null
assert len(tbl.scan().to_arrow()) == len(arrow_table_without_some_columns) * 2


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_write_all_timestamp_precision(mocker: MockerFixture, session_catalog: Catalog, format_version: int) -> None:
identifier = "default.table_all_timestamp_precision"
arrow_table_schema_with_all_timestamp_precisions = pa.schema([
("timestamp_s", pa.timestamp(unit="s")),
("timestamptz_s", pa.timestamp(unit="s", tz="UTC")),
("timestamp_ms", pa.timestamp(unit="ms")),
("timestamptz_ms", pa.timestamp(unit="ms", tz="UTC")),
("timestamp_us", pa.timestamp(unit="us")),
("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ns", pa.timestamp(unit="ns")),
("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
])
TEST_DATA_WITH_NULL = {
"timestamp_s": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz_s": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
"timestamp_ms": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz_ms": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
"timestamp_us": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz_us": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
"timestamp_ns": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz_ns": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
}
input_arrow_table = pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=arrow_table_schema_with_all_timestamp_precisions)
mocker.patch.dict(os.environ, values={"PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE": "True"})

tbl = _create_table(
session_catalog,
identifier,
{"format-version": format_version},
data=[input_arrow_table],
schema=arrow_table_schema_with_all_timestamp_precisions,
)
tbl.overwrite(input_arrow_table)
written_arrow_table = tbl.scan().to_arrow()

expected_schema_in_all_us = pa.schema([
("timestamp_s", pa.timestamp(unit="us")),
("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ms", pa.timestamp(unit="us")),
("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
("timestamp_us", pa.timestamp(unit="us")),
("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ns", pa.timestamp(unit="us")),
("timestamptz_ns", pa.timestamp(unit="us", tz="UTC")),
])
assert written_arrow_table.schema == expected_schema_in_all_us
assert written_arrow_table == input_arrow_table.cast(expected_schema_in_all_us)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None:
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.