Skip to content

Commit 5f23688

Browse files
committed
Transaction size for encoding upload will depend on encoding_size.
1 parent 8a1187f commit 5f23688

File tree

3 files changed

+21
-11
lines changed

3 files changed

+21
-11
lines changed

backend/entityservice/database/insertions.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,15 @@ def insert_encoding_metadata(db, clks_filename, dp_id, receipt_token, encoding_c
7878

7979

8080
def insert_encodings_into_blocks(db, dp_id: int, block_ids: List[List[str]], encoding_ids: List[int],
81-
encodings: List[bytes], page_size=4096):
81+
encodings: List[bytes], page_size: int = 4096):
8282
"""
8383
Bulk load blocking and encoding data into the database.
84-
8584
See https://hakibenita.com/fast-load-data-python-postgresql#copy-data-from-a-string-iterator-with-buffer-size
85+
86+
:param page_size:
87+
Maximum number of rows to fetch in a given sql statement/network transfer. A larger page size
88+
will require more local memory, but could be faster due to less network transfers.
89+
8690
"""
8791
encodings_insertion_query = "INSERT INTO encodings (dp, encoding_id, encoding) VALUES %s"
8892
blocks_insertion_query = "INSERT INTO encodingblocks (dp, encoding_id, block_id) VALUES %s"

backend/entityservice/encoding_storage.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ def convert_encodings_from_base64_to_binary(encodings: Iterator[Tuple[str, str,
4747
yield i, binary_packed_encoding, blocks
4848

4949

50-
5150
def _grouper(iterable, n, fillvalue=None):
5251
"Collect data into fixed-length chunks or blocks"
5352
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
@@ -73,23 +72,28 @@ def _transpose(group):
7372
return a, b, c
7473

7574

76-
def store_encodings_in_db(conn, dp_id, encodings: Iterator[Tuple[str, bytes, List[str]]]):
75+
def store_encodings_in_db(conn, dp_id, encodings: Iterator[Tuple[str, bytes, List[str]]], encoding_size: int=128):
7776
"""
7877
Group encodings + blocks into database transactions and execute.
79-
80-
Assuming default encoding size of 128 B, n encodings each with their own
81-
4 B encoding id, and assuming `k` multiple unique blocks of 64 B will be a transaction
82-
of approximately k*64 + 132 * n. For k = 10 and n = 100_000 this gives a transaction
83-
size under 100MiB.
8478
"""
8579

86-
for group in _grouper(encodings, n=100_000):
80+
for group in _grouper(encodings, n=_estimate_group_size(encoding_size)):
8781
encoding_ids, encodings, blocks = _transpose(group)
8882
assert len(blocks) == len(encodings)
8983
assert len(encoding_ids) == len(encodings)
9084
insert_encodings_into_blocks(conn, dp_id, block_ids=blocks, encoding_ids=encoding_ids, encodings=encodings)
9185

9286

87+
def _estimate_group_size(encoding_size):
88+
"""
89+
Given an encoding size (e.g. 128 B), estimate the number of encodings that will likely
90+
be under 100MiB in data including blocks. Note this is hopefully very conservative
91+
in estimating the average number of blocks each record is in.
92+
"""
93+
network_transaction_size = 104857600 # 100MiB
94+
blocks_per_record_estimate = 50
95+
return network_transaction_size / ((blocks_per_record_estimate * 64) + (encoding_size + 4))
96+
9397

9498
def convert_encodings_from_json_to_binary(f):
9599
"""

backend/entityservice/tasks/encoding_uploading.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
3232
# Get number of blocks + total number of encodings from database
3333
expected_count, block_count = get_encoding_metadata(db, dp_id)
3434

35+
encoding_size = get_uploads_columns(db, dp_id, ['encoding_size'])[0]
36+
3537
log.info(f"Expecting to handle {expected_count} encodings in {block_count} blocks")
3638
mc = connect_to_object_store()
3739
raw_file = Config.RAW_FILENAME_FMT.format(receipt_token)
@@ -43,7 +45,7 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
4345
# output into database for each block (temp or direct to minio?)
4446
pipeline = convert_encodings_from_base64_to_binary(stream_json_clksnblocks(raw_data))
4547
with DBConn() as db:
46-
store_encodings_in_db(db, dp_id, pipeline)
48+
store_encodings_in_db(db, dp_id, pipeline, encoding_size)
4749

4850

4951
#### GLUE CODE - TODO remove me once moved away from storing encodings in files

0 commit comments

Comments
 (0)