Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Relax the constaints a bit
  • Loading branch information
Fokko committed Jul 4, 2024
commit efcd471e61074d02f3f7d35836321d306a881f86
16 changes: 0 additions & 16 deletions pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ class AvroOutputFile(Generic[D]):
encoder: BinaryEncoder
sync_bytes: bytes
writer: Writer
records_written: int

def __init__(
self,
Expand All @@ -248,7 +247,6 @@ def __init__(
else resolve_writer(record_schema=record_schema, file_schema=self.file_schema)
)
self.metadata = metadata
self.records_written = 0

def __enter__(self) -> AvroOutputFile[D]:
"""
Expand All @@ -268,12 +266,6 @@ def __exit__(
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
"""Perform cleanup when exiting the scope of a 'with' statement."""
if self.records_written == 0:
# This is very opinionated, as for Iceberg we should not write empty metadata.
# The `write_block` method should be called at least once to make sure that we
# write the number of blocks and more.
raise ValueError("No records have been written for this Avro file.")

self.output_stream.close()

def _write_header(self) -> None:
Expand All @@ -285,16 +277,8 @@ def _write_header(self) -> None:
def write_block(self, objects: List[D]) -> None:
in_memory = io.BytesIO()
block_content_encoder = BinaryEncoder(output_stream=in_memory)

records_written_in_block = 0
for obj in objects:
self.writer.write(block_content_encoder, obj)
records_written_in_block += 1

if records_written_in_block == 0:
raise ValueError("No records have been written in this block.")

self.records_written += records_written_in_block
block_content = in_memory.getvalue()

self.encoder.write_int(len(objects))
Expand Down
6 changes: 6 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,10 @@ def __exit__(
traceback: Optional[TracebackType],
) -> None:
"""Close the writer."""
if (self._added_files + self._existing_files + self._deleted_files) == 0:
# This is just a guard to ensure that we don't write empty manifest files
raise ValueError("An empty manifest file has been written")

self.closed = True
self._writer.__exit__(exc_type, exc_value, traceback)

Expand Down Expand Up @@ -757,6 +761,8 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
elif entry.status == ManifestEntryStatus.DELETED:
self._deleted_files += 1
self._deleted_rows += entry.data_file.record_count
else:
raise ValueError(f"Unknown entry: {entry.status}")

self._partitions.append(entry.data_file.partition)

Expand Down
15 changes: 0 additions & 15 deletions tests/avro/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,18 +394,3 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
for idx, field in enumerate(all_primitives_schema.as_struct()):
assert record[idx] == avro_entry[idx], f"Invalid {field}"
assert record[idx] == avro_entry_read_with_fastavro[idx], f"Invalid {field} read with fastavro"


def test_forbid_writing_empty_file() -> None:
with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/manifest_entry.avro"

with pytest.raises(ValueError, match="No records have been written for this Avro file."):
with avro.AvroOutputFile[ManifestEntry](
output_file=PyArrowFileIO().new_output(tmp_avro_file),
file_schema=MANIFEST_ENTRY_SCHEMAS[1],
schema_name="manifest_entry",
record_schema=MANIFEST_ENTRY_SCHEMAS[2],
) as out:
with pytest.raises(ValueError, match="No records have been written in this block."):
out.write_block([])
19 changes: 18 additions & 1 deletion tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
write_manifest,
write_manifest_list,
)
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.transforms import IdentityTransform
Expand Down Expand Up @@ -306,6 +306,23 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
assert entry.status == ManifestEntryStatus.ADDED


def test_write_empty_manifest() -> None:
io = load_file_io()
test_schema = Schema(NestedField(1, "foo", IntegerType(), False))
with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/test_write_manifest.avro"

with pytest.raises(ValueError, match="An empty manifest file has been written"):
with write_manifest(
format_version=1,
spec=UNPARTITIONED_PARTITION_SPEC,
schema=test_schema,
output_file=io.new_output(tmp_avro_file),
snapshot_id=8744736658442914487,
) as _:
pass


@pytest.mark.parametrize("format_version", [1, 2])
def test_write_manifest(
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion
Expand Down