diff --git a/backend/entityservice/VERSION b/backend/entityservice/VERSION index 30817b33..4d518a80 100644 --- a/backend/entityservice/VERSION +++ b/backend/entityservice/VERSION @@ -1 +1 @@ -v1.13.0-beta +v1.13.0-beta2 diff --git a/backend/entityservice/database/insertions.py b/backend/entityservice/database/insertions.py index cb91cad8..4cd917f6 100644 --- a/backend/entityservice/database/insertions.py +++ b/backend/entityservice/database/insertions.py @@ -1,5 +1,6 @@ from typing import List +import opentracing import psycopg2 import psycopg2.extras @@ -95,13 +96,15 @@ def insert_encodings_into_blocks(db, dp_id: int, block_ids: List[List[str]], enc def block_data_generator(encoding_ids, block_ids): for eid, block_ids in zip(encoding_ids, block_ids): for block_id in block_ids: - yield (dp_id, eid, block_id) + yield dp_id, eid, block_id with db.cursor() as cur: - psycopg2.extras.execute_values(cur, encodings_insertion_query, encoding_data, page_size=page_size) - psycopg2.extras.execute_values(cur, + with opentracing.tracer.start_span('insert-encodings-to-db'): + psycopg2.extras.execute_values(cur, encodings_insertion_query, encoding_data, page_size=page_size) + with opentracing.tracer.start_span('insert-encodingblocks-to-db'): + psycopg2.extras.execute_values(cur, blocks_insertion_query, - block_data_generator(encoding_ids, block_ids), + list(block_data_generator(encoding_ids, block_ids)), page_size=page_size) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 2a801e1a..4ce1d35c 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -1,3 +1,4 @@ +import io import itertools from entityservice.database.util import query_db, logger @@ -275,51 +276,107 @@ def get_uploads_columns(db, dp_id, columns): return [result[column] for column in columns] -def get_encodingblock_ids(db, dp_id, block_name=None): +def get_encodingblock_ids(db, dp_id, block_id=None, offset=0, limit=None): """Yield all encoding ids in either a single block, or all blocks for a given data provider.""" sql_query = """ SELECT encoding_id FROM encodingblocks - WHERE dp = %s + WHERE dp = %(dp_id)s {} ORDER BY - encoding_ID ASC - """.format("AND block_id = %s" if block_name else "") + encoding_ID ASC + OFFSET %(offset)s + LIMIT %(limit)s + """.format("AND block_id = %(block_id)s" if block_id else "") # Specifying a name for the cursor creates a server-side cursor, which prevents all of the # records from being downloaded at once. cur = db.cursor(f'encodingblockfetcher-{dp_id}') + args = {'dp_id': dp_id, 'block_id': block_id, 'offset': offset, 'limit': limit} + cur.execute(sql_query, args) + yield from iterate_cursor_results(cur) - args = (dp_id, block_name) if block_name else (dp_id,) +def get_block_metadata(db, dp_id): + """Yield block id and counts for a given data provider.""" + sql_query = """ + SELECT block_name, count + FROM blocks + WHERE dp = %s + """ + # Specifying a name for the cursor creates a server-side cursor, which prevents all of the + # records from being downloaded at once. + cur = db.cursor(f'blockfetcher-{dp_id}') + args = (dp_id,) cur.execute(sql_query, args) + for block_name, count in iterate_cursor_results(cur, one=False): + yield block_name.strip(), count + + +def iterate_cursor_results(cur, one=True, page_size=4096): while True: - rows = cur.fetchmany(10_000) + rows = cur.fetchmany(page_size) if not rows: break for row in rows: - yield row[0] + if one: + yield row[0] + else: + yield row + + +def copy_binary_column_from_select_query(cur, select_query, stored_binary_size=132): + """Yields raw bytes from postgres given a query returning a column containing fixed size bytea data. + + :param select_query: An sql query that select's a single binary column. Include ordering the results. + :param stored_binary_size: Fixed size of each bytea data. + :raises AssertionError if the database implements an unhandled extension or the EOF is corrupt. + """ + + copy_to_stream_query = """COPY ({}) TO STDOUT WITH binary""".format(select_query) + stream = io.BytesIO() + cur.copy_expert(copy_to_stream_query, stream) + raw_data = stream.getvalue() + + # Need to read/remove the Postgres Binary Header, Trailer, and the per tuple info + # https://www.postgresql.org/docs/current/sql-copy.html + _ignored_header = raw_data[:15] + header_extension = raw_data[15:19] + assert header_extension == b'\x00\x00\x00\x00', "Need to implement skipping postgres binary header extension" + binary_trailer = raw_data[-2:] + assert binary_trailer == b'\xff\xff', "Corrupt COPY of binary data from postgres" + raw_data = raw_data[19:-2] + + # The first 6 bytes of each row contains: tuple field count and field length + per_row_header_size = 6 + size = stored_binary_size + per_row_header_size + for i in range(0, len(raw_data), size): + start_index = i + per_row_header_size + end_index = start_index + stored_binary_size + yield raw_data[start_index: end_index] + + +def get_chunk_of_encodings(db, dp_id, encoding_ids, stored_binary_size=132): + """Yields raw byte encodings for a data provider given the encoding ids. + + :param dp_id: Fetch encodings from this dataprovider (encoding ids are not unique across data providers). + :param encoding_ids: List of ints of the encoding ids to include. + :param stored_binary_size: Size of each encoding stored in the database. Including encoding ids. + """ + + cur = db.cursor() -def get_encodings_by_id_range(db, dp_id, encoding_id_min=None, encoding_id_max=None): - """Yield all encodings in a given range for a given data provider.""" sql_query = """ - SELECT encoding - FROM encodings - WHERE dp = %s - {} - {} - ORDER BY - encoding_id ASC - """.format( - f"AND encoding_id >= {encoding_id_min}" if encoding_id_min else "", - f"AND encoding_id < {encoding_id_max}" if encoding_id_max else "", + SELECT encoding + FROM encodings + WHERE encodings.dp = {} + AND encodings.encoding_id in ({}) + ORDER BY encoding_id ASC + """.format( + dp_id, + ','.join(map(str, encoding_ids)) ) - cur = db.cursor() - cur.execute(sql_query, (dp_id,)) - rows = cur.fetchall() - for row in rows: - # Note row[0] is a memoryview - yield bytes(row[0]) + yield from copy_binary_column_from_select_query(cur, sql_query, stored_binary_size=stored_binary_size) def get_filter_metadata(db, dp_id): diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index 081e4e82..7b4a5e91 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -4,7 +4,8 @@ import ijson -from entityservice.database import insert_encodings_into_blocks, get_encodings_by_id_range +from entityservice.database import insert_encodings_into_blocks, get_encodingblock_ids, \ + get_chunk_of_encodings from entityservice.serialization import deserialize_bytes, binary_format, binary_unpack_filters @@ -14,7 +15,7 @@ def stream_json_clksnblocks(f): the following structure: { - "clksnblocks": [ + "clknblocks": [ ["BASE64 ENCODED ENCODING 1", blockid1, blockid2, ...], ["BASE64 ENCODED ENCODING 2", blockid1, ...], ... @@ -25,7 +26,7 @@ def stream_json_clksnblocks(f): :return: Generator of (entity_id, base64 encoding, list of blocks) """ # At some point the user may supply the entity id. For now we use the order of uploaded encodings. - for i, obj in enumerate(ijson.items(f, 'clksnblocks.item')): + for i, obj in enumerate(ijson.items(f, 'clknblocks.item')): b64_encoding, *blocks = obj yield i, deserialize_bytes(b64_encoding), blocks @@ -52,7 +53,7 @@ def generator(first_i, first_encoding_data, first_blocks): def _grouper(iterable, n, fillvalue=None): - "Collect data into fixed-length chunks or blocks" + "Collect data into fixed-length chunks or blocks from an iterable" # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" args = [iter(iterable)] * n return zip_longest(*args, fillvalue=fillvalue) @@ -102,8 +103,10 @@ def _estimate_group_size(encoding_size): def get_encoding_chunk(conn, chunk_info, encoding_size=128): chunk_range_start, chunk_range_stop = chunk_info['range'] dataprovider_id = chunk_info['dataproviderId'] - - encoding_data_stream = get_encodings_by_id_range(conn, dataprovider_id, chunk_range_start, chunk_range_stop) - chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size) + block_id = chunk_info['block_id'] + limit = chunk_range_stop - chunk_range_start + encoding_ids = get_encodingblock_ids(conn, dataprovider_id, block_id, chunk_range_start, limit) + encoding_iter = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids, stored_binary_size=(encoding_size+4)) + chunk_data = binary_unpack_filters(encoding_iter, encoding_size=encoding_size) return chunk_data, len(chunk_data) diff --git a/backend/entityservice/init-db-schema.sql b/backend/entityservice/init-db-schema.sql index 42a48e7c..8798322c 100644 --- a/backend/entityservice/init-db-schema.sql +++ b/backend/entityservice/init-db-schema.sql @@ -175,6 +175,7 @@ CREATE TABLE encodings ( PRIMARY KEY (dp, encoding_id) ); +CREATE INDEX ON encodings (dp, encoding_id); -- Table mapping blocks to encodings CREATE TABLE encodingblocks ( @@ -188,8 +189,8 @@ CREATE TABLE encodingblocks ( FOREIGN KEY (dp, block_id) REFERENCES blocks (dp, block_name) ); - - +CREATE INDEX ON encodingblocks (dp, block_id); +CREATE INDEX ON encodingblocks (encoding_id); CREATE TABLE run_results ( -- Just the table index diff --git a/backend/entityservice/integrationtests/dbtests/__init__.py b/backend/entityservice/integrationtests/dbtests/__init__.py index e69de29b..1aa25ef0 100644 --- a/backend/entityservice/integrationtests/dbtests/__init__.py +++ b/backend/entityservice/integrationtests/dbtests/__init__.py @@ -0,0 +1,13 @@ +import psycopg2 + +from entityservice.settings import Config as config + + +def _get_conn_and_cursor(): + db = config.DATABASE + host = config.DATABASE_SERVER + user = config.DATABASE_USER + password = config.DATABASE_PASSWORD + conn = psycopg2.connect(host=host, dbname=db, user=user, password=password) + cursor = conn.cursor() + return conn, cursor diff --git a/backend/entityservice/integrationtests/dbtests/conftest.py b/backend/entityservice/integrationtests/dbtests/conftest.py new file mode 100644 index 00000000..fa6c8597 --- /dev/null +++ b/backend/entityservice/integrationtests/dbtests/conftest.py @@ -0,0 +1,57 @@ +import pytest +import psycopg2 + +from entityservice.settings import Config as config + + +@pytest.fixture +def conn(): + db = config.DATABASE + host = config.DATABASE_SERVER + user = config.DATABASE_USER + password = config.DATABASE_PASSWORD + conn = psycopg2.connect(host=host, dbname=db, user=user, password=password) + yield conn + conn.close() + +@pytest.fixture +def cur(conn): + return conn.cursor() + + + + +@pytest.fixture() +def prepopulated_binary_test_data(conn, cur, num_bytes=4, num_rows=100): + creation_sql = """ + DROP TABLE IF EXISTS binary_test; + CREATE TABLE binary_test + ( + id integer not null, + encoding bytea not null + );""" + cur.execute(creation_sql) + conn.commit() + + # Add data using execute_values + data = [(i, bytes([i % 128] * num_bytes)) for i in range(num_rows)] + psycopg2.extras.execute_values(cur, """ + INSERT INTO binary_test (id, encoding) VALUES %s + """, data) + + conn.commit() + + # quick check data is there + cur.execute("select count(*) from binary_test") + res = cur.fetchone()[0] + assert res == num_rows + + cur.execute("select encoding from binary_test where id = 1") + assert bytes(cur.fetchone()[0]) == data[1][1] + + yield data + + # delete test table + deletion_sql = "drop table if exists binary_test cascade;" + cur.execute(deletion_sql) + conn.commit() diff --git a/backend/entityservice/integrationtests/dbtests/test_insertions.py b/backend/entityservice/integrationtests/dbtests/test_insertions.py index c62a3cf0..19017119 100644 --- a/backend/entityservice/integrationtests/dbtests/test_insertions.py +++ b/backend/entityservice/integrationtests/dbtests/test_insertions.py @@ -1,53 +1,61 @@ import datetime -import time import psycopg2 from pytest import raises from entityservice.database import insert_dataprovider, insert_encodings_into_blocks, insert_blocking_metadata, \ - get_project, get_encodingblock_ids, get_encodings_by_id_range + get_project, get_encodingblock_ids, get_block_metadata, get_chunk_of_encodings, copy_binary_column_from_select_query + +from entityservice.integrationtests.dbtests import _get_conn_and_cursor from entityservice.models import Project +from entityservice.serialization import binary_format from entityservice.tests.util import generate_bytes from entityservice.utils import generate_code -from entityservice.settings import Config as config + + +class TestBinaryCopy: + + def test_copy_binary_column_from_select_query(self, conn, cur, prepopulated_binary_test_data): + query = "select encoding from binary_test where id >= 10 and id < 20" + res = list(copy_binary_column_from_select_query(cur, query, stored_binary_size=4)) + assert len(res) == 10 + for (i, original), stored in zip(prepopulated_binary_test_data[10:20], res): + assert original == stored + + def test_copy_binary_column_from_select_query_empty(self, conn, cur, prepopulated_binary_test_data): + query = "select encoding from binary_test where id < 0" + res = list(copy_binary_column_from_select_query(cur, query, stored_binary_size=4)) + assert len(res) == 0 class TestInsertions: - def _get_conn_and_cursor(self): - db = config.DATABASE - host = config.DATABASE_SERVER - user = config.DATABASE_USER - password = config.DATABASE_PASSWORD - conn = psycopg2.connect(host=host, dbname=db, user=user, password=password) - cursor = conn.cursor() - return conn, cursor + def _create_project(self): + project = Project('groups', {}, name='', notes='', parties=2, uses_blocking=False) + conn, cur = _get_conn_and_cursor() + dp_ids = project.save(conn) + return project, dp_ids def _create_project_and_dp(self): project, dp_ids = self._create_project() dp_id = dp_ids[0] dp_auth_token = project.update_tokens[0] - conn, cur = self._get_conn_and_cursor() + conn, cur = _get_conn_and_cursor() # create a default block - insert_blocking_metadata(conn, dp_id, {'1': 99}) + insert_blocking_metadata(conn, dp_id, {'1': 10000}) conn.commit() assert len(dp_auth_token) == 48 return project.project_id, project.result_token, dp_id, dp_auth_token - def _create_project(self): - project = Project('groups', {}, name='', notes='', parties=2, uses_blocking=False) - conn, cur = self._get_conn_and_cursor() - dp_ids = project.save(conn) - return project, dp_ids def test_insert_project(self): before = datetime.datetime.now() project, _ = self._create_project() assert len(project.result_token) == 48 # check we can fetch the inserted project back from the database - conn, cur = self._get_conn_and_cursor() + conn, cur = _get_conn_and_cursor() project_response = get_project(conn, project.project_id) assert 'time_added' in project_response assert project_response['time_added'] - before >= datetime.timedelta(seconds=0) @@ -61,24 +69,30 @@ def test_insert_project(self): assert project_response['encoding_size'] is None def test_insert_dp_no_project_fails(self): - conn, cur = self._get_conn_and_cursor() + conn, cur = _get_conn_and_cursor() project_id = generate_code() dp_auth = generate_code() with raises(psycopg2.errors.ForeignKeyViolation): insert_dataprovider(cur, auth_token=dp_auth, project_id=project_id) def test_insert_many_clks(self): - data = [generate_bytes(128) for _ in range(100)] - project_id, project_auth_token, dp_id, dp_auth_token = self._create_project_and_dp() - conn, cur = self._get_conn_and_cursor() num_entities = 10_000 + encoding_size = 2048 # non default encoding size + binary_formatter = binary_format(encoding_size) + + raw_data = [generate_bytes(encoding_size) for i in range(100)] + encodings = [binary_formatter.pack(i, raw_data[i % 100]) for i in range(num_entities)] blocks = [['1'] for _ in range(num_entities)] - encodings = [data[i % 100] for i in range(num_entities)] + + project_id, project_auth_token, dp_id, dp_auth_token = self._create_project_and_dp() + conn, cur = _get_conn_and_cursor() + insert_encodings_into_blocks(conn, dp_id, block_ids=blocks, encoding_ids=list(range(num_entities)), encodings=encodings ) + conn.commit() stored_encoding_ids = list(get_encodingblock_ids(conn, dp_id, '1')) @@ -86,7 +100,37 @@ def test_insert_many_clks(self): for stored_encoding_id, original_id in zip(stored_encoding_ids, range(num_entities)): assert stored_encoding_id == original_id - stored_encodings = list(get_encodings_by_id_range(conn, dp_id)) + stored_encodings = list(get_chunk_of_encodings(conn, dp_id, stored_encoding_ids, stored_binary_size=(encoding_size+4))) + assert len(stored_encodings) == num_entities for stored_encoding, original_encoding in zip(stored_encodings, encodings): - assert stored_encoding == original_encoding + assert bytes(stored_encoding) == original_encoding + + block_names, block_sizes = zip(*list(get_block_metadata(conn, dp_id))) + + assert len(block_names) == 1 + assert len(block_sizes) == 1 + assert block_names[0] == '1' + assert block_sizes[0] == 10_000 + + def test_fetch_chunk(self): + data = [generate_bytes(128) for _ in range(100)] + project_id, project_auth_token, dp_id, dp_auth_token = self._create_project_and_dp() + conn, cur = _get_conn_and_cursor() + num_entities = 10_000 + blocks = [['1'] for _ in range(num_entities)] + encodings = [data[i % 100] for i in range(num_entities)] + + insert_encodings_into_blocks(conn, dp_id, + block_ids=blocks, + encoding_ids=list(range(num_entities)), + encodings=encodings + ) + conn.commit() + + stored_encoding_ids = list(get_encodingblock_ids(conn, dp_id, '1', offset=10, limit=20)) + + assert len(stored_encoding_ids) == 20 + for i, stored_encoding_id in enumerate(stored_encoding_ids): + assert stored_encoding_id == i + 10 + diff --git a/backend/entityservice/models/project.py b/backend/entityservice/models/project.py index 5776b2f8..d54eecec 100644 --- a/backend/entityservice/models/project.py +++ b/backend/entityservice/models/project.py @@ -36,12 +36,6 @@ def __init__(self, result_type, schema, name, notes, parties, uses_blocking): # Order is important here self.update_tokens = [generate_code() for _ in range(parties)] - # TODO DELETE? - self.ready = False - self.status = 'not ready' - self.data = {} - self.result = {} - VALID_RESULT_TYPES = {'groups', 'permutations', 'similarity_scores'} @@ -60,12 +54,14 @@ def from_json(data): # Get optional fields from JSON data name = data.get('name', '') notes = data.get('notes', '') - parties = data.get('number_parties', 2) + parties = int(data.get('number_parties', 2)) uses_blocking = data.get('uses_blocking', False) if parties > 2 and result_type != 'groups': raise InvalidProjectParametersException( "Multi-party linkage requires result type 'groups'.") + if parties < 2: + raise InvalidProjectParametersException("Record linkage requires at least 2 parties!") return Project(result_type, schema, name, notes, parties, uses_blocking) diff --git a/backend/entityservice/settings.py b/backend/entityservice/settings.py index 5c99bf29..c2a55a5c 100644 --- a/backend/entityservice/settings.py +++ b/backend/entityservice/settings.py @@ -63,17 +63,18 @@ class Config(object): } CELERY_ROUTES = { - 'async_worker.calculate_mapping': {'queue': 'celery'}, - 'async_worker.compute_similarity': {'queue': 'compute'}, - 'async_worker.aggregate_filter_chunks': {'queue': 'highmemory'}, - 'async_worker.solver_task': {'queue': 'highmemory'}, - 'async_worker.save_and_permute': {'queue': 'highmemory'}, - 'async_worker.handle_raw_upload': {'queue': 'celery'} + 'entityservice.tasks.comparing.create_comparison_jobs': {'queue': 'celery'}, + 'entityservice.tasks.comparing.compute_filter_similarity': {'queue': 'compute'}, + 'entityservice.tasks.comparing.aggregate_comparisons': {'queue': 'highmemory'}, + 'entityservice.tasks.solver.solver_task': {'queue': 'highmemory'}, + 'entityservice.tasks.permutation.save_and_permute': {'queue': 'highmemory'}, + 'entityservice.tasks.encoding_uploading.handle_raw_upload': {'queue': 'celery'} } CELERYD_PREFETCH_MULTIPLIER = int(os.getenv('CELERYD_PREFETCH_MULTIPLIER', '1')) CELERYD_MAX_TASKS_PER_CHILD = int(os.getenv('CELERYD_MAX_TASKS_PER_CHILD', '4')) - CELERYD_CONCURRENCY = int(os.getenv("CELERYD_CONCURRENCY", '0')) + # number of concurrent worker processes/threads, executing tasks + CELERYD_CONCURRENCY = int(os.getenv("CELERYD_CONCURRENCY", '2')) CELERY_ACKS_LATE = os.getenv('CELERY_ACKS_LATE', 'false') == 'true' # Number of comparisons per chunk (on average). diff --git a/backend/entityservice/tasks/base_task.py b/backend/entityservice/tasks/base_task.py index afe712cf..ce0c06e4 100644 --- a/backend/entityservice/tasks/base_task.py +++ b/backend/entityservice/tasks/base_task.py @@ -104,7 +104,7 @@ def celery_bug_fix(*args, **kwargs): @celery.task(base=BaseTask, ignore_result=True) -def on_chord_error(*args, **kwargs): +def run_failed_handler(*args, **kwargs): """ Record that a task has encountered an error, mark the run as failed. @@ -112,7 +112,8 @@ def on_chord_error(*args, **kwargs): :param kwargs: Keyword arguments to the task e.g. {'run_id': '...', } """ task_id = args[0] - logger.bind(run_id=kwargs['run_id']) + if 'run_id' in kwargs: + logger.bind(run_id=kwargs['run_id']) logger.info("An error occurred while processing task", task_id=task_id) with DBConn() as db: diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index 4a35f7f3..fe463519 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -1,12 +1,11 @@ import array import heapq +import itertools import operator import time import anonlink import minio -import opentracing -import psycopg2 from celery import chord @@ -14,68 +13,67 @@ from entityservice.cache.encodings import remove_from_cache from entityservice.cache.progress import save_current_progress from entityservice.encoding_storage import get_encoding_chunk -from entityservice.errors import RunDeleted +from entityservice.errors import InactiveRun from entityservice.database import ( check_project_exists, check_run_exists, DBConn, get_dataprovider_ids, - get_filter_metadata, get_project_column, get_project_dataset_sizes, + get_project_column, get_project_dataset_sizes, get_project_encoding_size, get_run, insert_similarity_score_file, - update_run_mark_failure) + update_run_mark_failure, get_block_metadata) from entityservice.models.run import progress_run_stage as progress_stage from entityservice.object_store import connect_to_object_store -from entityservice.serialization import get_chunk_from_object_store from entityservice.settings import Config -from entityservice.tasks.base_task import TracedTask, celery_bug_fix, on_chord_error +from entityservice.tasks.base_task import TracedTask, celery_bug_fix, run_failed_handler from entityservice.tasks.solver import solver_task from entityservice.tasks import mark_run_complete from entityservice.tasks.assert_valid_run import assert_valid_run from entityservice.utils import generate_code, iterable_to_stream +def check_run_active(conn, project_id, run_id): + """Raises InactiveRun if the project or run has been deleted from the database. + """ + if not check_project_exists(conn, project_id) or not check_run_exists(conn, project_id, run_id): + raise InactiveRun("Skipping as project or run not found in database.") + + @celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'run_id')) def create_comparison_jobs(project_id, run_id, parent_span=None): + """Schedule all the entity comparisons as sub tasks for a run. + + At a high level this task: + - checks if the project and run have been deleted and if so aborts. + - retrieves metadata: the number and size of the datasets, the encoding size, + and the number and size of blocks. + - splits the work into independent "chunks" and schedules them to run in celery + - schedules the follow up task to run after all the comparisons have been computed. + """ log = logger.bind(pid=project_id, run_id=run_id) + current_span = create_comparison_jobs.span with DBConn() as conn: + check_run_active(conn, project_id, run_id) dp_ids = get_dataprovider_ids(conn, project_id) - assert len(dp_ids) >= 2, "Expected at least 2 data providers" - log.info(f"Starting comparison of CLKs from data provider ids: " + number_of_datasets = len(dp_ids) + assert number_of_datasets >= 2, "Expected at least 2 data providers" + log.info(f"Scheduling comparison of CLKs from data provider ids: " f"{', '.join(map(str, dp_ids))}") - current_span = create_comparison_jobs.span - - if not check_project_exists(conn, project_id) or not check_run_exists(conn, project_id, run_id): - log.info("Skipping as project or run not found in database.") - return - - run_info = get_run(conn, run_id) - threshold = run_info['threshold'] - dataset_sizes = get_project_dataset_sizes(conn, project_id) + # Retrieve required metadata + dataset_sizes, dp_block_sizes = _retrieve_blocked_dataset_sizes(conn, project_id, dp_ids) - if len(dataset_sizes) < 2: - log.warning("Unexpected number of dataset sizes in db. Stopping") - update_run_mark_failure(conn, run_id) - return + log.info("Finding blocks in common between dataproviders") + common_blocks = _get_common_blocks(dp_block_sizes, dp_ids) + # We pass the encoding_size and threshold to the comparison tasks to minimize their db lookups encoding_size = get_project_encoding_size(conn, project_id) - - log.info(f"Computing similarity for " - f"{' x '.join(map(str, dataset_sizes))} entities") - current_span.log_kv({"event": 'get-dataset-sizes', 'sizes': dataset_sizes}) + threshold = get_run(conn, run_id)['threshold'] log.debug("Chunking computation task") + # Create "chunks" of comparisons + chunks = _create_work_chunks(common_blocks, dp_block_sizes, dp_ids, log) - chunk_infos = tuple(anonlink.concurrency.split_to_chunks( - Config.CHUNK_SIZE_AIM, - dataset_sizes=dataset_sizes)) - - # Add the db ids to the chunk information. - for chunk_info in chunk_infos: - for chunk_dp_info in chunk_info: - chunk_dp_index = chunk_dp_info['datasetIndex'] - chunk_dp_info['dataproviderId'] = dp_ids[chunk_dp_index] - - log.info(f"Chunking into {len(chunk_infos)} computation tasks") - current_span.log_kv({"event": "chunking", 'num_chunks': len(chunk_infos)}) + log.info(f"Chunking into {len(chunks)} computation tasks") + current_span.log_kv({"event": "chunking", 'num_chunks': len(chunks), 'dataset-sizes': dataset_sizes}) span_serialized = create_comparison_jobs.get_serialized_span() # Prepare the Celery Chord that will compute all the similarity scores: @@ -86,16 +84,141 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): threshold, encoding_size, span_serialized - ) for chunk_info in chunk_infos] + ) for chunk_info in chunks] if len(scoring_tasks) == 1: scoring_tasks.append(celery_bug_fix.si()) - callback_task = aggregate_comparisons.s(project_id, run_id, parent_span=span_serialized).on_error( - on_chord_error.s(run_id=run_id)) + callback_task = aggregate_comparisons.s(project_id=project_id, run_id=run_id, parent_span=span_serialized).on_error( + run_failed_handler.s(run_id=run_id)) + log.info(f"Scheduling comparison tasks") future = chord(scoring_tasks)(callback_task) +def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log, chunk_size_aim=Config.CHUNK_SIZE_AIM): + """Create chunks of comparisons using blocking information. + + .. note:: + + Ideally the chunks would be similarly sized, however with blocking the best we + can do in this regard is to ensure a similar maximum size. Small blocks will + directly translate to small chunks. + + :todo Consider passing all dataproviders to anonlink's `split_to_chunks` for a given + large block instead of pairwise. + + :param blocks: + dict mapping block identifier to a list of all combinations of + data provider pairs containing this block. As created by :func:`_get_common_blocks` + :param dp_block_sizes: + A map from dataprovider id to a dict mapping + block id to the number of encodings from the dataprovider in the + block. As created by :func:`_retrieve_blocked_dataset_sizes` + :param dp_ids: list of data provider ids + :param log: A logger instance + :param chunk_size_aim: The desired number of comparisons per chunk. + + :returns + + a dict describing the required comparisons. Comprised of: + - dataproviderId - The dataprovider identifier in our database + - datasetIndex - The dataprovider index [0, 1 ...] + - block_id - The block of encodings this chunk is for + - range - The range of encodings within the block that make up this chunk. + """ + chunks = [] + for block_id in blocks: + log.debug("Chunking block", block_id=block_id) + # Note there might be many pairs of datasets in this block + for dp1, dp2 in blocks[block_id]: + size1 = dp_block_sizes[dp1][block_id] + size2 = dp_block_sizes[dp2][block_id] + + comparisons = size1 * size2 + + if comparisons > chunk_size_aim: + log.debug("Block is too large for single task. Working out how to chunk it up") + for chunk_info in anonlink.concurrency.split_to_chunks(chunk_size_aim, + dataset_sizes=(size1, size2)): + # chunk_info's from anonlink already have datasetIndex of 0 or 1 and a range + # We need to correct the datasetIndex and add the database datasetId and add block_id. + add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2) + add_block_id_to_chunk_info(chunk_info, block_id) + chunks.append(chunk_info) + else: + log.debug("Add the full block as a single work chunk") + # Although the range "could" be optional we choose to make all chunk definitions consistent + chunk_left = {"range": (0, size1), "block_id": block_id} + chunk_right = {"range": (0, size2), "block_id": block_id} + chunk_info = (chunk_left, chunk_right) + add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2) + chunks.append(chunk_info) + return chunks + + +def _get_common_blocks(dp_block_sizes, dp_ids): + """Return all pairs of non-empty blocks across dataproviders. + + :returns + dict mapping block identifier to a list of all combinations of + data provider pairs containing this block. + + block_id -> List(pairs of dp ids) + e.g. {'1': [(26, 27), (26, 28), (27, 28)]} + """ + blocks = {} + for dp1, dp2 in itertools.combinations(dp_ids, 2): + # Get the intersection of blocks between these two dataproviders + common_block_ids = set(dp_block_sizes[dp1]).intersection(set(dp_block_sizes[dp2])) + for block_id in common_block_ids: + blocks.setdefault(block_id, []).append((dp1, dp2)) + return blocks + + +def _retrieve_blocked_dataset_sizes(conn, project_id, dp_ids): + """Fetch encoding counts for each dataset by block. + + :param dp_ids: Iterable of dataprovider database identifiers. + :returns + A 2-tuple of: + + - dataset sizes: a tuple of the number of encodings in each dataset. + + - dp_block_sizes: A map from dataprovider id to a dict mapping + block id to the number of encodings from the dataprovider in the + block. + + {dp_id -> {block_id -> block_size}} + e.g. {33: {'1': 100}, 34: {'1': 100}, 35: {'1': 100}} + """ + dataset_sizes = get_project_dataset_sizes(conn, project_id) + + dp_block_sizes = {} + for dp_id in dp_ids: + dp_block_sizes[dp_id] = dict(get_block_metadata(conn, dp_id)) + return dataset_sizes, dp_block_sizes + + +def add_dp_id_to_chunk_info(chunk_info, dp_ids, dp_id_left, dp_id_right): + """ + Modify a chunk_info as returned by anonlink.concurrency.split_to_chunks + to use correct project dataset indicies and add the database dp identifier. + """ + dpindx = dict(zip(dp_ids, range(len(dp_ids)))) + + left, right = chunk_info + left['dataproviderId'] = dp_id_left + left['datasetIndex'] = dpindx[dp_id_left] + + right['dataproviderId'] = dp_id_right + right['datasetIndex'] = dpindx[dp_id_right] + + +def add_block_id_to_chunk_info(chunk_info, block_id): + for chunk_dp_info in chunk_info: + chunk_dp_info['block_id'] = block_id + + @celery.task(base=TracedTask, args_as_tags=('project_id', 'run_id', 'threshold')) def compute_filter_similarity(chunk_info, project_id, run_id, threshold, encoding_size, parent_span=None): """Compute filter similarity between a chunk of filters in dataprovider 1, @@ -113,45 +236,52 @@ def compute_filter_similarity(chunk_info, project_id, run_id, threshold, encodin log = logger.bind(pid=project_id, run_id=run_id) task_span = compute_filter_similarity.span - def new_child_span(name): - return compute_filter_similarity.tracer.start_active_span( - name, - child_of=compute_filter_similarity.span) + def new_child_span(name, parent_scope=None): + if parent_scope is None: + parent_scope = compute_filter_similarity + return compute_filter_similarity.tracer.start_active_span(name, child_of=parent_scope.span) log.debug("Computing similarity for a chunk of filters") - log.debug("Checking that the resource exists (in case of run being canceled/deleted)") assert_valid_run(project_id, run_id, log) chunk_info_dp1, chunk_info_dp2 = chunk_info with DBConn() as conn: - with new_child_span('fetching-encodings'): - log.debug("Fetching and deserializing chunk of filters for dataprovider 1") - chunk_with_ids_dp1, chunk_dp1_size = get_encoding_chunk(conn, chunk_info_dp1, encoding_size) - #TODO: use the entity ids! - entity_ids_dp1, chunk_dp1 = zip(*chunk_with_ids_dp1) + with new_child_span('fetching-encodings') as parent_scope: + with new_child_span('fetching-left-encodings', parent_scope): + log.debug("Fetching and deserializing chunk of filters for dataprovider 1") + chunk_with_ids_dp1, chunk_dp1_size = get_encoding_chunk(conn, chunk_info_dp1, encoding_size) + entity_ids_dp1, chunk_dp1 = zip(*chunk_with_ids_dp1) - log.debug("Fetching and deserializing chunk of filters for dataprovider 2") - chunk_with_ids_dp2, chunk_dp2_size = get_encoding_chunk(conn, chunk_info_dp2, encoding_size) - # TODO: use the entity ids! - entity_ids_dp2, chunk_dp2 = zip(*chunk_with_ids_dp2) + with new_child_span('fetching-right-encodings', parent_scope): + log.debug("Fetching and deserializing chunk of filters for dataprovider 2") + chunk_with_ids_dp2, chunk_dp2_size = get_encoding_chunk(conn, chunk_info_dp2, encoding_size) + entity_ids_dp2, chunk_dp2 = zip(*chunk_with_ids_dp2) log.debug('Both chunks are fetched and deserialized') - task_span.log_kv({'size1': chunk_dp1_size, 'size2': chunk_dp2_size}) + task_span.log_kv({'size1': chunk_dp1_size, 'size2': chunk_dp2_size, 'chunk_info': chunk_info}) + + with new_child_span('comparing-encodings') as parent_scope: - with new_child_span('comparing-encodings'): log.debug("Calculating filter similarity") - try: - chunk_results = anonlink.concurrency.process_chunk( - chunk_info, - (chunk_dp1, chunk_dp2), - anonlink.similarities.dice_coefficient_accelerated, - threshold, - k=min(chunk_dp1_size, chunk_dp2_size)) - except NotImplementedError as e: - log.warning("Encodings couldn't be compared using anonlink.") - return + with new_child_span('dice-call', parent_scope): + try: + sims, (rec_is0, rec_is1) = anonlink.similarities.dice_coefficient_accelerated( + datasets=(chunk_dp1, chunk_dp2), + threshold=threshold, + k=min(chunk_dp1_size, chunk_dp2_size)) + except NotImplementedError as e: + log.warning("Encodings couldn't be compared using anonlink.") + return + + with new_child_span('reindex-call', parent_scope): + def reindex_using_encoding_ids(recordarray, encoding_id_list): + # Map results from "index in chunk" to encoding id. + return array.array('I', [encoding_id_list[i] for i in recordarray]) + + rec_is0 = reindex_using_encoding_ids(rec_is0, entity_ids_dp1) + rec_is1 = reindex_using_encoding_ids(rec_is1, entity_ids_dp2) log.debug('Encoding similarities calculated') @@ -159,9 +289,9 @@ def new_child_span(name): # Update the number of comparisons completed comparisons_computed = chunk_dp1_size * chunk_dp2_size save_current_progress(comparisons_computed, run_id) + log.info("Comparisons: {}, Links above threshold: {}".format(comparisons_computed, len(sims))) with new_child_span('save-comparison-results-to-minio'): - sims, _, _ = chunk_results num_results = len(sims) if num_results: @@ -170,6 +300,12 @@ def new_child_span(name): task_span.log_kv({"edges": num_results}) log.info("Writing {} intermediate results to file: {}".format(num_results, result_filename)) + # Make index arrays for serialization + index_1 = array.array('I', (chunk_info_dp1["datasetIndex"],)) * num_results + index_2 = array.array('I', (chunk_info_dp2["datasetIndex"],)) * num_results + + chunk_results = sims, (index_1, index_2), (rec_is0, rec_is1), + bytes_iter, file_size \ = anonlink.serialization.dump_candidate_pairs_iter(chunk_results) iter_stream = iterable_to_stream(bytes_iter) @@ -185,8 +321,6 @@ def new_child_span(name): result_filename = None file_size = None - log.info("Comparisons: {}, Links above threshold: {}".format(comparisons_computed, len(chunk_results))) - return num_results, file_size, result_filename @@ -236,18 +370,6 @@ def _merge_files(mc, log, file0, file1): return total_num, merged_file_size, merged_file_name -def _insert_similarity_into_db(db, log, run_id, merged_filename): - try: - result_id = insert_similarity_score_file( - db, run_id, merged_filename) - except psycopg2.IntegrityError: - log.info("Error saving similarity score filename to database. " - "The project may have been deleted.") - raise RunDeleted(run_id) - log.debug(f"Saved path to similarity scores file to db with id " - f"{result_id}") - - @celery.task( base=TracedTask, ignore_result=True, @@ -295,8 +417,9 @@ def aggregate_comparisons(similarity_result_files, project_id, run_id, parent_sp with DBConn() as db: result_type = get_project_column(db, project_id, 'result_type') - - _insert_similarity_into_db(db, log, run_id, merged_filename) + result_id = insert_similarity_score_file(db, run_id, merged_filename) + log.debug(f"Saved path to similarity scores file to db with id " + f"{result_id}") if result_type == "similarity_scores": # Post similarity computation cleanup diff --git a/backend/entityservice/tasks/run.py b/backend/entityservice/tasks/run.py index 18ef236c..bff98ed3 100644 --- a/backend/entityservice/tasks/run.py +++ b/backend/entityservice/tasks/run.py @@ -5,7 +5,7 @@ from entityservice.database import DBConn, check_project_exists, get_run, get_run_state_for_update from entityservice.database import update_run_set_started from entityservice.errors import RunDeleted, ProjectDeleted -from entityservice.tasks.base_task import TracedTask +from entityservice.tasks.base_task import TracedTask, run_failed_handler from entityservice.tasks.comparing import create_comparison_jobs from entityservice.async_worker import celery, logger @@ -47,5 +47,8 @@ def prerun_check(project_id, run_id, parent_span=None): set_run_state_active(run_id) progress_cache.save_current_progress(comparisons=0, run_id=run_id) - create_comparison_jobs.delay(project_id, run_id, prerun_check.get_serialized_span()) + create_comparison_jobs.apply_async( + kwargs={'project_id': project_id, 'run_id': run_id, 'parent_span': prerun_check.get_serialized_span()}, + link_error=run_failed_handler.s() + ) log.info("CLK similarity computation scheduled") diff --git a/backend/entityservice/tasks/solver.py b/backend/entityservice/tasks/solver.py index c1bd9352..600a51b3 100644 --- a/backend/entityservice/tasks/solver.py +++ b/backend/entityservice/tasks/solver.py @@ -1,4 +1,5 @@ import anonlink +from anonlink.candidate_generation import _merge_similarities from entityservice.object_store import connect_to_object_store from entityservice.async_worker import celery, logger @@ -15,10 +16,20 @@ def solver_task(similarity_scores_filename, project_id, run_id, dataset_sizes, p 'filename': similarity_scores_filename}) score_file = mc.get_object(config.MINIO_BUCKET, similarity_scores_filename) log.debug("Creating python sparse matrix from bytes data") - candidate_pairs = anonlink.serialization.load_candidate_pairs(score_file) - log.info("Calculating the optimal mapping from similarity matrix") + candidate_pairs_with_duplicates = anonlink.serialization.load_candidate_pairs(score_file) + similarity_scores, (dset_is0, dset_is1), (rec_is0, rec_is1) = candidate_pairs_with_duplicates - groups = anonlink.solving.greedy_solve(candidate_pairs) + log.info(f"Number of candidate pairs before deduplication: {len(candidate_pairs_with_duplicates[0])}") + if len(candidate_pairs_with_duplicates[0]) > 0: + # TODO use public interface when available + # https://github.com/data61/anonlink/issues/271 + candidate_pairs = _merge_similarities([zip(similarity_scores, dset_is0, dset_is1, rec_is0, rec_is1)], k=None) + log.info(f"Number of candidate pairs after deduplication: {len(candidate_pairs[0])}") + + log.info("Calculating the optimal mapping from similarity matrix") + groups = anonlink.solving.greedy_solve(candidate_pairs) + else: + groups = [] log.info("Entity groups have been computed") diff --git a/backend/entityservice/tests/test_chunking.py b/backend/entityservice/tests/test_chunking.py new file mode 100644 index 00000000..99730385 --- /dev/null +++ b/backend/entityservice/tests/test_chunking.py @@ -0,0 +1,65 @@ +from structlog import get_logger + +from entityservice.tasks.comparing import _get_common_blocks, _create_work_chunks +log = get_logger() + + +class TestCommonBlocks: + # Unit test for _get_common_blocks + + def test_2p_get_common_blocks(self): + dp_ids = [33, 34] + dp_block_sizes = {33: {'1': 100}, 34: {'1': 100, '2': 100}} + common_blocks = _get_common_blocks(dp_block_sizes, dp_ids) + assert common_blocks == {"1": [(33, 34)]} + + def test_3p_get_common_blocks(self): + dp_ids = [1, 2, 3] + dp_block_sizes = {1: {'1': 100}, 2: {'1': 100, '2': 100}, 3: {'1': 100, '2': 100}} + common_blocks = _get_common_blocks(dp_block_sizes, dp_ids) + assert '1' in common_blocks + assert len(common_blocks['1']) == 3 + block_1_set = set(common_blocks['1']) + # Should have (1, 2), (1, 3), (2, 3) + for dpcombo in [(1, 2), (1, 3), (2, 3)]: + assert dpcombo in block_1_set + + assert '2' in common_blocks + assert len(common_blocks['2']) == 1 + assert common_blocks['2'][0] == (2, 3) + + +class TestChunkingBlocks: + + def test_2p_single_chunked_block(self): + dp_ids = [1, 2] + dp_block_sizes = { + 1: {'1': 100}, + 2: {'1': 100, '2': 100}} + blocks = {"1": [(1, 2)]} + + chunks = _create_work_chunks(blocks, dp_block_sizes, dp_ids, log, 100) + assert len(chunks) == 100 + for chunk_pair in chunks: + for c in chunk_pair: + assert "range" in c + lower, upper = c['range'] + assert lower < upper + assert upper - lower <= 10 + assert "block_id" in c + assert "datasetIndex" in c + assert "dataproviderId" in c + + def test_basic_3p(self): + dp_ids = [1, 2, 3] + dp_block_sizes = { + 1: {'1': 100}, + 2: {'1': 100, '2': 100}, + 3: {'1': 100, '2': 100}, + } + blocks = _get_common_blocks(dp_block_sizes, dp_ids) + chunks = _create_work_chunks(blocks, dp_block_sizes, dp_ids, log, 100) + # Block 1 should create 100 chunks between dp combinations: 1:2, 1:3, and 2:3 for 300 chunks + # Block 2 should create 100 chunks between 2:3 + assert len(chunks) == 300 + 100 + diff --git a/backend/entityservice/tests/test_encoding_storage.py b/backend/entityservice/tests/test_encoding_storage.py index 174c93af..13fd7436 100644 --- a/backend/entityservice/tests/test_encoding_storage.py +++ b/backend/entityservice/tests/test_encoding_storage.py @@ -20,7 +20,7 @@ def test_convert_encodings_from_json_to_binary_simple(self): def test_convert_encodings_from_json_to_binary_empty(self): empty = io.BytesIO(b'''{ - "clksnblocks": [] + "clknblocks": [] }''') with pytest.raises(StopIteration): @@ -28,7 +28,7 @@ def test_convert_encodings_from_json_to_binary_empty(self): def test_convert_encodings_from_json_to_binary_short(self): d = serialize_bytes(b'abcdabcd') - json_data = io.BytesIO(b'{' + f'''"clksnblocks": [["{d}", "02"]]'''.encode() + b'}') + json_data = io.BytesIO(b'{' + f'''"clknblocks": [["{d}", "02"]]'''.encode() + b'}') encoding_ids, encodings, blocks = list(zip(*stream_json_clksnblocks(json_data))) assert len(encodings[0]) == 8 diff --git a/backend/entityservice/tests/test_project_uploads.py b/backend/entityservice/tests/test_project_uploads.py index 99a45acb..775c855a 100644 --- a/backend/entityservice/tests/test_project_uploads.py +++ b/backend/entityservice/tests/test_project_uploads.py @@ -185,7 +185,7 @@ def test_project_json_data_upload_with_too_large_encoded_size( max_rep = 10 while not project_description['error'] and rep < max_rep: rep += 1 - time.sleep(1) + time.sleep(2) project_description = requests.get( url + '/projects/{}'.format(new_project_data['project_id']), headers={'Authorization': new_project_data['result_token']} diff --git a/backend/entityservice/tests/test_utils.py b/backend/entityservice/tests/test_utils.py index 665f7d1d..b927a8b3 100644 --- a/backend/entityservice/tests/test_utils.py +++ b/backend/entityservice/tests/test_utils.py @@ -17,7 +17,7 @@ def test_empty(self): def test_list(self): with temp_file_containing(b'[1,2,3]') as fp: filename = fp.name - assert [1,2,3] == load_yaml_config(filename) + assert [1, 2, 3] == load_yaml_config(filename) def test_missing_file(self): filename = 'unlikely a valid file' @@ -40,7 +40,7 @@ def test_valid_yaml(self): """) self._check_valid_yaml(yamldata) - def _check_valid_yaml(self, yamldata:str): + def _check_valid_yaml(self, yamldata: str): with temp_file_containing(yamldata.encode()) as fp: filename = fp.name loaded = load_yaml_config(filename) @@ -62,3 +62,4 @@ def test_valid_yaml_with_comments(self): loaded = self._check_valid_yaml(yamldata) assert 'host' not in loaded['api']['ingress'] + diff --git a/backend/entityservice/tests/testdata/test_encoding.json b/backend/entityservice/tests/testdata/test_encoding.json index faea6279..8adbdce7 100644 --- a/backend/entityservice/tests/testdata/test_encoding.json +++ b/backend/entityservice/tests/testdata/test_encoding.json @@ -1,5 +1,5 @@ { - "clksnblocks": [ + "clknblocks": [ ["PNfT5qAAlAgFyQyoUhokyohywAAOYMJgdwPCRWBQOyCIsSEgePCo2CnRaON+FUog07AHTDUDARsUcJiSaYKNDiCAEeICbGYSZFhCVALQAylxDSAtSR4CsgiCmBjDiAEOGMfEi7ABkydqyKhIIFrBQQvFAUTDakAg0RyFYAWg8nE=", "1"], ["WvjX1rZYiFCRQAyqptwAS7jAaACEhFApFxB9RSeYPTBLkYkIUjKacAIleYLoNUooU/AHYCQ2FSkUYCzZMiClDIGEIBKQXKZAogCAUpvOKikUKUI+CRgI+AkgGsqAJYkdZcHEAAWDEidgHbYgIjBpMgpUU2wCwgCoXwCCWCVAsPM=", "1"], ["UPLX+gCgkbnVQhg4wBA8wogCUNA0QEbhPyHBPCBAKwSOmzJFWDGqWAZY/NsQX01Aw7A1UWBKAt5GPJib4BpLSHKQBesZbjsjYhTAQBBDZ7uQHKGNyRhCMhiWsAjaVEBcQlVkiCHzMhEimEgERlB/AQvFBwRjQHRywRQQQUuggnw=", "1", "2"], diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 32582d6b..d7d4b656 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -386,8 +386,8 @@ def upload_json_clk_data(dp_id, clk_json, uses_blocking, parent_span): if element == 'clks': logger.info("Rewriting provided json into clknsblocks format") - clk_json = {'clksnblocks': [[encoding, '1'] for encoding in clk_json['clks']]} - element = 'clksnblocks' + clk_json = {'clknblocks': [[encoding, '1'] for encoding in clk_json['clks']]} + element = 'clknblocks' logger.info("Counting block sizes and number of blocks") # {'clknblocks': [['UG9vcA==', '001', '211'], [...]]} diff --git a/base/requirements.txt b/base/requirements.txt index cc848a39..390ddaee 100644 --- a/base/requirements.txt +++ b/base/requirements.txt @@ -1,6 +1,6 @@ anonlink==0.12.5 bitmath==1.3.1.2 -celery==4.4.0 +celery==4.4.1 clkhash==0.15.1 colorama==0.4.3 # required for structlog connexion==2.6.0 diff --git a/benchmarking/benchmark.py b/benchmarking/benchmark.py index e0d16533..b5aec047 100644 --- a/benchmarking/benchmark.py +++ b/benchmarking/benchmark.py @@ -130,19 +130,20 @@ def upload_data(participant, auth_token, clk_length): file_name = os.path.join(data_path, "{}Parties".format(len(sizes)), "clk_{}_{}_v2.bin".format(participant, clk_length)) with open(file_name, 'rb') as f: - facs_data = f.read() - assert len(facs_data) % SIZE_PER_CLK == 0 + encoding_data = f.read() + assert len(encoding_data) % SIZE_PER_CLK == 0 try: r = requests.post( server + '/api/v1/projects/{}/binaryclks'.format(credentials['project_id']), headers={ 'Authorization': auth_token, 'Content-Type': 'application/octet-stream', - 'Hash-Count': str(len(facs_data) // SIZE_PER_CLK), + 'Hash-Count': str(len(encoding_data) // SIZE_PER_CLK), 'Hash-Size': '128' }, - data=facs_data + data=encoding_data ) + logger.info(f"Upload status: {r.status_code}") logger.debug('upload result: {}'.format(r.json())) except Exception as e: logger.warning('oh no...\n{}'.format(e)) diff --git a/deployment/entity-service/Chart.yaml b/deployment/entity-service/Chart.yaml index d2c8a242..6871d04d 100644 --- a/deployment/entity-service/Chart.yaml +++ b/deployment/entity-service/Chart.yaml @@ -1,5 +1,5 @@ name: entity-service -appVersion: 1.13.0-beta +appVersion: 1.13.0-beta2 version: 1.13.1 description: Privacy preserving record linkage service sources: diff --git a/deployment/entity-service/values.yaml b/deployment/entity-service/values.yaml index ab2062c3..fec7ef2e 100644 --- a/deployment/entity-service/values.yaml +++ b/deployment/entity-service/values.yaml @@ -38,7 +38,7 @@ api: image: repository: data61/anonlink-nginx - tag: "v1.4.6-beta" + tag: "v1.4.6-beta2" # pullPolicy: Always pullPolicy: IfNotPresent @@ -55,7 +55,7 @@ api: app: image: repository: data61/anonlink-app - tag: "v1.13.0-beta" + tag: "v1.13.0-beta2" # pullPolicy: IfNotPresent pullPolicy: Always @@ -78,7 +78,7 @@ api: ## It cannot be updated! So we have a separate image + tag image: repository: data61/anonlink-app - tag: "v1.13.0-beta" + tag: "v1.13.0-beta2" ## Ref: http://kubernetes.io/docs/user-guide/compute-resources/ resources: @@ -149,7 +149,7 @@ workers: image: repository: "data61/anonlink-app" - tag: "v1.13.0-beta" + tag: "v1.13.0-beta2" pullPolicy: Always ## The initial number of workers for this deployment @@ -204,9 +204,7 @@ workers: ## our tasks are usually quite "long". PREFETCH_MULTIPLIER: "1" ## Maximum number of tasks a pool worker process can execute before it’s replaced with a new one - ## Low number is recommended, otherwise the celery workers may exhaust the available memory and threads. - ## Cf issue https://github.com/data61/anonlink-entity-service/issues/410 - MAX_TASKS_PER_CHILD: "30" + MAX_TASKS_PER_CHILD: "2048" ## Late ack means the task messages will be acknowledged after the task has been executed, not just before. ACKS_LATE: "true" @@ -262,8 +260,6 @@ postgresql: global: - global: - storageClass: "default" postgresql: postgresqlDatabase: postgres diff --git a/docs/changelog.rst b/docs/changelog.rst index c399d70d..a1f42744 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -8,6 +8,12 @@ Next Version ------------ +Version 1.13.0-beta2 +------------------- + +- Encodings are now stored in Postgres database instead of files in an object store. +- Initial support for user supplied blocking implemented. + Version 1.13.0-beta ------------------- diff --git a/docs/concepts.rst b/docs/concepts.rst index b4bb232f..ae808ddb 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -56,8 +56,8 @@ characters. .. _schema: -Schema ------- +Linkage Schema +-------------- It is important that participating organisations agree on how personally identifiable information is processed to create the :ref:`clks `. We call the configuration for creating CLKs @@ -84,6 +84,19 @@ although this may become a configurable option in the future. .. _result-types: +Blocking +-------- + +Blocking is a technique that makes large-scale record linkage practical. Blocking partitions datasets +into groups, called blocks and only the records in corresponding blocks are compared. This can massively +reduce the total number of comparisons that need to be conducted to find matching records. + +In the *Anonlink Entity Service* blocking is optional, and is carried out by the client e.g., using the +`blocklib `_ library. See the +`blocklib documentation `_ for more information including +tutorials. + + Output Types ------------ diff --git a/docs/conf.py b/docs/conf.py index d718d1bd..b3b61157 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -68,7 +68,7 @@ # The short X.Y version. version = '1.13' # The full version, including alpha/beta/rc tags. -release = '1.13.0-beta' +release = '1.13.0-beta2' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/frontend/VERSION b/frontend/VERSION index 9deb7ce3..87a4e2fc 100644 --- a/frontend/VERSION +++ b/frontend/VERSION @@ -1 +1 @@ -v1.4.6-beta +v1.4.6-beta2 diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index 639c9172..3b9ed5a4 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -70,7 +70,7 @@ services: - MINIO_SECRET_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY - CELERY_ACKS_LATE=true - REDIS_USE_SENTINEL=false - - CELERYD_MAX_TASKS_PER_CHILD=30 + - CELERYD_MAX_TASKS_PER_CHILD=2048 #- CHUNK_SIZE_AIM=300_000_000 - CELERY_DB_MIN_CONNECTIONS=1 - CELERY_DB_MAX_CONNECTIONS=3