diff --git a/backend/entityservice/cache/helpers.py b/backend/entityservice/cache/helpers.py index a3a05239..381fff3a 100644 --- a/backend/entityservice/cache/helpers.py +++ b/backend/entityservice/cache/helpers.py @@ -1,3 +1,16 @@ def _get_run_hash_key(run_id): key = f'run:{run_id}' return key + + +def _get_project_hash_key(project_id): + key = f'project:{project_id}' + return key + + +def _convert_redis_result_to_int(res): + # redis returns bytes, and None if not present + if res is not None: + return int(res) + else: + return None \ No newline at end of file diff --git a/backend/entityservice/cache/progress.py b/backend/entityservice/cache/progress.py index 10615be1..a18cab62 100644 --- a/backend/entityservice/cache/progress.py +++ b/backend/entityservice/cache/progress.py @@ -2,30 +2,45 @@ from entityservice.settings import Config as globalconfig from entityservice.cache.connection import connect_to_redis -from entityservice.cache.helpers import _get_run_hash_key +from entityservice.cache.helpers import _get_run_hash_key, _get_project_hash_key, _convert_redis_result_to_int +import entityservice.database as db logger = structlog.get_logger() +def get_total_number_of_comparisons(project_id): + r = connect_to_redis(read_only=True) + key = _get_project_hash_key(project_id) + res = r.hget(key, 'total_comparisons') + # hget returns None if missing key/name, and bytes if present + if res: + return _convert_redis_result_to_int(res) + else: + # Calculate the number of comparisons + with db.DBConn() as conn: + total_comparisons = db.get_total_comparisons_for_project(conn, project_id) + # get a writable connection to redis + r = connect_to_redis() + res = r.hset(key, 'total_comparisons', total_comparisons) + r.expire(key, 60*60) + return total_comparisons + def save_current_progress(comparisons, run_id, config=None): if config is None: config = globalconfig logger.debug(f"Updating progress. Compared {comparisons} CLKS", run_id=run_id) - r = connect_to_redis() - key = _get_run_hash_key(run_id) - r.hincrby(key, 'progress', comparisons) - r.expire(key, config.CACHE_EXPIRY) + if comparisons > 0: + r = connect_to_redis() + key = _get_run_hash_key(run_id) + r.hincrby(key, 'progress', comparisons) + r.expire(key, config.CACHE_EXPIRY) def get_progress(run_id): r = connect_to_redis(read_only=True) key = _get_run_hash_key(run_id) res = r.hget(key, 'progress') - # redis returns bytes, and None if not present - if res is not None: - return int(res) - else: - return None + return _convert_redis_result_to_int(res) def clear_progress(run_id): diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 4ce1d35c..0d8d9120 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -229,27 +229,35 @@ def get_smaller_dataset_size_for_project(db, project_id): def get_total_comparisons_for_project(db, project_id): """ + Returns the number of comparisons that a project requires. + + For each block: + for each pairwise combination of data providers in each block: + multiply the block sizes together + then sum number of comparisons for each block + Sum the number of comparisons in all blocks together. + :return total number of comparisons for this project """ - expected_datasets = get_project_column(db, project_id, 'parties') + + # The full computation is *hard* to do in postgres, so we return an array of sizes + # for each block and then use Python to find the pairwise combinations. sql_query = """ - SELECT uploads.count as rows - from dataproviders, uploads - where - uploads.dp=dataproviders.id AND - dataproviders.project=%s + select block_name, array_agg(count) as counts + from blocks + where dp in ( + select id from dataproviders where project = %s + ) + group by block_name """ + query_results = query_db(db, sql_query, [project_id]) - if len(query_results) < expected_datasets: - return 'NA' - elif len(query_results) == expected_datasets: - counts = (qr['rows'] for qr in query_results) - total_comparisons = sum( - c0 * c1 for c0, c1 in itertools.combinations(counts, 2)) - return total_comparisons - else: - raise ValueError(f'expected at most {expected_datasets} ' - f'datasets, got {len(query_results)}') + total_comparisons = 0 + for block in query_results: + num_comparisons_in_block = sum(c0 * c1 for c0, c1 in itertools.combinations(block['counts'], 2)) + total_comparisons += num_comparisons_in_block + + return total_comparisons def get_dataprovider_id(db, update_token): diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index fe463519..10af0312 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -262,6 +262,9 @@ def new_child_span(name, parent_scope=None): log.debug('Both chunks are fetched and deserialized') task_span.log_kv({'size1': chunk_dp1_size, 'size2': chunk_dp2_size, 'chunk_info': chunk_info}) + assert chunk_dp1_size > 0, "Zero sized chunk in dp1" + assert chunk_dp2_size > 0, "Zero sized chunk in dp2" + with new_child_span('comparing-encodings') as parent_scope: log.debug("Calculating filter similarity") @@ -285,11 +288,12 @@ def reindex_using_encoding_ids(recordarray, encoding_id_list): log.debug('Encoding similarities calculated') - with new_child_span('update-comparison-progress'): + with new_child_span('update-comparison-progress') as scope: # Update the number of comparisons completed comparisons_computed = chunk_dp1_size * chunk_dp2_size save_current_progress(comparisons_computed, run_id) - log.info("Comparisons: {}, Links above threshold: {}".format(comparisons_computed, len(sims))) + scope.span.log_kv({'comparisons': comparisons_computed, 'num_similar': len(sims)}) + log.debug("Comparisons: {}, Links above threshold: {}".format(comparisons_computed, len(sims))) with new_child_span('save-comparison-results-to-minio'): num_results = len(sims) diff --git a/backend/entityservice/tasks/run.py b/backend/entityservice/tasks/run.py index bff98ed3..c6c36e5c 100644 --- a/backend/entityservice/tasks/run.py +++ b/backend/entityservice/tasks/run.py @@ -45,7 +45,6 @@ def prerun_check(project_id, run_id, parent_span=None): log.debug("Updating redis cache for run") set_run_state_active(run_id) - progress_cache.save_current_progress(comparisons=0, run_id=run_id) create_comparison_jobs.apply_async( kwargs={'project_id': project_id, 'run_id': run_id, 'parent_span': prerun_check.get_serialized_span()}, diff --git a/backend/entityservice/views/run/status.py b/backend/entityservice/views/run/status.py index ff5e50ea..beca1ebd 100644 --- a/backend/entityservice/views/run/status.py +++ b/backend/entityservice/views/run/status.py @@ -51,8 +51,8 @@ def get(project_id, run_id): # Computing similarity abs_val = progress_cache.get_progress(run_id) if abs_val is not None: - with db.DBConn() as conn: - max_val = db.get_total_comparisons_for_project(conn, project_id) + max_val = progress_cache.get_total_number_of_comparisons(project_id) + logger.debug(f"total comparisons: {max_val}") else: # Solving for mapping (no progress) abs_val = None