Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
fccb74b
test
mattmartin14 Jan 14, 2025
7298589
unit testing
mattmartin14 Jan 14, 2025
25bc9cf
adding unit tests
mattmartin14 Jan 14, 2025
af6c868
adding unit tests
mattmartin14 Jan 14, 2025
94be807
adding unit tests
mattmartin14 Jan 14, 2025
269d9f5
adding unit tests
mattmartin14 Jan 15, 2025
f44c61a
adding unit tests
mattmartin14 Jan 15, 2025
a96fdf9
finished unit tests
mattmartin14 Jan 16, 2025
fa5ab35
removed unnecesary return
mattmartin14 Jan 16, 2025
cfa2277
updated poetry manifest list for datafusion package dependency
mattmartin14 Jan 17, 2025
35f29be
added license headers, cleaned up dead code
mattmartin14 Jan 22, 2025
6c68d0d
updated the merge function to use bools for matched and not matched rows
mattmartin14 Jan 29, 2025
2d1e8ae
incorporated changes for boolExpression. It simplified the filters a lot
mattmartin14 Jan 31, 2025
f988f25
moved the filter build function to a separate function to accomodate …
mattmartin14 Jan 31, 2025
43393b4
removed unneccessary comment
mattmartin14 Jan 31, 2025
9a561b4
removed test files
mattmartin14 Jan 31, 2025
9ef39a6
bug fixes and removed some more dependency on datafusion
mattmartin14 Feb 3, 2025
2ba1ed6
updated various items including adding a dataclass return result
mattmartin14 Feb 4, 2025
a42eecd
updated merge_rows to remove dependency from datafusion! wahoo
mattmartin14 Feb 4, 2025
1305f58
renamed merge_rows to upsert, removed unnecessary code. will put in f…
mattmartin14 Feb 5, 2025
b2be3db
adding params to unit testing for pytest; having some errors
mattmartin14 Feb 5, 2025
f5688ad
fixed bugs on unit testing; added context wrapper for txn; fixed vari…
mattmartin14 Feb 5, 2025
7d55a4e
bug fixes
mattmartin14 Feb 6, 2025
2e14767
updated some error throwing items
mattmartin14 Feb 6, 2025
85c5848
moved datafusion to just a dev dependency in poetry toml
mattmartin14 Feb 6, 2025
6472071
updated UpsertRow class to be recognized in the return statement
mattmartin14 Feb 6, 2025
51c34da
removed some spaces and streamlined assert statements in unit testing
mattmartin14 Feb 6, 2025
862a69a
updated test cases to use an InMemory catalog
mattmartin14 Feb 7, 2025
3731b86
updated some formatting; added more commentary on the rows_to_update …
mattmartin14 Feb 7, 2025
bbb35d6
rebased poetry lock file and pyproject.toml file; removed sf repo info
mattmartin14 Feb 10, 2025
c8189c9
Merge branch 'main' into main
mattmartin14 Feb 10, 2025
02af4d4
updated equality checks with not instead of == false
mattmartin14 Feb 10, 2025
cc75192
ran ruff check --fix
mattmartin14 Feb 10, 2025
998d98b
manually added lint fixes and updated poetry toml and lock files. tha…
mattmartin14 Feb 11, 2025
513c839
added formatting fices
mattmartin14 Feb 11, 2025
0fd6446
remove the node_modules
mattmartin14 Feb 11, 2025
5fc3478
updated code for another round of fixes
mattmartin14 Feb 11, 2025
6cef789
removed npm uneeded files
mattmartin14 Feb 11, 2025
40b69b8
fixed formatting on upsert function for docs build
mattmartin14 Feb 12, 2025
804c526
Merge branch 'main' into main
mattmartin14 Feb 12, 2025
09e0347
rebased for poetry lock files
mattmartin14 Feb 12, 2025
ca2d904
updated lock files. thanks kevin
mattmartin14 Feb 12, 2025
77375fb
fixed other changes
mattmartin14 Feb 12, 2025
ba4db49
fixed gitignore file
mattmartin14 Feb 12, 2025
622e66c
no whitespace
mattmartin14 Feb 12, 2025
9e79dad
fixed vendor fb file from kevins changes
mattmartin14 Feb 12, 2025
4cbf3e3
reverting vendor changes
mattmartin14 Feb 12, 2025
5333a1e
removing node modules
mattmartin14 Feb 12, 2025
11a25be
updating vendor files
mattmartin14 Feb 12, 2025
03a8d10
Update vendor/fb303/FacebookService.py
mattmartin14 Feb 12, 2025
8a2143c
updated vendor files
mattmartin14 Feb 12, 2025
e719cf8
updated vendor files
mattmartin14 Feb 12, 2025
245b4a9
attempting to update poetry files
mattmartin14 Feb 12, 2025
e3e9611
Merge branch 'main' into main
mattmartin14 Feb 12, 2025
e575b3c
restore vendor/
kevinjqliu Feb 13, 2025
e4e530f
resetore pyproject.toml
kevinjqliu Feb 13, 2025
2ff2083
poetry lock
kevinjqliu Feb 13, 2025
8585d2d
add datafusion to tool.mypy.overrides
kevinjqliu Feb 13, 2025
f673b70
Merge remote-tracking branch 'apache/main' into StateFarmIns/main
kevinjqliu Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updated various items including adding a dataclass return result
  • Loading branch information
