Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
PR comments
  • Loading branch information
rambleraptor committed Dec 12, 2025
commit 228263c37c46a5637ae45f17679859c379bcaf5c
28 changes: 19 additions & 9 deletions pyiceberg/table/puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,27 +152,34 @@ def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
class PuffinWriter:
_blobs: list[PuffinBlobMetadata]
_blob_payloads: list[bytes]
_created_by: str | None

def __init__(self) -> None:
def __init__(self, created_by: str | None = None) -> None:
self._blobs = []
self._blob_payloads = []
self._created_by = created_by

def add(
def set_blob(
self,
positions: Iterable[int],
referenced_data_file: str,
) -> None:
# We only support one blob at the moment
self._blobs = []
self._blob_payloads = []

# 1. Create bitmaps from positions
bitmaps: dict[int, BitMap] = {}
cardinality = 0
for pos in positions:
cardinality += 1
key = pos >> 32
low_bits = pos & 0xFFFFFFFF
if key not in bitmaps:
bitmaps[key] = BitMap()
bitmaps[key].add(low_bits)

# Calculate the cardinality from the bitmaps
cardinality = sum(len(bm) for bm in bitmaps.values())

# 2. Serialize bitmaps for the vector payload
vector_payload = _serialize_bitmaps(bitmaps)

Expand Down Expand Up @@ -204,13 +211,13 @@ def add(
self._blobs.append(
PuffinBlobMetadata(
type="deletion-vector-v1",
fields=[],
fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors
snapshot_id=-1,
sequence_number=-1,
offset=0, # Will be set later
length=0, # Will be set later
offset=0, # TODO: Use DeleteFileIndex data
length=0, # TODO: Use DeleteFileIndex data
properties=properties,
compression_codec=None, # Explicitly None
compression_codec=None,
)
)

Expand All @@ -229,12 +236,15 @@ def finish(self) -> bytes:
updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict))
current_offset += len(blob_payload)

footer = Footer(blobs=updated_blobs_metadata)
footer = Footer(
blobs=updated_blobs_metadata, properties={"created-by": self._created_by} if self._created_by else {}
)
footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8")

# Final assembly
out.write(MAGIC_BYTES)
out.write(payload_buffer.getvalue())
out.write(MAGIC_BYTES)
out.write(footer_payload_bytes)
out.write(len(footer_payload_bytes).to_bytes(4, "little"))
out.write((0).to_bytes(4, "little")) # flags
Expand Down
62 changes: 25 additions & 37 deletions tests/table/test_puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,68 +74,55 @@ def test_map_high_vals() -> None:


def test_puffin_round_trip() -> None:
# Define some deletion positions for multiple files
deletions1 = [10, 20, 30]
deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position
# Define some deletion positions for a file
deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate

file1_path = "path/to/data1.parquet"
file2_path = "path/to/data2.parquet"
file_path = "path/to/data.parquet"

# Write the Puffin file
writer = PuffinWriter()
writer.add(positions=deletions1, referenced_data_file=file1_path)
writer.add(positions=deletions2, referenced_data_file=file2_path)
writer = PuffinWriter(created_by="my-test-app")
writer.set_blob(positions=deletions, referenced_data_file=file_path)
puffin_bytes = writer.finish()

# Read the Puffin file back
reader = PuffinFile(puffin_bytes)

# Assert footer metadata
assert len(reader.footer.blobs) == 2

blob1_meta = reader.footer.blobs[0]
assert blob1_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file1_path
assert blob1_meta.properties["cardinality"] == str(len(deletions1))
assert reader.footer.properties["created-by"] == "my-test-app"
assert len(reader.footer.blobs) == 1

blob2_meta = reader.footer.blobs[1]
assert blob2_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file2_path
assert blob2_meta.properties["cardinality"] == str(len(deletions2))
blob_meta = reader.footer.blobs[0]
assert blob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file_path
assert blob_meta.properties["cardinality"] == str(len(set(deletions)))

# Assert the content of deletion vectors
read_vectors = reader.to_vector()

assert file1_path in read_vectors
assert file2_path in read_vectors

assert read_vectors[file1_path].to_pylist() == sorted(deletions1)
assert read_vectors[file2_path].to_pylist() == sorted(deletions2)
assert file_path in read_vectors
assert read_vectors[file_path].to_pylist() == sorted(list(set(deletions)))


def test_write_and_read_puffin_file() -> None:
writer = PuffinWriter()
writer.add(positions=[1, 2, 3], referenced_data_file="file1.parquet")
writer.add(positions=[4, 5, 6], referenced_data_file="file2.parquet")
writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet")
writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet")
puffin_bytes = writer.finish()

reader = PuffinFile(puffin_bytes)

assert len(reader.footer.blobs) == 2
blob1 = reader.footer.blobs[0]
blob2 = reader.footer.blobs[1]

assert blob1.properties["referenced-data-file"] == "file1.parquet"
assert blob1.properties["cardinality"] == "3"
assert blob1.type == "deletion-vector-v1"
assert blob1.snapshot_id == -1
assert blob1.sequence_number == -1
assert blob1.compression_codec is None
assert len(reader.footer.blobs) == 1
blob = reader.footer.blobs[0]

assert blob2.properties["referenced-data-file"] == "file2.parquet"
assert blob2.properties["cardinality"] == "3"
assert blob.properties["referenced-data-file"] == "file2.parquet"
assert blob.properties["cardinality"] == "3"
assert blob.type == "deletion-vector-v1"
assert blob.snapshot_id == -1
assert blob.sequence_number == -1
assert blob.compression_codec is None

vectors = reader.to_vector()
assert len(vectors) == 2
assert vectors["file1.parquet"].to_pylist() == [1, 2, 3]
assert len(vectors) == 1
assert "file1.parquet" not in vectors
assert vectors["file2.parquet"].to_pylist() == [4, 5, 6]


Expand All @@ -146,3 +133,4 @@ def test_puffin_file_with_no_blobs() -> None:
reader = PuffinFile(puffin_bytes)
assert len(reader.footer.blobs) == 0
assert len(reader.to_vector()) == 0
assert "created-by" not in reader.footer.properties