diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 14c7259504..6da2fc3a8b 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -679,6 +679,27 @@ latest_schema_id: [[null,0,0,0]] latest_sequence_number: [[null,0,0,0]] ``` +### History + +To show a table's history: + +```python +table.inspect.history() +``` + +``` +pyarrow.Table +made_current_at: timestamp[ms] not null +snapshot_id: int64 not null +parent_id: int64 +is_current_ancestor: bool not null +---- +made_current_at: [[2024-06-18 16:17:48.768,2024-06-18 16:17:49.240,2024-06-18 16:17:49.343,2024-06-18 16:17:49.511]] +snapshot_id: [[4358109269873137077,3380769165026943338,4358109269873137077,3089420140651211776]] +parent_id: [[null,4358109269873137077,null,4358109269873137077]] +is_current_ancestor: [[true,false,true,true]] +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index dced94de9e..8c1493974b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -113,6 +113,7 @@ SnapshotLogEntry, SnapshotSummaryCollector, Summary, + ancestors_of, update_snapshot_summaries, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -3879,6 +3880,33 @@ def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any schema=table_schema, ) + def history(self) -> "pa.Table": + import pyarrow as pa + + history_schema = pa.schema([ + pa.field("made_current_at", pa.timestamp(unit="ms"), nullable=False), + pa.field("snapshot_id", pa.int64(), nullable=False), + pa.field("parent_id", pa.int64(), nullable=True), + pa.field("is_current_ancestor", pa.bool_(), nullable=False), + ]) + + ancestors_ids = {snapshot.snapshot_id for snapshot in ancestors_of(self.tbl.current_snapshot(), self.tbl.metadata)} + + history = [] + metadata = self.tbl.metadata + + for snapshot_entry in metadata.snapshot_log: + snapshot = metadata.snapshot_by_id(snapshot_entry.snapshot_id) + + history.append({ + "made_current_at": datetime.utcfromtimestamp(snapshot_entry.timestamp_ms / 1000.0), + "snapshot_id": snapshot_entry.snapshot_id, + "parent_id": snapshot.parent_snapshot_id if snapshot else None, + "is_current_ancestor": snapshot_entry.snapshot_id in ancestors_ids, + }) + + return pa.Table.from_pylist(history, schema=history_schema) + @dataclass(frozen=True) class TablePartition: diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 2840fb0b16..8414fba333 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -568,3 +568,72 @@ def test_inspect_metadata_log_entries( if column == "timestamp": continue assert left == right, f"Difference in column {column}: {left} != {right}" + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_history" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES (1, "a") + """ + ) + + table = session_catalog.load_table(identifier) + first_snapshot = table.current_snapshot() + snapshot_id = None if not first_snapshot else first_snapshot.snapshot_id + + spark.sql( + f""" + INSERT INTO {identifier} VALUES (2, "b") + """ + ) + + spark.sql( + f""" + CALL integration.system.rollback_to_snapshot('{identifier}', {snapshot_id}) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES (3, "c") + """ + ) + + table.refresh() + + df = table.inspect.history() + + assert df.column_names == [ + "made_current_at", + "snapshot_id", + "parent_id", + "is_current_ancestor", + ] + + lhs = spark.table(f"{identifier}.history").toPandas() + rhs = df.to_pandas() + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): + # NaN != NaN in Python + continue + assert left == right, f"Difference in column {column}: {left} != {right}"