From 79b7828bd73997d46fcf5c3aa9c418a8847e6fc4 Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Thu, 18 Apr 2024 04:27:09 +0000 Subject: [PATCH 01/10] files metadata table --- pyiceberg/table/__init__.py | 52 +++++++++++++++++++++ tests/conftest.py | 2 +- tests/integration/test_inspect_table.py | 60 +++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 13186c42cc..dbad48a4dd 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3537,6 +3537,58 @@ def update_partitions_map( schema=table_schema, ) + def files(self) -> "pa.Table": + import pyarrow as pa + + files_schema = pa.schema([ + pa.field('content', pa.int8(), nullable=False), + pa.field('file_path', pa.string(), nullable=False), + pa.field('file_format', pa.string(), nullable=False), + pa.field('record_count', pa.int64(), nullable=False), + pa.field('file_size_in_bytes', pa.int64(), nullable=False), + pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field('value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field('null_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field('nan_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True), + pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True), + pa.field('key_metadata', pa.binary(), nullable=True), + pa.field('split_offsets', pa.list_(pa.int64()), nullable=True), + pa.field('equality_ids', pa.list_(pa.int32()), nullable=True), + ]) + + files = [] + + snapshot = self.tbl.current_snapshot() + if not snapshot: + return pa.pylist([]) + + io = self.tbl.io + for manifest_list in snapshot.manifests(io): + for manifest_entry in manifest_list.fetch_manifest_entry(io): + data_file = manifest_entry.data_file + files.append({ + 'content': data_file.content, + 'file_path': data_file.file_path, + 'file_format': data_file.file_format, + 'record_count': data_file.record_count, + 'file_size_in_bytes': data_file.file_size_in_bytes, + 'column_sizes': dict(data_file.column_sizes), + 'value_counts': dict(data_file.value_counts), + 'null_value_counts': dict(data_file.null_value_counts), + 'nan_value_counts': dict(data_file.nan_value_counts), + 'lower_bounds': dict(data_file.lower_bounds), + 'upper_bounds': dict(data_file.upper_bounds), + 'key_metadata': data_file.key_metadata, + 'split_offsets': data_file.split_offsets, + 'equality_ids': data_file.equality_ids, + }) + + return pa.Table.from_pylist( + files, + schema=files_schema, + ) + @dataclass(frozen=True) class TablePartition: diff --git a/tests/conftest.py b/tests/conftest.py index 6679543694..99485ea7ab 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2060,7 +2060,7 @@ def spark() -> "SparkSession": .config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/") .config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000") .config("spark.sql.catalog.hive.s3.path-style-access", "true") - .config("spark.sql.execution.arrow.pyspark.enabled", "true") + .config("spark.sql.execution.arrow.pyspark.enabled", "false") .getOrCreate() ) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index a884f9d4c0..1dce32b9c2 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -445,3 +445,63 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id) spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION AS OF {snapshot.snapshot_id}") check_pyiceberg_df_equals_spark_df(df, spark_df) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_files( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_metadata_files" + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + # write some data + tbl.append(arrow_table_with_null) + + df = tbl.refresh().inspect.files() + + assert df.column_names == [ + 'content', + 'file_path', + 'file_format', + 'record_count', + 'file_size_in_bytes', + 'column_sizes', + 'value_counts', + 'null_value_counts', + 'nan_value_counts', + 'lower_bounds', + 'upper_bounds', + 'key_metadata', + 'split_offsets', + 'equality_ids', + ] + + for file_size_in_bytes in df['file_size_in_bytes']: + assert isinstance(file_size_in_bytes.as_py(), int) + + for split_offsets in df['split_offsets']: + assert isinstance(split_offsets.as_py(), list) + + for file_format in df['file_format']: + assert file_format.as_py() == "PARQUET" + + for file_path in df['file_path']: + assert file_path.as_py().startswith("s3://") + + lhs = spark.table(f"{identifier}.files").toPandas() + rhs = df.to_pandas() + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if column in [ + 'column_sizes', + 'value_counts', + 'null_value_counts', + 'nan_value_counts', + 'lower_bounds', + 'upper_bounds', + ]: + # Arrow returns a list of tuples, instead of a dict + right = dict(right) + + assert left == right, f"Difference in column {column}: {left} != {right}" From 0c05902ce3ed79b3efd623bab22ab2a4d416289e Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Thu, 18 Apr 2024 05:29:54 +0000 Subject: [PATCH 02/10] doc update --- mkdocs/docs/api.md | 62 +++++++++++++++++++++++++ tests/integration/test_inspect_table.py | 4 +- 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 0bc23fb0dc..a7fad9f7e9 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -606,6 +606,68 @@ min_snapshots_to_keep: [[null,10]] max_snapshot_age_in_ms: [[null,604800000]] ``` +### Files + +Inspect the data files in the current snapshot of the table: + +```python +table.inspect.files() +``` + +``` +pyarrow.Table +content: int8 not null +file_path: string not null +file_format: string not null +record_count: int64 not null +file_size_in_bytes: int64 not null +column_sizes: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 +value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 +null_value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 +nan_value_counts: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: int64 +lower_bounds: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: binary +upper_bounds: map + child 0, entries: struct not null + child 0, key: int32 not null + child 1, value: binary +key_metadata: binary +split_offsets: list + child 0, item: int64 +equality_ids: list + child 0, item: int32 +---- +content: [[0,0]] +file_path: [["s3://warehouse/default/table_metadata_files/data/00000-0-9ea7d222-6457-467f-bad5-6fb125c9aa5f.parquet","s3://warehouse/default/table_metadata_files/data/00000-0-afa8893c-de71-4710-97c9-6b01590d0c44.parquet"]] +file_format: [["PARQUET","PARQUET"]] +record_count: [[3,3]] +file_size_in_bytes: [[5459,5459]] +column_sizes: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109]]] +value_counts: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[3,3,3,3,3,...,3,3,3,3,3],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[3,3,3,3,3,...,3,3,3,3,3]]] +null_value_counts: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[1,1,1,1,1,...,1,1,1,1,1],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[1,1,1,1,1,...,1,1,1,1,1]]] +nan_value_counts: [[keys:[]values:[],keys:[]values:[]]] +lower_bounds: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000]]] +upper_bounds:[[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,61616161616161616161616161616161,01000000,0100000000000000,...,009B6ACA38F10500,009B6ACA38F10500,9E4B0000,01,00000000000000000000000000000000]]] +key_metadata: [[0100,0100]] +split_offsets:[[[],[]]] +equality_ids:[[[],[]]] + +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 1dce32b9c2..d5ae7ba1a1 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -455,7 +455,9 @@ def test_inspect_files( identifier = "default.table_metadata_files" tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) - # write some data + tbl.overwrite(arrow_table_with_null) + + # append more data tbl.append(arrow_table_with_null) df = tbl.refresh().inspect.files() From f7aee5e0d3cb737371c16c6002d8a4c218173d2b Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Mon, 29 Apr 2024 13:14:02 +0000 Subject: [PATCH 03/10] review comments --- mkdocs/docs/api.md | 71 ++++++++++++++++++++++++- pyiceberg/table/__init__.py | 54 +++++++++++++++++-- tests/conftest.py | 2 +- tests/integration/test_inspect_table.py | 56 ++++++++++++++++--- 4 files changed, 171 insertions(+), 12 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index a7fad9f7e9..4751939a8e 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -619,6 +619,7 @@ pyarrow.Table content: int8 not null file_path: string not null file_format: string not null +spec_id: int32 not null record_count: int64 not null file_size_in_bytes: int64 not null column_sizes: map @@ -650,10 +651,34 @@ split_offsets: list child 0, item: int64 equality_ids: list child 0, item: int32 +sort_order_id: int32 not null +readable_metrics: struct not null, lat: struct not null, long: struct not null> + child 0, city: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: string + child 5, upper_bound: string + child 1, lat: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: double + child 5, upper_bound: double + child 2, long: struct not null + child 0, column_size: int64 + child 1, value_count: int64 + child 2, null_value_count: int64 + child 3, nan_value_count: int64 + child 4, lower_bound: double + child 5, upper_bound: double ---- content: [[0,0]] file_path: [["s3://warehouse/default/table_metadata_files/data/00000-0-9ea7d222-6457-467f-bad5-6fb125c9aa5f.parquet","s3://warehouse/default/table_metadata_files/data/00000-0-afa8893c-de71-4710-97c9-6b01590d0c44.parquet"]] file_format: [["PARQUET","PARQUET"]] +spec_id: [[0,0]] record_count: [[3,3]] file_size_in_bytes: [[5459,5459]] column_sizes: [[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109],keys:[1,2,3,4,5,...,8,9,10,11,12]values:[49,78,128,94,118,...,118,118,94,78,109]]] @@ -665,7 +690,51 @@ upper_bounds:[[keys:[1,2,3,4,5,...,8,9,10,11,12]values:[00,61,616161616161616161 key_metadata: [[0100,0100]] split_offsets:[[[],[]]] equality_ids:[[[],[]]] - +sort_order_id:[[[],[]]] +readable_metrics: [ + -- is_valid: all not null + -- child 0 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[140] + -- child 1 type: int64 +[4] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: string +["Amsterdam"] + -- child 5 type: string +["San Francisco"] + -- child 1 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[135] + -- child 1 type: int64 +[4] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: double +[37.773972] + -- child 5 type: double +[53.11254] + -- child 2 type: struct + -- is_valid: all not null + -- child 0 type: int64 +[135] + -- child 1 type: int64 +[4] + -- child 2 type: int64 +[0] + -- child 3 type: int64 +[null] + -- child 4 type: double +[-122.431297] + -- child 5 type: double +[6.0989]] ``` ## Add Files diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index dbad48a4dd..ca509f113d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3537,13 +3537,35 @@ def update_partitions_map( schema=table_schema, ) - def files(self) -> "pa.Table": + def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa + from pyiceberg.io.pyarrow import schema_to_pyarrow + + schema = self.tbl.metadata.schema() + readable_metrics_struct = [] + + def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: + pa_bound_type = schema_to_pyarrow(bound_type) + return pa.struct([ + pa.field("column_size", pa.int64(), nullable=True), + pa.field("value_count", pa.int64(), nullable=True), + pa.field("null_value_count", pa.int64(), nullable=True), + pa.field("nan_value_count", pa.int64(), nullable=True), + pa.field("lower_bound", pa_bound_type, nullable=True), + pa.field("upper_bound", pa_bound_type, nullable=True), + ]) + + for field in self.tbl.metadata.schema().fields: + readable_metrics_struct.append( + pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False) + ) + files_schema = pa.schema([ pa.field('content', pa.int8(), nullable=False), pa.field('file_path', pa.string(), nullable=False), - pa.field('file_format', pa.string(), nullable=False), + pa.field('file_format', pa.dictionary(pa.int32(), pa.string()), nullable=False), + pa.field('spec_id', pa.int32(), nullable=False), pa.field('record_count', pa.int64(), nullable=False), pa.field('file_size_in_bytes', pa.int64(), nullable=False), pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), nullable=True), @@ -3555,11 +3577,13 @@ def files(self) -> "pa.Table": pa.field('key_metadata', pa.binary(), nullable=True), pa.field('split_offsets', pa.list_(pa.int64()), nullable=True), pa.field('equality_ids', pa.list_(pa.int32()), nullable=True), + pa.field('sort_order_id', pa.int32(), nullable=True), + pa.field('readable_metrics', pa.struct(readable_metrics_struct), nullable=True), ]) files = [] - snapshot = self.tbl.current_snapshot() + snapshot = self._get_snapshot(snapshot_id) if not snapshot: return pa.pylist([]) @@ -3567,10 +3591,32 @@ def files(self) -> "pa.Table": for manifest_list in snapshot.manifests(io): for manifest_entry in manifest_list.fetch_manifest_entry(io): data_file = manifest_entry.data_file + column_sizes = data_file.column_sizes or {} + value_counts = data_file.value_counts or {} + null_value_counts = data_file.null_value_counts or {} + nan_value_counts = data_file.nan_value_counts or {} + lower_bounds = data_file.lower_bounds or {} + upper_bounds = data_file.upper_bounds or {} + readable_metrics = { + schema.find_column_name(field.field_id): { + "column_size": column_sizes.get(field.field_id), + "value_count": value_counts.get(field.field_id), + "null_value_count": null_value_counts.get(field.field_id), + "nan_value_count": nan_value_counts.get(field.field_id), + "lower_bound": from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None, + "upper_bound": from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None, + } + for field in self.tbl.metadata.schema().fields + } files.append({ 'content': data_file.content, 'file_path': data_file.file_path, 'file_format': data_file.file_format, + 'spec_id': data_file.spec_id, 'record_count': data_file.record_count, 'file_size_in_bytes': data_file.file_size_in_bytes, 'column_sizes': dict(data_file.column_sizes), @@ -3582,6 +3628,8 @@ def files(self) -> "pa.Table": 'key_metadata': data_file.key_metadata, 'split_offsets': data_file.split_offsets, 'equality_ids': data_file.equality_ids, + 'sort_order_id': data_file.sort_order_id, + 'readable_metrics': readable_metrics, }) return pa.Table.from_pylist( diff --git a/tests/conftest.py b/tests/conftest.py index 99485ea7ab..6679543694 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2060,7 +2060,7 @@ def spark() -> "SparkSession": .config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/") .config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000") .config("spark.sql.catalog.hive.s3.path-style-access", "true") - .config("spark.sql.execution.arrow.pyspark.enabled", "false") + .config("spark.sql.execution.arrow.pyspark.enabled", "true") .getOrCreate() ) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index d5ae7ba1a1..d3e36a2114 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -453,6 +453,7 @@ def test_inspect_files( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: identifier = "default.table_metadata_files" + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) tbl.overwrite(arrow_table_with_null) @@ -466,6 +467,7 @@ def test_inspect_files( 'content', 'file_path', 'file_format', + 'spec_id', 'record_count', 'file_size_in_bytes', 'column_sizes', @@ -477,10 +479,14 @@ def test_inspect_files( 'key_metadata', 'split_offsets', 'equality_ids', + 'sort_order_id', + 'readable_metrics', ] - for file_size_in_bytes in df['file_size_in_bytes']: - assert isinstance(file_size_in_bytes.as_py(), int) + # make sure the non-nullable fields are filled + for int_column in ['content', 'spec_id', 'record_count', 'file_size_in_bytes']: + for value in df[int_column]: + assert isinstance(value.as_py(), int) for split_offsets in df['split_offsets']: assert isinstance(split_offsets.as_py(), list) @@ -491,10 +497,13 @@ def test_inspect_files( for file_path in df['file_path']: assert file_path.as_py().startswith("s3://") - lhs = spark.table(f"{identifier}.files").toPandas() - rhs = df.to_pandas() + lhs = df.to_pandas() + rhs = spark.table(f"{identifier}.files").toPandas() for column in df.column_names: for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): + # NaN != NaN in Python + continue if column in [ 'column_sizes', 'value_counts', @@ -504,6 +513,39 @@ def test_inspect_files( 'upper_bounds', ]: # Arrow returns a list of tuples, instead of a dict - right = dict(right) - - assert left == right, f"Difference in column {column}: {left} != {right}" + left = dict(left) + elif column == 'readable_metrics': + assert list(left.keys()) == [ + 'bool', + 'string', + 'string_long', + 'int', + 'long', + 'float', + 'double', + 'timestamp', + 'timestamptz', + 'date', + 'binary', + 'fixed', + ] + assert left.keys() == right.asDict().keys() + + for rm_column in left.keys(): + rm_lhs = left[rm_column] + rm_rhs = right[rm_column].asDict() + + assert rm_lhs['column_size'] == rm_rhs['column_size'] + assert rm_lhs['value_count'] == rm_rhs['value_count'] + assert rm_lhs['null_value_count'] == rm_rhs['null_value_count'] + assert rm_lhs['nan_value_count'] == rm_rhs['nan_value_count'] + + if rm_column == 'timestamptz': + # PySpark does not correctly set the timstamptz + rm_rhs['lower_bound'] = rm_rhs['lower_bound'].replace(tzinfo=pytz.utc) + rm_rhs['upper_bound'] = rm_rhs['upper_bound'].replace(tzinfo=pytz.utc) + + assert rm_lhs['lower_bound'] == rm_rhs['lower_bound'] + assert rm_lhs['upper_bound'] == rm_rhs['upper_bound'] + else: + assert left == right, f"Difference in column {column}: {left} != {right}" From c06a91ac45f0cc59ff5ebaae3a35756ac80eda4a Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Tue, 30 Apr 2024 21:49:53 +0000 Subject: [PATCH 04/10] remove dict conversion --- tests/integration/test_inspect_table.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index d3e36a2114..0895b80bdf 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -529,11 +529,11 @@ def test_inspect_files( 'binary', 'fixed', ] - assert left.keys() == right.asDict().keys() + assert left.keys() == right.keys() for rm_column in left.keys(): rm_lhs = left[rm_column] - rm_rhs = right[rm_column].asDict() + rm_rhs = right[rm_column] assert rm_lhs['column_size'] == rm_rhs['column_size'] assert rm_lhs['value_count'] == rm_rhs['value_count'] From b27638f7f7010b65205046294f330b0cc0a70d62 Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Sun, 12 May 2024 21:46:03 +0000 Subject: [PATCH 05/10] added assertion --- tests/integration/test_inspect_table.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 0895b80bdf..2c936c2ca9 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -512,8 +512,10 @@ def test_inspect_files( 'lower_bounds', 'upper_bounds', ]: - # Arrow returns a list of tuples, instead of a dict - left = dict(left) + if isinstance(right, dict): + left = dict(left) + assert left == right, f"Difference in column {column}: {left} != {right}" + elif column == 'readable_metrics': assert list(left.keys()) == [ 'bool', From 484dc9f05555cc9409d0703198ef05ee95036d2e Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Wed, 22 May 2024 21:10:35 +0000 Subject: [PATCH 06/10] remove unnecessary code --- pyiceberg/table/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c6aba4e36e..2a7b4d9ab7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3594,8 +3594,6 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: files = [] snapshot = self._get_snapshot(snapshot_id) - if not snapshot: - return pa.pylist([]) io = self.tbl.io for manifest_list in snapshot.manifests(io): From 5580e40a5c9edfb1dcaea99d0b69c715cc70229f Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Thu, 27 Jun 2024 05:14:11 +0000 Subject: [PATCH 07/10] empty files for no snapshot --- pyiceberg/table/__init__.py | 80 +++++++------- tests/integration/test_inspect_table.py | 135 +++++++++++++++--------- 2 files changed, 127 insertions(+), 88 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b6305455b9..2eec4d3036 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3760,6 +3760,7 @@ def update_partitions_map( def manifests(self) -> "pa.Table": import pyarrow as pa + from pyiceberg.conversions import from_bytes partition_summary_schema = pa.struct([ @@ -3905,8 +3906,8 @@ def history(self) -> "pa.Table": }) return pa.Table.from_pylist(history, schema=history_schema) - - def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": + + def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow @@ -3931,27 +3932,32 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: ) files_schema = pa.schema([ - pa.field('content', pa.int8(), nullable=False), - pa.field('file_path', pa.string(), nullable=False), - pa.field('file_format', pa.dictionary(pa.int32(), pa.string()), nullable=False), - pa.field('spec_id', pa.int32(), nullable=False), - pa.field('record_count', pa.int64(), nullable=False), - pa.field('file_size_in_bytes', pa.int64(), nullable=False), - pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), nullable=True), - pa.field('value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True), - pa.field('null_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True), - pa.field('nan_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True), - pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True), - pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True), - pa.field('key_metadata', pa.binary(), nullable=True), - pa.field('split_offsets', pa.list_(pa.int64()), nullable=True), - pa.field('equality_ids', pa.list_(pa.int32()), nullable=True), - pa.field('sort_order_id', pa.int32(), nullable=True), - pa.field('readable_metrics', pa.struct(readable_metrics_struct), nullable=True), + pa.field("content", pa.int8(), nullable=False), + pa.field("file_path", pa.string(), nullable=False), + pa.field("file_format", pa.dictionary(pa.int32(), pa.string()), nullable=False), + pa.field("spec_id", pa.int32(), nullable=False), + pa.field("record_count", pa.int64(), nullable=False), + pa.field("file_size_in_bytes", pa.int64(), nullable=False), + pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field("value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field("null_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field("nan_value_counts", pa.map_(pa.int32(), pa.int64()), nullable=True), + pa.field("lower_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True), + pa.field("upper_bounds", pa.map_(pa.int32(), pa.binary()), nullable=True), + pa.field("key_metadata", pa.binary(), nullable=True), + pa.field("split_offsets", pa.list_(pa.int64()), nullable=True), + pa.field("equality_ids", pa.list_(pa.int32()), nullable=True), + pa.field("sort_order_id", pa.int32(), nullable=True), + pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ]) - files = [] + files: list[dict[str, Any]] = [] + if not snapshot_id and not self.tbl.metadata.current_snapshot(): + return pa.Table.from_pylist( + files, + schema=files_schema, + ) snapshot = self._get_snapshot(snapshot_id) io = self.tbl.io @@ -3980,23 +3986,23 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: for field in self.tbl.metadata.schema().fields } files.append({ - 'content': data_file.content, - 'file_path': data_file.file_path, - 'file_format': data_file.file_format, - 'spec_id': data_file.spec_id, - 'record_count': data_file.record_count, - 'file_size_in_bytes': data_file.file_size_in_bytes, - 'column_sizes': dict(data_file.column_sizes), - 'value_counts': dict(data_file.value_counts), - 'null_value_counts': dict(data_file.null_value_counts), - 'nan_value_counts': dict(data_file.nan_value_counts), - 'lower_bounds': dict(data_file.lower_bounds), - 'upper_bounds': dict(data_file.upper_bounds), - 'key_metadata': data_file.key_metadata, - 'split_offsets': data_file.split_offsets, - 'equality_ids': data_file.equality_ids, - 'sort_order_id': data_file.sort_order_id, - 'readable_metrics': readable_metrics, + "content": data_file.content, + "file_path": data_file.file_path, + "file_format": data_file.file_format, + "spec_id": data_file.spec_id, + "record_count": data_file.record_count, + "file_size_in_bytes": data_file.file_size_in_bytes, + "column_sizes": dict(data_file.column_sizes), + "value_counts": dict(data_file.value_counts), + "null_value_counts": dict(data_file.null_value_counts), + "nan_value_counts": dict(data_file.nan_value_counts), + "lower_bounds": dict(data_file.lower_bounds), + "upper_bounds": dict(data_file.upper_bounds), + "key_metadata": data_file.key_metadata, + "split_offsets": data_file.split_offsets, + "equality_ids": data_file.equality_ids, + "sort_order_id": data_file.sort_order_id, + "readable_metrics": readable_metrics, }) return pa.Table.from_pylist( diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 262d2f39df..7042bd7a8e 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -637,7 +637,8 @@ def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_v # NaN != NaN in Python continue assert left == right, f"Difference in column {column}: {left} != {right}" - + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_inspect_files( @@ -655,37 +656,37 @@ def test_inspect_files( df = tbl.refresh().inspect.files() assert df.column_names == [ - 'content', - 'file_path', - 'file_format', - 'spec_id', - 'record_count', - 'file_size_in_bytes', - 'column_sizes', - 'value_counts', - 'null_value_counts', - 'nan_value_counts', - 'lower_bounds', - 'upper_bounds', - 'key_metadata', - 'split_offsets', - 'equality_ids', - 'sort_order_id', - 'readable_metrics', + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + "key_metadata", + "split_offsets", + "equality_ids", + "sort_order_id", + "readable_metrics", ] # make sure the non-nullable fields are filled - for int_column in ['content', 'spec_id', 'record_count', 'file_size_in_bytes']: + for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]: for value in df[int_column]: assert isinstance(value.as_py(), int) - for split_offsets in df['split_offsets']: + for split_offsets in df["split_offsets"]: assert isinstance(split_offsets.as_py(), list) - for file_format in df['file_format']: + for file_format in df["file_format"]: assert file_format.as_py() == "PARQUET" - for file_path in df['file_path']: + for file_path in df["file_path"]: assert file_path.as_py().startswith("s3://") lhs = df.to_pandas() @@ -696,31 +697,31 @@ def test_inspect_files( # NaN != NaN in Python continue if column in [ - 'column_sizes', - 'value_counts', - 'null_value_counts', - 'nan_value_counts', - 'lower_bounds', - 'upper_bounds', + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", ]: if isinstance(right, dict): left = dict(left) assert left == right, f"Difference in column {column}: {left} != {right}" - elif column == 'readable_metrics': + elif column == "readable_metrics": assert list(left.keys()) == [ - 'bool', - 'string', - 'string_long', - 'int', - 'long', - 'float', - 'double', - 'timestamp', - 'timestamptz', - 'date', - 'binary', - 'fixed', + "bool", + "string", + "string_long", + "int", + "long", + "float", + "double", + "timestamp", + "timestamptz", + "date", + "binary", + "fixed", ] assert left.keys() == right.keys() @@ -728,17 +729,49 @@ def test_inspect_files( rm_lhs = left[rm_column] rm_rhs = right[rm_column] - assert rm_lhs['column_size'] == rm_rhs['column_size'] - assert rm_lhs['value_count'] == rm_rhs['value_count'] - assert rm_lhs['null_value_count'] == rm_rhs['null_value_count'] - assert rm_lhs['nan_value_count'] == rm_rhs['nan_value_count'] + assert rm_lhs["column_size"] == rm_rhs["column_size"] + assert rm_lhs["value_count"] == rm_rhs["value_count"] + assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] + assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] - if rm_column == 'timestamptz': + if rm_column == "timestamptz": # PySpark does not correctly set the timstamptz - rm_rhs['lower_bound'] = rm_rhs['lower_bound'].replace(tzinfo=pytz.utc) - rm_rhs['upper_bound'] = rm_rhs['upper_bound'].replace(tzinfo=pytz.utc) + rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) + rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) - assert rm_lhs['lower_bound'] == rm_rhs['lower_bound'] - assert rm_lhs['upper_bound'] == rm_rhs['upper_bound'] + assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] + assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] else: - assert left == right, f"Difference in column {column}: {left} != {right}" \ No newline at end of file + assert left == right, f"Difference in column {column}: {left} != {right}" + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = "default.table_metadata_files" + + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + df = tbl.refresh().inspect.files() + + assert df.column_names == [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + "key_metadata", + "split_offsets", + "equality_ids", + "sort_order_id", + "readable_metrics", + ] + + assert df.to_pandas().empty is True From edfd1b302fcec3430816bf398e112d0f42f68e21 Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Thu, 27 Jun 2024 12:50:32 +0000 Subject: [PATCH 08/10] assert frame equal --- tests/integration/test_inspect_table.py | 32 +++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 7042bd7a8e..79c9750ded 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -644,6 +644,8 @@ def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_v def test_inspect_files( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: + from pandas.testing import assert_frame_equal + identifier = "default.table_metadata_files" tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) @@ -691,6 +693,36 @@ def test_inspect_files( lhs = df.to_pandas() rhs = spark.table(f"{identifier}.files").toPandas() + + lhs_subset = lhs[ + [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "split_offsets", + "equality_ids", + "sort_order_id" + ] + ] + rhs_subset = rhs[ + [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "split_offsets", + "equality_ids", + "sort_order_id" + ] + ] + + assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False) + for column in df.column_names: for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): From 78a39b945292a0af4b5be8715c3cf2cd65a27483 Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Thu, 27 Jun 2024 12:54:54 +0000 Subject: [PATCH 09/10] lint fix --- tests/integration/test_inspect_table.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 79c9750ded..834fe83d5f 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -704,7 +704,7 @@ def test_inspect_files( "file_size_in_bytes", "split_offsets", "equality_ids", - "sort_order_id" + "sort_order_id", ] ] rhs_subset = rhs[ @@ -717,7 +717,7 @@ def test_inspect_files( "file_size_in_bytes", "split_offsets", "equality_ids", - "sort_order_id" + "sort_order_id", ] ] From c703213bf3544a00f01f8a8040dc1e1652d6325c Mon Sep 17 00:00:00 2001 From: Gowthami B Date: Tue, 2 Jul 2024 12:36:22 +0000 Subject: [PATCH 10/10] update docs --- mkdocs/docs/api.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 6124258366..0e80b6eb5e 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -712,7 +712,7 @@ table.inspect.files() pyarrow.Table content: int8 not null file_path: string not null -file_format: string not null +file_format: dictionary not null spec_id: int32 not null record_count: int64 not null file_size_in_bytes: int64 not null @@ -745,15 +745,15 @@ split_offsets: list child 0, item: int64 equality_ids: list child 0, item: int32 -sort_order_id: int32 not null -readable_metrics: struct not null, lat: struct not null, long: struct not null> +sort_order_id: int32 +readable_metrics: struct not null, lat: struct not null, long: struct not null> child 0, city: struct not null child 0, column_size: int64 child 1, value_count: int64 child 2, null_value_count: int64 child 3, nan_value_count: int64 - child 4, lower_bound: string - child 5, upper_bound: string + child 4, lower_bound: large_string + child 5, upper_bound: large_string child 1, lat: struct not null child 0, column_size: int64 child 1, value_count: int64 @@ -787,7 +787,7 @@ equality_ids:[[[],[]]] sort_order_id:[[[],[]]] readable_metrics: [ -- is_valid: all not null - -- child 0 type: struct + -- child 0 type: struct -- is_valid: all not null -- child 0 type: int64 [140] @@ -797,9 +797,9 @@ readable_metrics: [ [0] -- child 3 type: int64 [null] - -- child 4 type: string + -- child 4 type: large_string ["Amsterdam"] - -- child 5 type: string + -- child 5 type: large_string ["San Francisco"] -- child 1 type: struct -- is_valid: all not null