From 39cfe946b01d9c36a57ed99992fdc61186a9e327 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 3 Mar 2020 14:14:15 +1300 Subject: [PATCH 01/13] Configure jaeger for docker-compose --- tools/ci.yml | 2 ++ tools/docker-compose.yml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/tools/ci.yml b/tools/ci.yml index 73eb7112..ccfc10a4 100644 --- a/tools/ci.yml +++ b/tools/ci.yml @@ -7,6 +7,8 @@ services: environment: - SERVER=http://nginx:8851 - INITIAL_DELAY=20 + - JAEGER_AGENT_HOST=jaeger + - JAEGER_SERVICE_NAME=e2etests entrypoint: /bin/sh -c "dockerize -wait tcp://nginx:8851/v1/status -timeout 1m python -m pytest -n 16 entityservice/tests --junitxml=testResults.xml -x" depends_on: - db diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index eb977d19..639c9172 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -35,6 +35,7 @@ services: - MINIO_SECRET_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY - FLASK_DB_MIN_CONNECTIONS=1 - FLASK_DB_MAX_CONNECTIONS=10 + - JAEGER_AGENT_HOST=jaeger depends_on: - db - db_init @@ -73,6 +74,7 @@ services: #- CHUNK_SIZE_AIM=300_000_000 - CELERY_DB_MIN_CONNECTIONS=1 - CELERY_DB_MAX_CONNECTIONS=3 + - JAEGER_AGENT_HOST=jaeger nginx: From 1540c0f12461d5a4f2681b89fb850a21ee214d0b Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 3 Mar 2020 17:10:31 +1300 Subject: [PATCH 02/13] Tweak to allow debugging of tests from docker-compose --- tools/ci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/ci.yml b/tools/ci.yml index ccfc10a4..c002fcb8 100644 --- a/tools/ci.yml +++ b/tools/ci.yml @@ -8,8 +8,7 @@ services: - SERVER=http://nginx:8851 - INITIAL_DELAY=20 - JAEGER_AGENT_HOST=jaeger - - JAEGER_SERVICE_NAME=e2etests - entrypoint: /bin/sh -c "dockerize -wait tcp://nginx:8851/v1/status -timeout 1m python -m pytest -n 16 entityservice/tests --junitxml=testResults.xml -x" + command: /bin/sh -c "dockerize -wait tcp://nginx:8851/v1/status -timeout 1m python -m pytest -n 4 entityservice/tests --junitxml=testResults.xml -x" depends_on: - db - redis From db51552a63d4c7a4244d883eaf36f955ecbaebc7 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 2 Mar 2020 16:16:50 +1300 Subject: [PATCH 03/13] Transaction size for encoding upload will depend on encoding_size. --- backend/entityservice/tasks/encoding_uploading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/entityservice/tasks/encoding_uploading.py b/backend/entityservice/tasks/encoding_uploading.py index f0444f91..22235def 100644 --- a/backend/entityservice/tasks/encoding_uploading.py +++ b/backend/entityservice/tasks/encoding_uploading.py @@ -32,7 +32,7 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): # Get number of blocks + total number of encodings from database expected_count, block_count = get_encoding_metadata(db, dp_id) - log.info(f"Expecting to handle {expected_count} encodings of in {block_count} blocks") + log.info(f"Expecting to handle {expected_count} encodings in {block_count} blocks") mc = connect_to_object_store() raw_file = Config.RAW_FILENAME_FMT.format(receipt_token) raw_data = mc.get_object(Config.MINIO_BUCKET, raw_file) From e73597e41b95e5b7862a1785cfae6a203cc8785b Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 2 Mar 2020 15:10:12 +1300 Subject: [PATCH 04/13] Pull encoding data from postgres --- backend/entityservice/database/selections.py | 21 ++- backend/entityservice/encoding_storage.py | 13 +- backend/entityservice/tasks/comparing.py | 143 +++++++++---------- 3 files changed, 99 insertions(+), 78 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 6f3c353c..0a10708a 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -285,7 +285,7 @@ def get_encodingblock_ids(db, dp_id, block_name=None): """.format("AND block_id = %s" if block_name else "") # Specifying a name for the cursor creates a server-side cursor, which prevents all of the # records from being downloaded at once. - cur = db.cursor(f'encodingfetcher-{dp_id}') + cur = db.cursor(f'encodingblockfetcher-{dp_id}') args = (dp_id, block_name) if block_name else (dp_id,) @@ -298,6 +298,25 @@ def get_encodingblock_ids(db, dp_id, block_name=None): yield row[0] +def get_encodings_by_id_range(db, dp_id, encoding_id_min=None, encoding_id_max=None): + """Yield all encodings in a given range for a given data provider.""" + sql_query = """ + SELECT encoding + FROM encodings + WHERE dp = %s + {} + {} + """.format( + f"AND encoding_id >= {encoding_id_min}" if encoding_id_min else "", + f"AND encoding_id < {encoding_id_max}" if encoding_id_max else "", + ) + cur = db.cursor() + cur.execute(sql_query, (dp_id,)) + rows = cur.fetchall() + for row in rows: + yield row[0] + + def get_filter_metadata(db, dp_id): """ :return: The filename and the encoding size of the raw clks. diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index 6174eb18..73318bc2 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -4,8 +4,8 @@ import ijson -from entityservice.database import insert_encodings_into_blocks -from entityservice.serialization import deserialize_bytes, binary_format +from entityservice.database import insert_encodings_into_blocks, get_encodings_by_id_range, get_dataprovider_ids +from entityservice.serialization import deserialize_bytes, binary_format, binary_unpack_filters def stream_json_clksnblocks(f): @@ -98,6 +98,15 @@ def _estimate_group_size(encoding_size): return math.ceil(network_transaction_size / ((blocks_per_record_estimate * 64) + (encoding_size + 4))) +def get_encoding_chunk(conn, chunk_info, encoding_size=128): + chunk_range_start, chunk_range_stop = chunk_info['range'] + dataprovider_id = chunk_info['dataproviderId'] + + encoding_data_stream = get_encodings_by_id_range(conn, dataprovider_id, chunk_range_start, chunk_range_stop) + chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size) + return chunk_data, len(chunk_data) + + def convert_encodings_from_json_to_binary(f): """ Temp helper function diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index e2f598c8..5cddf21f 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -5,6 +5,7 @@ import anonlink import minio +import opentracing import psycopg2 from celery import chord @@ -12,6 +13,7 @@ from entityservice.async_worker import celery, logger from entityservice.cache.encodings import remove_from_cache from entityservice.cache.progress import save_current_progress +from entityservice.encoding_storage import get_encoding_chunk from entityservice.errors import RunDeleted from entityservice.database import ( check_project_exists, check_run_exists, DBConn, get_dataprovider_ids, @@ -29,7 +31,7 @@ from entityservice.utils import generate_code, iterable_to_stream -@celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'run_id', 'threshold')) +@celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'run_id')) def create_comparison_jobs(project_id, run_id, parent_span=None): log = logger.bind(pid=project_id, run_id=run_id) with DBConn() as conn: @@ -60,21 +62,17 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): f"{' x '.join(map(str, dataset_sizes))} entities") current_span.log_kv({"event": 'get-dataset-sizes'}) - filters_object_filenames = tuple( - get_filter_metadata(conn, dp_id)[0] for dp_id in dp_ids) - current_span.log_kv({"event": 'get-metadata'}) - log.debug("Chunking computation task") + log.debug("Chunking computation task") chunk_infos = tuple(anonlink.concurrency.split_to_chunks( Config.CHUNK_SIZE_AIM, dataset_sizes=dataset_sizes)) - # Save filenames with chunk information. + # Add the db ids to the chunk information. for chunk_info in chunk_infos: for chunk_dp_info in chunk_info: chunk_dp_index = chunk_dp_info['datasetIndex'] - chunk_dp_store_filename = filters_object_filenames[chunk_dp_index] - chunk_dp_info['storeFilename'] = chunk_dp_store_filename + chunk_dp_info['dataproviderId'] = dp_ids[chunk_dp_index] log.info(f"Chunking into {len(chunk_infos)} computation tasks") current_span.log_kv({"event": "chunking", 'num_chunks': len(chunk_infos)}) @@ -103,15 +101,14 @@ def compute_filter_similarity(chunk_info, project_id, run_id, threshold, encodin """Compute filter similarity between a chunk of filters in dataprovider 1, and a chunk of filters in dataprovider 2. - :param chunk_info: - Chunk info returned by ``anonlink.concurrency.split_to_chunks``. - Additionally, "storeFilename" is added to each dataset chunk. + :param dict chunk_info: + A chunk returned by ``anonlink.concurrency.split_to_chunks``. :param project_id: :param run_id: :param threshold: :param encoding_size: The size in bytes of each encoded entry :param parent_span: A serialized opentracing span context. - @returns A 2-tuple: (num_results, results_filename_in_object_store) + :returns A 3-tuple: (num_results, result size in bytes, results_filename_in_object_store, ) """ log = logger.bind(pid=project_id, run_id=run_id) log.debug("Computing similarity for a chunk of filters") @@ -121,72 +118,68 @@ def compute_filter_similarity(chunk_info, project_id, run_id, threshold, encodin chunk_info_dp1, chunk_info_dp2 = chunk_info - t0 = time.time() - log.debug("Fetching and deserializing chunk of filters for dataprovider 1") - chunk_dp1, chunk_dp1_size = get_chunk_from_object_store(chunk_info_dp1, encoding_size) - #TODO: use the entity ids! - entity_ids_dp1, chunk_dp1 = zip(*chunk_dp1) - t1 = time.time() - log.debug("Fetching and deserializing chunk of filters for dataprovider 2") - chunk_dp2, chunk_dp2_size = get_chunk_from_object_store(chunk_info_dp2, encoding_size) - # TODO: use the entity ids! - entity_ids_dp2, chunk_dp2 = zip(*chunk_dp2) - t2 = time.time() - span.log_kv({'event': 'chunks are fetched and deserialized'}) - log.debug("Calculating filter similarity") + with DBConn() as conn: + with opentracing.tracer.start_span('fetching-encodings', child_of=span) as fetch_span: + with opentracing.tracer.start_span('chunk-1', child_of=fetch_span): + log.debug("Fetching and deserializing chunk of filters for dataprovider 1") + chunk_dp1, chunk_dp1_size = get_encoding_chunk(conn, chunk_info_dp1, encoding_size) + #TODO: use the entity ids! + entity_ids_dp1, chunk_dp1 = zip(*chunk_dp1) + with opentracing.tracer.start_span('chunk-2', child_of=fetch_span): + log.debug("Fetching and deserializing chunk of filters for dataprovider 2") + chunk_dp2, chunk_dp2_size = get_encoding_chunk(conn, chunk_info_dp2, encoding_size) + # TODO: use the entity ids! + entity_ids_dp2, chunk_dp2 = zip(*chunk_dp2) + + log.debug('Both chunks are fetched and deserialized') span.log_kv({'size1': chunk_dp1_size, 'size2': chunk_dp2_size}) - try: - chunk_results = anonlink.concurrency.process_chunk( - chunk_info, - (chunk_dp1, chunk_dp2), - anonlink.similarities.dice_coefficient_accelerated, - threshold, - k=min(chunk_dp1_size, chunk_dp2_size)) - except NotImplementedError as e: - log.warning("Encodings couldn't be compared using anonlink.") - return - t3 = time.time() - span.log_kv({'event': 'similarities calculated'}) - - # Update the number of comparisons completed - comparisons_computed = chunk_dp1_size * chunk_dp2_size - save_current_progress(comparisons_computed, run_id) - - t4 = time.time() - - sims, _, _ = chunk_results - num_results = len(sims) - - if num_results: - result_filename = Config.SIMILARITY_SCORES_FILENAME_FMT.format( - generate_code(12)) - log.info("Writing {} intermediate results to file: {}".format(num_results, result_filename)) - bytes_iter, file_size \ - = anonlink.serialization.dump_candidate_pairs_iter(chunk_results) - iter_stream = iterable_to_stream(bytes_iter) - - mc = connect_to_object_store() + with opentracing.tracer.start_span('comparing-encodings', child_of=span): + log.debug("Calculating filter similarity") try: - mc.put_object( - Config.MINIO_BUCKET, result_filename, iter_stream, file_size) - except minio.ResponseError as err: - log.warning("Failed to store result in minio") - raise - else: - result_filename = None - file_size = None - t5 = time.time() - - log.info("run={} Comparisons: {}, Links above threshold: {}".format(run_id, comparisons_computed, len(chunk_results))) - log.info("Prep: {:.3f} + {:.3f}, Solve: {:.3f}, Progress: {:.3f}, Save: {:.3f}, Total: {:.3f}".format( - t1 - t0, - t2 - t1, - t3 - t2, - t4 - t3, - t5 - t4, - t5 - t0) - ) + chunk_results = anonlink.concurrency.process_chunk( + chunk_info, + (chunk_dp1, chunk_dp2), + anonlink.similarities.dice_coefficient_accelerated, + threshold, + k=min(chunk_dp1_size, chunk_dp2_size)) + except NotImplementedError as e: + log.warning("Encodings couldn't be compared using anonlink.") + return + + log.debug('Encoding similarities calculated') + + with opentracing.tracer.start_span('save-comparison-progress', child_of=span): + # Update the number of comparisons completed + comparisons_computed = chunk_dp1_size * chunk_dp2_size + save_current_progress(comparisons_computed, run_id) + + with opentracing.tracer.start_span('save-comparison-results', child_of=span): + sims, _, _ = chunk_results + num_results = len(sims) + + if num_results: + result_filename = Config.SIMILARITY_SCORES_FILENAME_FMT.format( + generate_code(12)) + log.info("Writing {} intermediate results to file: {}".format(num_results, result_filename)) + + bytes_iter, file_size \ + = anonlink.serialization.dump_candidate_pairs_iter(chunk_results) + iter_stream = iterable_to_stream(bytes_iter) + + mc = connect_to_object_store() + try: + mc.put_object( + Config.MINIO_BUCKET, result_filename, iter_stream, file_size) + except minio.ResponseError as err: + log.warning("Failed to store result in minio") + raise + else: + result_filename = None + file_size = None + + log.info("Comparisons: {}, Links above threshold: {}".format(comparisons_computed, len(chunk_results))) + return num_results, file_size, result_filename From 1bdf1e86b9d892b58465e0b21f1d5ce110145965 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 3 Mar 2020 14:15:50 +1300 Subject: [PATCH 05/13] User provided binary packed uploads now go straight into db --- backend/entityservice/init-db-schema.sql | 2 +- backend/entityservice/views/project.py | 46 ++++++++++++------------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/backend/entityservice/init-db-schema.sql b/backend/entityservice/init-db-schema.sql index c1ac8974..42a48e7c 100644 --- a/backend/entityservice/init-db-schema.sql +++ b/backend/entityservice/init-db-schema.sql @@ -131,7 +131,7 @@ CREATE TABLE uploads ( token CHAR(48) NOT NULL UNIQUE, -- Filename for the raw unprocessed uploaded data - file CHAR(64) NOT NULL, + file CHAR(64) NULL, state PROCESSEDSTATE NOT NULL, diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 88827ebe..b463f362 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -2,17 +2,16 @@ import json import tempfile -import minio from flask import request from flask import g from structlog import get_logger import opentracing import entityservice.database as db +from entityservice.encoding_storage import store_encodings_in_db from entityservice.tasks import handle_raw_upload, check_for_executable_runs, remove_project from entityservice.tracing import serialize_span -from entityservice.utils import safe_fail_request, get_json, generate_code, get_stream, \ - clks_uploaded_to_project, fmt_bytes, iterable_to_stream +from entityservice.utils import safe_fail_request, get_json, generate_code, clks_uploaded_to_project, fmt_bytes from entityservice.database import DBConn, get_project_column from entityservice.views.auth_checks import abort_if_project_doesnt_exist, abort_if_invalid_dataprovider_token, \ abort_if_invalid_results_token, get_authorization_token_type_or_abort, abort_if_inconsistent_upload @@ -24,6 +23,8 @@ logger = get_logger() +DEFAULT_BLOCK_ID = '1' + def projects_get(): logger.info("Getting list of all projects") @@ -156,26 +157,27 @@ def project_binaryclks_post(project_id): stream = BytesIO(request.data) binary_formatter = binary_format(size) - def entity_id_injector(filter_stream): + def encoding_iterator(filter_stream): + # Assumes encoding id and block info not provided (yet) for entity_id in range(count): - yield binary_formatter.pack(entity_id, filter_stream.read(size)) + yield str(entity_id), binary_formatter.pack(entity_id, filter_stream.read(size)), [DEFAULT_BLOCK_ID] - data_with_ids = b''.join(entity_id_injector(stream)) expected_bytes = size * count log.debug(f"Stream size is {len(request.data)} B, and we expect {expected_bytes} B") if len(request.data) != expected_bytes: safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct") try: - receipt_token = upload_clk_data_binary(project_id, dp_id, BytesIO(data_with_ids), count, size) + receipt_token = upload_clk_data_binary(project_id, dp_id, encoding_iterator(stream), count, size) except ValueError: safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct.") else: safe_fail_request(400, "Content Type not supported") except Exception: - log.info("The dataprovider was not able to upload her clks," - " re-enable the corresponding upload token to be used.") + log.warning("The dataprovider was not able to upload their clks," + " re-enable the corresponding upload token to be used.") + with DBConn() as conn: db.set_dataprovider_upload_state(conn, dp_id, state='error') raise @@ -317,13 +319,13 @@ def authorise_get_request(project_id): return dp_id, project_object -def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128): +def upload_clk_data_binary(project_id, dp_id, encoding_iter, count, size=128): """ - Save the user provided raw CLK data. + Save the user provided binary-packed CLK data. """ receipt_token = generate_code() - filename = Config.BIN_FILENAME_FMT.format(receipt_token) + filename = None # Set the state to 'pending' in the uploads table with DBConn() as conn: db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, encoding_count=count, block_count=1) @@ -334,20 +336,18 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128): logger.debug("Directly storing binary file with index, base64 encoded CLK, popcount") - # Upload to object store - logger.info(f"Uploading {count} binary encodings to object store. Total size: {fmt_bytes(num_bytes)}") + # Upload to database + logger.info(f"Uploading {count} binary encodings to database. Total size: {fmt_bytes(num_bytes)}") parent_span = g.flask_tracer.get_span() - with opentracing.tracer.start_span('save-to-minio', child_of=parent_span): - mc = connect_to_object_store() - try: - mc.put_object(Config.MINIO_BUCKET, filename, data=raw_stream, length=num_bytes) - except (minio.error.InvalidSizeError, minio.error.InvalidArgumentError, minio.error.ResponseError): - logger.info("Mismatch between expected stream length and header info") - raise ValueError("Mismatch between expected stream length and header info") + with DBConn() as conn: + with opentracing.tracer.start_span('create-default-block-in-db', child_of=parent_span): + db.insert_blocking_metadata(conn, dp_id, {DEFAULT_BLOCK_ID: count}) - with opentracing.tracer.start_span('update-database-with-metadata', child_of=parent_span): - with DBConn() as conn: + with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span): + store_encodings_in_db(conn, dp_id, encoding_iter, size) + + with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span): db.update_encoding_metadata(conn, filename, dp_id, 'ready') # Now work out if all parties have added their data From 0ff7bf07a8fa1fa749144429a9f063c301bd9e55 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 3 Mar 2020 14:30:15 +1300 Subject: [PATCH 06/13] Stop storing uploaded encodings in minio Fix up opentracing spans --- .../entityservice/tasks/encoding_uploading.py | 56 ++++--------------- 1 file changed, 11 insertions(+), 45 deletions(-) diff --git a/backend/entityservice/tasks/encoding_uploading.py b/backend/entityservice/tasks/encoding_uploading.py index 22235def..0ee0e7db 100644 --- a/backend/entityservice/tasks/encoding_uploading.py +++ b/backend/entityservice/tasks/encoding_uploading.py @@ -24,7 +24,7 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): """ log = logger.bind(pid=project_id, dp_id=dp_id) log.info("Handling user provided base64 encodings") - + new_child_span = lambda name: handle_raw_upload.tracer.start_active_span(name, child_of=handle_raw_upload.span) with DBConn() as db: if not check_project_exists(db, project_id): log.info("Project deleted, stopping immediately") @@ -34,10 +34,10 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): log.info(f"Expecting to handle {expected_count} encodings in {block_count} blocks") mc = connect_to_object_store() - raw_file = Config.RAW_FILENAME_FMT.format(receipt_token) - raw_data = mc.get_object(Config.MINIO_BUCKET, raw_file) + input_filename = Config.RAW_FILENAME_FMT.format(receipt_token) + raw_data = mc.get_object(Config.MINIO_BUCKET, input_filename) - with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span): + with new_child_span('upload-encodings-to-db'): # stream encodings with block ids from uploaded file # convert each encoding to our internal binary format # output into database for each block (temp or direct to minio?) @@ -46,55 +46,21 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): with DBConn() as db: store_encodings_in_db(db, dp_id, pipeline, encoding_size) - #### GLUE CODE - TODO remove me once moved away from storing encodings in files - # Note we open the stream a second time - raw_data = mc.get_object(Config.MINIO_BUCKET, raw_file) - blocked_binary_data, encoding_size = convert_encodings_from_json_to_binary(raw_data) - assert block_count == len(blocked_binary_data) log.info(f"Converted uploaded encodings of size {encoding_size} bytes into internal binary format. Number of blocks: {block_count}") - if block_count == 0: - log.warning("No uploaded encoding blocks, stopping processing.") - # TODO mark run as failure? - return - elif block_count > 1: - raise NotImplementedError('Currently handle single block encodings - check back soon') - - #for block_id in blocked_binary_data: - block_id = list(blocked_binary_data.keys())[0] - actual_count = len(blocked_binary_data[block_id]) - log.info(f"{block_id=}, number of encodings: {actual_count}") - - # We peek at the first element as we need the encoding size - # for the rest of our processing pipeline. Note we now add 4 bytes to the encoding - # for the entity's identifier. - uploaded_encoding_size = len(blocked_binary_data[block_id][0]) - 4 - - # This is the first time we've seen the encoding size from this data provider + # As this is the first time we've seen the encoding size actually uploaded from this data provider + # We check it complies with the project encoding size. try: - check_dataproviders_encoding(project_id, uploaded_encoding_size) + check_dataproviders_encoding(project_id, encoding_size) except InvalidEncodingError as e: log.warning(e.args[0]) handle_invalid_encoding_data(project_id, dp_id) - with DBConn() as db: - # Save the encoding size as metadata - update_encoding_metadata_set_encoding_size(db, dp_id, uploaded_encoding_size) - - # Output file is our custom binary packed file - filename = Config.BIN_FILENAME_FMT.format(receipt_token) - bit_packed_element_size = binary_format(uploaded_encoding_size).size - num_bytes = actual_count * bit_packed_element_size - - with opentracing.tracer.start_span('process-encodings-in-quarantine', child_of=parent_span) as span: - packed_filter_stream = io.BytesIO(b''.join(blocked_binary_data[block_id])) - # Upload to object store - log.info(f"Uploading {expected_count} encodings of size {uploaded_encoding_size} " + - f"to object store. Total Size: {fmt_bytes(num_bytes)}") - mc.put_object(Config.MINIO_BUCKET, filename, data=packed_filter_stream, length=num_bytes) - with DBConn() as conn: - update_encoding_metadata(conn, filename, dp_id, 'ready') + with new_child_span('save-encoding-metadata'): + # Save the encoding size as metadata for this data provider + update_encoding_metadata_set_encoding_size(conn, dp_id, encoding_size) + update_encoding_metadata(conn, None, dp_id, 'ready') # Now work out if all parties have added their data if clks_uploaded_to_project(project_id, check_data_ready=True): From 07e2371c644a15b1351d9ffa2d0b73e8fcd672c6 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 3 Mar 2020 14:31:23 +1300 Subject: [PATCH 07/13] Fixup tracing in comparison task --- backend/entityservice/tasks/comparing.py | 41 ++++++++++++++---------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 5cddf21f..4a35f7f3 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -60,7 +60,7 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): log.info(f"Computing similarity for " f"{' x '.join(map(str, dataset_sizes))} entities") - current_span.log_kv({"event": 'get-dataset-sizes'}) + current_span.log_kv({"event": 'get-dataset-sizes', 'sizes': dataset_sizes}) log.debug("Chunking computation task") @@ -111,30 +111,36 @@ def compute_filter_similarity(chunk_info, project_id, run_id, threshold, encodin :returns A 3-tuple: (num_results, result size in bytes, results_filename_in_object_store, ) """ log = logger.bind(pid=project_id, run_id=run_id) + task_span = compute_filter_similarity.span + + def new_child_span(name): + return compute_filter_similarity.tracer.start_active_span( + name, + child_of=compute_filter_similarity.span) + log.debug("Computing similarity for a chunk of filters") - span = compute_filter_similarity.span + log.debug("Checking that the resource exists (in case of run being canceled/deleted)") assert_valid_run(project_id, run_id, log) chunk_info_dp1, chunk_info_dp2 = chunk_info with DBConn() as conn: - with opentracing.tracer.start_span('fetching-encodings', child_of=span) as fetch_span: - with opentracing.tracer.start_span('chunk-1', child_of=fetch_span): - log.debug("Fetching and deserializing chunk of filters for dataprovider 1") - chunk_dp1, chunk_dp1_size = get_encoding_chunk(conn, chunk_info_dp1, encoding_size) - #TODO: use the entity ids! - entity_ids_dp1, chunk_dp1 = zip(*chunk_dp1) - with opentracing.tracer.start_span('chunk-2', child_of=fetch_span): - log.debug("Fetching and deserializing chunk of filters for dataprovider 2") - chunk_dp2, chunk_dp2_size = get_encoding_chunk(conn, chunk_info_dp2, encoding_size) - # TODO: use the entity ids! - entity_ids_dp2, chunk_dp2 = zip(*chunk_dp2) + with new_child_span('fetching-encodings'): + log.debug("Fetching and deserializing chunk of filters for dataprovider 1") + chunk_with_ids_dp1, chunk_dp1_size = get_encoding_chunk(conn, chunk_info_dp1, encoding_size) + #TODO: use the entity ids! + entity_ids_dp1, chunk_dp1 = zip(*chunk_with_ids_dp1) + + log.debug("Fetching and deserializing chunk of filters for dataprovider 2") + chunk_with_ids_dp2, chunk_dp2_size = get_encoding_chunk(conn, chunk_info_dp2, encoding_size) + # TODO: use the entity ids! + entity_ids_dp2, chunk_dp2 = zip(*chunk_with_ids_dp2) log.debug('Both chunks are fetched and deserialized') - span.log_kv({'size1': chunk_dp1_size, 'size2': chunk_dp2_size}) + task_span.log_kv({'size1': chunk_dp1_size, 'size2': chunk_dp2_size}) - with opentracing.tracer.start_span('comparing-encodings', child_of=span): + with new_child_span('comparing-encodings'): log.debug("Calculating filter similarity") try: chunk_results = anonlink.concurrency.process_chunk( @@ -149,18 +155,19 @@ def compute_filter_similarity(chunk_info, project_id, run_id, threshold, encodin log.debug('Encoding similarities calculated') - with opentracing.tracer.start_span('save-comparison-progress', child_of=span): + with new_child_span('update-comparison-progress'): # Update the number of comparisons completed comparisons_computed = chunk_dp1_size * chunk_dp2_size save_current_progress(comparisons_computed, run_id) - with opentracing.tracer.start_span('save-comparison-results', child_of=span): + with new_child_span('save-comparison-results-to-minio'): sims, _, _ = chunk_results num_results = len(sims) if num_results: result_filename = Config.SIMILARITY_SCORES_FILENAME_FMT.format( generate_code(12)) + task_span.log_kv({"edges": num_results}) log.info("Writing {} intermediate results to file: {}".format(num_results, result_filename)) bytes_iter, file_size \ From c692bb23e827af4dec14802f1fbf5d21137d6aac Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 3 Mar 2020 16:09:38 +1300 Subject: [PATCH 08/13] Extract logging and span setup from views --- backend/entityservice/views/project.py | 40 +++++++++------------- backend/entityservice/views/run/list.py | 10 +++--- backend/entityservice/views/run/results.py | 5 +-- backend/entityservice/views/run/status.py | 4 +-- backend/entityservice/views/util.py | 15 ++++++++ 5 files changed, 41 insertions(+), 33 deletions(-) create mode 100644 backend/entityservice/views/util.py diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index b463f362..7cc3cbf1 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -20,6 +20,7 @@ from entityservice.serialization import binary_format from entityservice.settings import Config from entityservice.views.serialization import ProjectList, NewProjectResponse, ProjectDescription +from entityservice.views.util import bind_log_and_span logger = get_logger() @@ -105,20 +106,9 @@ def project_binaryclks_post(project_id): """ Update a project to provide encoded PII data. """ - log = logger.bind(pid=project_id) + log, parent_span = bind_log_and_span(project_id) headers = request.headers - - parent_span = g.flask_tracer.get_span() - - with opentracing.tracer.start_span('check-auth', child_of=parent_span) as span: - abort_if_project_doesnt_exist(project_id) - if headers is None or 'Authorization' not in headers: - safe_fail_request(401, message="Authentication token required") - - token = headers['Authorization'] - - # Check the caller has valid token -> otherwise 403 - abort_if_invalid_dataprovider_token(token) + token = precheck_encoding_upload(project_id, headers, parent_span) with DBConn() as conn: dp_id = db.get_dataprovider_id(conn, token) @@ -186,25 +176,29 @@ def encoding_iterator(filter_stream): return {'message': 'Updated', 'receipt_token': receipt_token}, 201 -def project_clks_post(project_id): - """ - Update a project to provide encoded PII data. - """ - log = logger.bind(pid=project_id) - headers = request.headers - - parent_span = g.flask_tracer.get_span() - +def precheck_encoding_upload(project_id, headers, parent_span): with opentracing.tracer.start_span('check-auth', child_of=parent_span) as span: abort_if_project_doesnt_exist(project_id) if headers is None or 'Authorization' not in headers: safe_fail_request(401, message="Authentication token required") token = headers['Authorization'] - #span.set_tag("headers", headers) # Check the caller has valid token -> otherwise 403 abort_if_invalid_dataprovider_token(token) + return token + + +def project_clks_post(project_id): + """ + Update a project to provide encoded PII data. + """ + + headers = request.headers + + log, parent_span = bind_log_and_span(project_id) + + token = precheck_encoding_upload(project_id, headers, parent_span) with DBConn() as conn: dp_id = db.get_dataprovider_id(conn, token) diff --git a/backend/entityservice/views/run/list.py b/backend/entityservice/views/run/list.py index 95b28b52..2404832a 100644 --- a/backend/entityservice/views/run/list.py +++ b/backend/entityservice/views/run/list.py @@ -6,6 +6,7 @@ from entityservice.database import get_runs from entityservice.models.run import Run from entityservice.utils import safe_fail_request +from entityservice.views import bind_log_and_span from entityservice.views.auth_checks import abort_if_project_doesnt_exist, abort_if_invalid_results_token, \ abort_if_project_in_error_state from entityservice.views.serialization import RunList, RunDescription @@ -15,8 +16,8 @@ def get(project_id): - log = logger.bind(pid=project_id) - log.info("Listing runs for project: {}".format(project_id)) + log, parent_span = bind_log_and_span(project_id) + log.info("Listing runs for project") authorize_run_listing(project_id) @@ -28,7 +29,7 @@ def get(project_id): def post(project_id, run): - log = logger.bind(pid=project_id) + log, span = bind_log_and_span(project_id) log.debug("Processing request to add a new run", run=run) # Check the resource exists abort_if_project_doesnt_exist(project_id) @@ -45,9 +46,6 @@ def post(project_id, run): with db.DBConn() as db_conn: run_model.save(db_conn) - span = g.flask_tracer.get_span() - span.set_tag("run_id", run_model.run_id) - span.set_tag("project_id", run_model.project_id) check_for_executable_runs.delay(project_id, serialize_span(span)) return RunDescription().dump(run_model), 201 diff --git a/backend/entityservice/views/run/results.py b/backend/entityservice/views/run/results.py index de161ebc..f8db0562 100644 --- a/backend/entityservice/views/run/results.py +++ b/backend/entityservice/views/run/results.py @@ -5,15 +5,16 @@ from entityservice import database as db from entityservice.serialization import get_similarity_scores from entityservice.utils import safe_fail_request +from entityservice.views import bind_log_and_span from entityservice.views.auth_checks import abort_if_run_doesnt_exist, get_authorization_token_type_or_abort logger = get_logger() def get(project_id, run_id): - log = logger.bind(pid=project_id, rid=run_id) + log, parent_span = bind_log_and_span(project_id, run_id) log.info("Checking for results of run.") - parent_span = g.flask_tracer.get_span() + with opentracing.tracer.start_span('check-auth', child_of=parent_span) as span: # Check the project and run resources exist abort_if_run_doesnt_exist(project_id, run_id) diff --git a/backend/entityservice/views/run/status.py b/backend/entityservice/views/run/status.py index 693e3af6..ff5e50ea 100644 --- a/backend/entityservice/views/run/status.py +++ b/backend/entityservice/views/run/status.py @@ -4,6 +4,7 @@ from entityservice.cache import progress as progress_cache from entityservice import database as db +from entityservice.views import bind_log_and_span from entityservice.views.auth_checks import abort_if_run_doesnt_exist, get_authorization_token_type_or_abort from entityservice.views.serialization import completed, running, error from entityservice.models.run import RUN_TYPES @@ -12,8 +13,7 @@ def get(project_id, run_id): - log = logger.bind(pid=project_id, rid=run_id) - parent_span = g.flask_tracer.get_span() + log, parent_span = bind_log_and_span(project_id, run_id) log.debug("request run status") with opentracing.tracer.start_span('check-auth', child_of=parent_span) as span: # Check the project and run resources exist diff --git a/backend/entityservice/views/util.py b/backend/entityservice/views/util.py new file mode 100644 index 00000000..a3763282 --- /dev/null +++ b/backend/entityservice/views/util.py @@ -0,0 +1,15 @@ +from flask import g +from structlog import get_logger + +logger = get_logger() + + +def bind_log_and_span(project_id, run_id=None): + + log = logger.bind(pid=project_id) + parent_span = g.flask_tracer.get_span() + parent_span.set_tag("project_id", project_id) + if run_id is not None: + log = log.bind(rid=run_id) + parent_span.set_tag("run_id", run_id) + return log, parent_span From 40dd27f8054aa281c13838a76d9817df437b2de4 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 3 Mar 2020 16:10:03 +1300 Subject: [PATCH 09/13] Handle null filename Remove unused ijson import --- backend/entityservice/database/selections.py | 6 ++++-- backend/entityservice/error_checking.py | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 0a10708a..532ed827 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -319,10 +319,12 @@ def get_encodings_by_id_range(db, dp_id, encoding_id_min=None, encoding_id_max=N def get_filter_metadata(db, dp_id): """ - :return: The filename and the encoding size of the raw clks. + :return: The filename (which could be None), and the encoding size of the raw clks. """ filename, encoding_size = get_uploads_columns(db, dp_id, ['file', 'encoding_size']) - return filename.strip(), encoding_size + if filename is not None: + filename = filename.strip() + return filename, encoding_size def get_encoding_metadata(db, dp_id): diff --git a/backend/entityservice/error_checking.py b/backend/entityservice/error_checking.py index 6a120419..8b2b4138 100644 --- a/backend/entityservice/error_checking.py +++ b/backend/entityservice/error_checking.py @@ -32,4 +32,5 @@ def handle_invalid_encoding_data(project_id, dp_id): with DBConn() as conn: filename, _ = get_filter_metadata(conn, dp_id) update_encoding_metadata(conn, 'DELETED', dp_id, state='error') - delete_minio_objects.delay([filename], project_id) + if filename is not None: + delete_minio_objects.delay([filename], project_id) From d5f35780e80202ded54cde9c150d465260c6e669 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 3 Mar 2020 16:48:46 +1300 Subject: [PATCH 10/13] Minor cleanup to imports and unused functions Remove unused ijson import Remove convert_encodings_from_json_to_binary and unused serialization. --- backend/entityservice/__init__.py | 4 -- backend/entityservice/encoding_storage.py | 30 +-------------- backend/entityservice/object_store.py | 3 -- backend/entityservice/serialization.py | 38 +------------------ .../entityservice/tasks/encoding_uploading.py | 3 +- .../tests/test_encoding_storage.py | 32 ++++++++-------- 6 files changed, 21 insertions(+), 89 deletions(-) diff --git a/backend/entityservice/__init__.py b/backend/entityservice/__init__.py index e2551905..35969185 100644 --- a/backend/entityservice/__init__.py +++ b/backend/entityservice/__init__.py @@ -4,10 +4,6 @@ from flask import g, request import structlog from tenacity import RetryError -try: - import ijson.backends.yajl2_cffi as ijson -except ImportError: - import ijson from entityservice.logger_setup import setup_logging diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index 73318bc2..081e4e82 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -4,7 +4,7 @@ import ijson -from entityservice.database import insert_encodings_into_blocks, get_encodings_by_id_range, get_dataprovider_ids +from entityservice.database import insert_encodings_into_blocks, get_encodings_by_id_range from entityservice.serialization import deserialize_bytes, binary_format, binary_unpack_filters @@ -50,6 +50,7 @@ def generator(first_i, first_encoding_data, first_blocks): return encoding_size, generator(i, encoding_data, blocks) + def _grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" @@ -106,30 +107,3 @@ def get_encoding_chunk(conn, chunk_info, encoding_size=128): chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size) return chunk_data, len(chunk_data) - -def convert_encodings_from_json_to_binary(f): - """ - Temp helper function - - :param f: File-like object containing `clksnblocks` json. - :return: a tuple comprising: - - dict mapping blocks to lists of bytes (each encoding in our internal encoding file format) - - the size of the first encoding in bytes (excluding entity ID info) - """ - # Each block index contains a set of base64 encodings. - encodings_by_block = {} - - # Default which is ignored but makes IDE/typechecker happier - bit_packing_struct = binary_format(128) - encoding_size = None - - for i, encoding_data, blocks in stream_json_clksnblocks(f): - if encoding_size is None: - encoding_size = len(encoding_data) - bit_packing_struct = binary_format(encoding_size) - binary_packed_encoding = bit_packing_struct.pack(i, encoding_data) - for block in blocks: - encodings_by_block.setdefault(block, []).append(binary_packed_encoding) - - return encodings_by_block, encoding_size - diff --git a/backend/entityservice/object_store.py b/backend/entityservice/object_store.py index ef2ff324..8a0b4602 100644 --- a/backend/entityservice/object_store.py +++ b/backend/entityservice/object_store.py @@ -1,9 +1,6 @@ import minio from structlog import get_logger -import psycopg2 -from entityservice.database import insert_similarity_score_file -from entityservice.errors import RunDeleted from entityservice.settings import Config as config logger = get_logger('objectstore') diff --git a/backend/entityservice/serialization.py b/backend/entityservice/serialization.py index c5a558e7..acc4ba80 100644 --- a/backend/entityservice/serialization.py +++ b/backend/entityservice/serialization.py @@ -1,8 +1,7 @@ -import io import typing import urllib3 -from bitarray import bitarray + import base64 import struct @@ -12,7 +11,7 @@ from entityservice.object_store import connect_to_object_store from entityservice.settings import Config as config -from entityservice.utils import chunks, safe_fail_request, iterable_to_stream +from entityservice.utils import chunks, safe_fail_request import concurrent.futures @@ -103,39 +102,6 @@ def binary_unpack_filters(data_iterable, max_bytes=None, encoding_size=None): return filters -def deserialize_filters(filters): - """ - Deserialize iterable of base64 encoded clks. - - Carrying out the popcount and adding the index as we go. - """ - res = [] - for i, f in enumerate(filters): - ba = deserialize_bitarray(f) - res.append((ba, i, ba.count())) - return res - - -def deserialize_filters_concurrent(filters): - """ - Deserialize iterable of base64 encoded clks. - - Carrying out the popcount and adding the index as we go. - """ - res = [] - chunk_size = int(100000) - with concurrent.futures.ProcessPoolExecutor() as executor: - futures = [] - for i, chunk in enumerate(chunks(filters, chunk_size)): - future = executor.submit(deserialize_filters, chunk) - futures.append(future) - - for future in futures: - res.extend(future.result()) - - return res - - def generate_scores(candidate_pair_stream: typing.BinaryIO): """ Processes a TextIO stream of candidate pair similarity scores into diff --git a/backend/entityservice/tasks/encoding_uploading.py b/backend/entityservice/tasks/encoding_uploading.py index 0ee0e7db..9a06f567 100644 --- a/backend/entityservice/tasks/encoding_uploading.py +++ b/backend/entityservice/tasks/encoding_uploading.py @@ -4,11 +4,10 @@ from entityservice.database import * from entityservice.encoding_storage import stream_json_clksnblocks, convert_encodings_from_base64_to_binary, \ - convert_encodings_from_json_to_binary, store_encodings_in_db + store_encodings_in_db from entityservice.error_checking import check_dataproviders_encoding, handle_invalid_encoding_data, \ InvalidEncodingError from entityservice.object_store import connect_to_object_store -from entityservice.serialization import binary_format from entityservice.settings import Config from entityservice.async_worker import celery, logger from entityservice.tasks.base_task import TracedTask diff --git a/backend/entityservice/tests/test_encoding_storage.py b/backend/entityservice/tests/test_encoding_storage.py index b34048ad..174c93af 100644 --- a/backend/entityservice/tests/test_encoding_storage.py +++ b/backend/entityservice/tests/test_encoding_storage.py @@ -1,36 +1,36 @@ from pathlib import Path import io -from entityservice.encoding_storage import convert_encodings_from_json_to_binary +import pytest + +from entityservice.encoding_storage import stream_json_clksnblocks from entityservice.tests.util import serialize_bytes class TestEncodingStorage: def test_convert_encodings_from_json_to_binary_simple(self): filename = Path(__file__).parent / 'testdata' / 'test_encoding.json' - with open(filename) as f: - binary_dict, encoding_length = convert_encodings_from_json_to_binary(f) - assert encoding_length == 128 - assert len(binary_dict) == 2 - assert len(binary_dict['1']) == 3 - assert len(binary_dict['2']) == 2 - assert len(set(binary_dict['1']).intersection(binary_dict['2'])) == 1 + with open(filename, 'rb') as f: + # stream_json_clksnblocks produces a generator of (entity_id, base64 encoding, list of blocks) + encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(f))) + assert len(encoding_ids) == 4 + assert len(encodings) == 4 + assert len(blocks[0]) == 1 + assert blocks[0][0] == '1' def test_convert_encodings_from_json_to_binary_empty(self): empty = io.BytesIO(b'''{ "clksnblocks": [] }''') - binary_dict, encoding_length = convert_encodings_from_json_to_binary(empty) - assert binary_dict == {} - assert encoding_length is None + with pytest.raises(StopIteration): + next(stream_json_clksnblocks(empty)) def test_convert_encodings_from_json_to_binary_short(self): d = serialize_bytes(b'abcdabcd') json_data = io.BytesIO(b'{' + f'''"clksnblocks": [["{d}", "02"]]'''.encode() + b'}') - binary_dict, encoding_length = convert_encodings_from_json_to_binary(json_data) - assert encoding_length == 8 - assert "02" in binary_dict - assert len(binary_dict["02"]) == 1 - assert len(binary_dict["02"][0]) == 8 + 4 # size of encoding + id + encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data))) + + assert len(encodings[0]) == 8 + assert "02" in blocks[0] From 631aa6713db64d815415ca474c014bfd9d9d78a2 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 4 Mar 2020 11:29:00 +1300 Subject: [PATCH 11/13] Add test to check stored encodings in db Also ensure encodings are returned in order --- backend/entityservice/database/selections.py | 7 ++++++- .../dbtests/test_insertions.py | 18 ++++++++++-------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 532ed827..2a801e1a 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -282,6 +282,8 @@ def get_encodingblock_ids(db, dp_id, block_name=None): FROM encodingblocks WHERE dp = %s {} + ORDER BY + encoding_ID ASC """.format("AND block_id = %s" if block_name else "") # Specifying a name for the cursor creates a server-side cursor, which prevents all of the # records from being downloaded at once. @@ -306,6 +308,8 @@ def get_encodings_by_id_range(db, dp_id, encoding_id_min=None, encoding_id_max=N WHERE dp = %s {} {} + ORDER BY + encoding_id ASC """.format( f"AND encoding_id >= {encoding_id_min}" if encoding_id_min else "", f"AND encoding_id < {encoding_id_max}" if encoding_id_max else "", @@ -314,7 +318,8 @@ def get_encodings_by_id_range(db, dp_id, encoding_id_min=None, encoding_id_max=N cur.execute(sql_query, (dp_id,)) rows = cur.fetchall() for row in rows: - yield row[0] + # Note row[0] is a memoryview + yield bytes(row[0]) def get_filter_metadata(db, dp_id): diff --git a/backend/entityservice/integrationtests/dbtests/test_insertions.py b/backend/entityservice/integrationtests/dbtests/test_insertions.py index 05793e9c..5a430d72 100644 --- a/backend/entityservice/integrationtests/dbtests/test_insertions.py +++ b/backend/entityservice/integrationtests/dbtests/test_insertions.py @@ -4,8 +4,8 @@ import psycopg2 from pytest import raises -from entityservice.database import insert_dataprovider, insert_new_project, \ - insert_encodings_into_blocks, insert_blocking_metadata, get_project, get_encodingblock_ids +from entityservice.database import insert_dataprovider, insert_encodings_into_blocks, insert_blocking_metadata, \ + get_project, get_encodingblock_ids, get_encodings_by_id_range from entityservice.models import Project from entityservice.tests.util import generate_bytes from entityservice.utils import generate_code @@ -88,12 +88,14 @@ def test_insert_many_clks(self): assert elapsed_time < 2 stored_encoding_ids = list(get_encodingblock_ids(conn, dp_id, '1')) - fetch_time = time.perf_counter() - end_time + id_fetch_time = time.perf_counter() - end_time # retrieval of encoding ids should be much faster than insertion - assert fetch_time < elapsed_time - + assert id_fetch_time < elapsed_time assert len(stored_encoding_ids) == num_entities - for stored_encoding_id, original in zip(stored_encoding_ids, range(num_entities)): - assert stored_encoding_id == original + for stored_encoding_id, original_id in zip(stored_encoding_ids, range(num_entities)): + assert stored_encoding_id == original_id - # TODO fetch binary encodings and verify against uploaded + stored_encodings = list(get_encodings_by_id_range(conn, dp_id)) + assert len(stored_encodings) == num_entities + for stored_encoding, original_encoding in zip(stored_encodings, encodings): + assert stored_encoding == original_encoding From ecd6cfe52b0d304fb2d0b5ed461c80c69216d792 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 16 Mar 2020 12:57:46 +1300 Subject: [PATCH 12/13] Remove timing requirements from insertion test --- .../integrationtests/dbtests/test_insertions.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/backend/entityservice/integrationtests/dbtests/test_insertions.py b/backend/entityservice/integrationtests/dbtests/test_insertions.py index 5a430d72..c62a3cf0 100644 --- a/backend/entityservice/integrationtests/dbtests/test_insertions.py +++ b/backend/entityservice/integrationtests/dbtests/test_insertions.py @@ -74,23 +74,14 @@ def test_insert_many_clks(self): num_entities = 10_000 blocks = [['1'] for _ in range(num_entities)] encodings = [data[i % 100] for i in range(num_entities)] - start_time = time.perf_counter() insert_encodings_into_blocks(conn, dp_id, block_ids=blocks, encoding_ids=list(range(num_entities)), encodings=encodings ) - end_time = time.perf_counter() - elapsed_time = end_time - start_time - # This takes ~0.5s using docker compose on a ~5yo desktop. - # If the database is busy - e.g. if you're running integration - # tests and e2e tests at the same time, this assertion could fail. - assert elapsed_time < 2 stored_encoding_ids = list(get_encodingblock_ids(conn, dp_id, '1')) - id_fetch_time = time.perf_counter() - end_time - # retrieval of encoding ids should be much faster than insertion - assert id_fetch_time < elapsed_time + assert len(stored_encoding_ids) == num_entities for stored_encoding_id, original_id in zip(stored_encoding_ids, range(num_entities)): assert stored_encoding_id == original_id From 23e78d43f55823b9c402baf8de00101345d29849 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 16 Mar 2020 12:59:34 +1300 Subject: [PATCH 13/13] Add docstring to precheck_encoding_upload function --- backend/entityservice/views/project.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 7cc3cbf1..32582d6b 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -177,6 +177,10 @@ def encoding_iterator(filter_stream): def precheck_encoding_upload(project_id, headers, parent_span): + """ + Raise a `ProblemException` if the project doesn't exist or the + authentication token passed in the headers isn't valid. + """ with opentracing.tracer.start_span('check-auth', child_of=parent_span) as span: abort_if_project_doesnt_exist(project_id) if headers is None or 'Authorization' not in headers: