Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,100 @@ min_snapshots_to_keep: [[null,10]]
max_snapshot_age_in_ms: [[null,604800000]]
```

### Manifests

To show a table's current file manifests:

```python
table.inspect.manifests()
```

```
pyarrow.Table
content: int8 not null
path: string not null
length: int64 not null
partition_spec_id: int32 not null
added_snapshot_id: int64 not null
added_data_files_count: int32 not null
existing_data_files_count: int32 not null
deleted_data_files_count: int32 not null
added_delete_files_count: int32 not null
existing_delete_files_count: int32 not null
deleted_delete_files_count: int32 not null
partition_summaries: list<item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>> not null
child 0, item: struct<contains_null: bool not null, contains_nan: bool, lower_bound: string, upper_bound: string>
child 0, contains_null: bool not null
child 1, contains_nan: bool
child 2, lower_bound: string
child 3, upper_bound: string
----
content: [[0]]
path: [["s3://warehouse/default/table_metadata_manifests/metadata/3bf5b4c6-a7a4-4b43-a6ce-ca2b4887945a-m0.avro"]]
length: [[6886]]
partition_spec_id: [[0]]
added_snapshot_id: [[3815834705531553721]]
added_data_files_count: [[1]]
existing_data_files_count: [[0]]
deleted_data_files_count: [[0]]
added_delete_files_count: [[0]]
existing_delete_files_count: [[0]]
deleted_delete_files_count: [[0]]
partition_summaries: [[ -- is_valid: all not null
-- child 0 type: bool
[false]
-- child 1 type: bool
[false]
-- child 2 type: string
["test"]
-- child 3 type: string
["test"]]]
```

### Metadata Log Entries

To show table metadata log entries:

```python
table.inspect.metadata_log_entries()
```

```
pyarrow.Table
timestamp: timestamp[ms] not null
file: string not null
latest_snapshot_id: int64
latest_schema_id: int32
latest_sequence_number: int64
----
timestamp: [[2024-04-28 17:03:00.214,2024-04-28 17:03:00.352,2024-04-28 17:03:00.445,2024-04-28 17:03:00.498]]
file: [["s3://warehouse/default/table_metadata_log_entries/metadata/00000-0b3b643b-0f3a-4787-83ad-601ba57b7319.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00001-f74e4b2c-0f89-4f55-822d-23d099fd7d54.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00002-97e31507-e4d9-4438-aff1-3c0c5304d271.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00003-6c8b7033-6ad8-4fe4-b64d-d70381aeaddc.metadata.json"]]
latest_snapshot_id: [[null,3958871664825505738,1289234307021405706,7640277914614648349]]
latest_schema_id: [[null,0,0,0]]
latest_sequence_number: [[null,0,0,0]]
```

### History

To show a table's history:

```python
table.inspect.history()
```

```
pyarrow.Table
made_current_at: timestamp[ms] not null
snapshot_id: int64 not null
parent_id: int64
is_current_ancestor: bool not null
----
made_current_at: [[2024-06-18 16:17:48.768,2024-06-18 16:17:49.240,2024-06-18 16:17:49.343,2024-06-18 16:17:49.511]]
snapshot_id: [[4358109269873137077,3380769165026943338,4358109269873137077,3089420140651211776]]
parent_id: [[null,4358109269873137077,null,4358109269873137077]]
is_current_ancestor: [[true,false,true,true]]
```

### Files

Inspect the data files in the current snapshot of the table:
Expand Down
150 changes: 149 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3758,7 +3758,155 @@ def update_partitions_map(
schema=table_schema,
)

def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
def manifests(self) -> "pa.Table":
import pyarrow as pa
from pyiceberg.conversions import from_bytes

partition_summary_schema = pa.struct([
pa.field("contains_null", pa.bool_(), nullable=False),
pa.field("contains_nan", pa.bool_(), nullable=True),
pa.field("lower_bound", pa.string(), nullable=True),
pa.field("upper_bound", pa.string(), nullable=True),
])

manifest_schema = pa.schema([
pa.field("content", pa.int8(), nullable=False),
pa.field("path", pa.string(), nullable=False),
pa.field("length", pa.int64(), nullable=False),
pa.field("partition_spec_id", pa.int32(), nullable=False),
pa.field("added_snapshot_id", pa.int64(), nullable=False),
pa.field("added_data_files_count", pa.int32(), nullable=False),
pa.field("existing_data_files_count", pa.int32(), nullable=False),
pa.field("deleted_data_files_count", pa.int32(), nullable=False),
pa.field("added_delete_files_count", pa.int32(), nullable=False),
pa.field("existing_delete_files_count", pa.int32(), nullable=False),
pa.field("deleted_delete_files_count", pa.int32(), nullable=False),
pa.field("partition_summaries", pa.list_(partition_summary_schema), nullable=False),
])

def _partition_summaries_to_rows(
spec: PartitionSpec, partition_summaries: List[PartitionFieldSummary]
) -> List[Dict[str, Any]]:
rows = []
for i, field_summary in enumerate(partition_summaries):
field = spec.fields[i]
partition_field_type = spec.partition_type(self.tbl.schema()).fields[i].field_type
lower_bound = (
(
field.transform.to_human_string(
partition_field_type, from_bytes(partition_field_type, field_summary.lower_bound)
)
)
if field_summary.lower_bound
else None
)
upper_bound = (
(
field.transform.to_human_string(
partition_field_type, from_bytes(partition_field_type, field_summary.upper_bound)
)
)
if field_summary.upper_bound
else None
)
rows.append({
"contains_null": field_summary.contains_null,
"contains_nan": field_summary.contains_nan,
"lower_bound": lower_bound,
"upper_bound": upper_bound,
})
return rows

specs = self.tbl.metadata.specs()
manifests = []
if snapshot := self.tbl.metadata.current_snapshot():
for manifest in snapshot.manifests(self.tbl.io):
is_data_file = manifest.content == ManifestContent.DATA
is_delete_file = manifest.content == ManifestContent.DELETES
manifests.append({
"content": manifest.content,
"path": manifest.manifest_path,
"length": manifest.manifest_length,
"partition_spec_id": manifest.partition_spec_id,
"added_snapshot_id": manifest.added_snapshot_id,
"added_data_files_count": manifest.added_files_count if is_data_file else 0,
"existing_data_files_count": manifest.existing_files_count if is_data_file else 0,
"deleted_data_files_count": manifest.deleted_files_count if is_data_file else 0,
"added_delete_files_count": manifest.added_files_count if is_delete_file else 0,
"existing_delete_files_count": manifest.existing_files_count if is_delete_file else 0,
"deleted_delete_files_count": manifest.deleted_files_count if is_delete_file else 0,
"partition_summaries": _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions)
if manifest.partitions
else [],
})

return pa.Table.from_pylist(
manifests,
schema=manifest_schema,
)

def metadata_log_entries(self) -> "pa.Table":
import pyarrow as pa

from pyiceberg.table.snapshots import MetadataLogEntry

table_schema = pa.schema([
pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False),
pa.field("file", pa.string(), nullable=False),
pa.field("latest_snapshot_id", pa.int64(), nullable=True),
pa.field("latest_schema_id", pa.int32(), nullable=True),
pa.field("latest_sequence_number", pa.int64(), nullable=True),
])

def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]:
latest_snapshot = self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms)
return {
"timestamp": metadata_entry.timestamp_ms,
"file": metadata_entry.metadata_file,
"latest_snapshot_id": latest_snapshot.snapshot_id if latest_snapshot else None,
"latest_schema_id": latest_snapshot.schema_id if latest_snapshot else None,
"latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None,
}

# similar to MetadataLogEntriesTable in Java
# https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66
metadata_log_entries = self.tbl.metadata.metadata_log + [
MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms)
]

return pa.Table.from_pylist(
[metadata_log_entry_to_row(entry) for entry in metadata_log_entries],
schema=table_schema,
)

def history(self) -> "pa.Table":
import pyarrow as pa

history_schema = pa.schema([
pa.field("made_current_at", pa.timestamp(unit="ms"), nullable=False),
pa.field("snapshot_id", pa.int64(), nullable=False),
pa.field("parent_id", pa.int64(), nullable=True),
pa.field("is_current_ancestor", pa.bool_(), nullable=False),
])

ancestors_ids = {snapshot.snapshot_id for snapshot in ancestors_of(self.tbl.current_snapshot(), self.tbl.metadata)}

history = []
metadata = self.tbl.metadata

for snapshot_entry in metadata.snapshot_log:
snapshot = metadata.snapshot_by_id(snapshot_entry.snapshot_id)

history.append({
"made_current_at": datetime.utcfromtimestamp(snapshot_entry.timestamp_ms / 1000.0),
"snapshot_id": snapshot_entry.snapshot_id,
"parent_id": snapshot.parent_snapshot_id if snapshot else None,
"is_current_ancestor": snapshot_entry.snapshot_id in ancestors_ids,
})

return pa.Table.from_pylist(history, schema=history_schema)

def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow
Expand Down
Loading