Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d4ca653
s/"main"/MAIN_BRANCH
kevinjqliu Jan 27, 2024
0b7aaaf
replace string literals
kevinjqliu Jan 27, 2024
23f04ec
default writes to main branch
kevinjqliu Jan 27, 2024
af6ff9a
Added some more methods for branches
vinjai Jul 18, 2024
6fbf3f1
s/"main"/MAIN_BRANCH
kevinjqliu Jan 27, 2024
8ce1509
replace string literals
kevinjqliu Jan 27, 2024
6daf29e
default writes to main branch
kevinjqliu Jan 27, 2024
09321cd
Added some more methods for branches
vinjai Jul 18, 2024
60fef31
Merged with master
vinjai Oct 12, 2024
45b01a6
Updated antries for branches
vinjai Oct 12, 2024
917108b
Resolved Merge Conflict
vinjai Oct 12, 2024
917b044
Fixed some bugs
vinjai Oct 14, 2024
398f6c0
Fixed bugs in delete and overwrite
vinjai Oct 15, 2024
b7b8ba0
Added tests and some refactoring
vinjai Oct 16, 2024
ee591b4
Added another integration test
vinjai Oct 16, 2024
e81907d
Fixed bug: concurrent same name branch and tag writes
vinjai Oct 16, 2024
4cf9198
Merge with main branch
vinjai Nov 13, 2024
bc6fb68
Added integration tests with spark
vinjai Nov 14, 2024
82e65e1
Fixed comments for AssertSnapshotRef
vinjai Feb 23, 2025
82e5b90
Fixed comments and linter issues
vinjai Feb 23, 2025
84d0971
Fixed comments
vinjai Feb 23, 2025
3efe53c
Fixed comments
vinjai Feb 23, 2025
dfedc63
Fixed a bug in tests
vinjai Feb 24, 2025
076a6d5
Fixed some more tests
vinjai Feb 24, 2025
53a7f84
Merge branch 'main' into feature/write-to-branch
vinjai May 25, 2025
e4463df
Fixed linter and code errors
vinjai May 25, 2025
49f75b4
Fixed bug for empty tables
vinjai May 26, 2025
4ed0607
Fixed bugs and added more tests
vinjai May 27, 2025
958aac4
changed design context for branch writes
vinjai May 27, 2025
a0aae4d
Merge branch 'main' into feature/write-to-branch
vinjai Jun 3, 2025
76249e9
Merge branch 'main' into feature/write-to-branch
vinjai Jun 23, 2025
079802a
Fixed linter, comments and other bugs
vinjai Jun 24, 2025
f45df8b
Usage of builder pattern
vinjai Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed bug for empty tables
  • Loading branch information
