diff --git a/backend/entityservice/database/insertions.py b/backend/entityservice/database/insertions.py index 6c123be7..cb91cad8 100644 --- a/backend/entityservice/database/insertions.py +++ b/backend/entityservice/database/insertions.py @@ -1,3 +1,5 @@ +from typing import List + import psycopg2 import psycopg2.extras @@ -42,17 +44,65 @@ def insert_dataprovider(cur, auth_token, project_id): return execute_returning_id(cur, sql_query, [project_id, auth_token]) -def insert_encoding_metadata(db, clks_filename, dp_id, receipt_token, count): +def insert_blocking_metadata(db, dp_id, blocks): + """ + Insert a new entry into the blocks table. + + :param blocks: A dict mapping block id to the number of encodings per block. + """ + logger.info("Adding blocking metadata to database") + sql_insertion_query = """ + INSERT INTO blocks + (dp, block_name, count, state) + VALUES %s + """ + + logger.info("Preparing SQL for bulk insert of blocks") + values = [(dp_id, block_id, blocks[block_id], 'pending') for block_id in blocks] + + with db.cursor() as cur: + psycopg2.extras.execute_values(cur, sql_insertion_query, values) + + +def insert_encoding_metadata(db, clks_filename, dp_id, receipt_token, encoding_count, block_count): logger.info("Adding metadata on encoded entities to database") sql_insertion_query = """ - INSERT INTO bloomingdata - (dp, token, file, count, state) + INSERT INTO uploads + (dp, token, file, count, block_count, state) VALUES - (%s, %s, %s, %s, %s) + (%s, %s, %s, %s, %s, %s) """ with db.cursor() as cur: - cur.execute(sql_insertion_query, [dp_id, receipt_token, clks_filename, count, 'pending']) + cur.execute(sql_insertion_query, [dp_id, receipt_token, clks_filename, encoding_count, block_count, 'pending']) + + +def insert_encodings_into_blocks(db, dp_id: int, block_ids: List[List[str]], encoding_ids: List[int], + encodings: List[bytes], page_size: int = 4096): + """ + Bulk load blocking and encoding data into the database. + See https://hakibenita.com/fast-load-data-python-postgresql#copy-data-from-a-string-iterator-with-buffer-size + + :param page_size: + Maximum number of rows to fetch in a given sql statement/network transfer. A larger page size + will require more local memory, but could be faster due to less network transfers. + + """ + encodings_insertion_query = "INSERT INTO encodings (dp, encoding_id, encoding) VALUES %s" + blocks_insertion_query = "INSERT INTO encodingblocks (dp, encoding_id, block_id) VALUES %s" + encoding_data = ((dp_id, eid, encoding) for eid, encoding in zip(encoding_ids, encodings)) + + def block_data_generator(encoding_ids, block_ids): + for eid, block_ids in zip(encoding_ids, block_ids): + for block_id in block_ids: + yield (dp_id, eid, block_id) + + with db.cursor() as cur: + psycopg2.extras.execute_values(cur, encodings_insertion_query, encoding_data, page_size=page_size) + psycopg2.extras.execute_values(cur, + blocks_insertion_query, + block_data_generator(encoding_ids, block_ids), + page_size=page_size) def set_dataprovider_upload_state(db, dp_id, state='error'): @@ -130,7 +180,7 @@ def insert_permutation_mask(conn, project_id, run_id, mask_list): def update_encoding_metadata(db, clks_filename, dp_id, state): sql_query = """ - UPDATE bloomingdata + UPDATE uploads SET state = %s, file = %s @@ -149,7 +199,7 @@ def update_encoding_metadata(db, clks_filename, dp_id, state): def update_encoding_metadata_set_encoding_size(db, dp_id, encoding_size): sql_query = """ - UPDATE bloomingdata + UPDATE uploads SET encoding_size = %s WHERE diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 51083829..6f3c353c 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -11,11 +11,11 @@ def select_dataprovider_id(db, project_id, receipt_token): Returns None if token is incorrect. """ sql_query = """ - SELECT dp from dataproviders, bloomingdata + SELECT dp from dataproviders, uploads WHERE - bloomingdata.dp = dataproviders.id AND + uploads.dp = dataproviders.id AND dataproviders.project = %s AND - bloomingdata.token = %s + uploads.token = %s """ query_result = query_db(db, sql_query, [project_id, receipt_token], one=True) logger.debug("Looking up data provider with auth. {}".format(query_result)) @@ -61,10 +61,10 @@ def check_run_exists(db, project_id, run_id): def get_number_parties_uploaded(db, project_id): sql_query = """ SELECT COUNT(*) - FROM dataproviders, bloomingdata + FROM dataproviders, uploads WHERE dataproviders.project = %s AND - bloomingdata.dp = dataproviders.id AND + uploads.dp = dataproviders.id AND dataproviders.uploaded = 'done' """ query_result = query_db(db, sql_query, [project_id], one=True) @@ -77,11 +77,11 @@ def get_encoding_error_count(db, project_id): """ sql_query = """ SELECT count(*) - FROM dataproviders, bloomingdata + FROM dataproviders, uploads WHERE dataproviders.project = %s AND - bloomingdata.dp = dataproviders.id AND - bloomingdata.state = 'error' + uploads.dp = dataproviders.id AND + uploads.state = 'error' """ return query_db(db, sql_query, [project_id], one=True)['count'] @@ -89,12 +89,12 @@ def get_encoding_error_count(db, project_id): def get_number_parties_ready(db, resource_id): sql_query = """ SELECT COUNT(*) - FROM dataproviders, bloomingdata + FROM dataproviders, uploads WHERE dataproviders.project = %s AND - bloomingdata.dp = dataproviders.id AND + uploads.dp = dataproviders.id AND dataproviders.uploaded = 'done' AND - bloomingdata.state = 'ready' + uploads.state = 'ready' """ query_result = query_db(db, sql_query, [resource_id], one=True) return query_result['count'] @@ -187,11 +187,12 @@ def get_run_result(db, resource_id): def get_project_dataset_sizes(db, project_id): + """Returns the number of encodings in a dataset.""" sql_query = """ - SELECT bloomingdata.count - FROM dataproviders, bloomingdata + SELECT uploads.count + FROM dataproviders, uploads WHERE - bloomingdata.dp=dataproviders.id AND + uploads.dp=dataproviders.id AND dataproviders.project=%s ORDER BY dataproviders.id """ @@ -203,9 +204,9 @@ def get_project_dataset_sizes(db, project_id): def get_uploaded_encoding_sizes(db, project_id): sql_query = """ SELECT dp, encoding_size - FROM dataproviders, bloomingdata + FROM dataproviders, uploads WHERE - bloomingdata.dp=dataproviders.id AND + uploads.dp=dataproviders.id AND dataproviders.project=%s ORDER BY dataproviders.id """ @@ -215,10 +216,10 @@ def get_uploaded_encoding_sizes(db, project_id): def get_smaller_dataset_size_for_project(db, project_id): sql_query = """ - SELECT MIN(bloomingdata.count) as smaller - FROM dataproviders, bloomingdata + SELECT MIN(uploads.count) as smaller + FROM dataproviders, uploads WHERE - bloomingdata.dp=dataproviders.id AND + uploads.dp=dataproviders.id AND dataproviders.project=%s """ query_result = query_db(db, sql_query, [project_id], one=True) @@ -231,10 +232,10 @@ def get_total_comparisons_for_project(db, project_id): """ expected_datasets = get_project_column(db, project_id, 'parties') sql_query = """ - SELECT bloomingdata.count as rows - from dataproviders, bloomingdata + SELECT uploads.count as rows + from dataproviders, uploads where - bloomingdata.dp=dataproviders.id AND + uploads.dp=dataproviders.id AND dataproviders.project=%s """ query_results = query_db(db, sql_query, [project_id]) @@ -260,12 +261,12 @@ def get_dataprovider_id(db, update_token): return query_db(db, sql_query, [update_token], one=True)['id'] -def get_bloomingdata_columns(db, dp_id, columns): +def get_uploads_columns(db, dp_id, columns): for column in columns: - assert column in {'ts', 'token', 'file', 'state', 'count', 'encoding_size'} + assert column in {'ts', 'token', 'file', 'state', 'block_count', 'count', 'encoding_size'} sql_query = """ SELECT {} - FROM bloomingdata + FROM uploads WHERE dp = %s """.format(', '.join(columns)) result = query_db(db, sql_query, [dp_id], one=True) @@ -274,19 +275,42 @@ def get_bloomingdata_columns(db, dp_id, columns): return [result[column] for column in columns] +def get_encodingblock_ids(db, dp_id, block_name=None): + """Yield all encoding ids in either a single block, or all blocks for a given data provider.""" + sql_query = """ + SELECT encoding_id + FROM encodingblocks + WHERE dp = %s + {} + """.format("AND block_id = %s" if block_name else "") + # Specifying a name for the cursor creates a server-side cursor, which prevents all of the + # records from being downloaded at once. + cur = db.cursor(f'encodingfetcher-{dp_id}') + + args = (dp_id, block_name) if block_name else (dp_id,) + + cur.execute(sql_query, args) + while True: + rows = cur.fetchmany(10_000) + if not rows: + break + for row in rows: + yield row[0] + + def get_filter_metadata(db, dp_id): """ :return: The filename and the encoding size of the raw clks. """ - filename, encoding_size = get_bloomingdata_columns(db, dp_id, ['file', 'encoding_size']) + filename, encoding_size = get_uploads_columns(db, dp_id, ['file', 'encoding_size']) return filename.strip(), encoding_size -def get_number_of_hashes(db, dp_id): +def get_encoding_metadata(db, dp_id): """ - :return: The count of the uploaded encodings. + :return: The number of encodings and number of blocks of the uploaded data. """ - return get_bloomingdata_columns(db, dp_id, ['count'])[0] + return get_uploads_columns(db, dp_id, ['count', 'block_count']) def get_project_schema_encoding_size(db, project_id): @@ -370,7 +394,7 @@ def get_all_objects_for_project(db, project_id): for dp in dps: clk_file_ref = query_db(db, """ - SELECT file FROM bloomingdata + SELECT file FROM uploads WHERE dp = %s """, [dp['id']], one=True) diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py new file mode 100644 index 00000000..6174eb18 --- /dev/null +++ b/backend/entityservice/encoding_storage.py @@ -0,0 +1,126 @@ +import math +from itertools import zip_longest +from typing import Iterator, List, Tuple + +import ijson + +from entityservice.database import insert_encodings_into_blocks +from entityservice.serialization import deserialize_bytes, binary_format + + +def stream_json_clksnblocks(f): + """ + The provided file will be contain encodings and blocking information with + the following structure: + + { + "clksnblocks": [ + ["BASE64 ENCODED ENCODING 1", blockid1, blockid2, ...], + ["BASE64 ENCODED ENCODING 2", blockid1, ...], + ... + ] + } + + :param f: JSON file containing clksnblocks data. + :return: Generator of (entity_id, base64 encoding, list of blocks) + """ + # At some point the user may supply the entity id. For now we use the order of uploaded encodings. + for i, obj in enumerate(ijson.items(f, 'clksnblocks.item')): + b64_encoding, *blocks = obj + yield i, deserialize_bytes(b64_encoding), blocks + + +def convert_encodings_from_base64_to_binary(encodings: Iterator[Tuple[str, str, List[str]]]): + """ + :param encodings: Iterable object containing tuples of (entity_id, base64 encoding, list of blocks) + :return: a tuple comprising: + (entity_id, binary encoding, list of blocks) + """ + # Peek at the first element to extract the encoding size + i, encoding_data, blocks = next(encodings) + encoding_size = len(encoding_data) + bit_packing_struct = binary_format(encoding_size) + + def generator(first_i, first_encoding_data, first_blocks): + binary_packed_encoding = bit_packing_struct.pack(first_i, first_encoding_data) + yield first_i, binary_packed_encoding, first_blocks + for i, encoding_data, blocks in encodings: + binary_packed_encoding = bit_packing_struct.pack(i, encoding_data) + yield i, binary_packed_encoding, blocks + + return encoding_size, generator(i, encoding_data, blocks) + +def _grouper(iterable, n, fillvalue=None): + "Collect data into fixed-length chunks or blocks" + # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" + args = [iter(iterable)] * n + return zip_longest(*args, fillvalue=fillvalue) + + +def _transpose(group): + """ + Given a list of 3-tuples from _grouper, return 3 lists. + Also filter out possible None values from _grouper + """ + a, b, c = [], [], [] + for g in group: + # g can be None + if g is not None: + + x, y, z = g + #if x is not None and y is not None and z is not None: + a.append(x) + b.append(y) + c.append(z) + return a, b, c + + +def store_encodings_in_db(conn, dp_id, encodings: Iterator[Tuple[str, bytes, List[str]]], encoding_size: int=128): + """ + Group encodings + blocks into database transactions and execute. + """ + + for group in _grouper(encodings, n=_estimate_group_size(encoding_size)): + encoding_ids, encodings, blocks = _transpose(group) + assert len(blocks) == len(encodings) + assert len(encoding_ids) == len(encodings) + insert_encodings_into_blocks(conn, dp_id, block_ids=blocks, encoding_ids=encoding_ids, encodings=encodings) + + +def _estimate_group_size(encoding_size): + """ + Given an encoding size (e.g. 128 B), estimate the number of encodings that will likely + be under 100MiB in data including blocks. Note this is hopefully very conservative + in estimating the average number of blocks each record is in. + """ + network_transaction_size = 104857600 # 100MiB + blocks_per_record_estimate = 50 + return math.ceil(network_transaction_size / ((blocks_per_record_estimate * 64) + (encoding_size + 4))) + + +def convert_encodings_from_json_to_binary(f): + """ + Temp helper function + + :param f: File-like object containing `clksnblocks` json. + :return: a tuple comprising: + - dict mapping blocks to lists of bytes (each encoding in our internal encoding file format) + - the size of the first encoding in bytes (excluding entity ID info) + """ + # Each block index contains a set of base64 encodings. + encodings_by_block = {} + + # Default which is ignored but makes IDE/typechecker happier + bit_packing_struct = binary_format(128) + encoding_size = None + + for i, encoding_data, blocks in stream_json_clksnblocks(f): + if encoding_size is None: + encoding_size = len(encoding_data) + bit_packing_struct = binary_format(encoding_size) + binary_packed_encoding = bit_packing_struct.pack(i, encoding_data) + for block in blocks: + encodings_by_block.setdefault(block, []).append(binary_packed_encoding) + + return encodings_by_block, encoding_size + diff --git a/backend/entityservice/init-db-schema.sql b/backend/entityservice/init-db-schema.sql index 11d46995..c1ac8974 100644 --- a/backend/entityservice/init-db-schema.sql +++ b/backend/entityservice/init-db-schema.sql @@ -1,16 +1,18 @@ DROP TABLE IF EXISTS -projects, runs, dataproviders, bloomingdata, run_results, -similarity_scores, permutations, permutation_masks, metrics +blocks, encodings, encodingblocks, dataproviders, metrics, permutations, permutation_masks, +projects, runs, run_results, similarity_scores, uploads CASCADE; DROP TYPE IF EXISTS MAPPINGRESULT; +DROP TYPE IF EXISTS RUNSTATE; +DROP TYPE IF EXISTS UPLOADEDSTATE; +DROP TYPE IF EXISTS PROCESSEDSTATE; CREATE TYPE MAPPINGRESULT AS ENUM ( 'groups', 'permutations', 'similarity_scores' - ); -- The table of entity matching jobs @@ -117,8 +119,8 @@ CREATE TYPE PROCESSEDSTATE AS ENUM ( 'error' -- an error occurred during the processing ); --- The encoded PII data for each dataprovider -CREATE TABLE bloomingdata ( +-- The PII data for each dataprovider +CREATE TABLE uploads ( id SERIAL PRIMARY KEY, ts TIMESTAMP DEFAULT current_timestamp, @@ -128,19 +130,67 @@ CREATE TABLE bloomingdata ( -- The receipt token for this data token CHAR(48) NOT NULL UNIQUE, - -- Store the raw CLK data in a file + -- Filename for the raw unprocessed uploaded data file CHAR(64) NOT NULL, state PROCESSEDSTATE NOT NULL, - -- Size in bytes of the uploaded encoding + -- Size in bytes of the uploaded encodings encoding_size INT NULL, - -- Number of uploaded entries - count INT NOT NULL + -- Number of uploaded encodings + count INT NOT NULL, + + -- Number of blocks uploaded + block_count INT NOT NULL DEFAULT 1 ); +CREATE TABLE blocks +( + dp INT REFERENCES dataproviders (id) on DELETE CASCADE, + + -- User supplied block name + block_name CHAR(64) NOT NULL, + + -- Number of encodings in block + count INT NOT NULL, + + -- State of the block + state PROCESSEDSTATE NOT NULL, + + PRIMARY KEY (dp, block_name) +); + +CREATE INDEX ON blocks (dp, block_name); + +CREATE TABLE encodings ( + dp INT REFERENCES dataproviders (id) on DELETE CASCADE, + + -- user supplied encoding id + encoding_id INT NOT NULL, + + encoding bytea NOT NULL, + + PRIMARY KEY (dp, encoding_id) +); + + +-- Table mapping blocks to encodings +CREATE TABLE encodingblocks ( + dp INT REFERENCES dataproviders (id) on DELETE CASCADE, + + encoding_id INT, + + block_id CHAR(64), + + FOREIGN KEY (dp, encoding_id) REFERENCES encodings (dp, encoding_id), + FOREIGN KEY (dp, block_id) REFERENCES blocks (dp, block_name) +); + + + + CREATE TABLE run_results ( -- Just the table index id SERIAL PRIMARY KEY, diff --git a/backend/entityservice/tasks/encoding_uploading.py b/backend/entityservice/tasks/encoding_uploading.py index bc13dc6d..f0444f91 100644 --- a/backend/entityservice/tasks/encoding_uploading.py +++ b/backend/entityservice/tasks/encoding_uploading.py @@ -1,23 +1,27 @@ import io -import json -from entityservice.cache import encodings as encoding_cache +import opentracing from entityservice.database import * +from entityservice.encoding_storage import stream_json_clksnblocks, convert_encodings_from_base64_to_binary, \ + convert_encodings_from_json_to_binary, store_encodings_in_db from entityservice.error_checking import check_dataproviders_encoding, handle_invalid_encoding_data, \ InvalidEncodingError from entityservice.object_store import connect_to_object_store -from entityservice.serialization import binary_pack_filters, deserialize_bytes, binary_format +from entityservice.serialization import binary_format from entityservice.settings import Config from entityservice.async_worker import celery, logger from entityservice.tasks.base_task import TracedTask from entityservice.tasks.pre_run_check import check_for_executable_runs -from entityservice.utils import iterable_to_stream, fmt_bytes, clks_uploaded_to_project +from entityservice.utils import fmt_bytes, clks_uploaded_to_project @celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'dp_id')) def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): - # User has uploaded base64 encodings as JSON + """ + User has uploaded base64 encodings as JSON, this task needs to copy the data into + our internal binary format. + """ log = logger.bind(pid=project_id, dp_id=dp_id) log.info("Handling user provided base64 encodings") @@ -25,42 +29,46 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): if not check_project_exists(db, project_id): log.info("Project deleted, stopping immediately") return - expected_count = get_number_of_hashes(db, dp_id) + # Get number of blocks + total number of encodings from database + expected_count, block_count = get_encoding_metadata(db, dp_id) - log.info(f"Expecting to handle {expected_count} encodings") + log.info(f"Expecting to handle {expected_count} encodings of in {block_count} blocks") mc = connect_to_object_store() - - #### GLUE CODE raw_file = Config.RAW_FILENAME_FMT.format(receipt_token) raw_data = mc.get_object(Config.MINIO_BUCKET, raw_file) - data = json.loads(raw_data.data.decode('utf-8')) - if 'clks' not in data: - raise ValueError('can only handle CLKs at the moment.') - binary_data = b'\n'.join(''.join(clk.split('\n')).encode() for clk in data['clks']) + b'\n' - buffer = io.BytesIO(binary_data) - #### END GLUE - - # Set up streaming processing pipeline - buffered_stream = iterable_to_stream(buffer) - text_stream = io.TextIOWrapper(buffered_stream, newline='\n') - - first_hash_bytes = deserialize_bytes(next(text_stream)) - uploaded_encoding_size = len(first_hash_bytes) - - def filter_generator(): - i = 0 - yield i, first_hash_bytes - for i, line in enumerate(text_stream, start=1): - hash_bytes = deserialize_bytes(line) - if len(hash_bytes) != uploaded_encoding_size: - raise ValueError("Encodings were not all the same size") - yield i, hash_bytes - - log.info(f"Processed {i + 1} hashes") + + with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span): + # stream encodings with block ids from uploaded file + # convert each encoding to our internal binary format + # output into database for each block (temp or direct to minio?) + encoding_size, pipeline = convert_encodings_from_base64_to_binary(stream_json_clksnblocks(raw_data)) + log.info(f"Starting pipeline to store {encoding_size}B sized encodings in database") + with DBConn() as db: + store_encodings_in_db(db, dp_id, pipeline, encoding_size) + + #### GLUE CODE - TODO remove me once moved away from storing encodings in files + # Note we open the stream a second time + raw_data = mc.get_object(Config.MINIO_BUCKET, raw_file) + blocked_binary_data, encoding_size = convert_encodings_from_json_to_binary(raw_data) + assert block_count == len(blocked_binary_data) + log.info(f"Converted uploaded encodings of size {encoding_size} bytes into internal binary format. Number of blocks: {block_count}") + + if block_count == 0: + log.warning("No uploaded encoding blocks, stopping processing.") + # TODO mark run as failure? + return + elif block_count > 1: + raise NotImplementedError('Currently handle single block encodings - check back soon') + + #for block_id in blocked_binary_data: + block_id = list(blocked_binary_data.keys())[0] + actual_count = len(blocked_binary_data[block_id]) + log.info(f"{block_id=}, number of encodings: {actual_count}") # We peek at the first element as we need the encoding size - # for the ret of our processing pipeline - python_filters = filter_generator() + # for the rest of our processing pipeline. Note we now add 4 bytes to the encoding + # for the entity's identifier. + uploaded_encoding_size = len(blocked_binary_data[block_id][0]) - 4 # This is the first time we've seen the encoding size from this data provider try: @@ -76,23 +84,14 @@ def filter_generator(): # Output file is our custom binary packed file filename = Config.BIN_FILENAME_FMT.format(receipt_token) bit_packed_element_size = binary_format(uploaded_encoding_size).size - num_bytes = expected_count * bit_packed_element_size - - # If small enough preload the data into our redis cache - if expected_count < Config.ENTITY_CACHE_THRESHOLD: - log.info("Caching pickled clk data") - python_filters = list(python_filters) - encoding_cache.set_deserialized_filter(dp_id, python_filters) - else: - log.info("Not caching clk data as it is too large") - - packed_filters = binary_pack_filters(python_filters, uploaded_encoding_size) - packed_filter_stream = iterable_to_stream(packed_filters) - - # Upload to object store - log.info(f"Uploading {expected_count} encodings of size {uploaded_encoding_size} " + - f"to object store. Total Size: {fmt_bytes(num_bytes)}") - mc.put_object(Config.MINIO_BUCKET, filename, data=packed_filter_stream, length=num_bytes) + num_bytes = actual_count * bit_packed_element_size + + with opentracing.tracer.start_span('process-encodings-in-quarantine', child_of=parent_span) as span: + packed_filter_stream = io.BytesIO(b''.join(blocked_binary_data[block_id])) + # Upload to object store + log.info(f"Uploading {expected_count} encodings of size {uploaded_encoding_size} " + + f"to object store. Total Size: {fmt_bytes(num_bytes)}") + mc.put_object(Config.MINIO_BUCKET, filename, data=packed_filter_stream, length=num_bytes) with DBConn() as conn: update_encoding_metadata(conn, filename, dp_id, 'ready') diff --git a/backend/entityservice/tests/test_encoding_storage.py b/backend/entityservice/tests/test_encoding_storage.py new file mode 100644 index 00000000..b34048ad --- /dev/null +++ b/backend/entityservice/tests/test_encoding_storage.py @@ -0,0 +1,36 @@ +from pathlib import Path +import io + +from entityservice.encoding_storage import convert_encodings_from_json_to_binary +from entityservice.tests.util import serialize_bytes + + +class TestEncodingStorage: + def test_convert_encodings_from_json_to_binary_simple(self): + filename = Path(__file__).parent / 'testdata' / 'test_encoding.json' + with open(filename) as f: + binary_dict, encoding_length = convert_encodings_from_json_to_binary(f) + assert encoding_length == 128 + assert len(binary_dict) == 2 + assert len(binary_dict['1']) == 3 + assert len(binary_dict['2']) == 2 + assert len(set(binary_dict['1']).intersection(binary_dict['2'])) == 1 + + def test_convert_encodings_from_json_to_binary_empty(self): + empty = io.BytesIO(b'''{ + "clksnblocks": [] + }''') + + binary_dict, encoding_length = convert_encodings_from_json_to_binary(empty) + assert binary_dict == {} + assert encoding_length is None + + def test_convert_encodings_from_json_to_binary_short(self): + d = serialize_bytes(b'abcdabcd') + json_data = io.BytesIO(b'{' + f'''"clksnblocks": [["{d}", "02"]]'''.encode() + b'}') + binary_dict, encoding_length = convert_encodings_from_json_to_binary(json_data) + assert encoding_length == 8 + assert "02" in binary_dict + assert len(binary_dict["02"]) == 1 + assert len(binary_dict["02"][0]) == 8 + 4 # size of encoding + id + diff --git a/backend/entityservice/tests/testdata/test_encoding.json b/backend/entityservice/tests/testdata/test_encoding.json new file mode 100644 index 00000000..faea6279 --- /dev/null +++ b/backend/entityservice/tests/testdata/test_encoding.json @@ -0,0 +1,8 @@ +{ + "clksnblocks": [ + ["PNfT5qAAlAgFyQyoUhokyohywAAOYMJgdwPCRWBQOyCIsSEgePCo2CnRaON+FUog07AHTDUDARsUcJiSaYKNDiCAEeICbGYSZFhCVALQAylxDSAtSR4CsgiCmBjDiAEOGMfEi7ABkydqyKhIIFrBQQvFAUTDakAg0RyFYAWg8nE=", "1"], + ["WvjX1rZYiFCRQAyqptwAS7jAaACEhFApFxB9RSeYPTBLkYkIUjKacAIleYLoNUooU/AHYCQ2FSkUYCzZMiClDIGEIBKQXKZAogCAUpvOKikUKUI+CRgI+AkgGsqAJYkdZcHEAAWDEidgHbYgIjBpMgpUU2wCwgCoXwCCWCVAsPM=", "1"], + ["UPLX+gCgkbnVQhg4wBA8wogCUNA0QEbhPyHBPCBAKwSOmzJFWDGqWAZY/NsQX01Aw7A1UWBKAt5GPJib4BpLSHKQBesZbjsjYhTAQBBDZ7uQHKGNyRhCMhiWsAjaVEBcQlVkiCHzMhEimEgERlB/AQvFBwRjQHRywRQQQUuggnw=", "1", "2"], + ["MNXW4D9Gg5EYCQ2AQqGhBuAK+BiElCKJjyF1HjObORkqIDFgUTBZXQgEI4LAUc1k8tmMTkhijMynMKa44DiEWhDUAtAEphwapBCAFIpQIGgUAoIt0ZACOoxBEyBIFKIHRChFARbRunMh/GRi4hCRB2ZEJYZS1gTgAcAGXQGD++0=", "2"] + ] +} diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 1d9b1d42..88827ebe 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -1,5 +1,5 @@ -import json from io import BytesIO +import json import tempfile import minio @@ -324,9 +324,9 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128): """ receipt_token = generate_code() filename = Config.BIN_FILENAME_FMT.format(receipt_token) - # Set the state to 'pending' in the bloomingdata table + # Set the state to 'pending' in the uploads table with DBConn() as conn: - db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, count) + db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, encoding_count=count, block_count=1) db.update_encoding_metadata_set_encoding_size(conn, dp_id, size) logger.info(f"Storing supplied binary clks of individual size {size} in file: {filename}") @@ -386,17 +386,22 @@ def upload_json_clk_data(dp_id, clk_json, uses_blocking, parent_span): span.set_tag(element, encoding_count) logger.debug(f"Received {encoding_count} {element}") - if element == 'clksnblocks': - # Note the format of encoding + blocks. - # {'clknblocks': [['UG9vcA==', '001', '211'], [...]]} - blocks = set() - for _, *elements_blocks in clk_json[element]: - blocks.update(elements_blocks) - block_count = len(blocks) - else: - block_count = 1 + if element == 'clks': + logger.info("Rewriting provided json into clknsblocks format") + clk_json = {'clksnblocks': [[encoding, '1'] for encoding in clk_json['clks']]} + element = 'clksnblocks' + + logger.info("Counting block sizes and number of blocks") + # {'clknblocks': [['UG9vcA==', '001', '211'], [...]]} + block_sizes = {} + for _, *elements_blocks in clk_json[element]: + for el_block in elements_blocks: + block_sizes[el_block] = block_sizes.setdefault(el_block, 0) + 1 + block_count = len(block_sizes) logger.info(f"Received {encoding_count} encodings in {block_count} blocks") + for block in block_sizes: + logger.info(f"Block {block} has {block_sizes[block]} elements") # write clk_json into a temp file tmp = tempfile.NamedTemporaryFile(mode='w') @@ -415,6 +420,7 @@ def upload_json_clk_data(dp_id, clk_json, uses_blocking, parent_span): with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span): with DBConn() as conn: - db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, encoding_count) + db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, encoding_count, block_count) + db.insert_blocking_metadata(conn, dp_id, block_sizes) return receipt_token, filename diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index 1ff6258e..eb977d19 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -59,7 +59,7 @@ services: depends_on: - redis - db - entrypoint: celery -A entityservice.async_worker worker --loglevel=info -O fair -Q celery,compute,highmemory + command: celery -A entityservice.async_worker worker --loglevel=info -O fair -Q celery,compute,highmemory environment: - DATABASE_PASSWORD=rX%QpV7Xgyrz - DEBUG=true