Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
empty files for no snapshot
  • Loading branch information
Gowthami03B committed Jun 27, 2024
commit 5580e40a5c9edfb1dcaea99d0b69c715cc70229f
80 changes: 43 additions & 37 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3760,6 +3760,7 @@ def update_partitions_map(

def manifests(self) -> "pa.Table":
import pyarrow as pa

from pyiceberg.conversions import from_bytes

partition_summary_schema = pa.struct([
Expand Down Expand Up @@ -3905,8 +3906,8 @@ def history(self) -> "pa.Table":
})

return pa.Table.from_pylist(history, schema=history_schema)
def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":

def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow
Expand All @@ -3931,27 +3932,32 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
)

files_schema = pa.schema([
pa.field('content', pa.int8(), nullable=False),
pa.field('file_path', pa.string(), nullable=False),
pa.field('file_format', pa.dictionary(pa.int32(), pa.string()), nullable=False),
pa.field('spec_id', pa.int32(), nullable=False),
pa.field('record_count', pa.int64(), nullable=False),
pa.field('file_size_in_bytes', pa.int64(), nullable=False),
pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('null_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('nan_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field('key_metadata', pa.binary(), nullable=True),
pa.field('split_offsets', pa.list_(pa.int64()), nullable=True),
pa.field('equality_ids', pa.list_(pa.int32()), nullable=True),
pa.field('sort_order_id', pa.int32(), nullable=True),
pa.field('readable_metrics', pa.struct(readable_metrics_struct), nullable=True),
pa.field("content", pa.int8(), nullable=False),
pa.field("file_path", pa.string(), nullable=False),
pa.field("file_format", pa.dictionary(pa.int32(), pa.string()), nullable=False),
pa.field("spec_id", pa.int32(), nullable=False),
pa.field("record_count", pa.int64(), nullable=False),
pa.field("file_size_in_bytes", pa.int64(), nullable=False),
pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("null_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("nan_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field("lower_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field("upper_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field("key_metadata", pa.binary(), nullable=True),
pa.field("split_offsets", pa.list_(pa.int64()), nullable=True),
pa.field("equality_ids", pa.list_(pa.int32()), nullable=True),
pa.field("sort_order_id", pa.int32(), nullable=True),
pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True),
])

files = []
files: list[dict[str, Any]] = []

if not snapshot_id and not self.tbl.metadata.current_snapshot():
return pa.Table.from_pylist(
files,
schema=files_schema,
)
snapshot = self._get_snapshot(snapshot_id)

io = self.tbl.io
Expand Down Expand Up @@ -3980,23 +3986,23 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
for field in self.tbl.metadata.schema().fields
}
files.append({
'content': data_file.content,
'file_path': data_file.file_path,
'file_format': data_file.file_format,
'spec_id': data_file.spec_id,
'record_count': data_file.record_count,
'file_size_in_bytes': data_file.file_size_in_bytes,
'column_sizes': dict(data_file.column_sizes),
'value_counts': dict(data_file.value_counts),
'null_value_counts': dict(data_file.null_value_counts),
'nan_value_counts': dict(data_file.nan_value_counts),
'lower_bounds': dict(data_file.lower_bounds),
'upper_bounds': dict(data_file.upper_bounds),
'key_metadata': data_file.key_metadata,
'split_offsets': data_file.split_offsets,
'equality_ids': data_file.equality_ids,
'sort_order_id': data_file.sort_order_id,
'readable_metrics': readable_metrics,
"content": data_file.content,
"file_path": data_file.file_path,
"file_format": data_file.file_format,
"spec_id": data_file.spec_id,
"record_count": data_file.record_count,
"file_size_in_bytes": data_file.file_size_in_bytes,
"column_sizes": dict(data_file.column_sizes),
"value_counts": dict(data_file.value_counts),
"null_value_counts": dict(data_file.null_value_counts),
"nan_value_counts": dict(data_file.nan_value_counts),
"lower_bounds": dict(data_file.lower_bounds),
"upper_bounds": dict(data_file.upper_bounds),
"key_metadata": data_file.key_metadata,
"split_offsets": data_file.split_offsets,
"equality_ids": data_file.equality_ids,
"sort_order_id": data_file.sort_order_id,
"readable_metrics": readable_metrics,
})

return pa.Table.from_pylist(
Expand Down
135 changes: 84 additions & 51 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,8 @@ def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_v
# NaN != NaN in Python
continue
assert left == right, f"Difference in column {column}: {left} != {right}"



@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_files(
Expand All @@ -655,37 +656,37 @@ def test_inspect_files(
df = tbl.refresh().inspect.files()

assert df.column_names == [
'content',
'file_path',
'file_format',
'spec_id',
'record_count',
'file_size_in_bytes',
'column_sizes',
'value_counts',
'null_value_counts',
'nan_value_counts',
'lower_bounds',
'upper_bounds',
'key_metadata',
'split_offsets',
'equality_ids',
'sort_order_id',
'readable_metrics',
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
"readable_metrics",
]

# make sure the non-nullable fields are filled
for int_column in ['content', 'spec_id', 'record_count', 'file_size_in_bytes']:
for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]:
for value in df[int_column]:
assert isinstance(value.as_py(), int)

for split_offsets in df['split_offsets']:
for split_offsets in df["split_offsets"]:
assert isinstance(split_offsets.as_py(), list)

for file_format in df['file_format']:
for file_format in df["file_format"]:
assert file_format.as_py() == "PARQUET"

for file_path in df['file_path']:
for file_path in df["file_path"]:
assert file_path.as_py().startswith("s3://")

lhs = df.to_pandas()
Expand All @@ -696,49 +697,81 @@ def test_inspect_files(
# NaN != NaN in Python
continue
if column in [
'column_sizes',
'value_counts',
'null_value_counts',
'nan_value_counts',
'lower_bounds',
'upper_bounds',
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
]:
if isinstance(right, dict):
left = dict(left)
assert left == right, f"Difference in column {column}: {left} != {right}"

elif column == 'readable_metrics':
elif column == "readable_metrics":
assert list(left.keys()) == [
'bool',
'string',
'string_long',
'int',
'long',
'float',
'double',
'timestamp',
'timestamptz',
'date',
'binary',
'fixed',
"bool",
"string",
"string_long",
"int",
"long",
"float",
"double",
"timestamp",
"timestamptz",
"date",
"binary",
"fixed",
]
assert left.keys() == right.keys()

for rm_column in left.keys():
rm_lhs = left[rm_column]
rm_rhs = right[rm_column]

assert rm_lhs['column_size'] == rm_rhs['column_size']
assert rm_lhs['value_count'] == rm_rhs['value_count']
assert rm_lhs['null_value_count'] == rm_rhs['null_value_count']
assert rm_lhs['nan_value_count'] == rm_rhs['nan_value_count']
assert rm_lhs["column_size"] == rm_rhs["column_size"]
assert rm_lhs["value_count"] == rm_rhs["value_count"]
assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"]
assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"]

if rm_column == 'timestamptz':
if rm_column == "timestamptz":
# PySpark does not correctly set the timstamptz
rm_rhs['lower_bound'] = rm_rhs['lower_bound'].replace(tzinfo=pytz.utc)
rm_rhs['upper_bound'] = rm_rhs['upper_bound'].replace(tzinfo=pytz.utc)
rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)

assert rm_lhs['lower_bound'] == rm_rhs['lower_bound']
assert rm_lhs['upper_bound'] == rm_rhs['upper_bound']
assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
else:
assert left == right, f"Difference in column {column}: {left} != {right}"
assert left == right, f"Difference in column {column}: {left} != {right}"


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = "default.table_metadata_files"

tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})

df = tbl.refresh().inspect.files()

assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
"nan_value_counts",
"lower_bounds",
"upper_bounds",
"key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
"readable_metrics",
]

assert df.to_pandas().empty is True