vinjai committed May 26, 2025
commit 49f75b4ec021e5e2cdcb058568c509a6089cd524
60 changes: 40 additions & 20 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
from pyiceberg.table.name_mapping import (
NameMapping,
)
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.refs import SnapshotRef
from pyiceberg.table.snapshots import (
Snapshot,
SnapshotLogEntry,
Expand Down Expand Up @@ -398,7 +398,7 @@ def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanE
expr = Or(expr, match_partition_expression)
return expr

def _append_snapshot_producer(self, snapshot_properties: Dict[str, str], branch: str) -> _FastAppendFiles:
def _append_snapshot_producer(self, snapshot_properties: Dict[str, str], branch: Optional[str]) -> _FastAppendFiles:
"""Determine the append type based on table properties.

Args:
Expand Down Expand Up @@ -431,7 +431,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
name_mapping=self.table_metadata.name_mapping(),
)

def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> UpdateSnapshot:
def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None) -> UpdateSnapshot:
"""Create a new UpdateSnapshot to produce a new snapshot for the table.

Returns:
Expand All @@ -448,7 +448,7 @@ def update_statistics(self) -> UpdateStatistics:
"""
return UpdateStatistics(transaction=self)

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None:
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None) -> None:
"""
Shorthand API for appending a PyArrow table to a table transaction.

Expand Down Expand Up @@ -490,7 +490,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
append_files.append_data_file(data_file)

def dynamic_partition_overwrite(
self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH
self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None
) -> None:
"""
Shorthand for overwriting existing partitions with a PyArrow table.
Expand Down Expand Up @@ -554,7 +554,7 @@ def overwrite(
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str = MAIN_BRANCH,
branch: Optional[str] = None,
) -> None:
"""
Shorthand for adding a table overwrite with a PyArrow table to the transaction.
Expand Down Expand Up @@ -617,7 +617,7 @@ def delete(
delete_filter: Union[str, BooleanExpression],
snapshot_properties: Dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str = MAIN_BRANCH,
branch: Optional[str] = None,
) -> None:
"""
Shorthand for deleting record from a table.
Expand Down Expand Up @@ -656,7 +656,10 @@ def delete(
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).use_ref(branch).plan_files()
if branch is None:
files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).plan_files()
else:
files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).use_ref(branch).plan_files()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in a subsequent PR we can pass in the ref to the constructor 👍


commit_uuid = uuid.uuid4()
counter = itertools.count(0)
Expand Down Expand Up @@ -717,6 +720,7 @@ def upsert(
when_matched_update_all: bool = True,
when_not_matched_insert_all: bool = True,
case_sensitive: bool = True,
branch: Optional[str] = None,
) -> UpsertResult:
"""Shorthand API for performing an upsert to an iceberg table.

Expand All @@ -727,6 +731,7 @@ def upsert(
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
case_sensitive: Bool indicating if the match should be case-sensitive
branch: Branch Reference to run the upsert operation

To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

Expand Down Expand Up @@ -789,12 +794,24 @@ def upsert(
matched_predicate = upsert_util.create_match_filter(df, join_cols)

# We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes.
matched_iceberg_table = DataScan(
table_metadata=self.table_metadata,
io=self._table.io,
row_filter=matched_predicate,
case_sensitive=case_sensitive,
).to_arrow()
if branch is None:
matched_iceberg_table = DataScan(
table_metadata=self.table_metadata,
io=self._table.io,
row_filter=matched_predicate,
case_sensitive=case_sensitive,
).to_arrow()
else:
matched_iceberg_table = (
DataScan(
table_metadata=self.table_metadata,
io=self._table.io,
row_filter=matched_predicate,
case_sensitive=case_sensitive,
)
.use_ref(branch)
.to_arrow()
)

update_row_cnt = 0
insert_row_cnt = 0
Expand All @@ -811,7 +828,7 @@ def upsert(
# build the match predicate filter
overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)

self.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate)
self.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate, branch=branch)

if when_not_matched_insert_all:
expr_match = upsert_util.create_match_filter(matched_iceberg_table, join_cols)
Expand All @@ -822,7 +839,7 @@ def upsert(
insert_row_cnt = len(rows_to_insert)

if insert_row_cnt > 0:
self.append(rows_to_insert)
self.append(rows_to_insert, branch=branch)

return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)

Expand Down Expand Up @@ -1255,6 +1272,7 @@ def upsert(
when_matched_update_all: bool = True,
when_not_matched_insert_all: bool = True,
case_sensitive: bool = True,
branch: Optional[str] = None,
) -> UpsertResult:
"""Shorthand API for performing an upsert to an iceberg table.

Expand All @@ -1265,6 +1283,7 @@ def upsert(
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
case_sensitive: Bool indicating if the match should be case-sensitive
branch: Branch Reference to run the upsert operation

To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

Expand Down Expand Up @@ -1297,9 +1316,10 @@ def upsert(
when_matched_update_all=when_matched_update_all,
when_not_matched_insert_all=when_not_matched_insert_all,
case_sensitive=case_sensitive,
branch=branch,
)

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None:
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None) -> None:
"""
Shorthand API for appending a PyArrow table to the table.

Expand All @@ -1312,7 +1332,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)

