diff --git a/backend/entityservice/database/insertions.py b/backend/entityservice/database/insertions.py index 63cd18d0..ce413f1f 100644 --- a/backend/entityservice/database/insertions.py +++ b/backend/entityservice/database/insertions.py @@ -1,7 +1,7 @@ import psycopg2 import psycopg2.extras -from entityservice.database.util import execute_returning_id, logger +from entityservice.database.util import execute_returning_id, logger, query_db from entityservice.errors import RunDeleted @@ -34,9 +34,9 @@ def insert_new_run(db, run_id, project_id, threshold, name, type, notes=''): def insert_dataprovider(cur, auth_token, project_id): sql_query = """ INSERT INTO dataproviders - (project, token) + (project, token, uploaded) VALUES - (%s, %s) + (%s, %s, 'not_started') RETURNING id """ return execute_returning_id(cur, sql_query, [project_id, auth_token]) @@ -54,10 +54,8 @@ def insert_encoding_metadata(db, clks_filename, dp_id, receipt_token, count): with db.cursor() as cur: cur.execute(sql_insertion_query, [dp_id, receipt_token, clks_filename, count, 'pending']) - set_dataprovider_upload_state(db, dp_id, True) - -def set_dataprovider_upload_state(db, dp_id, state=True): +def set_dataprovider_upload_state(db, dp_id, state='error'): logger.debug("Setting dataprovider {} upload state to {}".format(dp_id, state)) sql_update = """ UPDATE dataproviders @@ -261,3 +259,30 @@ def get_created_runs_and_queue(db, project_id): if res is None: res = [] return res + + +def is_dataprovider_allowed_to_upload_and_lock(db, dp_id): + """ + This method returns true if the dataprovider is allowed to upload her clks. + A dataprovider is not allowed to upload clks if she has already uploaded them, or if the upload is in progress. + This method will lock the resource by setting the upload state to `in_progress` and returning `true`. + Note that the upload state can be `error`, in which case we are allowing the dataprovider to re-try uploading + her clks not to block a project if a failure occurred. + """ + logger.debug("Setting dataprovider {} upload state to `in_progress``".format(dp_id)) + sql_update = """ + UPDATE dataproviders + SET uploaded = 'in_progress' + WHERE id = %s and uploaded != 'done' and uploaded != 'in_progress' + RETURNING id, uploaded + """ + query_response = query_db(db, sql_update, [dp_id]) + print(query_response) + length = len(query_response) + if length < 1: + return False + elif length > 1: + logger.error("{} rows in the table `dataproviders` are associated to the same dataprovider id {}, while each" + " dataprovider id should be unique.".format(length, dp_id)) + raise ValueError("Houston, we have a problem!!! This dataprovider can upload multiple times her clks.") + return True diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 63a21900..8141ce12 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -32,7 +32,7 @@ def get_dataprovider_ids(db, project_id): FROM dataproviders WHERE dataproviders.project = %s AND - dataproviders.uploaded = TRUE + dataproviders.uploaded = 'done' ORDER BY dataproviders.id """ query_result = query_db(db, sql_query, [project_id]) @@ -65,7 +65,7 @@ def get_number_parties_uploaded(db, project_id): WHERE dataproviders.project = %s AND bloomingdata.dp = dataproviders.id AND - dataproviders.uploaded = TRUE + dataproviders.uploaded = 'done' """ query_result = query_db(db, sql_query, [project_id], one=True) return query_result['count'] @@ -93,7 +93,7 @@ def get_number_parties_ready(db, resource_id): WHERE dataproviders.project = %s AND bloomingdata.dp = dataproviders.id AND - dataproviders.uploaded = TRUE AND + dataproviders.uploaded = 'done' AND bloomingdata.state = 'ready' """ query_result = query_db(db, sql_query, [resource_id], one=True) diff --git a/backend/entityservice/init-db-schema.sql b/backend/entityservice/init-db-schema.sql index 16825012..2f72990f 100644 --- a/backend/entityservice/init-db-schema.sql +++ b/backend/entityservice/init-db-schema.sql @@ -85,6 +85,13 @@ RETURNS bool AS $$ SELECT $1.state = 'completed' $$ STABLE LANGUAGE SQL; +-- Describe the state of the upload of the clks to the entity-service. +CREATE TYPE UPLOADEDSTATE AS ENUM ( + 'not_started', -- default state, a dataprovider has not tried yet to upload her clks + 'in_progress', -- the upload is in progress, so no-one else should be able to upload at the same time + 'done', -- the clks have been uploaded, it should stay this way + 'error' -- the dataprovider has tried to upload the clks but an error occurred, having this state allows a dataprovider to try again. +); CREATE TABLE dataproviders ( id SERIAL PRIMARY KEY, @@ -93,7 +100,7 @@ CREATE TABLE dataproviders ( token CHAR(48) NOT NULL UNIQUE, -- Set after the bloom filter data has been added - uploaded BOOL NOT NULL DEFAULT FALSE, + uploaded UPLOADEDSTATE NOT NULL, project CHAR(48) REFERENCES projects (project_id) on DELETE CASCADE ); @@ -101,10 +108,11 @@ CREATE TABLE dataproviders ( CREATE INDEX ON dataproviders (project); CREATE INDEX ON dataproviders (uploaded); -CREATE TYPE UPLOADSTATE AS ENUM ( - 'pending', - 'ready', - 'error' +-- It describes the state of the processing of the uploaded clks. +CREATE TYPE PROCESSEDSTATE AS ENUM ( + 'pending', -- waiting for some processing to be done + 'ready', -- processing done + 'error' -- an error occurred during the processing ); -- The encoded PII data for each dataprovider @@ -121,7 +129,7 @@ CREATE TABLE bloomingdata ( -- Store the raw CLK data in a file file CHAR(64) NOT NULL, - state UPLOADSTATE NOT NULL, + state PROCESSEDSTATE NOT NULL, -- Size in bytes of the uploaded encoding encoding_size INT NULL, diff --git a/backend/entityservice/tests/test_project_uploads.py b/backend/entityservice/tests/test_project_uploads.py index 921aee90..68b3107d 100644 --- a/backend/entityservice/tests/test_project_uploads.py +++ b/backend/entityservice/tests/test_project_uploads.py @@ -206,15 +206,15 @@ def test_project_binary_data_invalid_buffer_size( pid = new_project_data['project_id'] file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/clks_128B_1k.bin') - upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], -1, status=400) + upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], -1, expected_status_code=400) # Now try upload with valid hash-count but doesn't match actual size: - upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1000000, status=400) - upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 3, status=400) + upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1000000, expected_status_code=400) + upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 3, expected_status_code=400) # Now try the minimum upload size (1 clk) file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/single_clk.bin') - upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1, status=201) + upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1, expected_status_code=201) def test_project_single_party_empty_data_upload( @@ -235,3 +235,62 @@ def test_project_single_party_empty_data_upload( ) assert r.status_code == 400 + +def test_project_upload_using_twice_same_authentication(requests, valid_project_params): + """ + Test that a token cannot be re-used to upload clks. + So first, create a project, upload clks with a token (which should work), and then re-upload clks using the same + token which should return a 403 error. + """ + expected_number_parties = get_expected_number_parties(valid_project_params) + if expected_number_parties < 2: + # The test is not made for less than two parties + return + + new_project_data = requests.post(url + '/projects', + json={ + 'schema': {}, + **valid_project_params + }).json() + update_tokens = new_project_data['update_tokens'] + + assert len(update_tokens) == expected_number_parties + + small_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/clks_128B_1k.bin') + token_to_reuse = update_tokens[0] + upload_binary_data_from_file( + requests, + small_file_path, new_project_data['project_id'], token_to_reuse, 1000) + + upload_binary_data_from_file( + requests, + small_file_path, new_project_data['project_id'], token_to_reuse, 1000, expected_status_code=403) + + +def test_project_upload_invalid_clks_then_valid_clks_same_authentication(requests, valid_project_params): + """ + Test that a token can be re-used to upload clks after the upload failed. + So first, create a project, upload clks with a token (which should NOT work with a 400 error), + and then re-upload clks using the same token which should work. + """ + expected_number_parties = get_expected_number_parties(valid_project_params) + + new_project_data = requests.post(url + '/projects', + json={ + 'schema': {}, + **valid_project_params + }).json() + update_tokens = new_project_data['update_tokens'] + + assert len(update_tokens) == expected_number_parties + + small_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/clks_128B_1k.bin') + token_to_reuse = update_tokens[0] + # This should fail as we are not providing the good count. + upload_binary_data_from_file( + requests, + small_file_path, new_project_data['project_id'], token_to_reuse, 2000, expected_status_code=400) + + upload_binary_data_from_file( + requests, + small_file_path, new_project_data['project_id'], token_to_reuse, 1000) diff --git a/backend/entityservice/tests/util.py b/backend/entityservice/tests/util.py index a5f4e6a1..228bf8d5 100644 --- a/backend/entityservice/tests/util.py +++ b/backend/entityservice/tests/util.py @@ -434,9 +434,9 @@ def upload_binary_data(requests, data, project_id, token, count, size=128, expec return upload_response -def upload_binary_data_from_file(requests, file_path, project_id, token, count, size=128, status=201): +def upload_binary_data_from_file(requests, file_path, project_id, token, count, size=128, expected_status_code=201): with open(file_path, 'rb') as f: - return upload_binary_data(requests, f, project_id, token, count, size, status) + return upload_binary_data(requests, f, project_id, token, count, size, expected_status_code) def get_expected_number_parties(project_params): diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index ac801f68..41e1b22e 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -121,6 +121,10 @@ def project_clks_post(project_id): with DBConn() as conn: dp_id = db.get_dataprovider_id(conn, token) project_encoding_size = db.get_project_schema_encoding_size(conn, project_id) + upload_state_updated = db.is_dataprovider_allowed_to_upload_and_lock(conn, dp_id) + + if not upload_state_updated: + return safe_fail_request(403, "This token has already been used to upload clks.") log = log.bind(dp_id=dp_id) log.info("Receiving CLK data.") @@ -128,50 +132,58 @@ def project_clks_post(project_id): with opentracing.tracer.start_span('upload-clk-data', child_of=parent_span) as span: span.set_tag("project_id", project_id) - if headers['Content-Type'] == "application/json": - span.set_tag("content-type", 'json') - # TODO: Previously, we were accessing the CLKs in a streaming fashion to avoid parsing the json in one hit. This - # enables running the web frontend with less memory. - # However, as connexion is very, very strict about input validation when it comes to json, it will always - # consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as - # json into memory. -> issue #184 - - receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span) - # Schedule a task to deserialize the hashes, and carry - # out a pop count. - handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span)) - log.info("Job scheduled to handle user uploaded hashes") - elif headers['Content-Type'] == "application/octet-stream": - span.set_tag("content-type", 'binary') - log.info("Handling binary CLK upload") - try: - count, size = check_binary_upload_headers(headers) - log.info(f"Headers tell us to expect {count} encodings of {size} bytes") - span.log_kv({'count': count, 'size': size}) - except Exception: - log.warning("Upload failed due to problem with headers in binary upload") - raise - # Check against project level encoding size (if it has been set) - if project_encoding_size is not None and size != project_encoding_size: - # fail fast - we haven't stored the encoded data yet - return safe_fail_request(400, "Upload 'Hash-Size' doesn't match project settings") - - # TODO actually stream the upload data straight to Minio. Currently we can't because - # connexion has already read the data before our handler is called! - # https://github.com/zalando/connexion/issues/592 - # stream = get_stream() - stream = BytesIO(request.data) - expected_bytes = binary_format(size).size * count - log.debug(f"Stream size is {len(request.data)} B, and we expect {expected_bytes} B") - if len(request.data) != expected_bytes: - safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct") - try: - receipt_token = upload_clk_data_binary(project_id, dp_id, stream, count, size) - except ValueError: - safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct.") - else: - safe_fail_request(400, "Content Type not supported") - + try: + if headers['Content-Type'] == "application/json": + span.set_tag("content-type", 'json') + # TODO: Previously, we were accessing the CLKs in a streaming fashion to avoid parsing the json in one hit. This + # enables running the web frontend with less memory. + # However, as connexion is very, very strict about input validation when it comes to json, it will always + # consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as + # json into memory. -> issue #184 + + receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span) + # Schedule a task to deserialize the hashes, and carry + # out a pop count. + handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span)) + log.info("Job scheduled to handle user uploaded hashes") + elif headers['Content-Type'] == "application/octet-stream": + span.set_tag("content-type", 'binary') + log.info("Handling binary CLK upload") + try: + count, size = check_binary_upload_headers(headers) + log.info(f"Headers tell us to expect {count} encodings of {size} bytes") + span.log_kv({'count': count, 'size': size}) + except Exception: + log.warning("Upload failed due to problem with headers in binary upload") + raise + # Check against project level encoding size (if it has been set) + if project_encoding_size is not None and size != project_encoding_size: + # fail fast - we haven't stored the encoded data yet + return safe_fail_request(400, "Upload 'Hash-Size' doesn't match project settings") + + # TODO actually stream the upload data straight to Minio. Currently we can't because + # connexion has already read the data before our handler is called! + # https://github.com/zalando/connexion/issues/592 + # stream = get_stream() + stream = BytesIO(request.data) + expected_bytes = binary_format(size).size * count + log.debug(f"Stream size is {len(request.data)} B, and we expect {expected_bytes} B") + if len(request.data) != expected_bytes: + safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct") + try: + receipt_token = upload_clk_data_binary(project_id, dp_id, stream, count, size) + except ValueError: + safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct.") + else: + safe_fail_request(400, "Content Type not supported") + except Exception: + log.info("The dataprovider was not able to upload her clks," + " re-enable the corresponding upload token to be used.") + with DBConn() as conn: + db.set_dataprovider_upload_state(conn, dp_id, state='error') + raise + with DBConn() as conn: + db.set_dataprovider_upload_state(conn, dp_id, state='done') return {'message': 'Updated', 'receipt_token': receipt_token}, 201 @@ -250,7 +262,6 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128): with opentracing.tracer.start_span('update-database-with-metadata', child_of=parent_span): with DBConn() as conn: db.update_encoding_metadata(conn, filename, dp_id, 'ready') - db.set_dataprovider_upload_state(conn, dp_id, True) # Now work out if all parties have added their data if clks_uploaded_to_project(project_id): diff --git a/docs/changelog.rst b/docs/changelog.rst index 95c1b1c8..605a456c 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -12,6 +12,7 @@ Version 1.13.0-alpha - fixed bug where invalid state changes could occur when starting a run (#459) - ``matching`` output type has been removed as redundant with the ``groups`` output with 2 parties. (#458) +- fixed a bug where a dataprovider could upload her clks multiple time in a project using the same upload token (#463) - Update dependencies: @@ -21,6 +22,7 @@ Breaking Change ~~~~~~~~~~~~~~~ - ``matching`` output type is not available anymore. (#458) +- the ``dataproviders`` table `uploaded` field has been modified from a BOOL to an ENUM type (#463) Version 1.12.0