mattmartin14 committed Feb 4, 2025
commit 2ba1ed68ef772d6d1ba4705b1291200de9a33c1e
16 changes: 13 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@
from pyiceberg.utils.config import Config
from pyiceberg.utils.properties import property_as_bool

from dataclasses import dataclass

if TYPE_CHECKING:
import daft
import pandas as pd
Expand Down Expand Up @@ -1065,10 +1067,18 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()

@dataclass
class MergeResult:
"""docstring"""
rows_updated: int
rows_inserted: int
info_msgs: str
error_msgs: str

def merge_rows(self, df: pa.Table, join_cols: list
, when_matched_update_all: bool = True
, when_not_matched_insert_all: bool = True
) -> Dict:
) -> MergeResult:
"""
Shorthand API for performing an upsert/merge to an iceberg table.

Expand Down Expand Up @@ -1098,7 +1108,7 @@ def merge_rows(self, df: pa.Table, join_cols: list
target_table_name = "target"

if when_matched_update_all == False and when_not_matched_insert_all == False:
return {'rows_updated': 0, 'rows_inserted': 0, 'msg': 'no merge options selected...exiting'}
return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no merge options selected...exiting'}

missing_columns = merge_rows_util.do_join_columns_exist(df, self, join_cols)

Expand Down Expand Up @@ -1143,9 +1153,9 @@ def merge_rows(self, df: pa.Table, join_cols: list

txn.overwrite(update_recs, overwrite_filter=overwrite_filter)

# Insert the new records

if when_not_matched_insert_all:

insert_recs_sql = merge_rows_util.get_rows_to_insert_sql(source_table_name, target_table_name, join_cols, source_col_list, target_col_list)

insert_recs = ctx.sql(insert_recs_sql).to_arrow_table()
Expand Down
8 changes: 5 additions & 3 deletions pyiceberg/table/merge_rows_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ def get_filter_list(df: pyarrow_table, join_cols: list) -> BooleanExpression:
return pred


def get_table_column_list_pa(df: pyarrow_table) -> list:
def get_table_column_list_pa(df: pyarrow_table) -> set:
return set(col for col in df.column_names)

def get_table_column_list_iceberg(table: pyiceberg_table) -> list:
def get_table_column_list_iceberg(table: pyiceberg_table) -> set:
return set(col for col in table.schema().column_names)

def dups_check_in_source(df: pyarrow_table, join_cols: list) -> bool:
Expand All @@ -69,7 +69,6 @@ def dups_check_in_source(df: pyarrow_table, join_cols: list) -> bool:

return source_dup_count > 0


def do_join_columns_exist(source_df: pyarrow_table, target_iceberg_table: pyiceberg_table, join_cols: list) -> bool:

"""
Expand All @@ -89,6 +88,8 @@ def do_join_columns_exist(source_df: pyarrow_table, target_iceberg_table: pyiceb

return missing_columns



def get_rows_to_update_sql(source_table_name: str, target_table_name: str
, join_cols: list
, source_cols_list: set
Expand All @@ -99,6 +100,7 @@ def get_rows_to_update_sql(source_table_name: str, target_table_name: str
"""

# Determine non-join columns that exist in both tables


non_join_cols = source_cols_list.intersection(target_cols_list) - set(join_cols)

Expand Down
49 changes: 16 additions & 33 deletions tests/table/test_merge_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,8 @@ def gen_source_dataset(start_row: int, end_row: int, composite_key: bool, add_du
{dup_row}
"""

#print(sql)

df = ctx.sql(sql).to_arrow_table()


return df

def gen_target_iceberg_table(start_row: int, end_row: int, composite_key: bool, ctx: SessionContext):
Expand All @@ -110,7 +107,7 @@ def gen_target_iceberg_table(start_row: int, end_row: int, composite_key: bool,

return table

def test_merge_scenario_1a_simple():
def test_merge_scenario_single_ins_upd():

"""
tests a single insert and update
Expand All @@ -130,9 +127,8 @@ def test_merge_scenario_1a_simple():
assert res['rows_inserted'] == rows_inserted_should_be, f"rows inserted should be {rows_inserted_should_be}, but got {res['rows_inserted']}"

purge_warehouse()
print('merge rows: test scenario 1a pass')

def test_merge_scenario_1b_simple():
def test_merge_scenario_skip_upd_row():

"""
tests a single insert and update; skips a row that does not need to be updated
Expand Down Expand Up @@ -168,10 +164,8 @@ def test_merge_scenario_1b_simple():
assert res['rows_inserted'] == rows_inserted_should_be, f"rows inserted should be {rows_inserted_should_be}, but got {res['rows_inserted']}"

purge_warehouse()
print('merge rows: test scenario 1b (skip 1 row) pass')


def test_merge_scenario_1c_simple():
def test_merge_scenario_date_as_key():

"""
tests a single insert and update; primary key is a date column
Expand Down Expand Up @@ -207,9 +201,8 @@ def test_merge_scenario_1c_simple():
assert res['rows_inserted'] == rows_inserted_should_be, f"rows inserted should be {rows_inserted_should_be}, but got {res['rows_inserted']}"

purge_warehouse()
print('merge rows: test scenario 1c (date as key column) pass')

def test_merge_scenario_1d_simple():
def test_merge_scenario_string_as_key():

"""
tests a single insert and update; primary key is a string column
Expand Down Expand Up @@ -245,9 +238,8 @@ def test_merge_scenario_1d_simple():
assert res['rows_inserted'] == rows_inserted_should_be, f"rows inserted should be {rows_inserted_should_be}, but got {res['rows_inserted']}"

purge_warehouse()
print('merge rows: test scenario 1d (string as key column) pass')

def test_merge_scenario_2_10k_rows():
def test_merge_scenario_10k_rows():

"""
tests merging 10000 rows on a single key to simulate larger workload
Expand All @@ -268,9 +260,8 @@ def test_merge_scenario_2_10k_rows():
assert res['rows_inserted'] == rows_inserted_should_be, f"rows inserted should be {rows_inserted_should_be}, but got {res['rows_inserted']}"

purge_warehouse()
print('merge rows: test scenario 2 pass')

def test_merge_scenario_3_composite_key():
def test_merge_scenario_composite_key():

"""
tests merging 200 rows with a composite key
Expand All @@ -291,7 +282,6 @@ def test_merge_scenario_3_composite_key():
assert res['rows_inserted'] == rows_inserted_should_be, f"rows inserted should be {rows_inserted_should_be}, but got {res['rows_inserted']}"

purge_warehouse()
print('merge rows: composite keys test pass')

def test_merge_update_only():

Expand All @@ -313,7 +303,6 @@ def test_merge_update_only():
assert res['rows_inserted'] == rows_inserted_should_be, f"rows inserted should be {rows_inserted_should_be}, but got {res['rows_inserted']}"

purge_warehouse()
print('merge rows: update only pass')

def test_merge_insert_only():
"""
Expand All @@ -334,7 +323,6 @@ def test_merge_insert_only():
assert res['rows_inserted'] == rows_inserted_should_be, f"rows inserted should be {rows_inserted_should_be}, but got {res['rows_inserted']}"

purge_warehouse()
print('merge rows: insert only pass')

def test_merge_source_dups():

Expand All @@ -354,7 +342,6 @@ def test_merge_source_dups():
assert 'Duplicate rows found in source dataset' in error_msgs, f"error message should contain 'Duplicate rows found in source dataset', but got {error_msgs}"

purge_warehouse()
print('merge rows: source dups test pass')

def test_key_cols_misaligned():

Expand All @@ -381,17 +368,13 @@ def test_key_cols_misaligned():

purge_warehouse()

print('merge rows: key cols misaligned test pass')

if __name__ == "__main__":

test_merge_scenario_1a_simple()
test_merge_scenario_1b_simple()
test_merge_scenario_1c_simple()
test_merge_scenario_1d_simple()
test_merge_scenario_2_10k_rows()
test_merge_scenario_3_composite_key()
test_merge_update_only()
test_merge_insert_only()
test_merge_source_dups()
test_key_cols_misaligned()
test_merge_scenario_single_ins_upd
test_merge_scenario_skip_upd_row
test_merge_scenario_date_as_key
test_merge_scenario_string_as_key
test_merge_scenario_10k_rows()
test_merge_scenario_composite_key()
test_merge_update_only()
test_merge_insert_only()
test_merge_source_dups()
test_key_cols_misaligned()