diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 8141ce12..ad8bdc85 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -155,7 +155,7 @@ def get_run(db, run_id): def get_project_column(db, project_id, column): - assert column in {'notes', 'schema', 'parties', 'result_type', 'deleted', 'encoding_size'} + assert column in {'notes', 'schema', 'parties', 'result_type', 'deleted', 'encoding_size', 'uses_blocking'} sql_query = """ SELECT {} FROM projects diff --git a/backend/entityservice/views/auth_checks.py b/backend/entityservice/views/auth_checks.py index fe4469b3..e739fa4f 100644 --- a/backend/entityservice/views/auth_checks.py +++ b/backend/entityservice/views/auth_checks.py @@ -59,6 +59,37 @@ def abort_if_invalid_results_token(resource_id, results_token): safe_fail_request(403, message=INVALID_ACCESS_MSG) +def abort_if_inconsistent_upload(uses_blocking, clk_json): + """ + check if the following combinations are true + - uses_blocking is False AND 'clks' element in upload JSON + - uses_blocking if True AND 'clknblocks' element in upload JSON + otherwise, return safe_fail_request + + :param uses_blocking: Boolean that indicates if the project uses blocking + :param clk_json: a json dict + :return: safe_fail_request if conditions are not met + """ + is_valid_clks = not uses_blocking and 'clks' in clk_json + is_valid_clknblocks = uses_blocking and 'clknblocks' in clk_json + if not (is_valid_clks or is_valid_clknblocks): + # fail condition1 - uses_blocking is True but uploaded element is "clks" + if uses_blocking and 'clks' in clk_json: + safe_fail_request(400, message='Uploaded element is "clks" while expecting "clknblocks"') + # fail condition2 - uses_blocking is False but uploaded element is "clknblocks" + if not uses_blocking and 'clknblocks' in clk_json: + safe_fail_request(400, message='Uploaded element is "clknblocks" while expecting "clks"') + # fail condition3 - "clks" exist in JSON but there is no data + if 'clks' in clk_json and len(clk_json['clks']) < 1: + safe_fail_request(400, message='Missing CLKs information') + # fail condition4 - "clknblocks" exist in JSON but there is no data + if 'clknblocks' in clk_json and len(clk_json['clknblocks']) < 1: + safe_fail_request(400, message='Missing CLK and Blocks information') + # fail condition5 - unknown element in JSON + if 'clks' not in clk_json and 'clknblocks' not in clk_json: + safe_fail_request(400, message='Unknown upload element - expect "clks" or "clknblocks"') + + def dataprovider_id_if_authorize(resource_id, receipt_token): logger.debug("checking authorization token to fetch mask data") if not is_receipt_token_valid(resource_id, receipt_token): diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 86d6d509..890cfdcf 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -1,4 +1,6 @@ +import json from io import BytesIO +import tempfile import minio from flask import request @@ -11,9 +13,9 @@ from entityservice.tracing import serialize_span from entityservice.utils import safe_fail_request, get_json, generate_code, get_stream, \ clks_uploaded_to_project, fmt_bytes -from entityservice.database import DBConn +from entityservice.database import DBConn, get_project_column from entityservice.views.auth_checks import abort_if_project_doesnt_exist, abort_if_invalid_dataprovider_token, \ - abort_if_invalid_results_token, get_authorization_token_type_or_abort + abort_if_invalid_results_token, get_authorization_token_type_or_abort, abort_if_inconsistent_upload from entityservice import models from entityservice.object_store import connect_to_object_store from entityservice.serialization import binary_format @@ -198,6 +200,8 @@ def project_clks_post(project_id): dp_id = db.get_dataprovider_id(conn, token) project_encoding_size = db.get_project_schema_encoding_size(conn, project_id) upload_state_updated = db.is_dataprovider_allowed_to_upload_and_lock(conn, dp_id) + # get flag use_blocking from table projects + uses_blocking = get_project_column(conn, project_id, 'uses_blocking') if not upload_state_updated: return safe_fail_request(403, "This token has already been used to upload clks.") @@ -216,8 +220,7 @@ def project_clks_post(project_id): # However, as connexion is very, very strict about input validation when it comes to json, it will always # consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as # json into memory. -> issue #184 - - receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span) + receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), uses_blocking, parent_span=span) # Schedule a task to deserialize the hashes, and carry # out a pop count. handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span)) @@ -347,40 +350,43 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128): return receipt_token -def upload_json_clk_data(dp_id, clk_json, parent_span): +def upload_json_clk_data(dp_id, clk_json, uses_blocking, parent_span): """ - Convert user provided encodings from json array of base64 data into - a newline separated file of base64 data. + Take user provided encodings as json dict and save them, as-is, to the object store. Note this implementation is non-streaming. """ - if 'clks' not in clk_json or len(clk_json['clks']) < 1: - safe_fail_request(400, message="Missing CLKs information") + abort_if_inconsistent_upload(uses_blocking, clk_json) + + # now we need to know element name - clks or clknblocks + is_valid_clks = not uses_blocking and 'clks' in clk_json + element = 'clks' if is_valid_clks else 'clknblocks' receipt_token = generate_code() filename = Config.RAW_FILENAME_FMT.format(receipt_token) - logger.info("Storing user {} supplied clks from json".format(dp_id)) + logger.info("Storing user {} supplied {} from json".format(dp_id, element)) with opentracing.tracer.start_span('splitting-json-clks', child_of=parent_span) as span: - count = len(clk_json['clks']) - span.set_tag("clks", count) - data = b''.join(''.join(clk.split('\n')).encode() + b'\n' for clk in clk_json['clks']) + count = len(clk_json[element]) + span.set_tag(element, count) + + logger.info(f"Received {count} encodings.") - num_bytes = len(data) - span.set_tag("num_bytes", num_bytes) - buffer = BytesIO(data) + # write clk_json into a temp file + tmp = tempfile.NamedTemporaryFile(mode='w', delete=False) + json.dump(clk_json, tmp) - logger.info(f"Received {count} encodings. Uploading {fmt_bytes(num_bytes)} to object store") with opentracing.tracer.start_span('save-clk-file-to-quarantine', child_of=parent_span) as span: span.set_tag('filename', filename) mc = connect_to_object_store() - mc.put_object( + mc.fput_object( Config.MINIO_BUCKET, filename, - data=buffer, - length=num_bytes + tmp.name, + content_type='application/json' ) + logger.info('Saved uploaded {} JSON to file {} in object store.'.format(element.upper(), filename)) with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span): with DBConn() as conn: