Skip to content
Prev Previous commit
Next Next commit
test: test different rollover scenarios
  • Loading branch information
felixscherz committed Aug 5, 2024
commit ff58a53d30c233d499f1efac18815599661faea3
4 changes: 2 additions & 2 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,6 @@ def _get_current_writer(self) -> ManifestWriter:
return self._current_writer
if self._should_roll_to_new_file():
self._close_current_writer()
return self._get_current_writer()
return self._current_writer

def _should_roll_to_new_file(self) -> bool:
Expand Down Expand Up @@ -955,7 +954,6 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter:
if self._closed:
raise RuntimeError("Cannot add entry to closed manifest writer")
self._get_current_writer().add_entry(entry)
self._current_file_rows += entry.data_file.record_count
return self


Expand Down Expand Up @@ -1137,3 +1135,5 @@ def write_manifest_list(
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number)
else:
raise ValueError(f"Cannot write manifest list for table version: {format_version}")


166 changes: 28 additions & 138 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
from tempfile import TemporaryDirectory
from typing import Dict
from typing import Dict, Generator

import fastavro
import pytest
Expand All @@ -30,6 +30,7 @@
ManifestContent,
ManifestEntryStatus,
ManifestFile,
ManifestWriter,
PartitionFieldSummary,
RollingManifestWriter,
read_manifest_list,
Expand Down Expand Up @@ -495,8 +496,22 @@ def test_write_manifest(


@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize(
"target_number_of_rows,target_file_size_in_bytes,expected_number_of_files",
[
(19514, 388873, 1), # should not roll over
(19513, 388873, 2), # should roll over due to target_rows
(19514, 388872, 2), # should roll over due target_bytes
(19513, 388872, 2), # should roll over due to target_rows and target_bytes
],
)
def test_rolling_manifest_writer(
generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion
generated_manifest_file_file_v1: str,
generated_manifest_file_file_v2: str,
format_version: TableVersion,
target_number_of_rows: int,
target_file_size_in_bytes: int,
expected_number_of_files: int,
) -> None:
io = load_file_io()
snapshot = Snapshot(
Expand All @@ -518,13 +533,10 @@ def test_rolling_manifest_writer(
spec_id=demo_manifest_file.partition_spec_id,
)
with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/test_write_manifest.avro"
tmp_avro_file = tmpdir + "/test_write_manifest-1.avro"
output = io.new_output(tmp_avro_file)
def supplier():

def supplier() -> Generator[ManifestWriter, None, None]:
i = 0
while True:
i += 1
tmp_avro_file = tmpdir + f"/test_write_manifest-{i}.avro"
output = io.new_output(tmp_avro_file)
yield write_manifest(
Expand All @@ -534,143 +546,21 @@ def supplier():
output_file=output,
snapshot_id=8744736658442914487,
)
with RollingManifestWriter(supplier=supplier(), target_file_size_in_bytes=388872 + 1, target_number_of_rows=20000) as writer:
i += 1

with RollingManifestWriter(
supplier=supplier(),
target_file_size_in_bytes=target_file_size_in_bytes,
target_number_of_rows=target_number_of_rows,
) as writer:
for entry in manifest_entries:
writer.add_entry(entry)
new_manifest = writer.to_manifest_files()[0]
manifest_files = writer.to_manifest_files()
assert len(manifest_files) == expected_number_of_files
with pytest.raises(RuntimeError):
# It is already closed
writer.add_entry(manifest_entries[0])

expected_metadata = {
"schema": test_schema.model_dump_json(),
"partition-spec": test_spec.model_dump_json(),
"partition-spec-id": str(test_spec.spec_id),
"format-version": str(format_version),
}
_verify_metadata_with_fastavro(
tmp_avro_file,
expected_metadata,
)
new_manifest_entries = new_manifest.fetch_manifest_entry(io)

manifest_entry = new_manifest_entries[0]

assert manifest_entry.status == ManifestEntryStatus.ADDED
assert manifest_entry.snapshot_id == 8744736658442914487
assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3
assert isinstance(manifest_entry.data_file, DataFile)

data_file = manifest_entry.data_file

assert data_file.content is DataFileContent.DATA
assert (
data_file.file_path
== "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
)
assert data_file.file_format == FileFormat.PARQUET
assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925)
assert data_file.record_count == 19513
assert data_file.file_size_in_bytes == 388872
assert data_file.column_sizes == {
1: 53,
2: 98153,
3: 98693,
4: 53,
5: 53,
6: 53,
7: 17425,
8: 18528,
9: 53,
10: 44788,
11: 35571,
12: 53,
13: 1243,
14: 2355,
15: 12750,
16: 4029,
17: 110,
18: 47194,
19: 2948,
}
assert data_file.value_counts == {
1: 19513,
2: 19513,
3: 19513,
4: 19513,
5: 19513,
6: 19513,
7: 19513,
8: 19513,
9: 19513,
10: 19513,
11: 19513,
12: 19513,
13: 19513,
14: 19513,
15: 19513,
16: 19513,
17: 19513,
18: 19513,
19: 19513,
}
assert data_file.null_value_counts == {
1: 19513,
2: 0,
3: 0,
4: 19513,
5: 19513,
6: 19513,
7: 0,
8: 0,
9: 19513,
10: 0,
11: 0,
12: 19513,
13: 0,
14: 0,
15: 0,
16: 0,
17: 0,
18: 0,
19: 0,
}
assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0}
assert data_file.lower_bounds == {
2: b"2020-04-01 00:00",
3: b"2020-04-01 00:12",
7: b"\x03\x00\x00\x00",
8: b"\x01\x00\x00\x00",
10: b"\xf6(\\\x8f\xc2\x05S\xc0",
11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
15: b")\\\x8f\xc2\xf5(\x08\xc0",
16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
}
assert data_file.upper_bounds == {
2: b"2020-04-30 23:5:",
3: b"2020-05-01 00:41",
7: b"\t\x01\x00\x00",
8: b"\t\x01\x00\x00",
10: b"\xcd\xcc\xcc\xcc\xcc,_@",
11: b"\x1f\x85\xebQ\\\xe2\xfe@",
13: b"\x00\x00\x00\x00\x00\x00\x12@",
14: b"\x00\x00\x00\x00\x00\x00\xe0?",
15: b"q=\n\xd7\xa3\xf01@",
16: b"\x00\x00\x00\x00\x00`B@",
17: b"333333\xd3?",
18: b"\x00\x00\x00\x00\x00\x18b@",
19: b"\x00\x00\x00\x00\x00\x00\x04@",
}
assert data_file.key_metadata is None
assert data_file.split_offsets == [4]
assert data_file.equality_ids is None
assert data_file.sort_order_id == 0


@pytest.mark.parametrize("format_version", [1, 2])
def test_write_manifest_list(
Expand Down