Skip to content
Prev Previous commit
Next Next commit
feat: extend OutputStream, AvroOutputFile and ManifestWriter wi…
…th `__len__` method
  • Loading branch information
felixscherz committed Aug 5, 2024
commit 1cc36a535b4a79b743a2f1a5c22880de9f16f1b9
9 changes: 9 additions & 0 deletions pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class AvroOutputFile(Generic[D]):
encoder: BinaryEncoder
sync_bytes: bytes
writer: Writer
closed: bool

def __init__(
self,
Expand All @@ -247,6 +248,7 @@ def __init__(
else resolve_writer(record_schema=record_schema, file_schema=self.file_schema)
)
self.metadata = metadata
self.closed = False

def __enter__(self) -> AvroOutputFile[D]:
"""
Expand All @@ -267,6 +269,7 @@ def __exit__(
) -> None:
"""Perform cleanup when exiting the scope of a 'with' statement."""
self.output_stream.close()
self.closed = True

def _write_header(self) -> None:
json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
Expand All @@ -285,3 +288,9 @@ def write_block(self, objects: List[D]) -> None:
self.encoder.write_int(len(block_content))
self.encoder.write(block_content)
self.encoder.write(self.sync_bytes)

def __len__(self) -> int:
"""Returns the total number number of bytes written."""
if self.closed:
return len(self.output_file)
return len(self.output_stream)
4 changes: 4 additions & 0 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def __exit__(
) -> None:
"""Perform cleanup when exiting the scope of a 'with' statement."""

@abstractmethod
def __len__(self) -> int:
"""Returns the total number number of bytes written to the stream."""


class InputFile(ABC):
"""A base class for InputFile implementations.
Expand Down
7 changes: 6 additions & 1 deletion pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,11 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter:
return self


def __len__(self) -> int:
"""Returns the total number number of bytes written."""
return len(self._writer)


class RollingManifestWriter:
closed: bool
_supplier: Generator[ManifestWriter, None, None]
Expand Down Expand Up @@ -934,7 +939,7 @@ def _should_roll_to_new_file(self) -> bool:
return False
return (
self._current_file_rows >= self._target_number_of_rows
or len(self._current_writer._output_file) >= self._target_file_size_in_bytes
or len(self._current_writer) >= self._target_file_size_in_bytes
)

def _close_current_writer(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ def test_write_manifest(
[
(19514, 388873, 1), # should not roll over
(19513, 388873, 2), # should roll over due to target_rows
(19514, 388872, 2), # should roll over due target_bytes
(19513, 388872, 2), # should roll over due to target_rows and target_bytes
(4000, 388872, 2), # should roll over due target_bytes
(4000, 388872, 2), # should roll over due to target_rows and target_bytes
],
)
def test_rolling_manifest_writer(
Expand Down