def dynamic_partition_overwrite(
self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH
self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None
) -> None:
"""Shorthand for dynamic overwriting the table with a PyArrow table.

Expand All @@ -1331,7 +1351,7 @@ def overwrite(
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str = MAIN_BRANCH,
branch: Optional[str] = None,
) -> None:
"""
Shorthand for overwriting the table with a PyArrow table.
Expand Down Expand Up @@ -1364,7 +1384,7 @@ def delete(
delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str = MAIN_BRANCH,
branch: Optional[str] = None,
) -> None:
"""
Shorthand for deleting rows from the table.
Expand Down
43 changes: 28 additions & 15 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,30 +105,39 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
_added_data_files: List[DataFile]
_manifest_num_counter: itertools.count[int]
_deleted_data_files: Set[DataFile]
_branch: str

def __init__(
self,
operation: Operation,
transaction: Transaction,
io: FileIO,
branch: str,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
branch: str = MAIN_BRANCH,
) -> None:
super().__init__(transaction)
self.commit_uuid = commit_uuid or uuid.uuid4()
self._io = io
self._operation = operation
self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
self._branch = branch
self._parent_snapshot_id = (
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._branch)) else None
)
self._added_data_files = []
self._deleted_data_files = set()
self.snapshot_properties = snapshot_properties
self._manifest_num_counter = itertools.count(0)
self._set_target_branch(branch=branch)
self._parent_snapshot_id = (
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._target_branch)) else None
)

def _set_target_branch(self, branch: str) -> None:
# Default is already set to MAIN_BRANCH. So branch name can't be None.
assert branch is not None, ValueError("Invalid branch name: null")
if branch in self._transaction.table_metadata.refs:
ref = self._transaction.table_metadata.refs[branch]
assert ref.snapshot_ref_type == SnapshotRefType.BRANCH, ValueError(
f"{branch} is a tag, not a branch. Tags cannot be targets for producing snapshots"
)
self._target_branch = branch

def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._added_data_files.append(data_file)
Expand Down Expand Up @@ -276,16 +285,16 @@ def _commit(self) -> UpdatesAndRequirements:
SetSnapshotRefUpdate(
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
ref_name=self._branch,
ref_name=self._target_branch,
type=SnapshotRefType.BRANCH,
),
),
(
AssertRefSnapshotId(
snapshot_id=self._transaction.table_metadata.refs[self._branch].snapshot_id
if self._branch in self._transaction.table_metadata.refs
snapshot_id=self._transaction.table_metadata.refs[self._target_branch].snapshot_id
if self._target_branch in self._transaction.table_metadata.refs
else self._transaction.table_metadata.current_snapshot_id,
ref=self._branch,
ref=self._target_branch,
),
),
)
Expand Down Expand Up @@ -338,7 +347,7 @@ def __init__(
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
):
super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties)
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch)
self._predicate = AlwaysFalse()
self._case_sensitive = True

Expand Down Expand Up @@ -503,7 +512,7 @@ def __init__(
) -> None:
from pyiceberg.table import TableProperties

super().__init__(operation, transaction, io, branch, commit_uuid, snapshot_properties)
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch)
self._target_size_bytes = property_as_int(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
Expand Down Expand Up @@ -549,7 +558,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
"""Determine if there are any existing manifest files."""
existing_files = []

if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._branch):
if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
for manifest_file in snapshot.manifests(io=self._io):
entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True)
found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files]
Expand Down Expand Up @@ -623,12 +632,16 @@ class UpdateSnapshot:
_snapshot_properties: Dict[str, str]

def __init__(
self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH
self,
transaction: Transaction,
io: FileIO,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
branch: Optional[str] = MAIN_BRANCH,
) -> None:
self._transaction = transaction
self._io = io
self._snapshot_properties = snapshot_properties
self._branch = branch
self._branch = branch if branch is not None else MAIN_BRANCH

def fast_append(self) -> _FastAppendFiles:
return _FastAppendFiles(
Expand Down
10 changes: 5 additions & 5 deletions pyiceberg/utils/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
class ExecutorFactory:
_instance: Optional[Executor] = None

@staticmethod
def max_workers() -> Optional[int]:
"""Return the max number of workers configured."""
return Config().get_int("max-workers")

@staticmethod
def get_or_create() -> Executor:
"""Return the same executor in each call."""
Expand All @@ -33,8 +38,3 @@ def get_or_create() -> Executor:
ExecutorFactory._instance = ThreadPoolExecutor(max_workers=max_workers)

return ExecutorFactory._instance

@staticmethod
def max_workers() -> Optional[int]:
"""Return the max number of workers configured."""
return Config().get_int("max-workers")