-
Notifications
You must be signed in to change notification settings - Fork 407
Write Deletion Vectors #2822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Write Deletion Vectors #2822
Changes from 3 commits
9a4b91d
4db1734
71dd925
3efd28e
228263c
36bb37f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,10 @@ | |
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| import io | ||
| import math | ||
| import zlib | ||
| from collections.abc import Iterable | ||
| from typing import TYPE_CHECKING, Literal | ||
|
|
||
| from pydantic import Field | ||
|
|
@@ -27,6 +30,7 @@ | |
|
|
||
| # Short for: Puffin Fratercula arctica, version 1 | ||
| MAGIC_BYTES = b"PFA1" | ||
| DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64" | ||
| EMPTY_BITMAP = FrozenBitMap() | ||
| MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 | ||
| PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" | ||
|
|
@@ -62,6 +66,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: | |
| return bitmaps | ||
|
|
||
|
|
||
| def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes: | ||
| """ | ||
| Serialize a dictionary of bitmaps into a byte array. | ||
|
|
||
| The format is: | ||
| - 8 bytes: number of bitmaps (little-endian) | ||
| - For each bitmap: | ||
| - 4 bytes: key (little-endian) | ||
| - n bytes: serialized bitmap | ||
| """ | ||
| with io.BytesIO() as out: | ||
| sorted_keys = sorted(bitmaps.keys()) | ||
|
|
||
| # number of bitmaps | ||
| out.write(len(sorted_keys).to_bytes(8, "little")) | ||
|
|
||
| for key in sorted_keys: | ||
| if key < 0: | ||
| raise ValueError(f"Invalid unsigned key: {key}") | ||
| if key > MAX_JAVA_SIGNED: | ||
| raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl") | ||
|
|
||
| # key | ||
| out.write(key.to_bytes(4, "little")) | ||
| # bitmap | ||
| out.write(bitmaps[key].serialize()) | ||
| return out.getvalue() | ||
|
|
||
|
|
||
| class PuffinBlobMetadata(IcebergBaseModel): | ||
| type: Literal["deletion-vector-v1"] = Field() | ||
| fields: list[int] = Field() | ||
|
|
@@ -114,3 +147,97 @@ def __init__(self, puffin: bytes) -> None: | |
|
|
||
| def to_vector(self) -> dict[str, "pa.ChunkedArray"]: | ||
| return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()} | ||
|
|
||
|
|
||
| class PuffinWriter: | ||
| _blobs: list[PuffinBlobMetadata] | ||
| _blob_payloads: list[bytes] | ||
|
|
||
| def __init__(self) -> None: | ||
| self._blobs = [] | ||
| self._blob_payloads = [] | ||
|
|
||
| def add( | ||
| self, | ||
| positions: Iterable[int], | ||
| referenced_data_file: str, | ||
| ) -> None: | ||
| # 1. Create bitmaps from positions | ||
| bitmaps: dict[int, BitMap] = {} | ||
| cardinality = 0 | ||
| for pos in positions: | ||
| cardinality += 1 | ||
|
||
| key = pos >> 32 | ||
| low_bits = pos & 0xFFFFFFFF | ||
| if key not in bitmaps: | ||
| bitmaps[key] = BitMap() | ||
| bitmaps[key].add(low_bits) | ||
|
|
||
| # 2. Serialize bitmaps for the vector payload | ||
| vector_payload = _serialize_bitmaps(bitmaps) | ||
|
|
||
| # 3. Construct the full blob payload for deletion-vector-v1 | ||
| with io.BytesIO() as blob_payload_buffer: | ||
| # Magic bytes for DV | ||
| blob_payload_buffer.write(DELETION_VECTOR_MAGIC) | ||
| # The vector itself | ||
| blob_payload_buffer.write(vector_payload) | ||
|
|
||
| # The content for CRC calculation | ||
| crc_content = blob_payload_buffer.getvalue() | ||
| crc32 = zlib.crc32(crc_content) | ||
|
|
||
| # The full blob to be stored in the Puffin file | ||
| with io.BytesIO() as full_blob_buffer: | ||
| # Combined length of the vector and magic bytes stored as 4 bytes, big-endian | ||
| full_blob_buffer.write(len(crc_content).to_bytes(4, "big")) | ||
| # The content (magic + vector) | ||
| full_blob_buffer.write(crc_content) | ||
| # A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian | ||
| full_blob_buffer.write(crc32.to_bytes(4, "big")) | ||
|
|
||
| self._blob_payloads.append(full_blob_buffer.getvalue()) | ||
|
|
||
| # 4. Create blob metadata | ||
| properties = {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)} | ||
|
|
||
| self._blobs.append( | ||
| PuffinBlobMetadata( | ||
| type="deletion-vector-v1", | ||
| fields=[], | ||
rambleraptor marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| snapshot_id=-1, | ||
| sequence_number=-1, | ||
| offset=0, # Will be set later | ||
| length=0, # Will be set later | ||
| properties=properties, | ||
| compression_codec=None, # Explicitly None | ||
| ) | ||
| ) | ||
|
|
||
| def finish(self) -> bytes: | ||
| with io.BytesIO() as out: | ||
| payload_buffer = io.BytesIO() | ||
| for blob_payload in self._blob_payloads: | ||
| payload_buffer.write(blob_payload) | ||
|
|
||
| updated_blobs_metadata: list[PuffinBlobMetadata] = [] | ||
| current_offset = 4 # Start after file magic (4 bytes) | ||
| for i, blob_payload in enumerate(self._blob_payloads): | ||
| original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True) | ||
| original_metadata_dict["offset"] = current_offset | ||
| original_metadata_dict["length"] = len(blob_payload) | ||
| updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict)) | ||
| current_offset += len(blob_payload) | ||
|
|
||
| footer = Footer(blobs=updated_blobs_metadata) | ||
| footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") | ||
|
|
||
| # Final assembly | ||
| out.write(MAGIC_BYTES) | ||
| out.write(payload_buffer.getvalue()) | ||
| out.write(footer_payload_bytes) | ||
rambleraptor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| out.write(len(footer_payload_bytes).to_bytes(4, "little")) | ||
| out.write((0).to_bytes(4, "little")) # flags | ||
| out.write(MAGIC_BYTES) | ||
|
|
||
| return out.getvalue() | ||
Uh oh!
There was an error while loading. Please reload this page.