Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d4ca653
s/"main"/MAIN_BRANCH
kevinjqliu Jan 27, 2024
0b7aaaf
replace string literals
kevinjqliu Jan 27, 2024
23f04ec
default writes to main branch
kevinjqliu Jan 27, 2024
af6ff9a
Added some more methods for branches
vinjai Jul 18, 2024
6fbf3f1
s/"main"/MAIN_BRANCH
kevinjqliu Jan 27, 2024
8ce1509
replace string literals
kevinjqliu Jan 27, 2024
6daf29e
default writes to main branch
kevinjqliu Jan 27, 2024
09321cd
Added some more methods for branches
vinjai Jul 18, 2024
60fef31
Merged with master
vinjai Oct 12, 2024
45b01a6
Updated antries for branches
vinjai Oct 12, 2024
917108b
Resolved Merge Conflict
vinjai Oct 12, 2024
917b044
Fixed some bugs
vinjai Oct 14, 2024
398f6c0
Fixed bugs in delete and overwrite
vinjai Oct 15, 2024
b7b8ba0
Added tests and some refactoring
vinjai Oct 16, 2024
ee591b4
Added another integration test
vinjai Oct 16, 2024
e81907d
Fixed bug: concurrent same name branch and tag writes
vinjai Oct 16, 2024
4cf9198
Merge with main branch
vinjai Nov 13, 2024
bc6fb68
Added integration tests with spark
vinjai Nov 14, 2024
82e65e1
Fixed comments for AssertSnapshotRef
vinjai Feb 23, 2025
82e5b90
Fixed comments and linter issues
vinjai Feb 23, 2025
84d0971
Fixed comments
vinjai Feb 23, 2025
3efe53c
Fixed comments
vinjai Feb 23, 2025
dfedc63
Fixed a bug in tests
vinjai Feb 24, 2025
076a6d5
Fixed some more tests
vinjai Feb 24, 2025
53a7f84
Merge branch 'main' into feature/write-to-branch
vinjai May 25, 2025
e4463df
Fixed linter and code errors
vinjai May 25, 2025
49f75b4
Fixed bug for empty tables
vinjai May 26, 2025
4ed0607
Fixed bugs and added more tests
vinjai May 27, 2025
958aac4
changed design context for branch writes
vinjai May 27, 2025
a0aae4d
Merge branch 'main' into feature/write-to-branch
vinjai Jun 3, 2025
76249e9
Merge branch 'main' into feature/write-to-branch
vinjai Jun 23, 2025
079802a
Fixed linter, comments and other bugs
vinjai Jun 24, 2025
f45df8b
Usage of builder pattern
vinjai Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'main' into feature/write-to-branch
  • Loading branch information
vinjai committed May 25, 2025
commit 53a7f845ad3096c09c11f64ffec26b3a9c4b0480
1 change: 0 additions & 1 deletion pyiceberg/cli/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchPropertyException, NoSuchTableError
from pyiceberg.table import TableProperties
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
from pyiceberg.utils.deprecated import deprecated
from pyiceberg.utils.properties import property_as_int


Expand Down
106 changes: 84 additions & 22 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanE
expr = Or(expr, match_partition_expression)
return expr

def _append_snapshot_producer(self, snapshot_properties: Dict[str, str]) -> _FastAppendFiles:
def _append_snapshot_producer(self, snapshot_properties: Dict[str, str],branch:str) -> _FastAppendFiles:
"""Determine the append type based on table properties.

Args:
Expand All @@ -411,7 +411,7 @@ def _append_snapshot_producer(self, snapshot_properties: Dict[str, str]) -> _Fas
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)
update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties,branch=branch)
return update_snapshot.merge_append() if manifest_merge_enabled else update_snapshot.fast_append()

def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
Expand Down Expand Up @@ -439,6 +439,15 @@ def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT, bran
"""
return UpdateSnapshot(self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties)

