Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions backend/entityservice/cache/helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
def _get_run_hash_key(run_id):
key = f'run:{run_id}'
return key


def _get_project_hash_key(project_id):
key = f'project:{project_id}'
return key


def _convert_redis_result_to_int(res):
# redis returns bytes, and None if not present
if res is not None:
return int(res)
else:
return None
35 changes: 25 additions & 10 deletions backend/entityservice/cache/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,45 @@

from entityservice.settings import Config as globalconfig
from entityservice.cache.connection import connect_to_redis
from entityservice.cache.helpers import _get_run_hash_key
from entityservice.cache.helpers import _get_run_hash_key, _get_project_hash_key, _convert_redis_result_to_int
import entityservice.database as db

logger = structlog.get_logger()


def get_total_number_of_comparisons(project_id):
r = connect_to_redis(read_only=True)
key = _get_project_hash_key(project_id)
res = r.hget(key, 'total_comparisons')
# hget returns None if missing key/name, and bytes if present
if res:
return _convert_redis_result_to_int(res)
else:
# Calculate the number of comparisons
with db.DBConn() as conn:
total_comparisons = db.get_total_comparisons_for_project(conn, project_id)
# get a writable connection to redis
r = connect_to_redis()
res = r.hset(key, 'total_comparisons', total_comparisons)
r.expire(key, 60*60)
return total_comparisons

def save_current_progress(comparisons, run_id, config=None):
if config is None:
config = globalconfig
logger.debug(f"Updating progress. Compared {comparisons} CLKS", run_id=run_id)
r = connect_to_redis()
key = _get_run_hash_key(run_id)
r.hincrby(key, 'progress', comparisons)
r.expire(key, config.CACHE_EXPIRY)
if comparisons > 0:
r = connect_to_redis()
key = _get_run_hash_key(run_id)
r.hincrby(key, 'progress', comparisons)
r.expire(key, config.CACHE_EXPIRY)


def get_progress(run_id):
r = connect_to_redis(read_only=True)
key = _get_run_hash_key(run_id)
res = r.hget(key, 'progress')
# redis returns bytes, and None if not present
if res is not None:
return int(res)
else:
return None
return _convert_redis_result_to_int(res)


def clear_progress(run_id):
Expand Down
40 changes: 24 additions & 16 deletions backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,27 +229,35 @@ def get_smaller_dataset_size_for_project(db, project_id):

def get_total_comparisons_for_project(db, project_id):
"""
Returns the number of comparisons that a project requires.

For each block:
for each pairwise combination of data providers in each block:
multiply the block sizes together
then sum number of comparisons for each block
Sum the number of comparisons in all blocks together.

:return total number of comparisons for this project
"""
expected_datasets = get_project_column(db, project_id, 'parties')

# The full computation is *hard* to do in postgres, so we return an array of sizes
# for each block and then use Python to find the pairwise combinations.
sql_query = """
SELECT uploads.count as rows
from dataproviders, uploads
where
uploads.dp=dataproviders.id AND
dataproviders.project=%s
select block_name, array_agg(count) as counts
from blocks
where dp in (
select id from dataproviders where project = %s
)
group by block_name
"""

query_results = query_db(db, sql_query, [project_id])
if len(query_results) < expected_datasets:
return 'NA'
elif len(query_results) == expected_datasets:
counts = (qr['rows'] for qr in query_results)
total_comparisons = sum(
c0 * c1 for c0, c1 in itertools.combinations(counts, 2))
return total_comparisons
else:
raise ValueError(f'expected at most {expected_datasets} '
f'datasets, got {len(query_results)}')
total_comparisons = 0
for block in query_results:
num_comparisons_in_block = sum(c0 * c1 for c0, c1 in itertools.combinations(block['counts'], 2))
total_comparisons += num_comparisons_in_block

return total_comparisons


def get_dataprovider_id(db, update_token):
Expand Down
8 changes: 6 additions & 2 deletions backend/entityservice/tasks/comparing.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ def new_child_span(name, parent_scope=None):
log.debug('Both chunks are fetched and deserialized')
task_span.log_kv({'size1': chunk_dp1_size, 'size2': chunk_dp2_size, 'chunk_info': chunk_info})

assert chunk_dp1_size > 0, "Zero sized chunk in dp1"
assert chunk_dp2_size > 0, "Zero sized chunk in dp2"

with new_child_span('comparing-encodings') as parent_scope:

log.debug("Calculating filter similarity")
Expand All @@ -285,11 +288,12 @@ def reindex_using_encoding_ids(recordarray, encoding_id_list):

log.debug('Encoding similarities calculated')

with new_child_span('update-comparison-progress'):
with new_child_span('update-comparison-progress') as scope:
# Update the number of comparisons completed
comparisons_computed = chunk_dp1_size * chunk_dp2_size
save_current_progress(comparisons_computed, run_id)
log.info("Comparisons: {}, Links above threshold: {}".format(comparisons_computed, len(sims)))
scope.span.log_kv({'comparisons': comparisons_computed, 'num_similar': len(sims)})
log.debug("Comparisons: {}, Links above threshold: {}".format(comparisons_computed, len(sims)))

with new_child_span('save-comparison-results-to-minio'):
num_results = len(sims)
Expand Down
1 change: 0 additions & 1 deletion backend/entityservice/tasks/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def prerun_check(project_id, run_id, parent_span=None):

log.debug("Updating redis cache for run")
set_run_state_active(run_id)
progress_cache.save_current_progress(comparisons=0, run_id=run_id)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was why I'd seen a log messages saving "0 comparisons".


create_comparison_jobs.apply_async(
kwargs={'project_id': project_id, 'run_id': run_id, 'parent_span': prerun_check.get_serialized_span()},
Expand Down
4 changes: 2 additions & 2 deletions backend/entityservice/views/run/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def get(project_id, run_id):
# Computing similarity
abs_val = progress_cache.get_progress(run_id)
if abs_val is not None:
with db.DBConn() as conn:
max_val = db.get_total_comparisons_for_project(conn, project_id)
max_val = progress_cache.get_total_number_of_comparisons(project_id)
logger.debug(f"total comparisons: {max_val}")
else:
# Solving for mapping (no progress)
abs_val = None
Expand Down