Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
065b11a
Add function to fetch block ids and sizes from db
hardbyte Mar 4, 2020
6f7c75c
Retrieve blocking info in create_comparison_jobs task
hardbyte Mar 4, 2020
ebcb248
WIP - identify blocks that need to be broken up further
hardbyte Mar 4, 2020
a066ccc
Query for getting encodings in a block
hardbyte Mar 6, 2020
fb550de
Split tasks into chunks using blocking information
hardbyte Mar 6, 2020
610b3bb
Refactor create comparison jobs function
hardbyte Mar 8, 2020
d838fe4
More refactoring of chunk creation
hardbyte Mar 9, 2020
ec36e8d
Add a few unit tests for chunking
hardbyte Mar 9, 2020
ddcbcc3
Add database index on encodings table
hardbyte Mar 9, 2020
4ab16e6
clknblocks not clksnblocks and other minor cleanup
hardbyte Mar 10, 2020
d66bf58
cleanup
hardbyte Mar 10, 2020
1e5151f
Add blocking concept to docs
hardbyte Mar 13, 2020
aec9b5c
Deduplicate candidate pairs before solving
hardbyte Mar 15, 2020
f30c819
Catch the empty candidate pair case
hardbyte Mar 15, 2020
9dc59e1
Simplify solver task by using anonlink's _merge_similarities function
hardbyte Mar 16, 2020
6219e44
Update celery
hardbyte Mar 16, 2020
0b6a4c2
Address code review feedback
hardbyte Mar 18, 2020
5467362
Bump version to beta2
hardbyte Mar 18, 2020
f342d5a
Celery concurrency defaults
hardbyte Mar 19, 2020
2add5ef
Add another layer of tracing into the comparison task
hardbyte Mar 19, 2020
e2ebe99
Update task names in celery routing
hardbyte Mar 19, 2020
38b624f
Faster encoding retrieval by using COPY.
hardbyte Mar 22, 2020
7ec7fef
Pass on stored size when retrieving encodings from DB
hardbyte Mar 22, 2020
24caa79
Increase time on test
hardbyte Mar 22, 2020
dc1983b
Refactor binary copy into own function for easier reuse and testing
hardbyte Mar 23, 2020
8bae410
Add more detailed tracing around binary encoding insertions.
hardbyte Mar 24, 2020
88e968d
Add tests for binary copy function
hardbyte Mar 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor create comparison jobs function
  • Loading branch information
hardbyte committed Mar 18, 2020
commit 610b3bbfed74b92e7c1f92e7ded5569cc3923bfa
160 changes: 107 additions & 53 deletions backend/entityservice/tasks/comparing.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,10 @@ def create_comparison_jobs(project_id, run_id, parent_span=None):
f"{', '.join(map(str, dp_ids))}")

# Retrieve required metadata
dataset_sizes = get_project_dataset_sizes(conn, project_id)

# {dp_id -> {block_id -> block_size}}
# e.g. {33: {'1': 100}, 34: {'1': 100}, 35: {'1': 100}}
dp_block_sizes = {}
for dp_id in dp_ids:
dp_block_sizes[dp_id] = dict(get_block_metadata(conn, dp_id))
dataset_sizes, dp_block_sizes = _retrieve_blocked_dataset_sizes(conn, project_id, dp_ids)

log.info("Finding blocks in common between dataproviders")
# block_id -> List(pairs of dp ids)
# e.g. {'1': [(26, 27), (26, 28), (27, 28)]}
blocks = {}
for dp1, dp2 in itertools.combinations(dp_ids, 2):
# Get the intersection of blocks between these two dataproviders
common_block_ids = set(dp_block_sizes[dp1]).intersection(set(dp_block_sizes[dp2]))
for block_id in common_block_ids:
blocks.setdefault(block_id, []).append((dp1, dp2))
common_blocks = _get_common_blocks(dp_block_sizes, dp_ids)

if len(dataset_sizes) < 2:
log.warning("Unexpected number of dataset sizes in db. Stopping")
Expand All @@ -86,38 +73,8 @@ def create_comparison_jobs(project_id, run_id, parent_span=None):

encoding_size = get_project_encoding_size(conn, project_id)

# Now look at the sizes of the blocks, some may have to be broken into
# chunks, others will directly be compared as a full block. In either case
# we create a dict which describes the required comparisons. The dict will
# have required keys: dataproviderId, block_id, and range.
# And dataproviderIndex?
chunks = []
for block_id in blocks:
# Note there might be many pairs of datasets in this block
for dp1, dp2 in blocks[block_id]:
size1 = dp_block_sizes[dp1][block_id]
size2 = dp_block_sizes[dp2][block_id]

comparisons = size1 * size2
if comparisons > Config.CHUNK_SIZE_AIM:
log.info("Block is too large for single task. Working out how to chunk it up")
# A task will request a range of encodings for a block
# First cut can probably use postgres' LIMIT OFFSET (ordering by encoding_id?)
for chunk_info in anonlink.concurrency.split_to_chunks(Config.CHUNK_SIZE_AIM,
dataset_sizes=(size1, size2)):
# chunk_info's from anonlink already have datasetIndex of 0 or 1 and a range
# We need to correct the datasetIndex and add the database datasetId and add block_id.
add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2)
add_block_id_to_chunk_info(chunk_info, block_id)
chunks.append(chunk_info)
else:
# Add the full block as a chunk
# TODO should we provide a range (it could be optional)
chunk_left = {"range": (0, size1), "block_id": block_id}
chunk_right = {"range": (0, size2), "block_id": block_id}
chunk_info = (chunk_left, chunk_right)
add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2)
chunks.append(chunk_info)
# Create "chunks" of comparisons
chunks = _create_work_chunks(common_blocks, dp_block_sizes, dp_ids, log)

