Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address review comments
  • Loading branch information
HonahX committed Jun 10, 2024
commit a7da318b4d1def07bd63f7768be58cfcb31cfa69
32 changes: 12 additions & 20 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3922,29 +3922,24 @@ def __init__(
self._merge_enabled = merge_enabled
self._snapshot_producer = snapshot_producer

def _group_by_spec(
self, first_manifest: ManifestFile, remaining_manifests: List[ManifestFile]
) -> Dict[int, List[ManifestFile]]:
def _group_by_spec(self, manifests: List[ManifestFile]) -> Dict[int, List[ManifestFile]]:
groups = defaultdict(list)
groups[first_manifest.partition_spec_id].append(first_manifest)
for manifest in remaining_manifests:
for manifest in manifests:
groups[manifest.partition_spec_id].append(manifest)
return groups

def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile:
with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer:
for manifest in manifest_bin:
for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False):
if entry.status == ManifestEntryStatus.DELETED:
# suppress deletes from previous snapshots. only files deleted by this snapshot
# should be added to the new manifest
if entry.snapshot_id == self._snapshot_producer.snapshot_id:
writer.delete(entry)
if entry.status == ManifestEntryStatus.DELETED and entry.snapshot_id == self._snapshot_producer.snapshot_id:
# only files deleted by this snapshot should be added to the new manifest
writer.delete(entry)
elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id:
# adds from this snapshot are still adds, otherwise they should be existing
# added entries from this snapshot are still added, otherwise they should be existing
writer.add(entry)
else:
# add all files from the old manifest as existing files
elif entry.status != ManifestEntryStatus.DELETED:
# add all non-deleted files from the old manifest as existing files
writer.existing(entry)

return writer.to_manifest_file()
Expand All @@ -3958,11 +3953,9 @@ def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]:
if len(manifest_bin) == 1:
output_manifests.append(manifest_bin[0])
elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge:
# if the bin has the first manifest (the new data files or an appended manifest file)
# then only merge it
# if the number of manifests is above the minimum count. this is applied only to bins
# with an in-memory
# manifest so that large manifests don't prevent merging older groups.
# if the bin has the first manifest (the new data files or an appended manifest file) then only
# merge it if the number of manifests is above the minimum count. this is applied only to bins
# with an in-memory manifest so that large manifests don't prevent merging older groups.
output_manifests.extend(manifest_bin)
else:
output_manifests.append(self._create_manifest(spec_id, manifest_bin))
Expand All @@ -3987,8 +3980,7 @@ def merge_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
return manifests

first_manifest = manifests[0]
remaining_manifests = manifests[1:]
groups = self._group_by_spec(first_manifest, remaining_manifests)
groups = self._group_by_spec(manifests)

merged_manifests = []
for spec_id in reversed(groups.keys()):
Expand Down