Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d4ca653
s/"main"/MAIN_BRANCH
kevinjqliu Jan 27, 2024
0b7aaaf
replace string literals
kevinjqliu Jan 27, 2024
23f04ec
default writes to main branch
kevinjqliu Jan 27, 2024
af6ff9a
Added some more methods for branches
vinjai Jul 18, 2024
6fbf3f1
s/"main"/MAIN_BRANCH
kevinjqliu Jan 27, 2024
8ce1509
replace string literals
kevinjqliu Jan 27, 2024
6daf29e
default writes to main branch
kevinjqliu Jan 27, 2024
09321cd
Added some more methods for branches
vinjai Jul 18, 2024
60fef31
Merged with master
vinjai Oct 12, 2024
45b01a6
Updated antries for branches
vinjai Oct 12, 2024
917108b
Resolved Merge Conflict
vinjai Oct 12, 2024
917b044
Fixed some bugs
vinjai Oct 14, 2024
398f6c0
Fixed bugs in delete and overwrite
vinjai Oct 15, 2024
b7b8ba0
Added tests and some refactoring
vinjai Oct 16, 2024
ee591b4
Added another integration test
vinjai Oct 16, 2024
e81907d
Fixed bug: concurrent same name branch and tag writes
vinjai Oct 16, 2024
4cf9198
Merge with main branch
vinjai Nov 13, 2024
bc6fb68
Added integration tests with spark
vinjai Nov 14, 2024
82e65e1
Fixed comments for AssertSnapshotRef
vinjai Feb 23, 2025
82e5b90
Fixed comments and linter issues
vinjai Feb 23, 2025
84d0971
Fixed comments
vinjai Feb 23, 2025
3efe53c
Fixed comments
vinjai Feb 23, 2025
dfedc63
Fixed a bug in tests
vinjai Feb 24, 2025
076a6d5
Fixed some more tests
vinjai Feb 24, 2025
53a7f84
Merge branch 'main' into feature/write-to-branch
vinjai May 25, 2025
e4463df
Fixed linter and code errors
vinjai May 25, 2025
49f75b4
Fixed bug for empty tables
vinjai May 26, 2025
4ed0607
Fixed bugs and added more tests
vinjai May 27, 2025
958aac4
changed design context for branch writes
vinjai May 27, 2025
a0aae4d
Merge branch 'main' into feature/write-to-branch
vinjai Jun 3, 2025
76249e9
Merge branch 'main' into feature/write-to-branch
vinjai Jun 23, 2025
079802a
Fixed linter, comments and other bugs
vinjai Jun 24, 2025
f45df8b
Usage of builder pattern
vinjai Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updated antries for branches
  • Loading branch information
vinjai committed Oct 12, 2024
commit 45b01a68e1d4572494082a09b49883ecdf00b329
32 changes: 20 additions & 12 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
from pyiceberg.table.name_mapping import (
NameMapping,
)
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.snapshots import (
Snapshot,
SnapshotLogEntry,
Expand Down Expand Up @@ -402,21 +402,22 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
name_mapping=self.table_metadata.name_mapping(),
)

def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> UpdateSnapshot:
"""Create a new UpdateSnapshot to produce a new snapshot for the table.

Returns:
A new UpdateSnapshot
"""
return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)
return UpdateSnapshot(self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties)

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None:
"""
Shorthand API for appending a PyArrow table to a table transaction.

Args:
df: The Arrow dataframe that will be appended to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch Reference to run the overwrite operation
"""
try:
import pyarrow as pa
Expand Down Expand Up @@ -444,7 +445,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)
update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
update_snapshot = self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties)
append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append

with append_method() as append_files:
Expand All @@ -461,6 +462,7 @@ def overwrite(
df: pa.Table,
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
branch: str = MAIN_BRANCH,
) -> None:
"""
Shorthand for adding a table overwrite with a PyArrow table to the transaction.
Expand All @@ -476,6 +478,7 @@ def overwrite(
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch Reference to run the overwrite operation
"""
try:
import pyarrow as pa
Expand All @@ -500,7 +503,7 @@ def overwrite(

self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)

