Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
065b11a
Add function to fetch block ids and sizes from db
hardbyte Mar 4, 2020
6f7c75c
Retrieve blocking info in create_comparison_jobs task
hardbyte Mar 4, 2020
ebcb248
WIP - identify blocks that need to be broken up further
hardbyte Mar 4, 2020
a066ccc
Query for getting encodings in a block
hardbyte Mar 6, 2020
fb550de
Split tasks into chunks using blocking information
hardbyte Mar 6, 2020
610b3bb
Refactor create comparison jobs function
hardbyte Mar 8, 2020
d838fe4
More refactoring of chunk creation
hardbyte Mar 9, 2020
ec36e8d
Add a few unit tests for chunking
hardbyte Mar 9, 2020
ddcbcc3
Add database index on encodings table
hardbyte Mar 9, 2020
4ab16e6
clknblocks not clksnblocks and other minor cleanup
hardbyte Mar 10, 2020
d66bf58
cleanup
hardbyte Mar 10, 2020
1e5151f
Add blocking concept to docs
hardbyte Mar 13, 2020
aec9b5c
Deduplicate candidate pairs before solving
hardbyte Mar 15, 2020
f30c819
Catch the empty candidate pair case
hardbyte Mar 15, 2020
9dc59e1
Simplify solver task by using anonlink's _merge_similarities function
hardbyte Mar 16, 2020
6219e44
Update celery
hardbyte Mar 16, 2020
0b6a4c2
Address code review feedback
hardbyte Mar 18, 2020
5467362
Bump version to beta2
hardbyte Mar 18, 2020
f342d5a
Celery concurrency defaults
hardbyte Mar 19, 2020
2add5ef
Add another layer of tracing into the comparison task
hardbyte Mar 19, 2020
e2ebe99
Update task names in celery routing
hardbyte Mar 19, 2020
38b624f
Faster encoding retrieval by using COPY.
hardbyte Mar 22, 2020
7ec7fef
Pass on stored size when retrieving encodings from DB
hardbyte Mar 22, 2020
24caa79
Increase time on test
hardbyte Mar 22, 2020
dc1983b
Refactor binary copy into own function for easier reuse and testing
hardbyte Mar 23, 2020
8bae410
Add more detailed tracing around binary encoding insertions.
hardbyte Mar 24, 2020
88e968d
Add tests for binary copy function
hardbyte Mar 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add a few unit tests for chunking
  • Loading branch information
hardbyte committed Mar 18, 2020
commit ec36e8dd70d096d624899c2c796d81ffd24981aa
13 changes: 13 additions & 0 deletions backend/entityservice/integrationtests/dbtests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import psycopg2

from entityservice.settings import Config as config


def _get_conn_and_cursor():
db = config.DATABASE
host = config.DATABASE_SERVER
user = config.DATABASE_USER
password = config.DATABASE_PASSWORD
conn = psycopg2.connect(host=host, dbname=db, user=user, password=password)
cursor = conn.cursor()
return conn, cursor
23 changes: 7 additions & 16 deletions backend/entityservice/integrationtests/dbtests/test_insertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,20 @@

from entityservice.database import insert_dataprovider, insert_encodings_into_blocks, insert_blocking_metadata, \
get_project, get_encodingblock_ids, get_block_metadata, get_chunk_of_encodings
from entityservice.integrationtests.dbtests import _get_conn_and_cursor
from entityservice.models import Project
from entityservice.tests.util import generate_bytes
from entityservice.utils import generate_code
from entityservice.settings import Config as config


class TestInsertions:

def _get_conn_and_cursor(self):
db = config.DATABASE
host = config.DATABASE_SERVER
user = config.DATABASE_USER
password = config.DATABASE_PASSWORD
conn = psycopg2.connect(host=host, dbname=db, user=user, password=password)
cursor = conn.cursor()
return conn, cursor

def _create_project_and_dp(self):
project, dp_ids = self._create_project()
dp_id = dp_ids[0]
dp_auth_token = project.update_tokens[0]

conn, cur = self._get_conn_and_cursor()
conn, cur = _get_conn_and_cursor()
# create a default block
insert_blocking_metadata(conn, dp_id, {'1': 10000})
conn.commit()
Expand All @@ -38,7 +29,7 @@ def _create_project_and_dp(self):

def _create_project(self):
project = Project('groups', {}, name='', notes='', parties=2, uses_blocking=False)
conn, cur = self._get_conn_and_cursor()
conn, cur = _get_conn_and_cursor()
dp_ids = project.save(conn)
return project, dp_ids

Expand All @@ -47,7 +38,7 @@ def test_insert_project(self):
project, _ = self._create_project()
assert len(project.result_token) == 48
# check we can fetch the inserted project back from the database
conn, cur = self._get_conn_and_cursor()
conn, cur = _get_conn_and_cursor()
project_response = get_project(conn, project.project_id)
assert 'time_added' in project_response
assert project_response['time_added'] - before >= datetime.timedelta(seconds=0)
Expand All @@ -61,7 +52,7 @@ def test_insert_project(self):
assert project_response['encoding_size'] is None

