Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions backend/entityservice/database/insertions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import psycopg2
import psycopg2.extras

from entityservice.database.util import execute_returning_id, logger
from entityservice.database.util import execute_returning_id, logger, query_db
from entityservice.errors import RunDeleted


Expand Down Expand Up @@ -34,9 +34,9 @@ def insert_new_run(db, run_id, project_id, threshold, name, type, notes=''):
def insert_dataprovider(cur, auth_token, project_id):
sql_query = """
INSERT INTO dataproviders
(project, token)
(project, token, uploaded)
VALUES
(%s, %s)
(%s, %s, 'not_started')
RETURNING id
"""
return execute_returning_id(cur, sql_query, [project_id, auth_token])
Expand All @@ -54,10 +54,8 @@ def insert_encoding_metadata(db, clks_filename, dp_id, receipt_token, count):
with db.cursor() as cur:
cur.execute(sql_insertion_query, [dp_id, receipt_token, clks_filename, count, 'pending'])

set_dataprovider_upload_state(db, dp_id, True)


def set_dataprovider_upload_state(db, dp_id, state=True):
def set_dataprovider_upload_state(db, dp_id, state='error'):
logger.debug("Setting dataprovider {} upload state to {}".format(dp_id, state))
sql_update = """
UPDATE dataproviders
Expand Down Expand Up @@ -261,3 +259,30 @@ def get_created_runs_and_queue(db, project_id):
if res is None:
res = []
return res


def is_dataprovider_allowed_to_upload_and_lock(db, dp_id):
"""
This method returns true if the dataprovider is allowed to upload her clks.
A dataprovider is not allowed to upload clks if she has already uploaded them, or if the upload is in progress.
This method will lock the resource by setting the upload state to `in_progress` and returning `true`.
Note that the upload state can be `error`, in which case we are allowing the dataprovider to re-try uploading
her clks not to block a project if a failure occurred.
"""
logger.debug("Setting dataprovider {} upload state to `in_progress``".format(dp_id))
sql_update = """
UPDATE dataproviders
SET uploaded = 'in_progress'
WHERE id = %s and uploaded != 'done' and uploaded != 'in_progress'
RETURNING id, uploaded
"""
query_response = query_db(db, sql_update, [dp_id])
print(query_response)
length = len(query_response)
if length < 1:
return False
elif length > 1:
logger.error("{} rows in the table `dataproviders` are associated to the same dataprovider id {}, while each"
" dataprovider id should be unique.".format(length, dp_id))
raise ValueError("Houston, we have a problem!!! This dataprovider can upload multiple times her clks.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to write a message to the logger at high severity level as well here.

return True
6 changes: 3 additions & 3 deletions backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_dataprovider_ids(db, project_id):
FROM dataproviders
WHERE
dataproviders.project = %s AND
dataproviders.uploaded = TRUE
dataproviders.uploaded = 'done'
ORDER BY dataproviders.id
"""
query_result = query_db(db, sql_query, [project_id])
Expand Down Expand Up @@ -65,7 +65,7 @@ def get_number_parties_uploaded(db, project_id):
WHERE
dataproviders.project = %s AND
bloomingdata.dp = dataproviders.id AND
dataproviders.uploaded = TRUE
dataproviders.uploaded = 'done'
"""
query_result = query_db(db, sql_query, [project_id], one=True)
return query_result['count']
Expand Down Expand Up @@ -93,7 +93,7 @@ def get_number_parties_ready(db, resource_id):
WHERE
dataproviders.project = %s AND
bloomingdata.dp = dataproviders.id AND
dataproviders.uploaded = TRUE AND
dataproviders.uploaded = 'done' AND
bloomingdata.state = 'ready'
"""
query_result = query_db(db, sql_query, [resource_id], one=True)
Expand Down
20 changes: 14 additions & 6 deletions backend/entityservice/init-db-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ RETURNS bool AS $$
SELECT $1.state = 'completed'
$$ STABLE LANGUAGE SQL;