log.info(f"Computing similarity for "
f"{' x '.join(map(str, dataset_sizes))} entities")
Expand Down Expand Up @@ -152,6 +109,107 @@ def create_comparison_jobs(project_id, run_id, parent_span=None):
future = chord(scoring_tasks)(callback_task)


def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log):
"""Create chunks of comparisons using blocking information.

.. note::

Ideally the chunks would be similarly sized, however with blocking the best we
can do in this regard is to ensure a similar maximum size. Small blocks will
directly translate to small chunks.

:todo Consider passing all dataproviders to anonlink's `split_to_chunks` for a given
large block instead of pairwise.

:param blocks:
dict mapping block identifier to a list of all combinations of
data provider pairs containing this block. As created by :func:`_get_common_blocks`
:param dp_block_sizes:
A map from dataprovider id to a dict mapping
block id to the number of encodings from the dataprovider in the
block. As created by :func:`_retrieve_blocked_dataset_sizes`
:param dp_ids: list of data provider ids

:returns

a dict describing the required comparisons. Comprised of:
- dataproviderId - The dataprovider identifier in our database
- datasetIndex - The dataprovider index [0, 1 ...]
- block_id - The block of encodings this chunk is for
- range - The range of encodings within the block that make up this chunk.
"""
chunks = []
for block_id in blocks:
log.debug("Chunking block", block_id=block_id)
# Note there might be many pairs of datasets in this block
for dp1, dp2 in blocks[block_id]:
size1 = dp_block_sizes[dp1][block_id]
size2 = dp_block_sizes[dp2][block_id]

comparisons = size1 * size2
if comparisons > Config.CHUNK_SIZE_AIM:
log.debug("Block is too large for single task. Working out how to chunk it up")
for chunk_info in anonlink.concurrency.split_to_chunks(Config.CHUNK_SIZE_AIM,
dataset_sizes=(size1, size2)):
# chunk_info's from anonlink already have datasetIndex of 0 or 1 and a range
# We need to correct the datasetIndex and add the database datasetId and add block_id.
add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2)
add_block_id_to_chunk_info(chunk_info, block_id)
chunks.append(chunk_info)
else:
log.debug("Add the full block as a single work chunk")
# Although the range "could" be optional we choose to make all chunk definitions consistent
chunk_left = {"range": (0, size1), "block_id": block_id}
chunk_right = {"range": (0, size2), "block_id": block_id}
chunk_info = (chunk_left, chunk_right)
add_dp_id_to_chunk_info(chunk_info, dp_ids, dp1, dp2)
chunks.append(chunk_info)
return chunks


def _get_common_blocks(dp_block_sizes, dp_ids):
"""Return all pairs of non-empty blocks across dataproviders.

:returns
dict mapping block identifier to a list of all combinations of
data provider pairs containing this block.

block_id -> List(pairs of dp ids)
e.g. {'1': [(26, 27), (26, 28), (27, 28)]}
"""
blocks = {}
for dp1, dp2 in itertools.combinations(dp_ids, 2):
# Get the intersection of blocks between these two dataproviders
common_block_ids = set(dp_block_sizes[dp1]).intersection(set(dp_block_sizes[dp2]))
for block_id in common_block_ids:
blocks.setdefault(block_id, []).append((dp1, dp2))
return blocks


def _retrieve_blocked_dataset_sizes(conn, project_id, dp_ids):
"""Fetch encoding counts for each dataset by block.

:param dp_ids: Iterable of dataprovider database identifiers.
:returns
A 2-tuple of:

- dataset sizes: a tuple of the number of encodings in each dataset.

- dp_block_sizes: A map from dataprovider id to a dict mapping
block id to the number of encodings from the dataprovider in the
block.

{dp_id -> {block_id -> block_size}}
e.g. {33: {'1': 100}, 34: {'1': 100}, 35: {'1': 100}}
"""
dataset_sizes = get_project_dataset_sizes(conn, project_id)

dp_block_sizes = {}
for dp_id in dp_ids:
dp_block_sizes[dp_id] = dict(get_block_metadata(conn, dp_id))
return dataset_sizes, dp_block_sizes


def add_dp_id_to_chunk_info(chunk_info, dp_ids, dp_id_left, dp_id_right):
"""
Modify a chunk_info as returned by anonlink.concurrency.split_to_chunks
Expand Down Expand Up @@ -225,12 +283,8 @@ def new_child_span(name):
threshold=threshold,
k=min(chunk_dp1_size, chunk_dp2_size))

# TODO check offset results are working... in rec_is0 and rec_is1 using encoding_ids
def offset(recordarray, encoding_id_list):
rec = np.frombuffer(recordarray, dtype=recordarray.typecode)
encoding_ids = np.array(encoding_id_list)
res_np = encoding_ids[rec]
return array.array('I', [i for i in res_np])
return array.array('I', [encoding_id_list[i] for i in recordarray])

rec_is0 = offset(rec_is0, entity_ids_dp1)
rec_is1 = offset(rec_is1, entity_ids_dp2)
Expand Down Expand Up @@ -260,7 +314,7 @@ def offset(recordarray, encoding_id_list):
index_1 = array.array('I', (chunk_info_dp1["datasetIndex"],)) * num_results
index_2 = array.array('I', (chunk_info_dp2["datasetIndex"],)) * num_results

chunk_results = sims, (rec_is0, rec_is1), (index_1, index_2)
chunk_results = sims, (index_1, index_2), (rec_is0, rec_is1),

bytes_iter, file_size \
= anonlink.serialization.dump_candidate_pairs_iter(chunk_results)
Expand Down