def test_insert_dp_no_project_fails(self):
conn, cur = self._get_conn_and_cursor()
conn, cur = _get_conn_and_cursor()
project_id = generate_code()
dp_auth = generate_code()
with raises(psycopg2.errors.ForeignKeyViolation):
Expand All @@ -70,7 +61,7 @@ def test_insert_dp_no_project_fails(self):
def test_insert_many_clks(self):
data = [generate_bytes(128) for _ in range(100)]
project_id, project_auth_token, dp_id, dp_auth_token = self._create_project_and_dp()
conn, cur = self._get_conn_and_cursor()
conn, cur = _get_conn_and_cursor()
num_entities = 10_000
blocks = [['1'] for _ in range(num_entities)]
encodings = [data[i % 100] for i in range(num_entities)]
Expand Down Expand Up @@ -103,7 +94,7 @@ def test_insert_many_clks(self):
def test_fetch_chunk(self):
data = [generate_bytes(128) for _ in range(100)]
project_id, project_auth_token, dp_id, dp_auth_token = self._create_project_and_dp()
conn, cur = self._get_conn_and_cursor()
conn, cur = _get_conn_and_cursor()
num_entities = 10_000
blocks = [['1'] for _ in range(num_entities)]
encodings = [data[i % 100] for i in range(num_entities)]
Expand Down
9 changes: 6 additions & 3 deletions backend/entityservice/tasks/comparing.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def create_comparison_jobs(project_id, run_id, parent_span=None):
future = chord(scoring_tasks)(callback_task)


def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log):
def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log, chunk_size_aim=Config.CHUNK_SIZE_AIM):
"""Create chunks of comparisons using blocking information.

.. note::
Expand All @@ -126,6 +126,8 @@ def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log):
block id to the number of encodings from the dataprovider in the
block. As created by :func:`_retrieve_blocked_dataset_sizes`
:param dp_ids: list of data provider ids
:param log: A logger instance
:param chunk_size_aim: The desired number of comparisons per chunk.

:returns

Expand All @@ -144,9 +146,10 @@ def _create_work_chunks(blocks, dp_block_sizes, dp_ids, log):
size2 = dp_block_sizes[dp2][block_id]

comparisons = size1 * size2
if comparisons > Config.CHUNK_SIZE_AIM:

if comparisons > chunk_size_aim:
log.debug("Block is too large for single task. Working out how to chunk it up")
for chunk_info in anonlink.concurrency.split_to_chunks(Config.CHUNK_SIZE_AIM,
for chunk_info in anonlink.concurrency.split_to_chunks(chunk_size_aim,
dataset_sizes=(size1, size2)):
# chunk_info's from anonlink already have datasetIndex of 0 or 1 and a range
# We need to correct the datasetIndex and add the database datasetId and add block_id.
Expand Down
65 changes: 65 additions & 0 deletions backend/entityservice/tests/test_chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from structlog import get_logger

from entityservice.tasks.comparing import _get_common_blocks, _create_work_chunks
log = get_logger()


class TestCommonBlocks:
# Unit test for _get_common_blocks

def test_2p_get_common_blocks(self):
dp_ids = [33, 34]
dp_block_sizes = {33: {'1': 100}, 34: {'1': 100, '2': 100}}
common_blocks = _get_common_blocks(dp_block_sizes, dp_ids)
assert common_blocks == {"1": [(33, 34)]}

def test_3p_get_common_blocks(self):
dp_ids = [1, 2, 3]
dp_block_sizes = {1: {'1': 100}, 2: {'1': 100, '2': 100}, 3: {'1': 100, '2': 100}}
common_blocks = _get_common_blocks(dp_block_sizes, dp_ids)
assert '1' in common_blocks
assert len(common_blocks['1']) == 3
block_1_set = set(common_blocks['1'])
# Should have (1, 2), (1, 3), (2, 3)
for dpcombo in [(1, 2), (1, 3), (2, 3)]:
assert dpcombo in block_1_set

assert '2' in common_blocks
assert len(common_blocks['2']) == 1
assert common_blocks['2'][0] == (2, 3)


class TestChunkingBlocks:

def test_2p_single_chunked_block(self):
dp_ids = [1, 2]
dp_block_sizes = {
1: {'1': 100},
2: {'1': 100, '2': 100}}
blocks = {"1": [(1, 2)]}

chunks = _create_work_chunks(blocks, dp_block_sizes, dp_ids, log, 100)
assert len(chunks) == 100
for chunk_pair in chunks:
for c in chunk_pair:
assert "range" in c
lower, upper = c['range']
assert lower < upper
assert upper - lower <= 10
assert "block_id" in c
assert "datasetIndex" in c
assert "dataproviderId" in c

def test_basic_3p(self):
dp_ids = [1, 2, 3]
dp_block_sizes = {
1: {'1': 100},
2: {'1': 100, '2': 100},
3: {'1': 100, '2': 100},
}
blocks = _get_common_blocks(dp_block_sizes, dp_ids)
chunks = _create_work_chunks(blocks, dp_block_sizes, dp_ids, log, 100)
# Block 1 should create 100 chunks between dp combinations: 1:2, 1:3, and 2:3 for 300 chunks
# Block 2 should create 100 chunks between 2:3
assert len(chunks) == 300 + 100