Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Stop storing uploaded encodings in minio
Fix up opentracing spans
  • Loading branch information
hardbyte committed Mar 3, 2020
commit 0ff7bf07a8fa1fa749144429a9f063c301bd9e55
56 changes: 11 additions & 45 deletions backend/entityservice/tasks/encoding_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
"""
log = logger.bind(pid=project_id, dp_id=dp_id)
log.info("Handling user provided base64 encodings")

new_child_span = lambda name: handle_raw_upload.tracer.start_active_span(name, child_of=handle_raw_upload.span)
with DBConn() as db:
if not check_project_exists(db, project_id):
log.info("Project deleted, stopping immediately")
Expand All @@ -34,10 +34,10 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):

log.info(f"Expecting to handle {expected_count} encodings in {block_count} blocks")
mc = connect_to_object_store()
raw_file = Config.RAW_FILENAME_FMT.format(receipt_token)
raw_data = mc.get_object(Config.MINIO_BUCKET, raw_file)
input_filename = Config.RAW_FILENAME_FMT.format(receipt_token)
raw_data = mc.get_object(Config.MINIO_BUCKET, input_filename)

with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span):
with new_child_span('upload-encodings-to-db'):
# stream encodings with block ids from uploaded file
# convert each encoding to our internal binary format
# output into database for each block (temp or direct to minio?)
Expand All @@ -46,55 +46,21 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
with DBConn() as db:
store_encodings_in_db(db, dp_id, pipeline, encoding_size)

#### GLUE CODE - TODO remove me once moved away from storing encodings in files
# Note we open the stream a second time
raw_data = mc.get_object(Config.MINIO_BUCKET, raw_file)
blocked_binary_data, encoding_size = convert_encodings_from_json_to_binary(raw_data)
assert block_count == len(blocked_binary_data)
log.info(f"Converted uploaded encodings of size {encoding_size} bytes into internal binary format. Number of blocks: {block_count}")

if block_count == 0:
log.warning("No uploaded encoding blocks, stopping processing.")
# TODO mark run as failure?
return
elif block_count > 1:
raise NotImplementedError('Currently handle single block encodings - check back soon')

#for block_id in blocked_binary_data:
block_id = list(blocked_binary_data.keys())[0]
actual_count = len(blocked_binary_data[block_id])
log.info(f"{block_id=}, number of encodings: {actual_count}")

# We peek at the first element as we need the encoding size
# for the rest of our processing pipeline. Note we now add 4 bytes to the encoding
# for the entity's identifier.
uploaded_encoding_size = len(blocked_binary_data[block_id][0]) - 4

# This is the first time we've seen the encoding size from this data provider
# As this is the first time we've seen the encoding size actually uploaded from this data provider
# We check it complies with the project encoding size.
try:
check_dataproviders_encoding(project_id, uploaded_encoding_size)
check_dataproviders_encoding(project_id, encoding_size)
except InvalidEncodingError as e:
log.warning(e.args[0])
handle_invalid_encoding_data(project_id, dp_id)

with DBConn() as db:
# Save the encoding size as metadata
update_encoding_metadata_set_encoding_size(db, dp_id, uploaded_encoding_size)

# Output file is our custom binary packed file
filename = Config.BIN_FILENAME_FMT.format(receipt_token)
bit_packed_element_size = binary_format(uploaded_encoding_size).size
num_bytes = actual_count * bit_packed_element_size

with opentracing.tracer.start_span('process-encodings-in-quarantine', child_of=parent_span) as span:
packed_filter_stream = io.BytesIO(b''.join(blocked_binary_data[block_id]))
# Upload to object store
log.info(f"Uploading {expected_count} encodings of size {uploaded_encoding_size} " +
f"to object store. Total Size: {fmt_bytes(num_bytes)}")
mc.put_object(Config.MINIO_BUCKET, filename, data=packed_filter_stream, length=num_bytes)

with DBConn() as conn:
update_encoding_metadata(conn, filename, dp_id, 'ready')
with new_child_span('save-encoding-metadata'):
# Save the encoding size as metadata for this data provider
update_encoding_metadata_set_encoding_size(conn, dp_id, encoding_size)
update_encoding_metadata(conn, None, dp_id, 'ready')

# Now work out if all parties have added their data
if clks_uploaded_to_project(project_id, check_data_ready=True):
Expand Down