Skip to content
Merged
Prev Previous commit
Merge branch 'main' into kevinjqliu/bin-pack-write
  • Loading branch information
kevinjqliu committed Mar 28, 2024
commit 8cd71606381eac36a93bfc96f17929ca72e093d1
15 changes: 8 additions & 7 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,7 @@ def data_file_statistics_from_parquet_metadata(

def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
schema = table_metadata.schema()
arrow_file_schema = schema_to_pyarrow(schema)
arrow_file_schema = schema.as_arrow()
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)

row_group_size = PropertyUtil.property_as_int(
Expand All @@ -1778,6 +1778,11 @@ def write_parquet(task: WriteTask) -> DataFile:
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
writer.write(pa.Table.from_batches(task.record_batches), row_group_size=row_group_size)

statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
Expand All @@ -1792,13 +1797,9 @@ def write_parquet(task: WriteTask) -> DataFile:
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict(),
)
fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)

return data_file

executor = ExecutorFactory.get_or_create()
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.