Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Disallow writing empty Avro files/blocks
Raising an exception when doing this might look extreme, but
there is no real good reason to allow this.
  • Loading branch information
Fokko committed Jul 3, 2024
commit 7ae8f5fa897a9ff1af338ca743bfb91dc9a82c98
16 changes: 16 additions & 0 deletions pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class AvroOutputFile(Generic[D]):
encoder: BinaryEncoder
sync_bytes: bytes
writer: Writer
records_written: int

def __init__(
self,
Expand All @@ -247,6 +248,7 @@ def __init__(
else resolve_writer(record_schema=record_schema, file_schema=self.file_schema)
)
self.metadata = metadata
self.records_written = 0

def __enter__(self) -> AvroOutputFile[D]:
"""
Expand All @@ -266,6 +268,12 @@ def __exit__(
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
"""Perform cleanup when exiting the scope of a 'with' statement."""
if self.records_written == 0:
# This is very opinionated, as for Iceberg we should not write empty metadata.
# The `write_block` method should be called at least once to make sure that we
# write the number of blocks and more.
raise ValueError("No records have been written for this Avro file.")

self.output_stream.close()

def _write_header(self) -> None:
Expand All @@ -277,8 +285,16 @@ def _write_header(self) -> None:
def write_block(self, objects: List[D]) -> None:
in_memory = io.BytesIO()
block_content_encoder = BinaryEncoder(output_stream=in_memory)

records_written_in_block = 0
for obj in objects:
self.writer.write(block_content_encoder, obj)
records_written_in_block += 1

if records_written_in_block == 0:
raise ValueError("No records have been written in this block.")

self.records_written += records_written_in_block
block_content = in_memory.getvalue()

self.encoder.write_int(len(objects))
Expand Down
15 changes: 15 additions & 0 deletions tests/avro/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,18 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
for idx, field in enumerate(all_primitives_schema.as_struct()):
assert record[idx] == avro_entry[idx], f"Invalid {field}"
assert record[idx] == avro_entry_read_with_fastavro[idx], f"Invalid {field} read with fastavro"


def test_forbid_writing_empty_file() -> None:
with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/manifest_entry.avro"

with pytest.raises(ValueError, match="No records have been written for this Avro file."):
with avro.AvroOutputFile[ManifestEntry](
output_file=PyArrowFileIO().new_output(tmp_avro_file),
file_schema=MANIFEST_ENTRY_SCHEMAS[1],
schema_name="manifest_entry",
record_schema=MANIFEST_ENTRY_SCHEMAS[2],
) as out:
with pytest.raises(ValueError, match="No records have been written in this block."):
out.write_block([])