-- Describe the state of the upload of the clks to the entity-service.
CREATE TYPE UPLOADEDSTATE AS ENUM (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, now we have UPLOADEDSTATE and UPLOADSTATE. This is very confusing.
After looking at those for a bit, I think we should rename UPLOADSTATE to PROCESSEDSTATE.
It would also be nice to add a bit of an explanation what all those states are for.
In my view,
UPLOADSTATE describes the state of the processing of the uploaded clks. This is only set after CLKs have been uploaded. 'pending' means we are waiting for some processing to be done, 'ready' means done and 'error' if something fails.
UPLOADEDSTATE describes the state of the CLK upload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was indeed a pickle. I will add comments on the UPLOADEDSTATE.
I didn't want to touch the UPLOADSTATE as I wasn't sure what it was made for (as you mentioned, this is not well documented). But I'll also add some documentation around it and rename it. In which case I will also rename UPLOADEDSTATE to UPLOADSTATE which sounds better :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll finally keep UPLOADEDSTATE instead of renaming it because the value in the table is uploaded. But renamed the UPLOADSTATE to PROCESSEDSTATE, adding some documentation.

'not_started', -- default state, a dataprovider has not tried yet to upload her clks
'in_progress', -- the upload is in progress, so no-one else should be able to upload at the same time
'done', -- the clks have been uploaded, it should stay this way
'error' -- the dataprovider has tried to upload the clks but an error occurred, having this state allows a dataprovider to try again.
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


CREATE TABLE dataproviders (
id SERIAL PRIMARY KEY,
Expand All @@ -93,18 +100,19 @@ CREATE TABLE dataproviders (
token CHAR(48) NOT NULL UNIQUE,

-- Set after the bloom filter data has been added
uploaded BOOL NOT NULL DEFAULT FALSE,
uploaded UPLOADEDSTATE NOT NULL,

project CHAR(48) REFERENCES projects (project_id) on DELETE CASCADE
);

CREATE INDEX ON dataproviders (project);
CREATE INDEX ON dataproviders (uploaded);

CREATE TYPE UPLOADSTATE AS ENUM (
'pending',
'ready',
'error'
-- It describes the state of the processing of the uploaded clks.
CREATE TYPE PROCESSEDSTATE AS ENUM (
'pending', -- waiting for some processing to be done
'ready', -- processing done
'error' -- an error occurred during the processing
);

-- The encoded PII data for each dataprovider
Expand All @@ -121,7 +129,7 @@ CREATE TABLE bloomingdata (
-- Store the raw CLK data in a file
file CHAR(64) NOT NULL,

state UPLOADSTATE NOT NULL,
state PROCESSEDSTATE NOT NULL,

-- Size in bytes of the uploaded encoding
encoding_size INT NULL,
Expand Down
67 changes: 63 additions & 4 deletions backend/entityservice/tests/test_project_uploads.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,15 @@ def test_project_binary_data_invalid_buffer_size(
pid = new_project_data['project_id']
file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/clks_128B_1k.bin')

upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], -1, status=400)
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], -1, expected_status_code=400)

# Now try upload with valid hash-count but doesn't match actual size:
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1000000, status=400)
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 3, status=400)
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1000000, expected_status_code=400)
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 3, expected_status_code=400)

# Now try the minimum upload size (1 clk)
file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/single_clk.bin')
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1, status=201)
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1, expected_status_code=201)


