From 065b11aa0d621b4cafda5e39cc3fbfaf7be1a63a Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 4 Mar 2020 14:27:17 +1300 Subject: [PATCH 01/27] Add function to fetch block ids and sizes from db --- backend/entityservice/database/selections.py | 25 +++++++++++++++++-- .../dbtests/test_insertions.py | 11 ++++++-- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 2a801e1a..5b9ebb9b 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -288,16 +288,37 @@ def get_encodingblock_ids(db, dp_id, block_name=None): # Specifying a name for the cursor creates a server-side cursor, which prevents all of the # records from being downloaded at once. cur = db.cursor(f'encodingblockfetcher-{dp_id}') - args = (dp_id, block_name) if block_name else (dp_id,) + cur.execute(sql_query, args) + yield from iterate_cursor_results(cur) + +def get_block_metadata(db, dp_id): + """Yield block id and counts for a given data provider.""" + sql_query = """ + SELECT block_name, count + FROM blocks + WHERE dp = %s + """ + # Specifying a name for the cursor creates a server-side cursor, which prevents all of the + # records from being downloaded at once. + cur = db.cursor(f'blockfetcher-{dp_id}') + args = (dp_id,) cur.execute(sql_query, args) + for block_name, count in iterate_cursor_results(cur, one=False): + yield block_name.strip(), count + + +def iterate_cursor_results(cur, one=True): while True: rows = cur.fetchmany(10_000) if not rows: break for row in rows: - yield row[0] + if one: + yield row[0] + else: + yield row def get_encodings_by_id_range(db, dp_id, encoding_id_min=None, encoding_id_max=None): diff --git a/backend/entityservice/integrationtests/dbtests/test_insertions.py b/backend/entityservice/integrationtests/dbtests/test_insertions.py index c62a3cf0..bab6e894 100644 --- a/backend/entityservice/integrationtests/dbtests/test_insertions.py +++ b/backend/entityservice/integrationtests/dbtests/test_insertions.py @@ -5,7 +5,7 @@ from pytest import raises from entityservice.database import insert_dataprovider, insert_encodings_into_blocks, insert_blocking_metadata, \ - get_project, get_encodingblock_ids, get_encodings_by_id_range + get_project, get_encodingblock_ids, get_encodings_by_id_range, get_block_metadata from entityservice.models import Project from entityservice.tests.util import generate_bytes from entityservice.utils import generate_code @@ -30,7 +30,7 @@ def _create_project_and_dp(self): conn, cur = self._get_conn_and_cursor() # create a default block - insert_blocking_metadata(conn, dp_id, {'1': 99}) + insert_blocking_metadata(conn, dp_id, {'1': 10000}) conn.commit() assert len(dp_auth_token) == 48 @@ -90,3 +90,10 @@ def test_insert_many_clks(self): assert len(stored_encodings) == num_entities for stored_encoding, original_encoding in zip(stored_encodings, encodings): assert stored_encoding == original_encoding + + block_names, block_sizes = zip(*list(get_block_metadata(conn, dp_id))) + + assert len(block_names) == 1 + assert len(block_sizes) == 1 + assert block_names[0] == '1' + assert block_sizes[0] == 10_000 From 6f7c75c0b949cb532a1ce2bac19d0b665da14e60 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 4 Mar 2020 14:27:44 +1300 Subject: [PATCH 02/27] Retrieve blocking info in create_comparison_jobs task --- backend/entityservice/tasks/comparing.py | 61 ++++++++++++++++++------ 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 4a35f7f3..88ccee10 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -1,5 +1,6 @@ import array import heapq +import itertools import operator import time @@ -14,15 +15,14 @@ from entityservice.cache.encodings import remove_from_cache from entityservice.cache.progress import save_current_progress from entityservice.encoding_storage import get_encoding_chunk -from entityservice.errors import RunDeleted +from entityservice.errors import RunDeleted, InactiveRun from entityservice.database import ( check_project_exists, check_run_exists, DBConn, get_dataprovider_ids, - get_filter_metadata, get_project_column, get_project_dataset_sizes, + get_project_column, get_project_dataset_sizes, get_project_encoding_size, get_run, insert_similarity_score_file, - update_run_mark_failure) + update_run_mark_failure, get_block_metadata) from entityservice.models.run import progress_run_stage as progress_stage from entityservice.object_store import connect_to_object_store -from entityservice.serialization import get_chunk_from_object_store from entityservice.settings import Config from entityservice.tasks.base_task import TracedTask, celery_bug_fix, on_chord_error from entityservice.tasks.solver import solver_task @@ -31,26 +31,54 @@ from entityservice.utils import generate_code, iterable_to_stream +def check_run_active(conn, project_id, run_id): + """Raises InactiveRun if the project or run has been deleted from the database. + """ + if not check_project_exists(conn, project_id) or not check_run_exists(conn, project_id, run_id): + raise InactiveRun("Skipping as project or run not found in database.") + + @celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'run_id')) def create_comparison_jobs(project_id, run_id, parent_span=None): + """Schedule all the entity comparisons as sub tasks for a run. + + At a high level this task: + - checks if the project and run have been deleted and if so aborts. + - retrieves metadata: the number and size of the datasets, the encoding size, + and the number and size of blocks. + - splits the work into independent "chunks" and schedules them to run in celery + - schedules the follow up task to run after all the comparisons have been computed. + """ log = logger.bind(pid=project_id, run_id=run_id) + current_span = create_comparison_jobs.span with DBConn() as conn: + check_run_active(conn, project_id, run_id) dp_ids = get_dataprovider_ids(conn, project_id) - assert len(dp_ids) >= 2, "Expected at least 2 data providers" - log.info(f"Starting comparison of CLKs from data provider ids: " + number_of_datasets = len(dp_ids) + assert number_of_datasets >= 2, "Expected at least 2 data providers" + log.info(f"Scheduling comparison of CLKs from data provider ids: " f"{', '.join(map(str, dp_ids))}") - current_span = create_comparison_jobs.span - - if not check_project_exists(conn, project_id) or not check_run_exists(conn, project_id, run_id): - log.info("Skipping as project or run not found in database.") - return - - run_info = get_run(conn, run_id) - threshold = run_info['threshold'] + # Retrieve required metadata dataset_sizes = get_project_dataset_sizes(conn, project_id) + # {dp_id -> {block_id -> block_size}} + # e.g. {33: {'1': 100}, 34: {'1': 100}, 35: {'1': 100}} + dp_block_sizes = {} + for dp_id in dp_ids: + dp_block_sizes[dp_id] = dict(get_block_metadata(conn, dp_id)) + + log.info("Finding blocks in common between dataproviders") + # block_id -> List(pairs of dp ids) + # e.g. {'1': [(26, 27), (26, 28), (27, 28)]} + blocks = {} + for dp1, dp2 in itertools.combinations(dp_ids, 2): + # Get the intersection of blocks between these two dataproviders + common_block_ids = set(dp_block_sizes[dp1]).intersection(set(dp_block_sizes[dp2])) + for block_id in common_block_ids: + blocks.setdefault(block_id, []).append((dp1, dp2)) + if len(dataset_sizes) < 2: log.warning("Unexpected number of dataset sizes in db. Stopping") update_run_mark_failure(conn, run_id) @@ -62,6 +90,9 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): f"{' x '.join(map(str, dataset_sizes))} entities") current_span.log_kv({"event": 'get-dataset-sizes', 'sizes': dataset_sizes}) + # Pass the threshold to the comparison tasks to minimize their db lookups + threshold = get_run(conn, run_id)['threshold'] + log.debug("Chunking computation task") chunk_infos = tuple(anonlink.concurrency.split_to_chunks( @@ -78,6 +109,8 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): current_span.log_kv({"event": "chunking", 'num_chunks': len(chunk_infos)}) span_serialized = create_comparison_jobs.get_serialized_span() + + # Prepare the Celery Chord that will compute all the similarity scores: scoring_tasks = [compute_filter_similarity.si( chunk_info, From ebcb2485c83bd20bbbd77a521e64ffaa6d57a6f8 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 4 Mar 2020 16:13:09 +1300 Subject: [PATCH 03/27] WIP - identify blocks that need to be broken up further --- backend/entityservice/tasks/comparing.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 88ccee10..58b1a627 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -86,6 +86,30 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): encoding_size = get_project_encoding_size(conn, project_id) + # Now look at the sizes of the blocks, some may have to be broken into + # chunks, others will directly be compared as a full block. In either case + # we create a CandidatePairSubset object which describes the required + # comparisons. + chunks = [] + for block_id in blocks: + # Note there might be many pairs of datasets in this block + for dp1, dp2 in blocks[block_id]: + size1 = dp_block_sizes[dp1][block_id] + size2 = dp_block_sizes[dp2][block_id] + + comparisons = size1 * size2 + if comparisons > Config.CHUNK_SIZE_AIM: + log.warning("Need to chunk it up") + # A task will request a range of encodings for a block + # First cut can probably use postgres' LIMIT OFFSET + raise NotImplemented + else: + # Add the full block as a chunk + # TODO do we provide a range (I think it should be optional) + chunk_info_1 = {"dataproviderId": dp1, "range": (0, size1), "block_id": block_id} + chunk_info_2 = {"dataproviderId": dp2, "range": (0, size2), "block_id": block_id} + chunks.append((chunk_info_1, chunk_info_2)) + log.info(f"Computing similarity for " f"{' x '.join(map(str, dataset_sizes))} entities") current_span.log_kv({"event": 'get-dataset-sizes', 'sizes': dataset_sizes}) From a066ccc28dda7f578efc6601cf35abe30506595e Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Fri, 6 Mar 2020 17:15:07 +1300 Subject: [PATCH 04/27] Query for getting encodings in a block --- backend/entityservice/database/selections.py | 73 +++++++++++++------ .../dbtests/test_insertions.py | 30 +++++++- 2 files changed, 76 insertions(+), 27 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 5b9ebb9b..87a186a7 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -275,20 +275,22 @@ def get_uploads_columns(db, dp_id, columns): return [result[column] for column in columns] -def get_encodingblock_ids(db, dp_id, block_name=None): +def get_encodingblock_ids(db, dp_id, block_name=None, offset=0, limit=None): """Yield all encoding ids in either a single block, or all blocks for a given data provider.""" sql_query = """ SELECT encoding_id FROM encodingblocks - WHERE dp = %s + WHERE dp = %(dp_id)s {} ORDER BY - encoding_ID ASC - """.format("AND block_id = %s" if block_name else "") + encoding_ID ASC + OFFSET %(offset)s + LIMIT %(limit)s + """.format("AND block_id = %(block_id)s" if block_name else "") # Specifying a name for the cursor creates a server-side cursor, which prevents all of the # records from being downloaded at once. cur = db.cursor(f'encodingblockfetcher-{dp_id}') - args = (dp_id, block_name) if block_name else (dp_id,) + args = {'dp_id': dp_id, 'block_id': block_name, 'offset': offset, 'limit': limit} cur.execute(sql_query, args) yield from iterate_cursor_results(cur) @@ -309,9 +311,9 @@ def get_block_metadata(db, dp_id): yield block_name.strip(), count -def iterate_cursor_results(cur, one=True): +def iterate_cursor_results(cur, one=True, page_size=4096): while True: - rows = cur.fetchmany(10_000) + rows = cur.fetchmany(page_size) if not rows: break for row in rows: @@ -320,27 +322,50 @@ def iterate_cursor_results(cur, one=True): else: yield row +# +# def get_block_of_encodings(db, dp_id, block_id, offset=0, limit=None): +# """Yield a block of all encodings - optionally with an offset and limit - for a data provider. +# +# I don't understand why this is slower than fetching all encoding_ids to Python and then fetching +# all the encoding in the below version. +# """ +# sql_query = """ +# SELECT encoding +# FROM encodings, +# encodingblocks +# WHERE encodings.dp = %(dp_id)s +# AND encodingblocks.dp = %(dp_id)s +# and encodingblocks.block_id = %(block_id)s +# AND encodingblocks.encoding_id = encodings.encoding_id +# ORDER BY encodingblocks.encoding_id ASC +# OFFSET %(offset)s +# LIMIT %(limit)s +# """ +# +# cur = db.cursor(f"dp{dp_id}-block{block_id}-{offset}-{limit}") +# cur.execute(sql_query, {'dp_id': dp_id, 'block_id': block_id, 'offset': offset, 'limit': limit}) +# # Note encoding is returned as a memoryview +# for encoding in iterate_cursor_results(cur, one=True): +# yield encoding + + +def get_chunk_of_encodings(db, dp_id, encoding_ids): + """Yield a chunk of encodings for a data provider given the encoding ids. -def get_encodings_by_id_range(db, dp_id, encoding_id_min=None, encoding_id_max=None): - """Yield all encodings in a given range for a given data provider.""" + """ sql_query = """ - SELECT encoding + SELECT encoding FROM encodings - WHERE dp = %s - {} - {} - ORDER BY - encoding_id ASC - """.format( - f"AND encoding_id >= {encoding_id_min}" if encoding_id_min else "", - f"AND encoding_id < {encoding_id_max}" if encoding_id_max else "", - ) + WHERE encodings.dp = %(dp_id)s + AND encodings.encoding_id in ({}) + ORDER BY encoding_id ASC + """.format(','.join(map(str, encoding_ids))) + cur = db.cursor() - cur.execute(sql_query, (dp_id,)) - rows = cur.fetchall() - for row in rows: - # Note row[0] is a memoryview - yield bytes(row[0]) + cur.execute(sql_query, {'dp_id': dp_id}) + # Note encoding is returned as a memoryview + for encoding in iterate_cursor_results(cur, one=True): + yield encoding def get_filter_metadata(db, dp_id): diff --git a/backend/entityservice/integrationtests/dbtests/test_insertions.py b/backend/entityservice/integrationtests/dbtests/test_insertions.py index bab6e894..e54364e7 100644 --- a/backend/entityservice/integrationtests/dbtests/test_insertions.py +++ b/backend/entityservice/integrationtests/dbtests/test_insertions.py @@ -5,7 +5,7 @@ from pytest import raises from entityservice.database import insert_dataprovider, insert_encodings_into_blocks, insert_blocking_metadata, \ - get_project, get_encodingblock_ids, get_encodings_by_id_range, get_block_metadata + get_project, get_encodingblock_ids, get_block_metadata, get_chunk_of_encodings from entityservice.models import Project from entityservice.tests.util import generate_bytes from entityservice.utils import generate_code @@ -79,6 +79,7 @@ def test_insert_many_clks(self): encoding_ids=list(range(num_entities)), encodings=encodings ) + conn.commit() stored_encoding_ids = list(get_encodingblock_ids(conn, dp_id, '1')) @@ -86,10 +87,11 @@ def test_insert_many_clks(self): for stored_encoding_id, original_id in zip(stored_encoding_ids, range(num_entities)): assert stored_encoding_id == original_id - stored_encodings = list(get_encodings_by_id_range(conn, dp_id)) + stored_encodings = list(get_chunk_of_encodings(conn, dp_id, stored_encoding_ids)) + assert len(stored_encodings) == num_entities for stored_encoding, original_encoding in zip(stored_encodings, encodings): - assert stored_encoding == original_encoding + assert bytes(stored_encoding) == original_encoding block_names, block_sizes = zip(*list(get_block_metadata(conn, dp_id))) @@ -97,3 +99,25 @@ def test_insert_many_clks(self): assert len(block_sizes) == 1 assert block_names[0] == '1' assert block_sizes[0] == 10_000 + + def test_fetch_chunk(self): + data = [generate_bytes(128) for _ in range(100)] + project_id, project_auth_token, dp_id, dp_auth_token = self._create_project_and_dp() + conn, cur = self._get_conn_and_cursor() + num_entities = 10_000 + blocks = [['1'] for _ in range(num_entities)] + encodings = [data[i % 100] for i in range(num_entities)] + + insert_encodings_into_blocks(conn, dp_id, + block_ids=blocks, + encoding_ids=list(range(num_entities)), + encodings=encodings + ) + conn.commit() + + stored_encoding_ids = list(get_encodingblock_ids(conn, dp_id, '1', offset=10, limit=20)) + + assert len(stored_encoding_ids) == 20 + for i, stored_encoding_id in enumerate(stored_encoding_ids): + assert stored_encoding_id == i + 10 + From fb550de0768e41fc23da5bbf993035b9efb3ac78 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Fri, 6 Mar 2020 17:17:17 +1300 Subject: [PATCH 05/27] Split tasks into chunks using blocking information --- backend/entityservice/encoding_storage.py | 9 ++- backend/entityservice/init-db-schema.sql | 5 +- backend/entityservice/tasks/comparing.py | 97 +++++++++++++++-------- 3 files changed, 74 insertions(+), 37 deletions(-) diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index 081e4e82..51319162 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -4,7 +4,8 @@ import ijson -from entityservice.database import insert_encodings_into_blocks, get_encodings_by_id_range +from entityservice.database import insert_encodings_into_blocks, get_encodingblock_ids, \ + get_chunk_of_encodings from entityservice.serialization import deserialize_bytes, binary_format, binary_unpack_filters @@ -102,8 +103,10 @@ def _estimate_group_size(encoding_size): def get_encoding_chunk(conn, chunk_info, encoding_size=128): chunk_range_start, chunk_range_stop = chunk_info['range'] dataprovider_id = chunk_info['dataproviderId'] - - encoding_data_stream = get_encodings_by_id_range(conn, dataprovider_id, chunk_range_start, chunk_range_stop) + block_id = chunk_info['block_id'] + limit = chunk_range_stop - chunk_range_start + encoding_ids = get_encodingblock_ids(conn, dataprovider_id, block_id, chunk_range_start, limit) + encoding_data_stream = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids) chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size) return chunk_data, len(chunk_data) diff --git a/backend/entityservice/init-db-schema.sql b/backend/entityservice/init-db-schema.sql index 42a48e7c..dcaba3ee 100644 --- a/backend/entityservice/init-db-schema.sql +++ b/backend/entityservice/init-db-schema.sql @@ -188,8 +188,9 @@ CREATE TABLE encodingblocks ( FOREIGN KEY (dp, block_id) REFERENCES blocks (dp, block_name) ); - - +-- TODO index to accelerate ordering/filtering operations? +CREATE INDEX ON encodingblocks (dp, block_id); +CREATE INDEX ON encodingblocks (encoding_id); CREATE TABLE run_results ( -- Just the table index diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 58b1a627..9f34c158 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -9,7 +9,7 @@ import opentracing import psycopg2 from celery import chord - +import numpy as np from entityservice.async_worker import celery, logger from entityservice.cache.encodings import remove_from_cache @@ -88,8 +88,9 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): # Now look at the sizes of the blocks, some may have to be broken into # chunks, others will directly be compared as a full block. In either case - # we create a CandidatePairSubset object which describes the required - # comparisons. + # we create a dict which describes the required comparisons. The dict will + # have required keys: dataproviderId, block_id, and range. + # And dataproviderIndex? chunks = [] for block_id in blocks: # Note there might be many pairs of datasets in this block @@ -99,16 +100,24 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): comparisons = size1 * size2 if comparisons > Config.CHUNK_SIZE_AIM: - log.warning("Need to chunk it up") + log.info("Block is too large for single task. Working out how to chunk it up") # A task will request a range of encodings for a block - # First cut can probably use postgres' LIMIT OFFSET - raise NotImplemented + # First cut can probably use postgres' LIMIT OFFSET (ordering by encoding_id?) + for chunk_info in anonlink.concurrency.split_to_chunks(Config.CHUNK_SIZE_AIM, + dataset_sizes=(size1, size2)): + # chunk_info's from anonlink already have datasetIndex of 0 or 1 and a range + # We need to correct the datasetIndex and add the database datasetId and add block_id. + add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2) + add_block_id_to_chunk_info(chunk_info, block_id) + chunks.append(chunk_info) else: # Add the full block as a chunk - # TODO do we provide a range (I think it should be optional) - chunk_info_1 = {"dataproviderId": dp1, "range": (0, size1), "block_id": block_id} - chunk_info_2 = {"dataproviderId": dp2, "range": (0, size2), "block_id": block_id} - chunks.append((chunk_info_1, chunk_info_2)) + # TODO should we provide a range (it could be optional) + chunk_left = {"range": (0, size1), "block_id": block_id} + chunk_right = {"range": (0, size2), "block_id": block_id} + chunk_info = (chunk_left, chunk_right) + add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2) + chunks.append(chunk_info) log.info(f"Computing similarity for " f"{' x '.join(map(str, dataset_sizes))} entities") @@ -119,22 +128,12 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): log.debug("Chunking computation task") - chunk_infos = tuple(anonlink.concurrency.split_to_chunks( - Config.CHUNK_SIZE_AIM, - dataset_sizes=dataset_sizes)) - - # Add the db ids to the chunk information. - for chunk_info in chunk_infos: - for chunk_dp_info in chunk_info: - chunk_dp_index = chunk_dp_info['datasetIndex'] - chunk_dp_info['dataproviderId'] = dp_ids[chunk_dp_index] + chunk_infos = chunks log.info(f"Chunking into {len(chunk_infos)} computation tasks") current_span.log_kv({"event": "chunking", 'num_chunks': len(chunk_infos)}) span_serialized = create_comparison_jobs.get_serialized_span() - - # Prepare the Celery Chord that will compute all the similarity scores: scoring_tasks = [compute_filter_similarity.si( chunk_info, @@ -148,11 +147,31 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): if len(scoring_tasks) == 1: scoring_tasks.append(celery_bug_fix.si()) - callback_task = aggregate_comparisons.s(project_id, run_id, parent_span=span_serialized).on_error( + callback_task = aggregate_comparisons.s(project_id=project_id, run_id=run_id, parent_span=span_serialized).on_error( on_chord_error.s(run_id=run_id)) future = chord(scoring_tasks)(callback_task) +def add_dp_id_to_chunk_info(chunk_info, dp_ids, dp_id_left, dp_id_right): + """ + Modify a chunk_info as returned by anonlink.concurrency.split_to_chunks + to use correct project dataset indicies and add the database dp identifier. + """ + dpindx = dict(zip(dp_ids, range(len(dp_ids)))) + + left, right = chunk_info + left['dataproviderId'] = dp_id_left + left['datasetIndex'] = dpindx[dp_id_left] + + right['dataproviderId'] = dp_id_right + right['datasetIndex'] = dpindx[dp_id_right] + + +def add_block_id_to_chunk_info(chunk_info, block_id): + for chunk_dp_info in chunk_info: + chunk_dp_info['block_id'] = block_id + + @celery.task(base=TracedTask, args_as_tags=('project_id', 'run_id', 'threshold')) def compute_filter_similarity(chunk_info, project_id, run_id, threshold, encoding_size, parent_span=None): """Compute filter similarity between a chunk of filters in dataprovider 1, @@ -183,29 +202,39 @@ def new_child_span(name): chunk_info_dp1, chunk_info_dp2 = chunk_info with DBConn() as conn: - with new_child_span('fetching-encodings'): + with new_child_span('fetching-left-encodings'): log.debug("Fetching and deserializing chunk of filters for dataprovider 1") chunk_with_ids_dp1, chunk_dp1_size = get_encoding_chunk(conn, chunk_info_dp1, encoding_size) #TODO: use the entity ids! entity_ids_dp1, chunk_dp1 = zip(*chunk_with_ids_dp1) + with new_child_span('fetching-right-encodings'): log.debug("Fetching and deserializing chunk of filters for dataprovider 2") chunk_with_ids_dp2, chunk_dp2_size = get_encoding_chunk(conn, chunk_info_dp2, encoding_size) # TODO: use the entity ids! entity_ids_dp2, chunk_dp2 = zip(*chunk_with_ids_dp2) log.debug('Both chunks are fetched and deserialized') - task_span.log_kv({'size1': chunk_dp1_size, 'size2': chunk_dp2_size}) + task_span.log_kv({'size1': chunk_dp1_size, 'size2': chunk_dp2_size, 'chunk_info': chunk_info}) with new_child_span('comparing-encodings'): log.debug("Calculating filter similarity") try: - chunk_results = anonlink.concurrency.process_chunk( - chunk_info, - (chunk_dp1, chunk_dp2), - anonlink.similarities.dice_coefficient_accelerated, - threshold, + sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated( + datasets=(chunk_dp1, chunk_dp2), + threshold=threshold, k=min(chunk_dp1_size, chunk_dp2_size)) + + # TODO check offset results are working... in rec_is0 and rec_is1 using encoding_ids + def offset(recordarray, encoding_id_list): + rec = np.frombuffer(recordarray, dtype=recordarray.typecode) + encoding_ids = np.array(encoding_id_list) + res_np = encoding_ids[rec] + return array.array('I', [i for i in res_np]) + + rec_is0 = offset(rec_is0, entity_ids_dp1) + rec_is1 = offset(rec_is1, entity_ids_dp2) + except NotImplementedError as e: log.warning("Encodings couldn't be compared using anonlink.") return @@ -216,9 +245,9 @@ def new_child_span(name): # Update the number of comparisons completed comparisons_computed = chunk_dp1_size * chunk_dp2_size save_current_progress(comparisons_computed, run_id) + log.info("Comparisons: {}, Links above threshold: {}".format(comparisons_computed, len(sims))) with new_child_span('save-comparison-results-to-minio'): - sims, _, _ = chunk_results num_results = len(sims) if num_results: @@ -227,6 +256,12 @@ def new_child_span(name): task_span.log_kv({"edges": num_results}) log.info("Writing {} intermediate results to file: {}".format(num_results, result_filename)) + # Make index arrays for serialization + index_1 = array.array('I', (chunk_info_dp1["datasetIndex"],)) * num_results + index_2 = array.array('I', (chunk_info_dp2["datasetIndex"],)) * num_results + + chunk_results = sims, (rec_is0, rec_is1), (index_1, index_2) + bytes_iter, file_size \ = anonlink.serialization.dump_candidate_pairs_iter(chunk_results) iter_stream = iterable_to_stream(bytes_iter) @@ -242,8 +277,6 @@ def new_child_span(name): result_filename = None file_size = None - log.info("Comparisons: {}, Links above threshold: {}".format(comparisons_computed, len(chunk_results))) - return num_results, file_size, result_filename From 610b3bbfed74b92e7c1f92e7ded5569cc3923bfa Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 9 Mar 2020 12:49:26 +1300 Subject: [PATCH 06/27] Refactor create comparison jobs function --- backend/entityservice/tasks/comparing.py | 160 +++++++++++++++-------- 1 file changed, 107 insertions(+), 53 deletions(-) diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 9f34c158..0e434875 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -61,23 +61,10 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): f"{', '.join(map(str, dp_ids))}") # Retrieve required metadata - dataset_sizes = get_project_dataset_sizes(conn, project_id) - - # {dp_id -> {block_id -> block_size}} - # e.g. {33: {'1': 100}, 34: {'1': 100}, 35: {'1': 100}} - dp_block_sizes = {} - for dp_id in dp_ids: - dp_block_sizes[dp_id] = dict(get_block_metadata(conn, dp_id)) + dataset_sizes, dp_block_sizes = _retrieve_blocked_dataset_sizes(conn, project_id, dp_ids) log.info("Finding blocks in common between dataproviders") - # block_id -> List(pairs of dp ids) - # e.g. {'1': [(26, 27), (26, 28), (27, 28)]} - blocks = {} - for dp1, dp2 in itertools.combinations(dp_ids, 2): - # Get the intersection of blocks between these two dataproviders - common_block_ids = set(dp_block_sizes[dp1]).intersection(set(dp_block_sizes[dp2])) - for block_id in common_block_ids: - blocks.setdefault(block_id, []).append((dp1, dp2)) + common_blocks = _get_common_blocks(dp_block_sizes, dp_ids) if len(dataset_sizes) < 2: log.warning("Unexpected number of dataset sizes in db. Stopping") @@ -86,38 +73,8 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): encoding_size = get_project_encoding_size(conn, project_id) - # Now look at the sizes of the blocks, some may have to be broken into - # chunks, others will directly be compared as a full block. In either case - # we create a dict which describes the required comparisons. The dict will - # have required keys: dataproviderId, block_id, and range. - # And dataproviderIndex? - chunks = [] - for block_id in blocks: - # Note there might be many pairs of datasets in this block - for dp1, dp2 in blocks[block_id]: - size1 = dp_block_sizes[dp1][block_id] - size2 = dp_block_sizes[dp2][block_id] - - comparisons = size1 * size2 - if comparisons > Config.CHUNK_SIZE_AIM: - log.info("Block is too large for single task. Working out how to chunk it up") - # A task will request a range of encodings for a block - # First cut can probably use postgres' LIMIT OFFSET (ordering by encoding_id?) - for chunk_info in anonlink.concurrency.split_to_chunks(Config.CHUNK_SIZE_AIM, - dataset_sizes=(size1, size2)): - # chunk_info's from anonlink already have datasetIndex of 0 or 1 and a range - # We need to correct the datasetIndex and add the database datasetId and add block_id. - add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2) - add_block_id_to_chunk_info(chunk_info, block_id) - chunks.append(chunk_info) - else: - # Add the full block as a chunk - # TODO should we provide a range (it could be optional) - chunk_left = {"range": (0, size1), "block_id": block_id} - chunk_right = {"range": (0, size2), "block_id": block_id} - chunk_info = (chunk_left, chunk_right) - add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2) - chunks.append(chunk_info) + # Create "chunks" of comparisons + chunks = _create_work_chunks(common_blocks, dp_block_sizes, dp_ids, log) log.info(f"Computing similarity for " f"{' x '.join(map(str, dataset_sizes))} entities") @@ -152,6 +109,107 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): future = chord(scoring_tasks)(callback_task) +def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log): + """Create chunks of comparisons using blocking information. + + .. note:: + + Ideally the chunks would be similarly sized, however with blocking the best we + can do in this regard is to ensure a similar maximum size. Small blocks will + directly translate to small chunks. + + :todo Consider passing all dataproviders to anonlink's `split_to_chunks` for a given + large block instead of pairwise. + + :param blocks: + dict mapping block identifier to a list of all combinations of + data provider pairs containing this block. As created by :func:`_get_common_blocks` + :param dp_block_sizes: + A map from dataprovider id to a dict mapping + block id to the number of encodings from the dataprovider in the + block. As created by :func:`_retrieve_blocked_dataset_sizes` + :param dp_ids: list of data provider ids + + :returns + + a dict describing the required comparisons. Comprised of: + - dataproviderId - The dataprovider identifier in our database + - datasetIndex - The dataprovider index [0, 1 ...] + - block_id - The block of encodings this chunk is for + - range - The range of encodings within the block that make up this chunk. + """ + chunks = [] + for block_id in blocks: + log.debug("Chunking block", block_id=block_id) + # Note there might be many pairs of datasets in this block + for dp1, dp2 in blocks[block_id]: + size1 = dp_block_sizes[dp1][block_id] + size2 = dp_block_sizes[dp2][block_id] + + comparisons = size1 * size2 + if comparisons > Config.CHUNK_SIZE_AIM: + log.debug("Block is too large for single task. Working out how to chunk it up") + for chunk_info in anonlink.concurrency.split_to_chunks(Config.CHUNK_SIZE_AIM, + dataset_sizes=(size1, size2)): + # chunk_info's from anonlink already have datasetIndex of 0 or 1 and a range + # We need to correct the datasetIndex and add the database datasetId and add block_id. + add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2) + add_block_id_to_chunk_info(chunk_info, block_id) + chunks.append(chunk_info) + else: + log.debug("Add the full block as a single work chunk") + # Although the range "could" be optional we choose to make all chunk definitions consistent + chunk_left = {"range": (0, size1), "block_id": block_id} + chunk_right = {"range": (0, size2), "block_id": block_id} + chunk_info = (chunk_left, chunk_right) + add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2) + chunks.append(chunk_info) + return chunks + + +def _get_common_blocks(dp_block_sizes, dp_ids): + """Return all pairs of non-empty blocks across dataproviders. + + :returns + dict mapping block identifier to a list of all combinations of + data provider pairs containing this block. + + block_id -> List(pairs of dp ids) + e.g. {'1': [(26, 27), (26, 28), (27, 28)]} + """ + blocks = {} + for dp1, dp2 in itertools.combinations(dp_ids, 2): + # Get the intersection of blocks between these two dataproviders + common_block_ids = set(dp_block_sizes[dp1]).intersection(set(dp_block_sizes[dp2])) + for block_id in common_block_ids: + blocks.setdefault(block_id, []).append((dp1, dp2)) + return blocks + + +def _retrieve_blocked_dataset_sizes(conn, project_id, dp_ids): + """Fetch encoding counts for each dataset by block. + + :param dp_ids: Iterable of dataprovider database identifiers. + :returns + A 2-tuple of: + + - dataset sizes: a tuple of the number of encodings in each dataset. + + - dp_block_sizes: A map from dataprovider id to a dict mapping + block id to the number of encodings from the dataprovider in the + block. + + {dp_id -> {block_id -> block_size}} + e.g. {33: {'1': 100}, 34: {'1': 100}, 35: {'1': 100}} + """ + dataset_sizes = get_project_dataset_sizes(conn, project_id) + + dp_block_sizes = {} + for dp_id in dp_ids: + dp_block_sizes[dp_id] = dict(get_block_metadata(conn, dp_id)) + return dataset_sizes, dp_block_sizes + + def add_dp_id_to_chunk_info(chunk_info, dp_ids, dp_id_left, dp_id_right): """ Modify a chunk_info as returned by anonlink.concurrency.split_to_chunks @@ -225,12 +283,8 @@ def new_child_span(name): threshold=threshold, k=min(chunk_dp1_size, chunk_dp2_size)) - # TODO check offset results are working... in rec_is0 and rec_is1 using encoding_ids def offset(recordarray, encoding_id_list): - rec = np.frombuffer(recordarray, dtype=recordarray.typecode) - encoding_ids = np.array(encoding_id_list) - res_np = encoding_ids[rec] - return array.array('I', [i for i in res_np]) + return array.array('I', [encoding_id_list[i] for i in recordarray]) rec_is0 = offset(rec_is0, entity_ids_dp1) rec_is1 = offset(rec_is1, entity_ids_dp2) @@ -260,7 +314,7 @@ def offset(recordarray, encoding_id_list): index_1 = array.array('I', (chunk_info_dp1["datasetIndex"],)) * num_results index_2 = array.array('I', (chunk_info_dp2["datasetIndex"],)) * num_results - chunk_results = sims, (rec_is0, rec_is1), (index_1, index_2) + chunk_results = sims, (index_1, index_2), (rec_is0, rec_is1), bytes_iter, file_size \ = anonlink.serialization.dump_candidate_pairs_iter(chunk_results) From d838fe411d2ed098e8cf135fa9389b3985025d91 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 9 Mar 2020 14:25:38 +1300 Subject: [PATCH 07/27] More refactoring of chunk creation --- backend/entityservice/models/project.py | 10 ++---- backend/entityservice/tasks/base_task.py | 2 +- backend/entityservice/tasks/comparing.py | 44 ++++++++---------------- backend/entityservice/tasks/run.py | 7 ++-- 4 files changed, 24 insertions(+), 39 deletions(-) diff --git a/backend/entityservice/models/project.py b/backend/entityservice/models/project.py index 5776b2f8..d54eecec 100644 --- a/backend/entityservice/models/project.py +++ b/backend/entityservice/models/project.py @@ -36,12 +36,6 @@ def __init__(self, result_type, schema, name, notes, parties, uses_blocking): # Order is important here self.update_tokens = [generate_code() for _ in range(parties)] - # TODO DELETE? - self.ready = False - self.status = 'not ready' - self.data = {} - self.result = {} - VALID_RESULT_TYPES = {'groups', 'permutations', 'similarity_scores'} @@ -60,12 +54,14 @@ def from_json(data): # Get optional fields from JSON data name = data.get('name', '') notes = data.get('notes', '') - parties = data.get('number_parties', 2) + parties = int(data.get('number_parties', 2)) uses_blocking = data.get('uses_blocking', False) if parties > 2 and result_type != 'groups': raise InvalidProjectParametersException( "Multi-party linkage requires result type 'groups'.") + if parties < 2: + raise InvalidProjectParametersException("Record linkage requires at least 2 parties!") return Project(result_type, schema, name, notes, parties, uses_blocking) diff --git a/backend/entityservice/tasks/base_task.py b/backend/entityservice/tasks/base_task.py index afe712cf..7c1a7493 100644 --- a/backend/entityservice/tasks/base_task.py +++ b/backend/entityservice/tasks/base_task.py @@ -104,7 +104,7 @@ def celery_bug_fix(*args, **kwargs): @celery.task(base=BaseTask, ignore_result=True) -def on_chord_error(*args, **kwargs): +def run_failed_handler(*args, **kwargs): """ Record that a task has encountered an error, mark the run as failed. diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 0e434875..b5c996f1 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -24,7 +24,7 @@ from entityservice.models.run import progress_run_stage as progress_stage from entityservice.object_store import connect_to_object_store from entityservice.settings import Config -from entityservice.tasks.base_task import TracedTask, celery_bug_fix, on_chord_error +from entityservice.tasks.base_task import TracedTask, celery_bug_fix, run_failed_handler from entityservice.tasks.solver import solver_task from entityservice.tasks import mark_run_complete from entityservice.tasks.assert_valid_run import assert_valid_run @@ -71,24 +71,21 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): update_run_mark_failure(conn, run_id) return + # We pass the encoding_size and threshold to the comparison tasks to minimize their db lookups encoding_size = get_project_encoding_size(conn, project_id) - - # Create "chunks" of comparisons - chunks = _create_work_chunks(common_blocks, dp_block_sizes, dp_ids, log) - - log.info(f"Computing similarity for " - f"{' x '.join(map(str, dataset_sizes))} entities") - current_span.log_kv({"event": 'get-dataset-sizes', 'sizes': dataset_sizes}) - - # Pass the threshold to the comparison tasks to minimize their db lookups threshold = get_run(conn, run_id)['threshold'] log.debug("Chunking computation task") + # Create "chunks" of comparisons + chunks = _create_work_chunks(common_blocks, dp_block_sizes, dp_ids, log) + + log.info(f"Computing similarity for " + f"{' x '.join(map(str, dataset_sizes))} entities") + current_span.log_kv({"event": 'get-dataset-sizes', 'sizes': dataset_sizes}) - chunk_infos = chunks - log.info(f"Chunking into {len(chunk_infos)} computation tasks") - current_span.log_kv({"event": "chunking", 'num_chunks': len(chunk_infos)}) + log.info(f"Chunking into {len(chunks)} computation tasks") + current_span.log_kv({"event": "chunking", 'num_chunks': len(chunks)}) span_serialized = create_comparison_jobs.get_serialized_span() # Prepare the Celery Chord that will compute all the similarity scores: @@ -99,13 +96,13 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): threshold, encoding_size, span_serialized - ) for chunk_info in chunk_infos] + ) for chunk_info in chunks] if len(scoring_tasks) == 1: scoring_tasks.append(celery_bug_fix.si()) callback_task = aggregate_comparisons.s(project_id=project_id, run_id=run_id, parent_span=span_serialized).on_error( - on_chord_error.s(run_id=run_id)) + run_failed_handler.s(run_id=run_id)) future = chord(scoring_tasks)(callback_task) @@ -380,18 +377,6 @@ def _merge_files(mc, log, file0, file1): return total_num, merged_file_size, merged_file_name -def _insert_similarity_into_db(db, log, run_id, merged_filename): - try: - result_id = insert_similarity_score_file( - db, run_id, merged_filename) - except psycopg2.IntegrityError: - log.info("Error saving similarity score filename to database. " - "The project may have been deleted.") - raise RunDeleted(run_id) - log.debug(f"Saved path to similarity scores file to db with id " - f"{result_id}") - - @celery.task( base=TracedTask, ignore_result=True, @@ -439,8 +424,9 @@ def aggregate_comparisons(similarity_result_files, project_id, run_id, parent_sp with DBConn() as db: result_type = get_project_column(db, project_id, 'result_type') - - _insert_similarity_into_db(db, log, run_id, merged_filename) + result_id = insert_similarity_score_file(db, run_id, merged_filename) + log.debug(f"Saved path to similarity scores file to db with id " + f"{result_id}") if result_type == "similarity_scores": # Post similarity computation cleanup diff --git a/backend/entityservice/tasks/run.py b/backend/entityservice/tasks/run.py index 18ef236c..bff98ed3 100644 --- a/backend/entityservice/tasks/run.py +++ b/backend/entityservice/tasks/run.py @@ -5,7 +5,7 @@ from entityservice.database import DBConn, check_project_exists, get_run, get_run_state_for_update from entityservice.database import update_run_set_started from entityservice.errors import RunDeleted, ProjectDeleted -from entityservice.tasks.base_task import TracedTask +from entityservice.tasks.base_task import TracedTask, run_failed_handler from entityservice.tasks.comparing import create_comparison_jobs from entityservice.async_worker import celery, logger @@ -47,5 +47,8 @@ def prerun_check(project_id, run_id, parent_span=None): set_run_state_active(run_id) progress_cache.save_current_progress(comparisons=0, run_id=run_id) - create_comparison_jobs.delay(project_id, run_id, prerun_check.get_serialized_span()) + create_comparison_jobs.apply_async( + kwargs={'project_id': project_id, 'run_id': run_id, 'parent_span': prerun_check.get_serialized_span()}, + link_error=run_failed_handler.s() + ) log.info("CLK similarity computation scheduled") From ec36e8dd70d096d624899c2c796d81ffd24981aa Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 9 Mar 2020 15:05:34 +1300 Subject: [PATCH 08/27] Add a few unit tests for chunking --- .../integrationtests/dbtests/__init__.py | 13 ++++ .../dbtests/test_insertions.py | 23 ++----- backend/entityservice/tasks/comparing.py | 9 ++- backend/entityservice/tests/test_chunking.py | 65 +++++++++++++++++++ 4 files changed, 91 insertions(+), 19 deletions(-) create mode 100644 backend/entityservice/tests/test_chunking.py diff --git a/backend/entityservice/integrationtests/dbtests/__init__.py b/backend/entityservice/integrationtests/dbtests/__init__.py index e69de29b..1aa25ef0 100644 --- a/backend/entityservice/integrationtests/dbtests/__init__.py +++ b/backend/entityservice/integrationtests/dbtests/__init__.py @@ -0,0 +1,13 @@ +import psycopg2 + +from entityservice.settings import Config as config + + +def _get_conn_and_cursor(): + db = config.DATABASE + host = config.DATABASE_SERVER + user = config.DATABASE_USER + password = config.DATABASE_PASSWORD + conn = psycopg2.connect(host=host, dbname=db, user=user, password=password) + cursor = conn.cursor() + return conn, cursor diff --git a/backend/entityservice/integrationtests/dbtests/test_insertions.py b/backend/entityservice/integrationtests/dbtests/test_insertions.py index e54364e7..5092c072 100644 --- a/backend/entityservice/integrationtests/dbtests/test_insertions.py +++ b/backend/entityservice/integrationtests/dbtests/test_insertions.py @@ -6,29 +6,20 @@ from entityservice.database import insert_dataprovider, insert_encodings_into_blocks, insert_blocking_metadata, \ get_project, get_encodingblock_ids, get_block_metadata, get_chunk_of_encodings +from entityservice.integrationtests.dbtests import _get_conn_and_cursor from entityservice.models import Project from entityservice.tests.util import generate_bytes from entityservice.utils import generate_code -from entityservice.settings import Config as config class TestInsertions: - def _get_conn_and_cursor(self): - db = config.DATABASE - host = config.DATABASE_SERVER - user = config.DATABASE_USER - password = config.DATABASE_PASSWORD - conn = psycopg2.connect(host=host, dbname=db, user=user, password=password) - cursor = conn.cursor() - return conn, cursor - def _create_project_and_dp(self): project, dp_ids = self._create_project() dp_id = dp_ids[0] dp_auth_token = project.update_tokens[0] - conn, cur = self._get_conn_and_cursor() + conn, cur = _get_conn_and_cursor() # create a default block insert_blocking_metadata(conn, dp_id, {'1': 10000}) conn.commit() @@ -38,7 +29,7 @@ def _create_project_and_dp(self): def _create_project(self): project = Project('groups', {}, name='', notes='', parties=2, uses_blocking=False) - conn, cur = self._get_conn_and_cursor() + conn, cur = _get_conn_and_cursor() dp_ids = project.save(conn) return project, dp_ids @@ -47,7 +38,7 @@ def test_insert_project(self): project, _ = self._create_project() assert len(project.result_token) == 48 # check we can fetch the inserted project back from the database - conn, cur = self._get_conn_and_cursor() + conn, cur = _get_conn_and_cursor() project_response = get_project(conn, project.project_id) assert 'time_added' in project_response assert project_response['time_added'] - before >= datetime.timedelta(seconds=0) @@ -61,7 +52,7 @@ def test_insert_project(self): assert project_response['encoding_size'] is None def test_insert_dp_no_project_fails(self): - conn, cur = self._get_conn_and_cursor() + conn, cur = _get_conn_and_cursor() project_id = generate_code() dp_auth = generate_code() with raises(psycopg2.errors.ForeignKeyViolation): @@ -70,7 +61,7 @@ def test_insert_dp_no_project_fails(self): def test_insert_many_clks(self): data = [generate_bytes(128) for _ in range(100)] project_id, project_auth_token, dp_id, dp_auth_token = self._create_project_and_dp() - conn, cur = self._get_conn_and_cursor() + conn, cur = _get_conn_and_cursor() num_entities = 10_000 blocks = [['1'] for _ in range(num_entities)] encodings = [data[i % 100] for i in range(num_entities)] @@ -103,7 +94,7 @@ def test_insert_many_clks(self): def test_fetch_chunk(self): data = [generate_bytes(128) for _ in range(100)] project_id, project_auth_token, dp_id, dp_auth_token = self._create_project_and_dp() - conn, cur = self._get_conn_and_cursor() + conn, cur = _get_conn_and_cursor() num_entities = 10_000 blocks = [['1'] for _ in range(num_entities)] encodings = [data[i % 100] for i in range(num_entities)] diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index b5c996f1..fa59fbfd 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -106,7 +106,7 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): future = chord(scoring_tasks)(callback_task) -def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log): +def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log, chunk_size_aim=Config.CHUNK_SIZE_AIM): """Create chunks of comparisons using blocking information. .. note:: @@ -126,6 +126,8 @@ def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log): block id to the number of encodings from the dataprovider in the block. As created by :func:`_retrieve_blocked_dataset_sizes` :param dp_ids: list of data provider ids + :param log: A logger instance + :param chunk_size_aim: The desired number of comparisons per chunk. :returns @@ -144,9 +146,10 @@ def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log): size2 = dp_block_sizes[dp2][block_id] comparisons = size1 * size2 - if comparisons > Config.CHUNK_SIZE_AIM: + + if comparisons > chunk_size_aim: log.debug("Block is too large for single task. Working out how to chunk it up") - for chunk_info in anonlink.concurrency.split_to_chunks(Config.CHUNK_SIZE_AIM, + for chunk_info in anonlink.concurrency.split_to_chunks(chunk_size_aim, dataset_sizes=(size1, size2)): # chunk_info's from anonlink already have datasetIndex of 0 or 1 and a range # We need to correct the datasetIndex and add the database datasetId and add block_id. diff --git a/backend/entityservice/tests/test_chunking.py b/backend/entityservice/tests/test_chunking.py new file mode 100644 index 00000000..99730385 --- /dev/null +++ b/backend/entityservice/tests/test_chunking.py @@ -0,0 +1,65 @@ +from structlog import get_logger + +from entityservice.tasks.comparing import _get_common_blocks, _create_work_chunks +log = get_logger() + + +class TestCommonBlocks: + # Unit test for _get_common_blocks + + def test_2p_get_common_blocks(self): + dp_ids = [33, 34] + dp_block_sizes = {33: {'1': 100}, 34: {'1': 100, '2': 100}} + common_blocks = _get_common_blocks(dp_block_sizes, dp_ids) + assert common_blocks == {"1": [(33, 34)]} + + def test_3p_get_common_blocks(self): + dp_ids = [1, 2, 3] + dp_block_sizes = {1: {'1': 100}, 2: {'1': 100, '2': 100}, 3: {'1': 100, '2': 100}} + common_blocks = _get_common_blocks(dp_block_sizes, dp_ids) + assert '1' in common_blocks + assert len(common_blocks['1']) == 3 + block_1_set = set(common_blocks['1']) + # Should have (1, 2), (1, 3), (2, 3) + for dpcombo in [(1, 2), (1, 3), (2, 3)]: + assert dpcombo in block_1_set + + assert '2' in common_blocks + assert len(common_blocks['2']) == 1 + assert common_blocks['2'][0] == (2, 3) + + +class TestChunkingBlocks: + + def test_2p_single_chunked_block(self): + dp_ids = [1, 2] + dp_block_sizes = { + 1: {'1': 100}, + 2: {'1': 100, '2': 100}} + blocks = {"1": [(1, 2)]} + + chunks = _create_work_chunks(blocks, dp_block_sizes, dp_ids, log, 100) + assert len(chunks) == 100 + for chunk_pair in chunks: + for c in chunk_pair: + assert "range" in c + lower, upper = c['range'] + assert lower < upper + assert upper - lower <= 10 + assert "block_id" in c + assert "datasetIndex" in c + assert "dataproviderId" in c + + def test_basic_3p(self): + dp_ids = [1, 2, 3] + dp_block_sizes = { + 1: {'1': 100}, + 2: {'1': 100, '2': 100}, + 3: {'1': 100, '2': 100}, + } + blocks = _get_common_blocks(dp_block_sizes, dp_ids) + chunks = _create_work_chunks(blocks, dp_block_sizes, dp_ids, log, 100) + # Block 1 should create 100 chunks between dp combinations: 1:2, 1:3, and 2:3 for 300 chunks + # Block 2 should create 100 chunks between 2:3 + assert len(chunks) == 300 + 100 + From ddcbcc3d840f089af6447a423711e4fcdefbdea9 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 10 Mar 2020 10:36:58 +1300 Subject: [PATCH 09/27] Add database index on encodings table --- backend/entityservice/init-db-schema.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/entityservice/init-db-schema.sql b/backend/entityservice/init-db-schema.sql index dcaba3ee..8798322c 100644 --- a/backend/entityservice/init-db-schema.sql +++ b/backend/entityservice/init-db-schema.sql @@ -175,6 +175,7 @@ CREATE TABLE encodings ( PRIMARY KEY (dp, encoding_id) ); +CREATE INDEX ON encodings (dp, encoding_id); -- Table mapping blocks to encodings CREATE TABLE encodingblocks ( @@ -188,7 +189,6 @@ CREATE TABLE encodingblocks ( FOREIGN KEY (dp, block_id) REFERENCES blocks (dp, block_name) ); --- TODO index to accelerate ordering/filtering operations? CREATE INDEX ON encodingblocks (dp, block_id); CREATE INDEX ON encodingblocks (encoding_id); From 4ab16e6b192267361db3018f2ecbfbac9556c22f Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 10 Mar 2020 14:33:44 +1300 Subject: [PATCH 10/27] clknblocks not clksnblocks and other minor cleanup --- backend/entityservice/database/selections.py | 26 ------------------- backend/entityservice/encoding_storage.py | 14 +++++----- backend/entityservice/tasks/comparing.py | 10 +++---- .../tests/test_encoding_storage.py | 4 +-- .../tests/testdata/test_encoding.json | 2 +- backend/entityservice/views/project.py | 4 +-- benchmarking/benchmark.py | 9 ++++--- 7 files changed, 21 insertions(+), 48 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 87a186a7..9f9ee68f 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -322,32 +322,6 @@ def iterate_cursor_results(cur, one=True, page_size=4096): else: yield row -# -# def get_block_of_encodings(db, dp_id, block_id, offset=0, limit=None): -# """Yield a block of all encodings - optionally with an offset and limit - for a data provider. -# -# I don't understand why this is slower than fetching all encoding_ids to Python and then fetching -# all the encoding in the below version. -# """ -# sql_query = """ -# SELECT encoding -# FROM encodings, -# encodingblocks -# WHERE encodings.dp = %(dp_id)s -# AND encodingblocks.dp = %(dp_id)s -# and encodingblocks.block_id = %(block_id)s -# AND encodingblocks.encoding_id = encodings.encoding_id -# ORDER BY encodingblocks.encoding_id ASC -# OFFSET %(offset)s -# LIMIT %(limit)s -# """ -# -# cur = db.cursor(f"dp{dp_id}-block{block_id}-{offset}-{limit}") -# cur.execute(sql_query, {'dp_id': dp_id, 'block_id': block_id, 'offset': offset, 'limit': limit}) -# # Note encoding is returned as a memoryview -# for encoding in iterate_cursor_results(cur, one=True): -# yield encoding - def get_chunk_of_encodings(db, dp_id, encoding_ids): """Yield a chunk of encodings for a data provider given the encoding ids. diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index 51319162..fc19e837 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -15,7 +15,7 @@ def stream_json_clksnblocks(f): the following structure: { - "clksnblocks": [ + "clknblocks": [ ["BASE64 ENCODED ENCODING 1", blockid1, blockid2, ...], ["BASE64 ENCODED ENCODING 2", blockid1, ...], ... @@ -26,7 +26,7 @@ def stream_json_clksnblocks(f): :return: Generator of (entity_id, base64 encoding, list of blocks) """ # At some point the user may supply the entity id. For now we use the order of uploaded encodings. - for i, obj in enumerate(ijson.items(f, 'clksnblocks.item')): + for i, obj in enumerate(ijson.items(f, 'clknblocks.item')): b64_encoding, *blocks = obj yield i, deserialize_bytes(b64_encoding), blocks @@ -100,13 +100,15 @@ def _estimate_group_size(encoding_size): return math.ceil(network_transaction_size / ((blocks_per_record_estimate * 64) + (encoding_size + 4))) -def get_encoding_chunk(conn, chunk_info, encoding_size=128): +def get_encoding_chunk(conn, chunk_info, encoding_size=128, s=None): chunk_range_start, chunk_range_stop = chunk_info['range'] dataprovider_id = chunk_info['dataproviderId'] block_id = chunk_info['block_id'] limit = chunk_range_stop - chunk_range_start - encoding_ids = get_encodingblock_ids(conn, dataprovider_id, block_id, chunk_range_start, limit) - encoding_data_stream = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids) - chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size) + with s.span.tracer.start_span("get_encodingblock_ids"): + encoding_ids = get_encodingblock_ids(conn, dataprovider_id, block_id, chunk_range_start, limit) + with s.span.tracer.start_span('get_chunk_of_encodings'): + encoding_data_stream = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids) + chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size) return chunk_data, len(chunk_data) diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index fa59fbfd..9c29191a 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -6,16 +6,14 @@ import anonlink import minio -import opentracing -import psycopg2 from celery import chord -import numpy as np + from entityservice.async_worker import celery, logger from entityservice.cache.encodings import remove_from_cache from entityservice.cache.progress import save_current_progress from entityservice.encoding_storage import get_encoding_chunk -from entityservice.errors import RunDeleted, InactiveRun +from entityservice.errors import InactiveRun from entityservice.database import ( check_project_exists, check_run_exists, DBConn, get_dataprovider_ids, get_project_column, get_project_dataset_sizes, @@ -248,9 +246,7 @@ def compute_filter_similarity(chunk_info, project_id, run_id, threshold, encodin task_span = compute_filter_similarity.span def new_child_span(name): - return compute_filter_similarity.tracer.start_active_span( - name, - child_of=compute_filter_similarity.span) + return compute_filter_similarity.tracer.start_active_span(name, child_of=compute_filter_similarity.span) log.debug("Computing similarity for a chunk of filters") diff --git a/backend/entityservice/tests/test_encoding_storage.py b/backend/entityservice/tests/test_encoding_storage.py index 174c93af..13fd7436 100644 --- a/backend/entityservice/tests/test_encoding_storage.py +++ b/backend/entityservice/tests/test_encoding_storage.py @@ -20,7 +20,7 @@ def test_convert_encodings_from_json_to_binary_simple(self): def test_convert_encodings_from_json_to_binary_empty(self): empty = io.BytesIO(b'''{ - "clksnblocks": [] + "clknblocks": [] }''') with pytest.raises(StopIteration): @@ -28,7 +28,7 @@ def test_convert_encodings_from_json_to_binary_empty(self): def test_convert_encodings_from_json_to_binary_short(self): d = serialize_bytes(b'abcdabcd') - json_data = io.BytesIO(b'{' + f'''"clksnblocks": [["{d}", "02"]]'''.encode() + b'}') + json_data = io.BytesIO(b'{' + f'''"clknblocks": [["{d}", "02"]]'''.encode() + b'}') encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data))) assert len(encodings[0]) == 8 diff --git a/backend/entityservice/tests/testdata/test_encoding.json b/backend/entityservice/tests/testdata/test_encoding.json index faea6279..8adbdce7 100644 --- a/backend/entityservice/tests/testdata/test_encoding.json +++ b/backend/entityservice/tests/testdata/test_encoding.json @@ -1,5 +1,5 @@ { - "clksnblocks": [ + "clknblocks": [ ["PNfT5qAAlAgFyQyoUhokyohywAAOYMJgdwPCRWBQOyCIsSEgePCo2CnRaON+FUog07AHTDUDARsUcJiSaYKNDiCAEeICbGYSZFhCVALQAylxDSAtSR4CsgiCmBjDiAEOGMfEi7ABkydqyKhIIFrBQQvFAUTDakAg0RyFYAWg8nE=", "1"], ["WvjX1rZYiFCRQAyqptwAS7jAaACEhFApFxB9RSeYPTBLkYkIUjKacAIleYLoNUooU/AHYCQ2FSkUYCzZMiClDIGEIBKQXKZAogCAUpvOKikUKUI+CRgI+AkgGsqAJYkdZcHEAAWDEidgHbYgIjBpMgpUU2wCwgCoXwCCWCVAsPM=", "1"], ["UPLX+gCgkbnVQhg4wBA8wogCUNA0QEbhPyHBPCBAKwSOmzJFWDGqWAZY/NsQX01Aw7A1UWBKAt5GPJib4BpLSHKQBesZbjsjYhTAQBBDZ7uQHKGNyRhCMhiWsAjaVEBcQlVkiCHzMhEimEgERlB/AQvFBwRjQHRywRQQQUuggnw=", "1", "2"], diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 32582d6b..d7d4b656 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -386,8 +386,8 @@ def upload_json_clk_data(dp_id, clk_json, uses_blocking, parent_span): if element == 'clks': logger.info("Rewriting provided json into clknsblocks format") - clk_json = {'clksnblocks': [[encoding, '1'] for encoding in clk_json['clks']]} - element = 'clksnblocks' + clk_json = {'clknblocks': [[encoding, '1'] for encoding in clk_json['clks']]} + element = 'clknblocks' logger.info("Counting block sizes and number of blocks") # {'clknblocks': [['UG9vcA==', '001', '211'], [...]]} diff --git a/benchmarking/benchmark.py b/benchmarking/benchmark.py index e0d16533..b5aec047 100644 --- a/benchmarking/benchmark.py +++ b/benchmarking/benchmark.py @@ -130,19 +130,20 @@ def upload_data(participant, auth_token, clk_length): file_name = os.path.join(data_path, "{}Parties".format(len(sizes)), "clk_{}_{}_v2.bin".format(participant, clk_length)) with open(file_name, 'rb') as f: - facs_data = f.read() - assert len(facs_data) % SIZE_PER_CLK == 0 + encoding_data = f.read() + assert len(encoding_data) % SIZE_PER_CLK == 0 try: r = requests.post( server + '/api/v1/projects/{}/binaryclks'.format(credentials['project_id']), headers={ 'Authorization': auth_token, 'Content-Type': 'application/octet-stream', - 'Hash-Count': str(len(facs_data) // SIZE_PER_CLK), + 'Hash-Count': str(len(encoding_data) // SIZE_PER_CLK), 'Hash-Size': '128' }, - data=facs_data + data=encoding_data ) + logger.info(f"Upload status: {r.status_code}") logger.debug('upload result: {}'.format(r.json())) except Exception as e: logger.warning('oh no...\n{}'.format(e)) From d66bf58aa642c82e226257a8e6dfb1b592c8cbfc Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 11 Mar 2020 11:50:19 +1300 Subject: [PATCH 11/27] cleanup --- backend/entityservice/encoding_storage.py | 10 ++++------ backend/entityservice/tasks/comparing.py | 4 +--- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index fc19e837..16da0951 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -100,15 +100,13 @@ def _estimate_group_size(encoding_size): return math.ceil(network_transaction_size / ((blocks_per_record_estimate * 64) + (encoding_size + 4))) -def get_encoding_chunk(conn, chunk_info, encoding_size=128, s=None): +def get_encoding_chunk(conn, chunk_info, encoding_size=128): chunk_range_start, chunk_range_stop = chunk_info['range'] dataprovider_id = chunk_info['dataproviderId'] block_id = chunk_info['block_id'] limit = chunk_range_stop - chunk_range_start - with s.span.tracer.start_span("get_encodingblock_ids"): - encoding_ids = get_encodingblock_ids(conn, dataprovider_id, block_id, chunk_range_start, limit) - with s.span.tracer.start_span('get_chunk_of_encodings'): - encoding_data_stream = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids) - chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size) + encoding_ids = get_encodingblock_ids(conn, dataprovider_id, block_id, chunk_range_start, limit) + encoding_data_stream = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids) + chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size) return chunk_data, len(chunk_data) diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 9c29191a..4b7914cd 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -249,7 +249,6 @@ def new_child_span(name): return compute_filter_similarity.tracer.start_active_span(name, child_of=compute_filter_similarity.span) log.debug("Computing similarity for a chunk of filters") - log.debug("Checking that the resource exists (in case of run being canceled/deleted)") assert_valid_run(project_id, run_id, log) @@ -259,13 +258,11 @@ def new_child_span(name): with new_child_span('fetching-left-encodings'): log.debug("Fetching and deserializing chunk of filters for dataprovider 1") chunk_with_ids_dp1, chunk_dp1_size = get_encoding_chunk(conn, chunk_info_dp1, encoding_size) - #TODO: use the entity ids! entity_ids_dp1, chunk_dp1 = zip(*chunk_with_ids_dp1) with new_child_span('fetching-right-encodings'): log.debug("Fetching and deserializing chunk of filters for dataprovider 2") chunk_with_ids_dp2, chunk_dp2_size = get_encoding_chunk(conn, chunk_info_dp2, encoding_size) - # TODO: use the entity ids! entity_ids_dp2, chunk_dp2 = zip(*chunk_with_ids_dp2) log.debug('Both chunks are fetched and deserialized') @@ -279,6 +276,7 @@ def new_child_span(name): threshold=threshold, k=min(chunk_dp1_size, chunk_dp2_size)) + # Map results from "index in chunk" to encoding id. def offset(recordarray, encoding_id_list): return array.array('I', [encoding_id_list[i] for i in recordarray]) From 1e5151f6a95c3f81262b6be3f531157c219fe489 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Fri, 13 Mar 2020 14:16:53 +1300 Subject: [PATCH 12/27] Add blocking concept to docs --- docs/concepts.rst | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index b4bb232f..ae808ddb 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -56,8 +56,8 @@ characters. .. _schema: -Schema ------- +Linkage Schema +-------------- It is important that participating organisations agree on how personally identifiable information is processed to create the :ref:`clks `. We call the configuration for creating CLKs @@ -84,6 +84,19 @@ although this may become a configurable option in the future. .. _result-types: +Blocking +-------- + +Blocking is a technique that makes large-scale record linkage practical. Blocking partitions datasets +into groups, called blocks and only the records in corresponding blocks are compared. This can massively +reduce the total number of comparisons that need to be conducted to find matching records. + +In the *Anonlink Entity Service* blocking is optional, and is carried out by the client e.g., using the +`blocklib `_ library. See the +`blocklib documentation `_ for more information including +tutorials. + + Output Types ------------ From aec9b5c437b0dc13e558a75124932ccc2e5e2f10 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 16 Mar 2020 11:13:27 +1300 Subject: [PATCH 13/27] Deduplicate candidate pairs before solving --- backend/entityservice/tasks/solver.py | 20 +++++++++++++++++++- backend/entityservice/tests/test_utils.py | 12 +++++++++--- backend/entityservice/utils.py | 16 ++++++++++++++++ 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/backend/entityservice/tasks/solver.py b/backend/entityservice/tasks/solver.py index c1bd9352..901c99b2 100644 --- a/backend/entityservice/tasks/solver.py +++ b/backend/entityservice/tasks/solver.py @@ -1,3 +1,5 @@ +from array import array + import anonlink from entityservice.object_store import connect_to_object_store @@ -5,6 +7,7 @@ from entityservice.settings import Config as config from entityservice.tasks.base_task import TracedTask from entityservice.tasks.permutation import save_and_permute +from entityservice.utils import deduplicate_sorted_iter @celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'run_id')) @@ -15,7 +18,22 @@ def solver_task(similarity_scores_filename, project_id, run_id, dataset_sizes, p 'filename': similarity_scores_filename}) score_file = mc.get_object(config.MINIO_BUCKET, similarity_scores_filename) log.debug("Creating python sparse matrix from bytes data") - candidate_pairs = anonlink.serialization.load_candidate_pairs(score_file) + candidate_pairs_with_duplicates = anonlink.serialization.load_candidate_pairs(score_file) + similarity_scores, (dset_is0, dset_is1), (rec_is0, rec_is1) = candidate_pairs_with_duplicates + float_typecode = similarity_scores.typecode + int_typecode = dset_is0.typecode + log.info(f"Number of candidate pairs before deduplication: {len(candidate_pairs_with_duplicates[0])}") + dedup_iter = deduplicate_sorted_iter(zip(similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1)) + # Convert back to typed arrays before solving + similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1 = zip(*dedup_iter) + similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1 = array(float_typecode, similarity_scores), \ + array(int_typecode, dset_is0), \ + array(int_typecode, dset_is1), \ + array(int_typecode, rec_is0), \ + array(int_typecode, rec_is1) + candidate_pairs = similarity_scores, (dset_is0, dset_is1), (rec_is0, rec_is1) + log.info(f"Number of candidate pairs after deduplication: {len(candidate_pairs[0])}") + log.info("Calculating the optimal mapping from similarity matrix") groups = anonlink.solving.greedy_solve(candidate_pairs) diff --git a/backend/entityservice/tests/test_utils.py b/backend/entityservice/tests/test_utils.py index 665f7d1d..acb6bef2 100644 --- a/backend/entityservice/tests/test_utils.py +++ b/backend/entityservice/tests/test_utils.py @@ -3,7 +3,7 @@ import pytest from entityservice.errors import InvalidConfiguration -from entityservice.utils import load_yaml_config +from entityservice.utils import load_yaml_config, deduplicate_sorted_iter from entityservice.tests.util import generate_bytes, temp_file_containing @@ -17,7 +17,7 @@ def test_empty(self): def test_list(self): with temp_file_containing(b'[1,2,3]') as fp: filename = fp.name - assert [1,2,3] == load_yaml_config(filename) + assert [1, 2, 3] == load_yaml_config(filename) def test_missing_file(self): filename = 'unlikely a valid file' @@ -40,7 +40,7 @@ def test_valid_yaml(self): """) self._check_valid_yaml(yamldata) - def _check_valid_yaml(self, yamldata:str): + def _check_valid_yaml(self, yamldata: str): with temp_file_containing(yamldata.encode()) as fp: filename = fp.name loaded = load_yaml_config(filename) @@ -62,3 +62,9 @@ def test_valid_yaml_with_comments(self): loaded = self._check_valid_yaml(yamldata) assert 'host' not in loaded['api']['ingress'] + +def test_deduplicate_sorted_iter(): + assert list(deduplicate_sorted_iter(iter([1, 2, 2, 2, 3]))) == [1, 2, 3] + + res = list(deduplicate_sorted_iter(zip(['a','a','a'], [1, 1, 1], [1,1,2]))) + assert res == [('a', 1, 1), ('a', 1, 2)] \ No newline at end of file diff --git a/backend/entityservice/utils.py b/backend/entityservice/utils.py index e06f22e5..b14b08b6 100644 --- a/backend/entityservice/utils.py +++ b/backend/entityservice/utils.py @@ -189,3 +189,19 @@ def convert_mapping_to_list(permutation): defined by the keys. """ return [permutation[i] for i in range(len(permutation))] + + +def deduplicate_sorted_iter(iterable): + """ + Remove duplicates from a sorted iterable. + + :param iterable: + :return: + """ + previous = next(iterable) + yield previous + + for current in iterable: + if current != previous: + yield current + previous = current From f30c8193b8a976f2582cfce2b030d7f737bfc0a9 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 16 Mar 2020 12:03:12 +1300 Subject: [PATCH 14/27] Catch the empty candidate pair case --- backend/entityservice/tasks/solver.py | 31 +++++++++++++++------------ 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/backend/entityservice/tasks/solver.py b/backend/entityservice/tasks/solver.py index 901c99b2..2fd2a66e 100644 --- a/backend/entityservice/tasks/solver.py +++ b/backend/entityservice/tasks/solver.py @@ -23,20 +23,23 @@ def solver_task(similarity_scores_filename, project_id, run_id, dataset_sizes, p float_typecode = similarity_scores.typecode int_typecode = dset_is0.typecode log.info(f"Number of candidate pairs before deduplication: {len(candidate_pairs_with_duplicates[0])}") - dedup_iter = deduplicate_sorted_iter(zip(similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1)) - # Convert back to typed arrays before solving - similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1 = zip(*dedup_iter) - similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1 = array(float_typecode, similarity_scores), \ - array(int_typecode, dset_is0), \ - array(int_typecode, dset_is1), \ - array(int_typecode, rec_is0), \ - array(int_typecode, rec_is1) - candidate_pairs = similarity_scores, (dset_is0, dset_is1), (rec_is0, rec_is1) - log.info(f"Number of candidate pairs after deduplication: {len(candidate_pairs[0])}") - - log.info("Calculating the optimal mapping from similarity matrix") - - groups = anonlink.solving.greedy_solve(candidate_pairs) + if len(candidate_pairs_with_duplicates[0]) > 0: + dedup_iter = deduplicate_sorted_iter(zip(similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1)) + # Convert back to typed arrays before solving + similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1 = zip(*dedup_iter) + similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1 = array(float_typecode, similarity_scores), \ + array(int_typecode, dset_is0), \ + array(int_typecode, dset_is1), \ + array(int_typecode, rec_is0), \ + array(int_typecode, rec_is1) + candidate_pairs = similarity_scores, (dset_is0, dset_is1), (rec_is0, rec_is1) + log.info(f"Number of candidate pairs after deduplication: {len(candidate_pairs[0])}") + + log.info("Calculating the optimal mapping from similarity matrix") + + groups = anonlink.solving.greedy_solve(candidate_pairs) + else: + groups = [] log.info("Entity groups have been computed") From 9dc59e19e34f2e81e3391a25102c4e2a4f85edbd Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 17 Mar 2020 10:24:09 +1300 Subject: [PATCH 15/27] Simplify solver task by using anonlink's _merge_similarities function --- backend/entityservice/tasks/solver.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/backend/entityservice/tasks/solver.py b/backend/entityservice/tasks/solver.py index 2fd2a66e..600a51b3 100644 --- a/backend/entityservice/tasks/solver.py +++ b/backend/entityservice/tasks/solver.py @@ -1,13 +1,11 @@ -from array import array - import anonlink +from anonlink.candidate_generation import _merge_similarities from entityservice.object_store import connect_to_object_store from entityservice.async_worker import celery, logger from entityservice.settings import Config as config from entityservice.tasks.base_task import TracedTask from entityservice.tasks.permutation import save_and_permute -from entityservice.utils import deduplicate_sorted_iter @celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'run_id')) @@ -20,23 +18,15 @@ def solver_task(similarity_scores_filename, project_id, run_id, dataset_sizes, p log.debug("Creating python sparse matrix from bytes data") candidate_pairs_with_duplicates = anonlink.serialization.load_candidate_pairs(score_file) similarity_scores, (dset_is0, dset_is1), (rec_is0, rec_is1) = candidate_pairs_with_duplicates - float_typecode = similarity_scores.typecode - int_typecode = dset_is0.typecode + log.info(f"Number of candidate pairs before deduplication: {len(candidate_pairs_with_duplicates[0])}") if len(candidate_pairs_with_duplicates[0]) > 0: - dedup_iter = deduplicate_sorted_iter(zip(similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1)) - # Convert back to typed arrays before solving - similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1 = zip(*dedup_iter) - similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1 = array(float_typecode, similarity_scores), \ - array(int_typecode, dset_is0), \ - array(int_typecode, dset_is1), \ - array(int_typecode, rec_is0), \ - array(int_typecode, rec_is1) - candidate_pairs = similarity_scores, (dset_is0, dset_is1), (rec_is0, rec_is1) + # TODO use public interface when available + # https://github.com/data61/anonlink/issues/271 + candidate_pairs = _merge_similarities([zip(similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1)], k=None) log.info(f"Number of candidate pairs after deduplication: {len(candidate_pairs[0])}") log.info("Calculating the optimal mapping from similarity matrix") - groups = anonlink.solving.greedy_solve(candidate_pairs) else: groups = [] From 6219e44bea4e2e94399e111af4db42a0e71f30a9 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 17 Mar 2020 10:24:25 +1300 Subject: [PATCH 16/27] Update celery --- base/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/base/requirements.txt b/base/requirements.txt index cc848a39..390ddaee 100644 --- a/base/requirements.txt +++ b/base/requirements.txt @@ -1,6 +1,6 @@ anonlink==0.12.5 bitmath==1.3.1.2 -celery==4.4.0 +celery==4.4.1 clkhash==0.15.1 colorama==0.4.3 # required for structlog connexion==2.6.0 From 0b6a4c2596ba538f5be1651561c4bdd7e5a9e338 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 18 Mar 2020 14:55:50 +1300 Subject: [PATCH 17/27] Address code review feedback --- backend/entityservice/database/selections.py | 6 +++--- backend/entityservice/tasks/comparing.py | 21 ++++++-------------- backend/entityservice/tests/test_utils.py | 7 +------ backend/entityservice/utils.py | 16 --------------- 4 files changed, 10 insertions(+), 40 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 9f9ee68f..7de3919b 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -275,7 +275,7 @@ def get_uploads_columns(db, dp_id, columns): return [result[column] for column in columns] -def get_encodingblock_ids(db, dp_id, block_name=None, offset=0, limit=None): +def get_encodingblock_ids(db, dp_id, block_id=None, offset=0, limit=None): """Yield all encoding ids in either a single block, or all blocks for a given data provider.""" sql_query = """ SELECT encoding_id @@ -286,11 +286,11 @@ def get_encodingblock_ids(db, dp_id, block_name=None, offset=0, limit=None): encoding_ID ASC OFFSET %(offset)s LIMIT %(limit)s - """.format("AND block_id = %(block_id)s" if block_name else "") + """.format("AND block_id = %(block_id)s" if block_id else "") # Specifying a name for the cursor creates a server-side cursor, which prevents all of the # records from being downloaded at once. cur = db.cursor(f'encodingblockfetcher-{dp_id}') - args = {'dp_id': dp_id, 'block_id': block_name, 'offset': offset, 'limit': limit} + args = {'dp_id': dp_id, 'block_id': block_id, 'offset': offset, 'limit': limit} cur.execute(sql_query, args) yield from iterate_cursor_results(cur) diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 4b7914cd..905f70a8 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -64,11 +64,6 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): log.info("Finding blocks in common between dataproviders") common_blocks = _get_common_blocks(dp_block_sizes, dp_ids) - if len(dataset_sizes) < 2: - log.warning("Unexpected number of dataset sizes in db. Stopping") - update_run_mark_failure(conn, run_id) - return - # We pass the encoding_size and threshold to the comparison tasks to minimize their db lookups encoding_size = get_project_encoding_size(conn, project_id) threshold = get_run(conn, run_id)['threshold'] @@ -77,13 +72,8 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): # Create "chunks" of comparisons chunks = _create_work_chunks(common_blocks, dp_block_sizes, dp_ids, log) - log.info(f"Computing similarity for " - f"{' x '.join(map(str, dataset_sizes))} entities") - current_span.log_kv({"event": 'get-dataset-sizes', 'sizes': dataset_sizes}) - - log.info(f"Chunking into {len(chunks)} computation tasks") - current_span.log_kv({"event": "chunking", 'num_chunks': len(chunks)}) + current_span.log_kv({"event": "chunking", 'num_chunks': len(chunks), 'dataset-sizes': dataset_sizes}) span_serialized = create_comparison_jobs.get_serialized_span() # Prepare the Celery Chord that will compute all the similarity scores: @@ -101,6 +91,7 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): callback_task = aggregate_comparisons.s(project_id=project_id, run_id=run_id, parent_span=span_serialized).on_error( run_failed_handler.s(run_id=run_id)) + log.info(f"Scheduling comparison tasks") future = chord(scoring_tasks)(callback_task) @@ -276,12 +267,12 @@ def new_child_span(name): threshold=threshold, k=min(chunk_dp1_size, chunk_dp2_size)) - # Map results from "index in chunk" to encoding id. - def offset(recordarray, encoding_id_list): + def reindex_using_encoding_ids(recordarray, encoding_id_list): + # Map results from "index in chunk" to encoding id. return array.array('I', [encoding_id_list[i] for i in recordarray]) - rec_is0 = offset(rec_is0, entity_ids_dp1) - rec_is1 = offset(rec_is1, entity_ids_dp2) + rec_is0 = reindex_using_encoding_ids(rec_is0, entity_ids_dp1) + rec_is1 = reindex_using_encoding_ids(rec_is1, entity_ids_dp2) except NotImplementedError as e: log.warning("Encodings couldn't be compared using anonlink.") diff --git a/backend/entityservice/tests/test_utils.py b/backend/entityservice/tests/test_utils.py index acb6bef2..b927a8b3 100644 --- a/backend/entityservice/tests/test_utils.py +++ b/backend/entityservice/tests/test_utils.py @@ -3,7 +3,7 @@ import pytest from entityservice.errors import InvalidConfiguration -from entityservice.utils import load_yaml_config, deduplicate_sorted_iter +from entityservice.utils import load_yaml_config from entityservice.tests.util import generate_bytes, temp_file_containing @@ -63,8 +63,3 @@ def test_valid_yaml_with_comments(self): assert 'host' not in loaded['api']['ingress'] -def test_deduplicate_sorted_iter(): - assert list(deduplicate_sorted_iter(iter([1, 2, 2, 2, 3]))) == [1, 2, 3] - - res = list(deduplicate_sorted_iter(zip(['a','a','a'], [1, 1, 1], [1,1,2]))) - assert res == [('a', 1, 1), ('a', 1, 2)] \ No newline at end of file diff --git a/backend/entityservice/utils.py b/backend/entityservice/utils.py index b14b08b6..e06f22e5 100644 --- a/backend/entityservice/utils.py +++ b/backend/entityservice/utils.py @@ -189,19 +189,3 @@ def convert_mapping_to_list(permutation): defined by the keys. """ return [permutation[i] for i in range(len(permutation))] - - -def deduplicate_sorted_iter(iterable): - """ - Remove duplicates from a sorted iterable. - - :param iterable: - :return: - """ - previous = next(iterable) - yield previous - - for current in iterable: - if current != previous: - yield current - previous = current From 5467362eac837c06529e06ba393cfcb33ea25f83 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 18 Mar 2020 16:50:51 +1300 Subject: [PATCH 18/27] Bump version to beta2 --- backend/entityservice/VERSION | 2 +- deployment/entity-service/Chart.yaml | 2 +- deployment/entity-service/values.yaml | 14 +++++--------- docs/changelog.rst | 6 ++++++ docs/conf.py | 2 +- frontend/VERSION | 2 +- 6 files changed, 15 insertions(+), 13 deletions(-) diff --git a/backend/entityservice/VERSION b/backend/entityservice/VERSION index 30817b33..4d518a80 100644 --- a/backend/entityservice/VERSION +++ b/backend/entityservice/VERSION @@ -1 +1 @@ -v1.13.0-beta +v1.13.0-beta2 diff --git a/deployment/entity-service/Chart.yaml b/deployment/entity-service/Chart.yaml index d2c8a242..6871d04d 100644 --- a/deployment/entity-service/Chart.yaml +++ b/deployment/entity-service/Chart.yaml @@ -1,5 +1,5 @@ name: entity-service -appVersion: 1.13.0-beta +appVersion: 1.13.0-beta2 version: 1.13.1 description: Privacy preserving record linkage service sources: diff --git a/deployment/entity-service/values.yaml b/deployment/entity-service/values.yaml index ab2062c3..fec7ef2e 100644 --- a/deployment/entity-service/values.yaml +++ b/deployment/entity-service/values.yaml @@ -38,7 +38,7 @@ api: image: repository: data61/anonlink-nginx - tag: "v1.4.6-beta" + tag: "v1.4.6-beta2" # pullPolicy: Always pullPolicy: IfNotPresent @@ -55,7 +55,7 @@ api: app: image: repository: data61/anonlink-app - tag: "v1.13.0-beta" + tag: "v1.13.0-beta2" # pullPolicy: IfNotPresent pullPolicy: Always @@ -78,7 +78,7 @@ api: ## It cannot be updated! So we have a separate image + tag image: repository: data61/anonlink-app - tag: "v1.13.0-beta" + tag: "v1.13.0-beta2" ## Ref: http://kubernetes.io/docs/user-guide/compute-resources/ resources: @@ -149,7 +149,7 @@ workers: image: repository: "data61/anonlink-app" - tag: "v1.13.0-beta" + tag: "v1.13.0-beta2" pullPolicy: Always ## The initial number of workers for this deployment @@ -204,9 +204,7 @@ workers: ## our tasks are usually quite "long". PREFETCH_MULTIPLIER: "1" ## Maximum number of tasks a pool worker process can execute before it’s replaced with a new one - ## Low number is recommended, otherwise the celery workers may exhaust the available memory and threads. - ## Cf issue https://github.com/data61/anonlink-entity-service/issues/410 - MAX_TASKS_PER_CHILD: "30" + MAX_TASKS_PER_CHILD: "2048" ## Late ack means the task messages will be acknowledged after the task has been executed, not just before. ACKS_LATE: "true" @@ -262,8 +260,6 @@ postgresql: global: - global: - storageClass: "default" postgresql: postgresqlDatabase: postgres diff --git a/docs/changelog.rst b/docs/changelog.rst index c399d70d..a1f42744 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -8,6 +8,12 @@ Next Version ------------ +Version 1.13.0-beta2 +------------------- + +- Encodings are now stored in Postgres database instead of files in an object store. +- Initial support for user supplied blocking implemented. + Version 1.13.0-beta ------------------- diff --git a/docs/conf.py b/docs/conf.py index d718d1bd..b3b61157 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -68,7 +68,7 @@ # The short X.Y version. version = '1.13' # The full version, including alpha/beta/rc tags. -release = '1.13.0-beta' +release = '1.13.0-beta2' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/frontend/VERSION b/frontend/VERSION index 9deb7ce3..87a4e2fc 100644 --- a/frontend/VERSION +++ b/frontend/VERSION @@ -1 +1 @@ -v1.4.6-beta +v1.4.6-beta2 From f342d5ad5aca4098760ec71c98fe28ea2e035825 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Thu, 19 Mar 2020 15:00:01 +1300 Subject: [PATCH 19/27] Celery concurrency defaults --- backend/entityservice/settings.py | 3 ++- tools/docker-compose.yml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/entityservice/settings.py b/backend/entityservice/settings.py index 5c99bf29..46d56289 100644 --- a/backend/entityservice/settings.py +++ b/backend/entityservice/settings.py @@ -73,7 +73,8 @@ class Config(object): CELERYD_PREFETCH_MULTIPLIER = int(os.getenv('CELERYD_PREFETCH_MULTIPLIER', '1')) CELERYD_MAX_TASKS_PER_CHILD = int(os.getenv('CELERYD_MAX_TASKS_PER_CHILD', '4')) - CELERYD_CONCURRENCY = int(os.getenv("CELERYD_CONCURRENCY", '0')) + # number of concurrent worker processes/threads, executing tasks + CELERYD_CONCURRENCY = int(os.getenv("CELERYD_CONCURRENCY", '2')) CELERY_ACKS_LATE = os.getenv('CELERY_ACKS_LATE', 'false') == 'true' # Number of comparisons per chunk (on average). diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index 639c9172..3b9ed5a4 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -70,7 +70,7 @@ services: - MINIO_SECRET_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY - CELERY_ACKS_LATE=true - REDIS_USE_SENTINEL=false - - CELERYD_MAX_TASKS_PER_CHILD=30 + - CELERYD_MAX_TASKS_PER_CHILD=2048 #- CHUNK_SIZE_AIM=300_000_000 - CELERY_DB_MIN_CONNECTIONS=1 - CELERY_DB_MAX_CONNECTIONS=3 From 2add5ef4acfee4f253996c52b8ed69d8d4d5eac1 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Thu, 19 Mar 2020 15:00:45 +1300 Subject: [PATCH 20/27] Add another layer of tracing into the comparison task --- backend/entityservice/tasks/base_task.py | 3 +- backend/entityservice/tasks/comparing.py | 47 +++++++++++++----------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/backend/entityservice/tasks/base_task.py b/backend/entityservice/tasks/base_task.py index 7c1a7493..ce0c06e4 100644 --- a/backend/entityservice/tasks/base_task.py +++ b/backend/entityservice/tasks/base_task.py @@ -112,7 +112,8 @@ def run_failed_handler(*args, **kwargs): :param kwargs: Keyword arguments to the task e.g. {'run_id': '...', } """ task_id = args[0] - logger.bind(run_id=kwargs['run_id']) + if 'run_id' in kwargs: + logger.bind(run_id=kwargs['run_id']) logger.info("An error occurred while processing task", task_id=task_id) with DBConn() as db: diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 905f70a8..fe463519 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -236,8 +236,10 @@ def compute_filter_similarity(chunk_info, project_id, run_id, threshold, encodin log = logger.bind(pid=project_id, run_id=run_id) task_span = compute_filter_similarity.span - def new_child_span(name): - return compute_filter_similarity.tracer.start_active_span(name, child_of=compute_filter_similarity.span) + def new_child_span(name, parent_scope=None): + if parent_scope is None: + parent_scope = compute_filter_similarity + return compute_filter_similarity.tracer.start_active_span(name, child_of=parent_scope.span) log.debug("Computing similarity for a chunk of filters") log.debug("Checking that the resource exists (in case of run being canceled/deleted)") @@ -246,27 +248,34 @@ def new_child_span(name): chunk_info_dp1, chunk_info_dp2 = chunk_info with DBConn() as conn: - with new_child_span('fetching-left-encodings'): - log.debug("Fetching and deserializing chunk of filters for dataprovider 1") - chunk_with_ids_dp1, chunk_dp1_size = get_encoding_chunk(conn, chunk_info_dp1, encoding_size) - entity_ids_dp1, chunk_dp1 = zip(*chunk_with_ids_dp1) + with new_child_span('fetching-encodings') as parent_scope: + with new_child_span('fetching-left-encodings', parent_scope): + log.debug("Fetching and deserializing chunk of filters for dataprovider 1") + chunk_with_ids_dp1, chunk_dp1_size = get_encoding_chunk(conn, chunk_info_dp1, encoding_size) + entity_ids_dp1, chunk_dp1 = zip(*chunk_with_ids_dp1) - with new_child_span('fetching-right-encodings'): - log.debug("Fetching and deserializing chunk of filters for dataprovider 2") - chunk_with_ids_dp2, chunk_dp2_size = get_encoding_chunk(conn, chunk_info_dp2, encoding_size) - entity_ids_dp2, chunk_dp2 = zip(*chunk_with_ids_dp2) + with new_child_span('fetching-right-encodings', parent_scope): + log.debug("Fetching and deserializing chunk of filters for dataprovider 2") + chunk_with_ids_dp2, chunk_dp2_size = get_encoding_chunk(conn, chunk_info_dp2, encoding_size) + entity_ids_dp2, chunk_dp2 = zip(*chunk_with_ids_dp2) log.debug('Both chunks are fetched and deserialized') task_span.log_kv({'size1': chunk_dp1_size, 'size2': chunk_dp2_size, 'chunk_info': chunk_info}) - with new_child_span('comparing-encodings'): - log.debug("Calculating filter similarity") - try: - sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated( - datasets=(chunk_dp1, chunk_dp2), - threshold=threshold, - k=min(chunk_dp1_size, chunk_dp2_size)) + with new_child_span('comparing-encodings') as parent_scope: + log.debug("Calculating filter similarity") + with new_child_span('dice-call', parent_scope): + try: + sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated( + datasets=(chunk_dp1, chunk_dp2), + threshold=threshold, + k=min(chunk_dp1_size, chunk_dp2_size)) + except NotImplementedError as e: + log.warning("Encodings couldn't be compared using anonlink.") + return + + with new_child_span('reindex-call', parent_scope): def reindex_using_encoding_ids(recordarray, encoding_id_list): # Map results from "index in chunk" to encoding id. return array.array('I', [encoding_id_list[i] for i in recordarray]) @@ -274,10 +283,6 @@ def reindex_using_encoding_ids(recordarray, encoding_id_list): rec_is0 = reindex_using_encoding_ids(rec_is0, entity_ids_dp1) rec_is1 = reindex_using_encoding_ids(rec_is1, entity_ids_dp2) - except NotImplementedError as e: - log.warning("Encodings couldn't be compared using anonlink.") - return - log.debug('Encoding similarities calculated') with new_child_span('update-comparison-progress'): From e2ebe990afcc560d6c9fcad5505c67d783313703 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Fri, 20 Mar 2020 11:29:02 +1300 Subject: [PATCH 21/27] Update task names in celery routing --- backend/entityservice/settings.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/backend/entityservice/settings.py b/backend/entityservice/settings.py index 46d56289..c2a55a5c 100644 --- a/backend/entityservice/settings.py +++ b/backend/entityservice/settings.py @@ -63,12 +63,12 @@ class Config(object): } CELERY_ROUTES = { - 'async_worker.calculate_mapping': {'queue': 'celery'}, - 'async_worker.compute_similarity': {'queue': 'compute'}, - 'async_worker.aggregate_filter_chunks': {'queue': 'highmemory'}, - 'async_worker.solver_task': {'queue': 'highmemory'}, - 'async_worker.save_and_permute': {'queue': 'highmemory'}, - 'async_worker.handle_raw_upload': {'queue': 'celery'} + 'entityservice.tasks.comparing.create_comparison_jobs': {'queue': 'celery'}, + 'entityservice.tasks.comparing.compute_filter_similarity': {'queue': 'compute'}, + 'entityservice.tasks.comparing.aggregate_comparisons': {'queue': 'highmemory'}, + 'entityservice.tasks.solver.solver_task': {'queue': 'highmemory'}, + 'entityservice.tasks.permutation.save_and_permute': {'queue': 'highmemory'}, + 'entityservice.tasks.encoding_uploading.handle_raw_upload': {'queue': 'celery'} } CELERYD_PREFETCH_MULTIPLIER = int(os.getenv('CELERYD_PREFETCH_MULTIPLIER', '1')) From 38b624fc3f6be8ceaee4f666a0c42dd5dda414ba Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 23 Mar 2020 10:11:48 +1300 Subject: [PATCH 22/27] Faster encoding retrieval by using COPY. --- backend/entityservice/database/selections.py | 53 ++++++++++++++----- backend/entityservice/encoding_storage.py | 6 +-- .../dbtests/test_insertions.py | 18 ++++--- 3 files changed, 55 insertions(+), 22 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 7de3919b..e652fec0 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -1,3 +1,4 @@ +import io import itertools from entityservice.database.util import query_db, logger @@ -323,23 +324,49 @@ def iterate_cursor_results(cur, one=True, page_size=4096): yield row -def get_chunk_of_encodings(db, dp_id, encoding_ids): - """Yield a chunk of encodings for a data provider given the encoding ids. +def get_chunk_of_encodings(db, dp_id, encoding_ids, stored_binary_size=132): + """Yields raw byte encodings for a data provider given the encoding ids. + :param dp_id: Fetch encodings from this dataprovider (encoding ids are not unique across data providers). + :param encoding_ids: List of ints of the encoding ids to include. + :param stored_binary_size: Size of each encoding stored in the database. Including encoding ids. """ - sql_query = """ - SELECT encoding - FROM encodings - WHERE encodings.dp = %(dp_id)s - AND encodings.encoding_id in ({}) - ORDER BY encoding_id ASC - """.format(','.join(map(str, encoding_ids))) cur = db.cursor() - cur.execute(sql_query, {'dp_id': dp_id}) - # Note encoding is returned as a memoryview - for encoding in iterate_cursor_results(cur, one=True): - yield encoding + + sql_query = """COPY + ( + SELECT encoding + FROM encodings + WHERE encodings.dp = {} + AND encodings.encoding_id in ({}) + ORDER BY encoding_id ASC) + TO STDOUT WITH binary + """.format( + dp_id, + ','.join(map(str, encoding_ids)) + ) + + stream = io.BytesIO() + cur.copy_expert(sql_query, stream) + raw_data = stream.getvalue() + + # Need to read/remove the Postgres Binary Header, Trailer, and the per tuple info + # https://www.postgresql.org/docs/current/sql-copy.html + ignored_header = raw_data[:15] + header_extension = raw_data[16:20] + assert header_extension == b'\x00\x00\x00\x00', "Need to implement skipping postgres binary header extension" + binary_trailer = raw_data[-2:] + assert binary_trailer == b'\xff\xff', "Corrupt COPY of binary data from postgres" + raw_data = raw_data[19:-2] + + size = stored_binary_size + 2 + 4 + for i in range(0, len(raw_data), size): + # Skip the first 6 bytes - tuple field count and field length + start_index = i + 6 + end_index = start_index + stored_binary_size + + yield raw_data[start_index: end_index] def get_filter_metadata(db, dp_id): diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index 16da0951..fecfcb0c 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -53,7 +53,7 @@ def generator(first_i, first_encoding_data, first_blocks): def _grouper(iterable, n, fillvalue=None): - "Collect data into fixed-length chunks or blocks" + "Collect data into fixed-length chunks or blocks from an iterable" # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" args = [iter(iterable)] * n return zip_longest(*args, fillvalue=fillvalue) @@ -106,7 +106,7 @@ def get_encoding_chunk(conn, chunk_info, encoding_size=128): block_id = chunk_info['block_id'] limit = chunk_range_stop - chunk_range_start encoding_ids = get_encodingblock_ids(conn, dataprovider_id, block_id, chunk_range_start, limit) - encoding_data_stream = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids) - chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size) + encoding_iter = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids) + chunk_data = binary_unpack_filters(encoding_iter, encoding_size=encoding_size) return chunk_data, len(chunk_data) diff --git a/backend/entityservice/integrationtests/dbtests/test_insertions.py b/backend/entityservice/integrationtests/dbtests/test_insertions.py index 5092c072..6deb50dc 100644 --- a/backend/entityservice/integrationtests/dbtests/test_insertions.py +++ b/backend/entityservice/integrationtests/dbtests/test_insertions.py @@ -1,13 +1,14 @@ import datetime -import time import psycopg2 from pytest import raises from entityservice.database import insert_dataprovider, insert_encodings_into_blocks, insert_blocking_metadata, \ get_project, get_encodingblock_ids, get_block_metadata, get_chunk_of_encodings + from entityservice.integrationtests.dbtests import _get_conn_and_cursor from entityservice.models import Project +from entityservice.serialization import binary_format from entityservice.tests.util import generate_bytes from entityservice.utils import generate_code @@ -59,12 +60,17 @@ def test_insert_dp_no_project_fails(self): insert_dataprovider(cur, auth_token=dp_auth, project_id=project_id) def test_insert_many_clks(self): - data = [generate_bytes(128) for _ in range(100)] - project_id, project_auth_token, dp_id, dp_auth_token = self._create_project_and_dp() - conn, cur = _get_conn_and_cursor() num_entities = 10_000 + encoding_size = 2048 # non default encoding size + binary_formatter = binary_format(encoding_size) + + raw_data = [generate_bytes(encoding_size) for i in range(100)] + encodings = [binary_formatter.pack(i, raw_data[i % 100]) for i in range(num_entities)] blocks = [['1'] for _ in range(num_entities)] - encodings = [data[i % 100] for i in range(num_entities)] + + project_id, project_auth_token, dp_id, dp_auth_token = self._create_project_and_dp() + conn, cur = _get_conn_and_cursor() + insert_encodings_into_blocks(conn, dp_id, block_ids=blocks, encoding_ids=list(range(num_entities)), @@ -78,7 +84,7 @@ def test_insert_many_clks(self): for stored_encoding_id, original_id in zip(stored_encoding_ids, range(num_entities)): assert stored_encoding_id == original_id - stored_encodings = list(get_chunk_of_encodings(conn, dp_id, stored_encoding_ids)) + stored_encodings = list(get_chunk_of_encodings(conn, dp_id, stored_encoding_ids, stored_binary_size=(encoding_size+4))) assert len(stored_encodings) == num_entities for stored_encoding, original_encoding in zip(stored_encodings, encodings): From 7ec7feffdd13ad33e571f7c291872c49dcb4393c Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 23 Mar 2020 10:59:29 +1300 Subject: [PATCH 23/27] Pass on stored size when retrieving encodings from DB --- backend/entityservice/encoding_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index fecfcb0c..7b4a5e91 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -106,7 +106,7 @@ def get_encoding_chunk(conn, chunk_info, encoding_size=128): block_id = chunk_info['block_id'] limit = chunk_range_stop - chunk_range_start encoding_ids = get_encodingblock_ids(conn, dataprovider_id, block_id, chunk_range_start, limit) - encoding_iter = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids) + encoding_iter = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids, stored_binary_size=(encoding_size+4)) chunk_data = binary_unpack_filters(encoding_iter, encoding_size=encoding_size) return chunk_data, len(chunk_data) From 24caa797b7d954400f9742a32ebd00ac40a427de Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 23 Mar 2020 11:08:57 +1300 Subject: [PATCH 24/27] Increase time on test --- backend/entityservice/tests/test_project_uploads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/entityservice/tests/test_project_uploads.py b/backend/entityservice/tests/test_project_uploads.py index 99a45acb..775c855a 100644 --- a/backend/entityservice/tests/test_project_uploads.py +++ b/backend/entityservice/tests/test_project_uploads.py @@ -185,7 +185,7 @@ def test_project_json_data_upload_with_too_large_encoded_size( max_rep = 10 while not project_description['error'] and rep < max_rep: rep += 1 - time.sleep(1) + time.sleep(2) project_description = requests.get( url + '/projects/{}'.format(new_project_data['project_id']), headers={'Authorization': new_project_data['result_token']} From dc1983b11fbbd59776787f4cfa2533dabfa43d71 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 24 Mar 2020 11:48:45 +1300 Subject: [PATCH 25/27] Refactor binary copy into own function for easier reuse and testing --- backend/entityservice/database/selections.py | 59 +++++++++++--------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index e652fec0..302e168b 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -324,6 +324,37 @@ def iterate_cursor_results(cur, one=True, page_size=4096): yield row +def copy_binary_column_from_select_query(cur, select_query, stored_binary_size=132): + """Yields raw bytes from postgres given a query returning a column containing fixed size bytea data. + + :param select_query: An sql query that select's a single binary column. Include ordering the results. + :param stored_binary_size: Fixed size of each bytea data. + """ + + copy_to_stream_query = """COPY ({}) TO STDOUT WITH binary""".format(select_query) + stream = io.BytesIO() + cur.copy_expert(copy_to_stream_query, stream) + # TODO: It may be more efficient to introduce a buffered binary stream instead of getting the entire stream at once + raw_data = stream.getvalue() + + # Need to read/remove the Postgres Binary Header, Trailer, and the per tuple info + # https://www.postgresql.org/docs/current/sql-copy.html + _ignored_header = raw_data[:15] + header_extension = raw_data[16:20] + assert header_extension == b'\x00\x00\x00\x00', "Need to implement skipping postgres binary header extension" + binary_trailer = raw_data[-2:] + assert binary_trailer == b'\xff\xff', "Corrupt COPY of binary data from postgres" + raw_data = raw_data[19:-2] + + # The first 6 bytes of each row contains: tuple field count and field length + per_row_header_size = 6 + size = stored_binary_size + per_row_header_size + for i in range(0, len(raw_data), size): + start_index = i + per_row_header_size + end_index = start_index + stored_binary_size + yield raw_data[start_index: end_index] + + def get_chunk_of_encodings(db, dp_id, encoding_ids, stored_binary_size=132): """Yields raw byte encodings for a data provider given the encoding ids. @@ -334,39 +365,17 @@ def get_chunk_of_encodings(db, dp_id, encoding_ids, stored_binary_size=132): cur = db.cursor() - sql_query = """COPY - ( + sql_query = """ SELECT encoding FROM encodings WHERE encodings.dp = {} AND encodings.encoding_id in ({}) - ORDER BY encoding_id ASC) - TO STDOUT WITH binary + ORDER BY encoding_id ASC """.format( dp_id, ','.join(map(str, encoding_ids)) ) - - stream = io.BytesIO() - cur.copy_expert(sql_query, stream) - raw_data = stream.getvalue() - - # Need to read/remove the Postgres Binary Header, Trailer, and the per tuple info - # https://www.postgresql.org/docs/current/sql-copy.html - ignored_header = raw_data[:15] - header_extension = raw_data[16:20] - assert header_extension == b'\x00\x00\x00\x00', "Need to implement skipping postgres binary header extension" - binary_trailer = raw_data[-2:] - assert binary_trailer == b'\xff\xff', "Corrupt COPY of binary data from postgres" - raw_data = raw_data[19:-2] - - size = stored_binary_size + 2 + 4 - for i in range(0, len(raw_data), size): - # Skip the first 6 bytes - tuple field count and field length - start_index = i + 6 - end_index = start_index + stored_binary_size - - yield raw_data[start_index: end_index] + yield from copy_binary_column_from_select_query(cur, sql_query, stored_binary_size=stored_binary_size) def get_filter_metadata(db, dp_id): From 8bae410912638602bae59947fe0e857fb38c88bb Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 24 Mar 2020 15:28:30 +1300 Subject: [PATCH 26/27] Add more detailed tracing around binary encoding insertions. --- backend/entityservice/database/insertions.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/backend/entityservice/database/insertions.py b/backend/entityservice/database/insertions.py index cb91cad8..4cd917f6 100644 --- a/backend/entityservice/database/insertions.py +++ b/backend/entityservice/database/insertions.py @@ -1,5 +1,6 @@ from typing import List +import opentracing import psycopg2 import psycopg2.extras @@ -95,13 +96,15 @@ def insert_encodings_into_blocks(db, dp_id: int, block_ids: List[List[str]], enc def block_data_generator(encoding_ids, block_ids): for eid, block_ids in zip(encoding_ids, block_ids): for block_id in block_ids: - yield (dp_id, eid, block_id) + yield dp_id, eid, block_id with db.cursor() as cur: - psycopg2.extras.execute_values(cur, encodings_insertion_query, encoding_data, page_size=page_size) - psycopg2.extras.execute_values(cur, + with opentracing.tracer.start_span('insert-encodings-to-db'): + psycopg2.extras.execute_values(cur, encodings_insertion_query, encoding_data, page_size=page_size) + with opentracing.tracer.start_span('insert-encodingblocks-to-db'): + psycopg2.extras.execute_values(cur, blocks_insertion_query, - block_data_generator(encoding_ids, block_ids), + list(block_data_generator(encoding_ids, block_ids)), page_size=page_size) From 88e968d2c2a0afbe1a05cd9e1600cddb2d150255 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 24 Mar 2020 20:08:07 +1300 Subject: [PATCH 27/27] Add tests for binary copy function --- backend/entityservice/database/selections.py | 5 +- .../integrationtests/dbtests/conftest.py | 57 +++++++++++++++++++ .../dbtests/test_insertions.py | 28 +++++++-- 3 files changed, 82 insertions(+), 8 deletions(-) create mode 100644 backend/entityservice/integrationtests/dbtests/conftest.py diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 302e168b..4ce1d35c 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -329,18 +329,19 @@ def copy_binary_column_from_select_query(cur, select_query, stored_binary_size=1 :param select_query: An sql query that select's a single binary column. Include ordering the results. :param stored_binary_size: Fixed size of each bytea data. + :raises AssertionError if the database implements an unhandled extension or the EOF is corrupt. """ copy_to_stream_query = """COPY ({}) TO STDOUT WITH binary""".format(select_query) stream = io.BytesIO() cur.copy_expert(copy_to_stream_query, stream) - # TODO: It may be more efficient to introduce a buffered binary stream instead of getting the entire stream at once + raw_data = stream.getvalue() # Need to read/remove the Postgres Binary Header, Trailer, and the per tuple info # https://www.postgresql.org/docs/current/sql-copy.html _ignored_header = raw_data[:15] - header_extension = raw_data[16:20] + header_extension = raw_data[15:19] assert header_extension == b'\x00\x00\x00\x00', "Need to implement skipping postgres binary header extension" binary_trailer = raw_data[-2:] assert binary_trailer == b'\xff\xff', "Corrupt COPY of binary data from postgres" diff --git a/backend/entityservice/integrationtests/dbtests/conftest.py b/backend/entityservice/integrationtests/dbtests/conftest.py new file mode 100644 index 00000000..fa6c8597 --- /dev/null +++ b/backend/entityservice/integrationtests/dbtests/conftest.py @@ -0,0 +1,57 @@ +import pytest +import psycopg2 + +from entityservice.settings import Config as config + + +@pytest.fixture +def conn(): + db = config.DATABASE + host = config.DATABASE_SERVER + user = config.DATABASE_USER + password = config.DATABASE_PASSWORD + conn = psycopg2.connect(host=host, dbname=db, user=user, password=password) + yield conn + conn.close() + +@pytest.fixture +def cur(conn): + return conn.cursor() + + + + +@pytest.fixture() +def prepopulated_binary_test_data(conn, cur, num_bytes=4, num_rows=100): + creation_sql = """ + DROP TABLE IF EXISTS binary_test; + CREATE TABLE binary_test + ( + id integer not null, + encoding bytea not null + );""" + cur.execute(creation_sql) + conn.commit() + + # Add data using execute_values + data = [(i, bytes([i % 128] * num_bytes)) for i in range(num_rows)] + psycopg2.extras.execute_values(cur, """ + INSERT INTO binary_test (id, encoding) VALUES %s + """, data) + + conn.commit() + + # quick check data is there + cur.execute("select count(*) from binary_test") + res = cur.fetchone()[0] + assert res == num_rows + + cur.execute("select encoding from binary_test where id = 1") + assert bytes(cur.fetchone()[0]) == data[1][1] + + yield data + + # delete test table + deletion_sql = "drop table if exists binary_test cascade;" + cur.execute(deletion_sql) + conn.commit() diff --git a/backend/entityservice/integrationtests/dbtests/test_insertions.py b/backend/entityservice/integrationtests/dbtests/test_insertions.py index 6deb50dc..19017119 100644 --- a/backend/entityservice/integrationtests/dbtests/test_insertions.py +++ b/backend/entityservice/integrationtests/dbtests/test_insertions.py @@ -4,7 +4,7 @@ from pytest import raises from entityservice.database import insert_dataprovider, insert_encodings_into_blocks, insert_blocking_metadata, \ - get_project, get_encodingblock_ids, get_block_metadata, get_chunk_of_encodings + get_project, get_encodingblock_ids, get_block_metadata, get_chunk_of_encodings, copy_binary_column_from_select_query from entityservice.integrationtests.dbtests import _get_conn_and_cursor from entityservice.models import Project @@ -13,8 +13,29 @@ from entityservice.utils import generate_code +class TestBinaryCopy: + + def test_copy_binary_column_from_select_query(self, conn, cur, prepopulated_binary_test_data): + query = "select encoding from binary_test where id >= 10 and id < 20" + res = list(copy_binary_column_from_select_query(cur, query, stored_binary_size=4)) + assert len(res) == 10 + for (i, original), stored in zip(prepopulated_binary_test_data[10:20], res): + assert original == stored + + def test_copy_binary_column_from_select_query_empty(self, conn, cur, prepopulated_binary_test_data): + query = "select encoding from binary_test where id < 0" + res = list(copy_binary_column_from_select_query(cur, query, stored_binary_size=4)) + assert len(res) == 0 + + class TestInsertions: + def _create_project(self): + project = Project('groups', {}, name='', notes='', parties=2, uses_blocking=False) + conn, cur = _get_conn_and_cursor() + dp_ids = project.save(conn) + return project, dp_ids + def _create_project_and_dp(self): project, dp_ids = self._create_project() dp_id = dp_ids[0] @@ -28,11 +49,6 @@ def _create_project_and_dp(self): assert len(dp_auth_token) == 48 return project.project_id, project.result_token, dp_id, dp_auth_token - def _create_project(self): - project = Project('groups', {}, name='', notes='', parties=2, uses_blocking=False) - conn, cur = _get_conn_and_cursor() - dp_ids = project.save(conn) - return project, dp_ids def test_insert_project(self): before = datetime.datetime.now()