Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions backend/entityservice/database/insertions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import psycopg2
import psycopg2.extras

from entityservice.database.util import execute_returning_id, logger
from entityservice.database.util import execute_returning_id, logger, query_db
from entityservice.errors import RunDeleted


Expand Down Expand Up @@ -34,9 +34,9 @@ def insert_new_run(db, run_id, project_id, threshold, name, type, notes=''):
def insert_dataprovider(cur, auth_token, project_id):
sql_query = """
INSERT INTO dataproviders
(project, token)
(project, token, uploaded)
VALUES
(%s, %s)
(%s, %s, 'not_started')
RETURNING id
"""
return execute_returning_id(cur, sql_query, [project_id, auth_token])
Expand All @@ -54,10 +54,8 @@ def insert_encoding_metadata(db, clks_filename, dp_id, receipt_token, count):
with db.cursor() as cur:
cur.execute(sql_insertion_query, [dp_id, receipt_token, clks_filename, count, 'pending'])

set_dataprovider_upload_state(db, dp_id, True)


def set_dataprovider_upload_state(db, dp_id, state=True):
def set_dataprovider_upload_state(db, dp_id, state='error'):
logger.debug("Setting dataprovider {} upload state to {}".format(dp_id, state))
sql_update = """
UPDATE dataproviders
Expand Down Expand Up @@ -261,3 +259,25 @@ def get_created_runs_and_queue(db, project_id):
if res is None:
res = []
return res


def get_and_set_dataprovider_upload_state_in_progress(db, dp_id):
"""
This method returns true if it was able to update the uploaded status of this dataprovider from false to true.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the status of the dataproviders is not true/false any more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well seen. I started by using the boolean value, but added more states to have an in_progress one. But I forgot the comments...

It return false otherwise (i.e. the state was already set to true).
"""
logger.debug("Setting dataprovider {} upload state to True".format(dp_id))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-phrased the whole docstring.

sql_update = """
UPDATE dataproviders
SET uploaded = 'in_progress'
WHERE id = %s and uploaded != 'done' and uploaded != 'in_progress'
RETURNING id, uploaded
"""
query_response = query_db(db, sql_update, [dp_id])
print(query_response)
length = len(query_response)
if length < 1:
return False
elif length > 1:
raise ValueError("Houston, we have a problem!!! This dataprovider can upload multiple times its clks.")
return True
6 changes: 3 additions & 3 deletions backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_dataprovider_ids(db, project_id):
FROM dataproviders
WHERE
dataproviders.project = %s AND
dataproviders.uploaded = TRUE
dataproviders.uploaded = 'done'
ORDER BY dataproviders.id
"""
query_result = query_db(db, sql_query, [project_id])
Expand Down Expand Up @@ -65,7 +65,7 @@ def get_number_parties_uploaded(db, project_id):
WHERE
dataproviders.project = %s AND
bloomingdata.dp = dataproviders.id AND
dataproviders.uploaded = TRUE
dataproviders.uploaded = 'done'
"""
query_result = query_db(db, sql_query, [project_id], one=True)
return query_result['count']
Expand Down Expand Up @@ -93,7 +93,7 @@ def get_number_parties_ready(db, resource_id):
WHERE
dataproviders.project = %s AND
bloomingdata.dp = dataproviders.id AND
dataproviders.uploaded = TRUE AND
dataproviders.uploaded = 'done' AND
bloomingdata.state = 'ready'
"""
query_result = query_db(db, sql_query, [resource_id], one=True)
Expand Down
8 changes: 7 additions & 1 deletion backend/entityservice/init-db-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ RETURNS bool AS $$
SELECT $1.state = 'completed'
$$ STABLE LANGUAGE SQL;

CREATE TYPE UPLOADEDSTATE AS ENUM (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, now we have UPLOADEDSTATE and UPLOADSTATE. This is very confusing.
After looking at those for a bit, I think we should rename UPLOADSTATE to PROCESSEDSTATE.
It would also be nice to add a bit of an explanation what all those states are for.
In my view,
UPLOADSTATE describes the state of the processing of the uploaded clks. This is only set after CLKs have been uploaded. 'pending' means we are waiting for some processing to be done, 'ready' means done and 'error' if something fails.
UPLOADEDSTATE describes the state of the CLK upload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was indeed a pickle. I will add comments on the UPLOADEDSTATE.
I didn't want to touch the UPLOADSTATE as I wasn't sure what it was made for (as you mentioned, this is not well documented). But I'll also add some documentation around it and rename it. In which case I will also rename UPLOADEDSTATE to UPLOADSTATE which sounds better :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll finally keep UPLOADEDSTATE instead of renaming it because the value in the table is uploaded. But renamed the UPLOADSTATE to PROCESSEDSTATE, adding some documentation.

'not_started',
'in_progress',
'done',
'error'
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


CREATE TABLE dataproviders (
id SERIAL PRIMARY KEY,
Expand All @@ -93,7 +99,7 @@ CREATE TABLE dataproviders (
token CHAR(48) NOT NULL UNIQUE,

-- Set after the bloom filter data has been added
uploaded BOOL NOT NULL DEFAULT FALSE,
uploaded UPLOADEDSTATE NOT NULL,

project CHAR(48) REFERENCES projects (project_id) on DELETE CASCADE
);
Expand Down
67 changes: 63 additions & 4 deletions backend/entityservice/tests/test_project_uploads.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,15 @@ def test_project_binary_data_invalid_buffer_size(
pid = new_project_data['project_id']
file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/clks_128B_1k.bin')

upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], -1, status=400)
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], -1, expected_status_code=400)