def test_project_single_party_empty_data_upload(
Expand All @@ -235,3 +235,62 @@ def test_project_single_party_empty_data_upload(
)
assert r.status_code == 400


def test_project_upload_using_twice_same_authentication(requests, valid_project_params):
"""
Test that a token cannot be re-used to upload clks.
So first, create a project, upload clks with a token (which should work), and then re-upload clks using the same
token which should return a 403 error.
"""
expected_number_parties = get_expected_number_parties(valid_project_params)
if expected_number_parties < 2:
# The test is not made for less than two parties
return

new_project_data = requests.post(url + '/projects',
json={
'schema': {},
**valid_project_params
}).json()
update_tokens = new_project_data['update_tokens']

assert len(update_tokens) == expected_number_parties

small_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/clks_128B_1k.bin')
token_to_reuse = update_tokens[0]
upload_binary_data_from_file(
requests,
small_file_path, new_project_data['project_id'], token_to_reuse, 1000)

upload_binary_data_from_file(
requests,
small_file_path, new_project_data['project_id'], token_to_reuse, 1000, expected_status_code=403)


def test_project_upload_invalid_clks_then_valid_clks_same_authentication(requests, valid_project_params):
"""
Test that a token can be re-used to upload clks after the upload failed.
So first, create a project, upload clks with a token (which should NOT work with a 400 error),
and then re-upload clks using the same token which should work.
"""
expected_number_parties = get_expected_number_parties(valid_project_params)

new_project_data = requests.post(url + '/projects',
json={
'schema': {},
**valid_project_params
}).json()
update_tokens = new_project_data['update_tokens']

assert len(update_tokens) == expected_number_parties

small_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/clks_128B_1k.bin')
token_to_reuse = update_tokens[0]
# This should fail as we are not providing the good count.
upload_binary_data_from_file(
requests,
small_file_path, new_project_data['project_id'], token_to_reuse, 2000, expected_status_code=400)

upload_binary_data_from_file(
requests,
small_file_path, new_project_data['project_id'], token_to_reuse, 1000)
4 changes: 2 additions & 2 deletions backend/entityservice/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,9 @@ def upload_binary_data(requests, data, project_id, token, count, size=128, expec
return upload_response


def upload_binary_data_from_file(requests, file_path, project_id, token, count, size=128, status=201):
def upload_binary_data_from_file(requests, file_path, project_id, token, count, size=128, expected_status_code=201):
with open(file_path, 'rb') as f:
return upload_binary_data(requests, f, project_id, token, count, size, status)
return upload_binary_data(requests, f, project_id, token, count, size, expected_status_code)


def get_expected_number_parties(project_params):
Expand Down
101 changes: 56 additions & 45 deletions backend/entityservice/views/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,57 +121,69 @@ def project_clks_post(project_id):
with DBConn() as conn:
dp_id = db.get_dataprovider_id(conn, token)
project_encoding_size = db.get_project_schema_encoding_size(conn, project_id)
upload_state_updated = db.is_dataprovider_allowed_to_upload_and_lock(conn, dp_id)

if not upload_state_updated:
return safe_fail_request(403, "This token has already been used to upload clks.")

log = log.bind(dp_id=dp_id)
log.info("Receiving CLK data.")
receipt_token = None

with opentracing.tracer.start_span('upload-clk-data', child_of=parent_span) as span:
span.set_tag("project_id", project_id)
if headers['Content-Type'] == "application/json":
span.set_tag("content-type", 'json')
# TODO: Previously, we were accessing the CLKs in a streaming fashion to avoid parsing the json in one hit. This
# enables running the web frontend with less memory.
# However, as connexion is very, very strict about input validation when it comes to json, it will always
# consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as
# json into memory. -> issue #184

receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span)
# Schedule a task to deserialize the hashes, and carry
# out a pop count.
handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span))
log.info("Job scheduled to handle user uploaded hashes")
elif headers['Content-Type'] == "application/octet-stream":
span.set_tag("content-type", 'binary')
log.info("Handling binary CLK upload")
try:
count, size = check_binary_upload_headers(headers)
log.info(f"Headers tell us to expect {count} encodings of {size} bytes")
span.log_kv({'count': count, 'size': size})
except Exception:
log.warning("Upload failed due to problem with headers in binary upload")
raise
# Check against project level encoding size (if it has been set)
if project_encoding_size is not None and size != project_encoding_size:
# fail fast - we haven't stored the encoded data yet
return safe_fail_request(400, "Upload 'Hash-Size' doesn't match project settings")

