Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Peek at actual encoding size
  • Loading branch information
hardbyte committed Mar 2, 2020
commit b03bf1f5f8822bb5c4115d26e7bbd7bef6f674f6
23 changes: 13 additions & 10 deletions backend/entityservice/encoding_storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
from itertools import zip_longest
from typing import Iterator, List, Tuple

Expand Down Expand Up @@ -35,17 +36,19 @@ def convert_encodings_from_base64_to_binary(encodings: Iterator[Tuple[str, str,
:return: a tuple comprising:
(entity_id, binary encoding, list of blocks)
"""
# Default which is ignored but makes IDE/typechecker happier
bit_packing_struct = binary_format(128)
encoding_size = None
# Peek at the first element to extract the encoding size
i, encoding_data, blocks = next(encodings)
encoding_size = len(encoding_data)
bit_packing_struct = binary_format(encoding_size)

for i, encoding_data, blocks in encodings:
if encoding_size is None:
encoding_size = len(encoding_data)
bit_packing_struct = binary_format(encoding_size)
binary_packed_encoding = bit_packing_struct.pack(i, encoding_data)
yield i, binary_packed_encoding, blocks
def generator(first_i, first_encoding_data, first_blocks):
binary_packed_encoding = bit_packing_struct.pack(first_i, first_encoding_data)
yield first_i, binary_packed_encoding, first_blocks
for i, encoding_data, blocks in encodings:
binary_packed_encoding = bit_packing_struct.pack(i, encoding_data)
yield i, binary_packed_encoding, blocks

return encoding_size, generator(i, encoding_data, blocks)

def _grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
Expand Down Expand Up @@ -92,7 +95,7 @@ def _estimate_group_size(encoding_size):
"""
network_transaction_size = 104857600 # 100MiB
blocks_per_record_estimate = 50
return network_transaction_size / ((blocks_per_record_estimate * 64) + (encoding_size + 4))
return math.ceil(network_transaction_size / ((blocks_per_record_estimate * 64) + (encoding_size + 4)))


def convert_encodings_from_json_to_binary(f):
Expand Down
7 changes: 3 additions & 4 deletions backend/entityservice/tasks/encoding_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
return
# Get number of blocks + total number of encodings from database
expected_count, block_count = get_encoding_metadata(db, dp_id)
encoding_size = get_project_encoding_size(db, project_id)

log.info(f"Expecting to handle {expected_count} encodings of size {encoding_size} in {block_count} blocks")
log.info(f"Expecting to handle {expected_count} encodings of in {block_count} blocks")
mc = connect_to_object_store()
raw_file = Config.RAW_FILENAME_FMT.format(receipt_token)
raw_data = mc.get_object(Config.MINIO_BUCKET, raw_file)
Expand All @@ -42,11 +41,11 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
# stream encodings with block ids from uploaded file
# convert each encoding to our internal binary format
# output into database for each block (temp or direct to minio?)
pipeline = convert_encodings_from_base64_to_binary(stream_json_clksnblocks(raw_data))
encoding_size, pipeline = convert_encodings_from_base64_to_binary(stream_json_clksnblocks(raw_data))
log.info(f"Starting pipeline to store {encoding_size}B sized encodings in database")
with DBConn() as db:
store_encodings_in_db(db, dp_id, pipeline, encoding_size)


#### GLUE CODE - TODO remove me once moved away from storing encodings in files
# Note we open the stream a second time
raw_data = mc.get_object(Config.MINIO_BUCKET, raw_file)
Expand Down