def update_statistics(self) -> UpdateStatistics:
"""
Create a new UpdateStatistics to update the statistics of the table.

Returns:
A new UpdateStatistics
"""
return UpdateStatistics(transaction=self)

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None:
"""
Shorthand API for appending a PyArrow table to a table transaction.
Expand Down Expand Up @@ -469,15 +478,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)

manifest_merge_enabled = property_as_bool(
self.table_metadata.properties,
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)
update_snapshot = self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties)
append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append

with append_method() as append_files:
with self._append_snapshot_producer(snapshot_properties,branch=branch) as append_files:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = list(
Expand Down Expand Up @@ -549,6 +550,7 @@ def overwrite(
df: pa.Table,
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str = MAIN_BRANCH,
) -> None:
"""
Expand All @@ -564,8 +566,8 @@ def overwrite(
df: The Arrow dataframe that will be used to overwrite the table
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
snapshot_properties: Custom properties to be added to the snapshot summary
case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
branch: Branch Reference to run the overwrite operation
"""
try:
Expand All @@ -589,9 +591,11 @@ def overwrite(
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)

self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch)
if overwrite_filter != AlwaysFalse():
# Only delete when the filter is != AlwaysFalse
self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch)

with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
with self._append_snapshot_producer(snapshot_properties,branch=branch) as append_files:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
Expand All @@ -604,6 +608,7 @@ def delete(
self,
delete_filter: Union[str, BooleanExpression],
snapshot_properties: Dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str = MAIN_BRANCH,
) -> None:
"""
Expand All @@ -617,6 +622,7 @@ def delete(
Args:
delete_filter: A boolean expression to delete rows from a table
snapshot_properties: Custom properties to be added to the snapshot summary
case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
branch: Branch Reference to run the delete operation
"""
from pyiceberg.io.pyarrow import (
Expand All @@ -634,15 +640,15 @@ def delete(
if isinstance(delete_filter, str):
delete_filter = _parse_row_filter(delete_filter)

with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).delete() as delete_snapshot:
delete_snapshot.delete_by_predicate(delete_filter)
with self.update_snapshot(snapshot_properties=snapshot_properties,branch=branch).delete() as delete_snapshot:
delete_snapshot.delete_by_predicate(delete_filter, case_sensitive)

# Check if there are any files that require an actual rewrite of a data file
if delete_snapshot.rewrites_needed is True:
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive)
preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)

files = self._scan(row_filter=delete_filter).use_ref(branch).plan_files()
files = self._scan(row_filter=delete_filter, case_sensitive=case_sensitive).use_ref(branch).plan_files()

commit_uuid = uuid.uuid4()
counter = itertools.count(0)
Expand Down Expand Up @@ -684,9 +690,8 @@ def delete(
)

if len(replaced_files) > 0:
with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).overwrite(
commit_uuid=commit_uuid
) as overwrite_snapshot:
with self.update_snapshot(snapshot_properties=snapshot_properties,branch=branch).overwrite() as overwrite_snapshot:
overwrite_snapshot.commit_uuid = commit_uuid
for original_data_file, replaced_data_files in replaced_files:
overwrite_snapshot.delete_data_file(original_data_file)
for replaced_data_file in replaced_data_files:
Expand Down Expand Up @@ -1233,6 +1238,57 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()

def upsert(
self,
df: pa.Table,
join_cols: Optional[List[str]] = None,
when_matched_update_all: bool = True,
when_not_matched_insert_all: bool = True,
case_sensitive: bool = True,
) -> UpsertResult:
"""Shorthand API for performing an upsert to an iceberg table.

Args:

df: The input dataframe to upsert with the table's data.
join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
case_sensitive: Bool indicating if the match should be case-sensitive

To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

Example Use Cases:
Case 1: Both Parameters = True (Full Upsert)
Existing row found → Update it
New row found → Insert it

Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
Existing row found → Do nothing (no updates)
New row found → Insert it

Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
Existing row found → Update it
New row found → Do nothing (no inserts)

Case 4: Both Parameters = False (No Merge Effect)
Existing row found → Do nothing
New row found → Do nothing
(Function effectively does nothing)


Returns:
An UpsertResult class (contains details of rows updated and inserted)
"""
with self.transaction() as tx:
return tx.upsert(
df=df,
join_cols=join_cols,
when_matched_update_all=when_matched_update_all,
when_not_matched_insert_all=when_not_matched_insert_all,
case_sensitive=case_sensitive,
)

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH) -> None:
"""
Shorthand API for appending a PyArrow table to the table.
Expand Down Expand Up @@ -1261,6 +1317,7 @@ def overwrite(
df: pa.Table,
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str = MAIN_BRANCH,
) -> None:
"""
Expand All @@ -1277,15 +1334,19 @@ def overwrite(
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
snapshot_properties: Custom properties to be added to the snapshot summary
case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
branch: Branch Reference to run the overwrite operation
"""
with self.transaction() as tx:
tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch)
tx.overwrite(
df=df, overwrite_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch
)

def delete(
self,
delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str = MAIN_BRANCH,
) -> None:
"""
Expand All @@ -1294,10 +1355,11 @@ def delete(
Args:
delete_filter: The predicate that used to remove rows
snapshot_properties: Custom properties to be added to the snapshot summary
case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
branch: Branch Reference to run the delete operation
"""
with self.transaction() as tx:
tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)
tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch)