with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
Expand All @@ -509,7 +512,12 @@ def overwrite(
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
def delete(
self,
delete_filter: Union[str, BooleanExpression],
snapshot_properties: Dict[str, str] = EMPTY_DICT,
branch: str = MAIN_BRANCH,
) -> None:
"""
Shorthand for deleting record from a table.

Expand Down Expand Up @@ -537,7 +545,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
if isinstance(delete_filter, str):
delete_filter = _parse_row_filter(delete_filter)

with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).delete() as delete_snapshot:
delete_snapshot.delete_by_predicate(delete_filter)

# Check if there are any files that require an actual rewrite of a data file
Expand Down Expand Up @@ -585,7 +593,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
))

if len(replaced_files) > 0:
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).overwrite(
commit_uuid=commit_uuid
) as overwrite_snapshot:
for original_data_file, replaced_data_files in replaced_files:
Expand Down Expand Up @@ -1003,7 +1011,7 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,branch: str = MAIN_BRANCH) -> None:
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None:
"""
Shorthand API for appending a PyArrow table to the table.

Expand All @@ -1012,7 +1020,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
snapshot_properties: Custom properties to be added to the snapshot summary
"""
with self.transaction() as tx:
tx.append(df=df, snapshot_properties=snapshot_properties,branch=branch)
tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)

def overwrite(
self,
Expand All @@ -1037,7 +1045,7 @@ def overwrite(
snapshot_properties: Custom properties to be added to the snapshot summary
"""
with self.transaction() as tx:
tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch)
tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch)

def delete(
self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
from pyiceberg.table.snapshots import (
MetadataLogEntry,
Snapshot,
Expand Down Expand Up @@ -136,7 +136,7 @@ class AddSnapshotUpdate(IcebergBaseModel):
class SetSnapshotRefUpdate(IcebergBaseModel):
action: Literal["set-snapshot-ref"] = Field(default="set-snapshot-ref")
ref_name: str = Field(alias="ref-name")
type: Literal["tag", "branch"]
type: Literal[SnapshotRefType.TAG, SnapshotRefType.BRANCH]
snapshot_id: int = Field(alias="snapshot-id")
max_ref_age_ms: Annotated[Optional[int], Field(alias="max-ref-age-ms", default=None)]
max_snapshot_age_ms: Annotated[Optional[int], Field(alias="max-snapshot-age-ms", default=None)]
Expand Down
38 changes: 30 additions & 8 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from pyiceberg.partitioning import (
PartitionSpec,
)
from pyiceberg.table.refs import SnapshotRefType
from pyiceberg.table.snapshots import (
Operation,
Snapshot,
Expand Down Expand Up @@ -103,12 +104,14 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
_added_data_files: List[DataFile]
_manifest_num_counter: itertools.count[int]
_deleted_data_files: Set[DataFile]
_branch: str

def __init__(
self,
operation: Operation,
transaction: Transaction,
io: FileIO,
branch: str,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
Expand All @@ -117,7 +120,7 @@ def __init__(
self._io = io
self._operation = operation
self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
# Since we only support the main branch for now
self._branch = branch
self._parent_snapshot_id = (
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
)
Expand Down Expand Up @@ -272,10 +275,13 @@ def _commit(self) -> UpdatesAndRequirements:
(
AddSnapshotUpdate(snapshot=snapshot),
SetSnapshotRefUpdate(
snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch"
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
ref_name=self._branch,
type=SnapshotRefType.BRANCH,
),
),
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),),
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref=self._branch),),
)

@property
Expand Down Expand Up @@ -324,10 +330,11 @@ def __init__(
operation: Operation,
transaction: Transaction,
io: FileIO,
branch: str,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
):
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties)
self._predicate = AlwaysFalse()

def _commit(self) -> UpdatesAndRequirements:
Expand Down Expand Up @@ -482,12 +489,13 @@ def __init__(
operation: Operation,
transaction: Transaction,
io: FileIO,
branch: str,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
from pyiceberg.table import TableProperties

super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties)
self._target_size_bytes = property_as_int(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
Expand Down Expand Up @@ -603,21 +611,33 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
class UpdateSnapshot:
_transaction: Transaction
_io: FileIO
_branch: str
_snapshot_properties: Dict[str, str]

def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
def __init__(
self, transaction: Transaction, io: FileIO, branch: str, snapshot_properties: Dict[str, str] = EMPTY_DICT
) -> None:
self._transaction = transaction
self._io = io
self._snapshot_properties = snapshot_properties
self._branch = branch

def fast_append(self) -> _FastAppendFiles:
return _FastAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
operation=Operation.APPEND,
transaction=self._transaction,
io=self._io,
branch=self._branch,
snapshot_properties=self._snapshot_properties,
)

def merge_append(self) -> _MergeAppendFiles:
return _MergeAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
operation=Operation.APPEND,
transaction=self._transaction,
io=self._io,
branch=self._branch,
snapshot_properties=self._snapshot_properties,
)

def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
Expand All @@ -628,6 +648,7 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
else Operation.APPEND,
transaction=self._transaction,
io=self._io,
branch=self._branch,
snapshot_properties=self._snapshot_properties,
)

Expand All @@ -636,6 +657,7 @@ def delete(self) -> _DeleteFiles:
operation=Operation.DELETE,
transaction=self._transaction,
io=self._io,
branch=self._branch,
snapshot_properties=self._snapshot_properties,
)

Expand Down