# TODO actually stream the upload data straight to Minio. Currently we can't because
# connexion has already read the data before our handler is called!
# https://github.com/zalando/connexion/issues/592
# stream = get_stream()
stream = BytesIO(request.data)
expected_bytes = binary_format(size).size * count
log.debug(f"Stream size is {len(request.data)} B, and we expect {expected_bytes} B")
if len(request.data) != expected_bytes:
safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct")
try:
receipt_token = upload_clk_data_binary(project_id, dp_id, stream, count, size)
except ValueError:
safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct.")
else:
safe_fail_request(400, "Content Type not supported")

try:
if headers['Content-Type'] == "application/json":
span.set_tag("content-type", 'json')
# TODO: Previously, we were accessing the CLKs in a streaming fashion to avoid parsing the json in one hit. This
# enables running the web frontend with less memory.
# However, as connexion is very, very strict about input validation when it comes to json, it will always
# consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as
# json into memory. -> issue #184

receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span)
# Schedule a task to deserialize the hashes, and carry
# out a pop count.
handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span))
log.info("Job scheduled to handle user uploaded hashes")
elif headers['Content-Type'] == "application/octet-stream":
span.set_tag("content-type", 'binary')
log.info("Handling binary CLK upload")
try:
count, size = check_binary_upload_headers(headers)
log.info(f"Headers tell us to expect {count} encodings of {size} bytes")
span.log_kv({'count': count, 'size': size})
except Exception:
log.warning("Upload failed due to problem with headers in binary upload")
raise
# Check against project level encoding size (if it has been set)
if project_encoding_size is not None and size != project_encoding_size:
# fail fast - we haven't stored the encoded data yet
return safe_fail_request(400, "Upload 'Hash-Size' doesn't match project settings")

# TODO actually stream the upload data straight to Minio. Currently we can't because
# connexion has already read the data before our handler is called!
# https://github.com/zalando/connexion/issues/592
# stream = get_stream()
stream = BytesIO(request.data)
expected_bytes = binary_format(size).size * count
log.debug(f"Stream size is {len(request.data)} B, and we expect {expected_bytes} B")
if len(request.data) != expected_bytes:
safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct")
try:
receipt_token = upload_clk_data_binary(project_id, dp_id, stream, count, size)
except ValueError:
safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct.")
else:
safe_fail_request(400, "Content Type not supported")
except Exception:
log.info("The dataprovider was not able to upload her clks,"
" re-enable the corresponding upload token to be used.")
with DBConn() as conn:
db.set_dataprovider_upload_state(conn, dp_id, state='error')
raise
with DBConn() as conn:
db.set_dataprovider_upload_state(conn, dp_id, state='done')
return {'message': 'Updated', 'receipt_token': receipt_token}, 201


Expand Down Expand Up @@ -250,7 +262,6 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128):
with opentracing.tracer.start_span('update-database-with-metadata', child_of=parent_span):
with DBConn() as conn:
db.update_encoding_metadata(conn, filename, dp_id, 'ready')
db.set_dataprovider_upload_state(conn, dp_id, True)

# Now work out if all parties have added their data
if clks_uploaded_to_project(project_id):
Expand Down
2 changes: 2 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Version 1.13.0-alpha

- fixed bug where invalid state changes could occur when starting a run (#459)
- ``matching`` output type has been removed as redundant with the ``groups`` output with 2 parties. (#458)
- fixed a bug where a dataprovider could upload her clks multiple time in a project using the same upload token (#463)

- Update dependencies:

Expand All @@ -21,6 +22,7 @@ Breaking Change
~~~~~~~~~~~~~~~

- ``matching`` output type is not available anymore. (#458)
- the ``dataproviders`` table `uploaded` field has been modified from a BOOL to an ENUM type (#463)


Version 1.12.0
Expand Down