Skip to content
Merged
Next Next commit
bin pack write
  • Loading branch information
kevinjqliu committed Mar 9, 2024
commit 5a7c8f9a99f1435dfc302a352d1fbe7619f9acf8
81 changes: 42 additions & 39 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1722,54 +1722,57 @@ def fill_parquet_file_metadata(


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
task = next(tasks)

try:
_ = next(tasks)
# If there are more tasks, raise an exception
raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208")
except StopIteration:
pass

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)

file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}'
schema = table_metadata.schema()
arrow_file_schema = schema_to_pyarrow(schema)
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)

fo = io.new_output(file_path)
row_group_size = PropertyUtil.property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
writer.write_table(task.df, row_group_size=row_group_size)

data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
file_size_in_bytes=len(fo),
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# sort_order_id=task.sort_order_id,
sort_order_id=None,
# Just copy these from the table for now
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
)
data_files = []
for task in tasks:
file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}'
fo = io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
for batch in task.record_batches:
writer.write_batch(batch, row_group_size=row_group_size)

data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
file_size_in_bytes=len(fo),
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# sort_order_id=task.sort_order_id,
sort_order_id=None,
# Just copy these from the table for now
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
)
fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
data_files.append(data_file)
return iter(data_files)

fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
return iter([data_file])

def bin_pack_arrow_table(tbl: pa.Table) -> Iterator[List[pa.RecordBatch]]:
# bin-pack the table into 256 MB chunks
from pyiceberg.utils.bin_packing import PackingIterator

splits = tbl.to_batches()
target_weight = 2 << 27 # 256 MB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java we have the write.target-file-size-bytes configuration. In this case, we're looking at the size in memory, and not the file size. Converting this is very tricky since Parquet has some excellent encodings to reduce the size on disk. We might want to check the heuristic on the Java side. On the other end, we also don't want to explode the memory when decoding a Parquet file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is done in Java like so #428 (comment)

Write is done row by row and on every 1000 rows, the file size is checked against the desired size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the write.target-file-size-bytes configuration is just a heuristic to achieve, not the absolute size of the result file.

Based on this comment, it seems that even in Spark result parquet files can be smaller than the target file size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I propose we reuse the write.target-file-size-bytes option and default to 512MB of arrow size in memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a test run when we bin-packed 685.46 MB of arrow memory into 256MB chunks. We ended up with 3 ~80MB parquet files.

bin_packed = PackingIterator(splits, target_weight, lookback=2, weight_func=lambda x: x.nbytes, largest_bin_first=True)
return bin_packed


ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
Expand Down
10 changes: 7 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2429,7 +2429,7 @@ def _add_and_move_fields(
class WriteTask:
write_uuid: uuid.UUID
task_id: int
df: pa.Table
record_batches: List[pa.RecordBatch]
sort_order_id: Optional[int] = None

# Later to be extended with partition information
Expand Down Expand Up @@ -2458,7 +2458,7 @@ def _dataframe_to_data_files(
Returns:
An iterable that supplies datafiles that represent the table.
"""
from pyiceberg.io.pyarrow import write_file
from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file

if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0:
raise ValueError("Cannot write to partitioned tables")
Expand All @@ -2468,7 +2468,11 @@ def _dataframe_to_data_files(

# This is an iter, so we don't have to materialize everything every time
# This will be more relevant when we start doing partitioned writes
yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)]))
yield from write_file(
io=io,
table_metadata=table_metadata,
tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df)]),
)


class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
Expand Down