def add_files(
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
Expand Down
124 changes: 112 additions & 12 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1221,10 +1221,11 @@ def test_table_write_schema_with_valid_nullability_diff(
table_schema = Schema(
NestedField(field_id=1, name="long", field_type=LongType(), required=False),
)
other_schema = pa.schema((
pa.field("long", pa.int64(), nullable=False),
# can support writing required pyarrow field to optional Iceberg field
))
other_schema = pa.schema(
(
pa.field("long", pa.int64(), nullable=False), # can support writing required pyarrow field to optional Iceberg field
)
)
arrow_table = pa.Table.from_pydict(
{
"long": [1, 9],
Expand Down Expand Up @@ -1265,14 +1266,15 @@ def test_table_write_schema_with_valid_upcast(
# table's long field should cast to long on read
written_arrow_table = tbl.scan().to_arrow()
assert written_arrow_table == pyarrow_table_with_promoted_types.cast(
pa.schema((
pa.field("long", pa.int64(), nullable=True),
pa.field("list", pa.large_list(pa.int64()), nullable=False),
pa.field("map", pa.map_(pa.large_string(), pa.int64()), nullable=False),
pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double
pa.field("uuid", pa.binary(length=16), nullable=True),
# can UUID is read as fixed length binary of length 16
))
pa.schema(
(
pa.field("long", pa.int64(), nullable=True),
pa.field("list", pa.list_(pa.int64()), nullable=False),
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double
pa.field("uuid", pa.binary(length=16), nullable=True), # can UUID is read as fixed length binary of length 16
)
)
)
lhs = spark.table(f"{identifier}").toPandas()
rhs = written_arrow_table.to_pandas()
Expand Down Expand Up @@ -1744,6 +1746,104 @@ def test_abort_table_transaction_on_exception(
assert len(tbl.scan().to_pandas()) == table_size # type: ignore


@pytest.mark.integration
def test_write_optional_list(session_catalog: Catalog) -> None:
identifier = "default.test_write_optional_list"
schema = Schema(
NestedField(field_id=1, name="name", field_type=StringType(), required=False),
NestedField(
field_id=3,
name="my_list",
field_type=ListType(element_id=45, element=StringType(), element_required=False),
required=False,
),
)
session_catalog.create_table_if_not_exists(identifier, schema)

df_1 = pa.Table.from_pylist(
[
{"name": "one", "my_list": ["test"]},
{"name": "another", "my_list": ["test"]},
]
)
session_catalog.load_table(identifier).append(df_1)

assert len(session_catalog.load_table(identifier).scan().to_arrow()) == 2

df_2 = pa.Table.from_pylist(
[
{"name": "one"},
{"name": "another"},
]
)
session_catalog.load_table(identifier).append(df_2)

assert len(session_catalog.load_table(identifier).scan().to_arrow()) == 4


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_evolve_and_write(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
identifier = "default.test_evolve_and_write"
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}, schema=Schema())
other_table = session_catalog.load_table(identifier)

numbers = pa.array([1, 2, 3, 4], type=pa.int32())

with tbl.update_schema() as upd:
# This is not known by other_table
upd.add_column("id", IntegerType())

with other_table.transaction() as tx:
# Refreshes the underlying metadata, and the schema
other_table.refresh()
tx.append(
pa.Table.from_arrays(
[
numbers,
],
schema=pa.schema(
[
pa.field("id", pa.int32(), nullable=True),
]
),
)
)

assert session_catalog.load_table(identifier).scan().to_arrow().column(0).combine_chunks() == numbers


@pytest.mark.integration
def test_read_write_decimals(session_catalog: Catalog) -> None:
"""Roundtrip decimal types to make sure that we correctly write them as ints"""
identifier = "default.test_read_write_decimals"

arrow_table = pa.Table.from_pydict(
{
"decimal8": pa.array([Decimal("123.45"), Decimal("678.91")], pa.decimal128(8, 2)),
"decimal16": pa.array([Decimal("12345679.123456"), Decimal("67891234.678912")], pa.decimal128(16, 6)),
"decimal19": pa.array([Decimal("1234567890123.123456"), Decimal("9876543210703.654321")], pa.decimal128(19, 6)),
},
)

tbl = _create_table(
session_catalog,
identifier,
properties={"format-version": 2},
schema=Schema(
NestedField(1, "decimal8", DecimalType(8, 2)),
NestedField(2, "decimal16", DecimalType(16, 6)),
NestedField(3, "decimal19", DecimalType(19, 6)),
),
)

tbl.append(arrow_table)

assert tbl.scan().to_arrow() == arrow_table


@pytest.mark.integration
def test_append_to_non_existing_branch(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.test_non_existing_branch"
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.