# Now try upload with valid hash-count but doesn't match actual size:
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1000000, status=400)
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 3, status=400)
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1000000, expected_status_code=400)
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 3, expected_status_code=400)

# Now try the minimum upload size (1 clk)
file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/single_clk.bin')
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1, status=201)
upload_binary_data_from_file(requests, file_path, pid, new_project_data['update_tokens'][0], 1, expected_status_code=201)


def test_project_single_party_empty_data_upload(
Expand All @@ -235,3 +235,62 @@ def test_project_single_party_empty_data_upload(
)
assert r.status_code == 400


def test_project_upload_wrong_authentication(requests, valid_project_params):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can be more specific here. You are testing that you cannot upload twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"""
Test that a token cannot be re-used to upload clks.
So first, create a project, upload clks with a token (which should work), and then re-upload clks using the same
token which should return a 403 error.
"""
expected_number_parties = get_expected_number_parties(valid_project_params)
if expected_number_parties < 2:
# The test is not made for less than two parties
return

new_project_data = requests.post(url + '/projects',
json={
'schema': {},
**valid_project_params
}).json()
update_tokens = new_project_data['update_tokens']

assert len(update_tokens) == expected_number_parties

small_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/clks_128B_1k.bin')
token_to_reuse = update_tokens[0]
upload_binary_data_from_file(
requests,
small_file_path, new_project_data['project_id'], token_to_reuse, 1000)

upload_binary_data_from_file(
requests,
small_file_path, new_project_data['project_id'], token_to_reuse, 1000, expected_status_code=403)


def test_project_upload_fail_then_works(requests, valid_project_params):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a great name for this test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was inspired :)

"""
Test that a token can be re-used to upload clks after the upload failed.
So first, create a project, upload clks with a token (which should NOT work with a 400 error),
and then re-upload clks using the same token which should work.
"""
expected_number_parties = get_expected_number_parties(valid_project_params)

new_project_data = requests.post(url + '/projects',
json={
'schema': {},
**valid_project_params
}).json()
update_tokens = new_project_data['update_tokens']

assert len(update_tokens) == expected_number_parties

small_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'testdata/clks_128B_1k.bin')
token_to_reuse = update_tokens[0]
# This should fail as we are not providing the good count.
upload_binary_data_from_file(
requests,
small_file_path, new_project_data['project_id'], token_to_reuse, 2000, expected_status_code=400)

upload_binary_data_from_file(
requests,
small_file_path, new_project_data['project_id'], token_to_reuse, 1000)
4 changes: 2 additions & 2 deletions backend/entityservice/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,9 @@ def upload_binary_data(requests, data, project_id, token, count, size=128, expec
return upload_response


def upload_binary_data_from_file(requests, file_path, project_id, token, count, size=128, status=201):
def upload_binary_data_from_file(requests, file_path, project_id, token, count, size=128, expected_status_code=201):
with open(file_path, 'rb') as f:
return upload_binary_data(requests, f, project_id, token, count, size, status)
return upload_binary_data(requests, f, project_id, token, count, size, expected_status_code)


def get_expected_number_parties(project_params):
Expand Down
101 changes: 56 additions & 45 deletions backend/entityservice/views/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,57 +121,69 @@ def project_clks_post(project_id):
with DBConn() as conn:
dp_id = db.get_dataprovider_id(conn, token)
project_encoding_size = db.get_project_schema_encoding_size(conn, project_id)
upload_state_updated = db.get_and_set_dataprovider_upload_state_in_progress(conn, dp_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit cryptic. The function get_and_set_dataprovider_upload_state_in_progress mixes two tasks.
How about if we split this to make it clearer:

if not db.dataprovider_allowed_to_upload(conn, project_id, dp_id, ...):
  return safe_fail_request(403, "This token has already been used to upload clks.")
db.set_dp_upload_state_in_progress(...)
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main point is to do both actions at once not to have race conditions. Otherwise we would first assess it, and then setting it which should return an error if already set. I thus rename the method is_dataprovider_allowed_to_upload_and_lock.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see...


if not upload_state_updated:
return safe_fail_request(403, "This token has already been used to upload clks.")

log = log.bind(dp_id=dp_id)
log.info("Receiving CLK data.")
receipt_token = None

with opentracing.tracer.start_span('upload-clk-data', child_of=parent_span) as span:
span.set_tag("project_id", project_id)
if headers['Content-Type'] == "application/json":
span.set_tag("content-type", 'json')
# TODO: Previously, we were accessing the CLKs in a streaming fashion to avoid parsing the json in one hit. This
# enables running the web frontend with less memory.
# However, as connexion is very, very strict about input validation when it comes to json, it will always
# consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as
# json into memory. -> issue #184

receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span)
# Schedule a task to deserialize the hashes, and carry
# out a pop count.
handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span))
log.info("Job scheduled to handle user uploaded hashes")
elif headers['Content-Type'] == "application/octet-stream":
span.set_tag("content-type", 'binary')
log.info("Handling binary CLK upload")
try:
count, size = check_binary_upload_headers(headers)
log.info(f"Headers tell us to expect {count} encodings of {size} bytes")
span.log_kv({'count': count, 'size': size})
except Exception:
log.warning("Upload failed due to problem with headers in binary upload")
raise
# Check against project level encoding size (if it has been set)
if project_encoding_size is not None and size != project_encoding_size:
# fail fast - we haven't stored the encoded data yet
return safe_fail_request(400, "Upload 'Hash-Size' doesn't match project settings")

