Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
User provided binary packed uploads now go straight into db
  • Loading branch information
hardbyte committed Mar 3, 2020
commit 1bdf1e86b9d892b58465e0b21f1d5ce110145965
2 changes: 1 addition & 1 deletion backend/entityservice/init-db-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ CREATE TABLE uploads (
token CHAR(48) NOT NULL UNIQUE,

-- Filename for the raw unprocessed uploaded data
file CHAR(64) NOT NULL,
file CHAR(64) NULL,

state PROCESSEDSTATE NOT NULL,

Expand Down
46 changes: 23 additions & 23 deletions backend/entityservice/views/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
import json
import tempfile

import minio
from flask import request
from flask import g
from structlog import get_logger
import opentracing

import entityservice.database as db
from entityservice.encoding_storage import store_encodings_in_db
from entityservice.tasks import handle_raw_upload, check_for_executable_runs, remove_project
from entityservice.tracing import serialize_span
from entityservice.utils import safe_fail_request, get_json, generate_code, get_stream, \
clks_uploaded_to_project, fmt_bytes, iterable_to_stream
from entityservice.utils import safe_fail_request, get_json, generate_code, clks_uploaded_to_project, fmt_bytes
from entityservice.database import DBConn, get_project_column
from entityservice.views.auth_checks import abort_if_project_doesnt_exist, abort_if_invalid_dataprovider_token, \
abort_if_invalid_results_token, get_authorization_token_type_or_abort, abort_if_inconsistent_upload
Expand All @@ -24,6 +23,8 @@

logger = get_logger()

DEFAULT_BLOCK_ID = '1'


def projects_get():
logger.info("Getting list of all projects")
Expand Down Expand Up @@ -156,26 +157,27 @@ def project_binaryclks_post(project_id):
stream = BytesIO(request.data)
binary_formatter = binary_format(size)

def entity_id_injector(filter_stream):
def encoding_iterator(filter_stream):
# Assumes encoding id and block info not provided (yet)
for entity_id in range(count):
yield binary_formatter.pack(entity_id, filter_stream.read(size))
yield str(entity_id), binary_formatter.pack(entity_id, filter_stream.read(size)), [DEFAULT_BLOCK_ID]

data_with_ids = b''.join(entity_id_injector(stream))
expected_bytes = size * count
log.debug(f"Stream size is {len(request.data)} B, and we expect {expected_bytes} B")
if len(request.data) != expected_bytes:
safe_fail_request(400,
"Uploaded data did not match the expected size. Check request headers are correct")
try:
receipt_token = upload_clk_data_binary(project_id, dp_id, BytesIO(data_with_ids), count, size)
receipt_token = upload_clk_data_binary(project_id, dp_id, encoding_iterator(stream), count, size)
except ValueError:
safe_fail_request(400,
"Uploaded data did not match the expected size. Check request headers are correct.")
else:
safe_fail_request(400, "Content Type not supported")
except Exception:
log.info("The dataprovider was not able to upload her clks,"
" re-enable the corresponding upload token to be used.")
log.warning("The dataprovider was not able to upload their clks,"
" re-enable the corresponding upload token to be used.")

with DBConn() as conn:
db.set_dataprovider_upload_state(conn, dp_id, state='error')
raise
Expand Down Expand Up @@ -317,13 +319,13 @@ def authorise_get_request(project_id):
return dp_id, project_object


def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128):
def upload_clk_data_binary(project_id, dp_id, encoding_iter, count, size=128):
"""
Save the user provided raw CLK data.
Save the user provided binary-packed CLK data.

"""
receipt_token = generate_code()
filename = Config.BIN_FILENAME_FMT.format(receipt_token)
filename = None
# Set the state to 'pending' in the uploads table
with DBConn() as conn:
db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, encoding_count=count, block_count=1)
Expand All @@ -334,20 +336,18 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128):

logger.debug("Directly storing binary file with index, base64 encoded CLK, popcount")

# Upload to object store
logger.info(f"Uploading {count} binary encodings to object store. Total size: {fmt_bytes(num_bytes)}")
# Upload to database
logger.info(f"Uploading {count} binary encodings to database. Total size: {fmt_bytes(num_bytes)}")
parent_span = g.flask_tracer.get_span()

with opentracing.tracer.start_span('save-to-minio', child_of=parent_span):
mc = connect_to_object_store()
try:
mc.put_object(Config.MINIO_BUCKET, filename, data=raw_stream, length=num_bytes)
except (minio.error.InvalidSizeError, minio.error.InvalidArgumentError, minio.error.ResponseError):
logger.info("Mismatch between expected stream length and header info")
raise ValueError("Mismatch between expected stream length and header info")
with DBConn() as conn:
with opentracing.tracer.start_span('create-default-block-in-db', child_of=parent_span):
db.insert_blocking_metadata(conn, dp_id, {DEFAULT_BLOCK_ID: count})

with opentracing.tracer.start_span('update-database-with-metadata', child_of=parent_span):
with DBConn() as conn:
with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span):
store_encodings_in_db(conn, dp_id, encoding_iter, size)

with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span):
db.update_encoding_metadata(conn, filename, dp_id, 'ready')

# Now work out if all parties have added their data
Expand Down