# TODO actually stream the upload data straight to Minio. Currently we can't because
# connexion has already read the data before our handler is called!
# https://github.com/zalando/connexion/issues/592
# stream = get_stream()
stream = BytesIO(request.data)
expected_bytes = binary_format(size).size * count
log.debug(f"Stream size is {len(request.data)} B, and we expect {expected_bytes} B")
if len(request.data) != expected_bytes:
safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct")
try:
receipt_token = upload_clk_data_binary(project_id, dp_id, stream, count, size)
except ValueError:
safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct.")
else:
safe_fail_request(400, "Content Type not supported")

try:
if headers['Content-Type'] == "application/json":
span.set_tag("content-type", 'json')
# TODO: Previously, we were accessing the CLKs in a streaming fashion to avoid parsing the json in one hit. This
# enables running the web frontend with less memory.
# However, as connexion is very, very strict about input validation when it comes to json, it will always
# consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as
# json into memory. -> issue #184

receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span)
# Schedule a task to deserialize the hashes, and carry
# out a pop count.
handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span))
log.info("Job scheduled to handle user uploaded hashes")
elif headers['Content-Type'] == "application/octet-stream":
span.set_tag("content-type", 'binary')
log.info("Handling binary CLK upload")
try:
count, size = check_binary_upload_headers(headers)
log.info(f"Headers tell us to expect {count} encodings of {size} bytes")
span.log_kv({'count': count, 'size': size})
except Exception:
log.warning("Upload failed due to problem with headers in binary upload")
raise
# Check against project level encoding size (if it has been set)
if project_encoding_size is not None and size != project_encoding_size:
# fail fast - we haven't stored the encoded data yet
return safe_fail_request(400, "Upload 'Hash-Size' doesn't match project settings")

# TODO actually stream the upload data straight to Minio. Currently we can't because
# connexion has already read the data before our handler is called!
# https://github.com/zalando/connexion/issues/592
# stream = get_stream()
stream = BytesIO(request.data)
expected_bytes = binary_format(size).size * count
log.debug(f"Stream size is {len(request.data)} B, and we expect {expected_bytes} B")
if len(request.data) != expected_bytes:
safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct")
try:
receipt_token = upload_clk_data_binary(project_id, dp_id, stream, count, size)
except ValueError:
safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct.")
else:
safe_fail_request(400, "Content Type not supported")
except Exception:
log.info("The dataprovider was not able to upload her clks,"
" re-enable the corresponding upload token to be used.")
with DBConn() as conn:
db.set_dataprovider_upload_state(conn, dp_id, state='error')
raise
with DBConn() as conn:
db.set_dataprovider_upload_state(conn, dp_id, state='done')
return {'message': 'Updated', 'receipt_token': receipt_token}, 201


Expand Down Expand Up @@ -250,7 +262,6 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128):
with opentracing.tracer.start_span('update-database-with-metadata', child_of=parent_span):
with DBConn() as conn:
db.update_encoding_metadata(conn, filename, dp_id, 'ready')
db.set_dataprovider_upload_state(conn, dp_id, True)

# Now work out if all parties have added their data
if clks_uploaded_to_project(project_id):
Expand Down
2 changes: 2 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Version 1.13.0-alpha

- fixed bug where invalid state changes could occur when starting a run (#459)
- ``matching`` output type has been removed as redundant with the ``groups`` output with 2 parties. (#458)
- fixed a bug where a dataprovider could upload her clks multiple time in a project using the same upload token (#463)

- Update dependencies:

Expand All @@ -21,6 +22,7 @@ Breaking Change
~~~~~~~~~~~~~~~

- ``matching`` output type is not available anymore. (#458)
- the ``dataproviders`` table `uploaded` field has been modified from a BOOL to an ENUM type (#463)


Version 1.12.0
Expand Down