diff --git a/.azurePipeline/k8s_test_job.yaml.tmpl b/.azurePipeline/k8s_test_job.yaml.tmpl index 186f88fd..78ce6c91 100644 --- a/.azurePipeline/k8s_test_job.yaml.tmpl +++ b/.azurePipeline/k8s_test_job.yaml.tmpl @@ -25,12 +25,12 @@ spec: claimName: $PVC containers: - name: entitytester - image: $IMAGE_NAME_WITH_TAG + image: $TEST_E2E_IMAGE_NAME_WITH_TAG imagePullPolicy: Always env: - name: SERVER value: http://$SERVICE - command: ["dockerize", "-wait", "http://$SERVICE/api/v1/status", "-timeout", "5m", "python", "-m", "pytest", "-n", "4", "entityservice/tests", "-x", "--junit-xml=/mnt/results.xml"] + command: ["dockerize", "-wait", "http://$SERVICE/api/v1/status", "-timeout", "5m", "python", "-m", "pytest", "-n", "4", "e2etests/tests", "-x", "--junit-xml=/mnt/results.xml"] volumeMounts: - mountPath: /mnt name: results diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 05cbc838..98a48dde 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -21,6 +21,7 @@ variables: backendImageName: data61/anonlink-app + testE2EImageName: data61/anonlink-test frontendImageName: data61/anonlink-nginx tutorialImageName: data61/anonlink-docs-tutorials benchmarkingImageName: data61/anonlink-benchmark @@ -89,6 +90,26 @@ stages: imageName: data61/anonlink-app dockerBuildVersion: "$[dependencies.HashBaseDependencies.outputs['SetDockerBaseTag.DOCKER_BASE_TAG']]" +- stage: stage_docker_e2e_test_image_build + displayName: Build E2E Test Docker image + dependsOn: [stage_base_docker_image_build] + jobs: + # Why do we recompute the base hash? Because we can't pass variables between stages. + # https://github.com/microsoft/azure-pipelines-tasks/issues/4743 + - job: HashBaseDependencies + displayName: Hash Dependencies + pool: + vmImage: 'ubuntu-latest' + steps: + - template: .azurePipeline/templateSetVariableDockerBaseTag.yml + - template: .azurePipeline/templateDockerBuildPush.yml + parameters: + folder: './e2etests' + jobName: 'anonlink_e2e_test' + dependsOn: HashBaseDependencies + imageName: data61/anonlink-test + dockerBuildVersion: "$[dependencies.HashBaseDependencies.outputs['SetDockerBaseTag.DOCKER_BASE_TAG']]" + - stage: stage_docker_nginx_image_build displayName: Nginx Docker build dependsOn: [] @@ -219,6 +240,7 @@ stages: echo "##vso[task.setvariable variable=PVC]$(DEPLOYMENT)-test-results" echo "##vso[task.setvariable variable=SERVICE]$(DEPLOYMENT)-entity-service-server" echo $(backendImageName):$(DOCKER_TAG) | xargs -I@ echo "##vso[task.setvariable variable=IMAGE_NAME_WITH_TAG]@" + echo $(testE2EImageName):$(DOCKER_TAG) | xargs -I@ echo "##vso[task.setvariable variable=TEST_E2E_IMAGE_NAME_WITH_TAG]@" echo $(DEPLOYMENT)-tmppod | xargs -I@ echo "##vso[task.setvariable variable=POD_NAME]@" displayName: 'Set variables for service, test result volume and pod' - task: Kubernetes@1 @@ -304,7 +326,7 @@ stages: cat .azurePipeline/k8s_test_job.yaml.tmpl | \ sed 's|\$PVC'"|$(PVC)|g" | \ sed 's|\$DEPLOYMENT_NAME'"|$(DEPLOYMENT)|g" | \ - sed 's|\$IMAGE_NAME_WITH_TAG'"|$(IMAGE_NAME_WITH_TAG)|g" | \ + sed 's|\$TEST_E2E_IMAGE_NAME_WITH_TAG'"|$(TEST_E2E_IMAGE_NAME_WITH_TAG)|g" | \ sed 's|\$SERVICE'"|$(SERVICE)|g" > $(Build.ArtifactStagingDirectory)/k8s_test_job.yaml displayName: 'Prepare integration test job from template' diff --git a/backend/entityservice/tests/config.py b/backend/entityservice/tests/config.py deleted file mode 100644 index 5f5639f4..00000000 --- a/backend/entityservice/tests/config.py +++ /dev/null @@ -1,25 +0,0 @@ -import logging -import os - -initial_delay = float(os.environ.get("INITIAL_DELAY", "2")) -rate_limit_delay = float(os.environ.get("RATE_LIMIT_DELAY", "0.25")) -LOGLEVEL = getattr(logging, os.environ.get("LOGGING_LEVEL", "INFO")) -LOGFILE = os.environ.get("LOGGING_FILE", "es-test.log") -url = os.environ.get("SERVER", "http://localhost:8851") + "/api/v1/" - -logger = logging.getLogger('anonlink.test') -logger.setLevel(logging.DEBUG) -formatter = logging.Formatter('%(asctime)s.%(msecs)03d %(name)020s %(levelname)010s - %(message)s', "%H:%M:%S") -ch = logging.StreamHandler() -ch.setFormatter(formatter) -ch.setLevel(LOGLEVEL) - -fh = logging.FileHandler(LOGFILE) -fh.setLevel(logging.DEBUG) -fh.setFormatter(formatter) - -logger.propagate = False -logger.addHandler(ch) -logger.addHandler(fh) - -logger.info("Testing Anonlink Entity Service at URL: {}".format(url)) diff --git a/backend/entityservice/tests/conftest.py b/backend/entityservice/tests/conftest.py index 2f169394..e69de29b 100644 --- a/backend/entityservice/tests/conftest.py +++ b/backend/entityservice/tests/conftest.py @@ -1,183 +0,0 @@ -import os -import time -import pytest -import requests as requests_library -import itertools - -from entityservice.tests.util import create_project_upload_fake_data, delete_project, create_project_no_data - -THROTTLE_SLEEP = 0.2 - - -@pytest.fixture(scope='session') -def requests(): - """ - We inject the requests session. - For now we just add a small sleep after every request to ensure we don't get throttled when - tests run back to back. Note the rate limit in nginx is 10 requests per ip per second. - """ - def delay_next(r, *args, **kwargs): - time.sleep(THROTTLE_SLEEP) - - testing_session = requests_library.Session() - testing_session.hooks['response'].append(delay_next) - yield testing_session - - -# Parameterising on: -# -# - pairs of dataset sizes -# - overlap of the sizes -# - result_type for 2 parties in ['similarity_scores', 'permutations'] and for more parties in ['groups'] -# - threshold - -ENVVAR_NAME = 'ENTITY_SERVICE_RUN_SLOW_TESTS' -THRESHOLDS = [0.9, 1.0] -OVERLAPS = [0.0, 0.9] -ENCODING_SIZES = [8] -NUMBERS_PARTIES = [2, 3, 5] - -if os.getenv(ENVVAR_NAME): - ENCODING_SIZES.extend([64, 128, 512, 2048]) - OVERLAPS.extend([0.2, 0.5, 1.0]) - THRESHOLDS.extend([0.6, 0.8, 0.95]) - -FAST_SIZES_2P = tuple(itertools.product([1, 1000], repeat=2)) -FAST_SIZES_NP = tuple(itertools.chain( - FAST_SIZES_2P, - [(1, 1000, 1000), - (1000, 1, 1000), - (1000, 1000, 1), - (1000, 1000, 1000), - (1000, 1000, 1000, 1000, 1000)])) - -SLOW_SIZES_2P = tuple(itertools.combinations([1, 10000, 100000, 1000000], 2)) -SLOW_SIZES_NP = tuple(itertools.chain( - SLOW_SIZES_2P, - itertools.product( - [10000, 100000], [10000, 100000], [100000, 1000000]), - ((10000, 10000, 100000, 100000, 1000000),))) - -SIZES_2P = (tuple(itertools.chain(FAST_SIZES_2P, SLOW_SIZES_2P)) - if os.getenv(ENVVAR_NAME) - else FAST_SIZES_2P) -SIZES_NP = (tuple(itertools.chain(FAST_SIZES_NP, SLOW_SIZES_NP)) - if os.getenv(ENVVAR_NAME) - else FAST_SIZES_NP) - -PROJECT_PARAMS_2P = tuple( - itertools.product(SIZES_2P, OVERLAPS, ENCODING_SIZES)) -PROJECT_PARAMS_NP = tuple( - itertools.product(SIZES_NP, OVERLAPS, ENCODING_SIZES)) -PROJECT_RESULT_TYPES_2P = ['similarity_scores', 'permutations'] -PROJECT_RESULT_TYPES_NP = ['groups'] - - -def create_project_response(requests, size, overlap, result_type, encoding_size=128): - """ - Create a project with the given size, overlap and result_type. - - Tests that use one of these projects will get a dict like the following: - - { - "project_id": "ID", - "upload-mode": "BINARY" | "JSON", - "size": [size 1, size 2], - "encoding-size": int number of bytes in each encoding e.g. 128, - "overlap": float between 0 and 1, - "result_token": "TOKEN", - "upload_tokens": [TOKENS, ...], - "dp_1": - "dp_2": - } - """ - project, dp_responses = create_project_upload_fake_data( - requests, size, overlap=overlap, result_type=result_type, encoding_size=encoding_size) - project.update({ - 'size': size, - 'encoding-size': encoding_size, - 'upload-mode': 'JSON', - 'overlap': overlap, - 'dp_responses': dp_responses - }) - return project - - -@pytest.fixture(scope='function', params=PROJECT_PARAMS_2P) -def similarity_scores_project(request, requests): - size, overlap, encoding_size = request.param - prj = create_project_response(requests, size, overlap, 'similarity_scores', encoding_size) - yield prj - delete_project(requests, prj) - - -@pytest.fixture(scope='function', params=tuple(itertools.chain( - [(t, 2) for t in PROJECT_RESULT_TYPES_2P], - [(t, n) for t in PROJECT_RESULT_TYPES_NP for n in NUMBERS_PARTIES]))) -def result_type_number_parties(request): - yield request.param - - -@pytest.fixture(params=( - *[(t, n) for t in PROJECT_RESULT_TYPES_2P - for n in (None, 2)], - *[(t, n) for t in PROJECT_RESULT_TYPES_NP - for n in (None, *NUMBERS_PARTIES)])) -def result_type_number_parties_or_none(request): - yield request.param - - -@pytest.fixture -def valid_project_params(request, result_type_number_parties_or_none): - result_type, number_parties_or_none = result_type_number_parties_or_none - # None is what we use to test handling of default values - params_dict = {'result_type': result_type} - if number_parties_or_none is not None: - params_dict['number_parties'] = number_parties_or_none - return params_dict - - -@pytest.fixture(scope='function') -def project(request, requests, result_type_number_parties): - result_type, number_parties = result_type_number_parties - project = create_project_no_data( - requests, - result_type=result_type, - number_parties=number_parties) - yield project - # Release project resource - delete_project(requests, project) - - -@pytest.fixture(scope='function', params=ENCODING_SIZES) -def encoding_size(request): - yield request.param - - -@pytest.fixture(scope='function', params=THRESHOLDS) -def threshold(request): - yield request.param - - -@pytest.fixture(scope='function', params=PROJECT_PARAMS_2P) -def permutations_project(request, requests): - size, overlap, encoding_size = request.param - prj = create_project_response(requests, size, overlap, 'permutations', encoding_size) - yield prj - delete_project(requests, prj) - - -@pytest.fixture(scope='function', params=PROJECT_PARAMS_NP) -def groups_project(request, requests): - size, overlap, encoding_size = request.param - prj = create_project_response(requests, size, overlap, 'groups', encoding_size) - yield prj - delete_project(requests, prj) - - -@pytest.fixture( - params=itertools.chain( - itertools.product(PROJECT_RESULT_TYPES_2P, [1, 3, 4, 5]), - [(t, 1) for t in PROJECT_RESULT_TYPES_NP])) -def invalid_result_type_number_parties(request): - yield request.param diff --git a/backend/entityservice/tests/generate_test_data.py b/backend/entityservice/tests/generate_test_data.py deleted file mode 100644 index a340a51c..00000000 --- a/backend/entityservice/tests/generate_test_data.py +++ /dev/null @@ -1,68 +0,0 @@ -import os -import json -from clkhash import randomnames, bloomfilter - -from entityservice.serialization import * - - -def create_test_data(entities, crossover=0.8, save_raw=True): - """ - Uses the NameList data and schema and creates - local files for raw data and clk data: - - - e1_NUM_raw.csv - - e1_NUM.json - - e2_NUM_raw.csv - - e2_NUM.json - - :param bool save_raw: Set to False to skip saving raw files - """ - print("Generating random test data for {} individuals".format(entities)) - - from timeit import default_timer as timer - - t0 = timer() - - nl = randomnames.NameList(entities * 2) - s1, s2 = nl.generate_subsets(entities, crossover) - t1 = timer() - print("generated data in {:.3f} s".format(t1-t0)) - - def save_subset_data(s, f): - print(",".join(nl.schema), file=f) - for entity in s: - print(",".join(map(str, entity)), file=f) - - def save_filter_data(filters, f): - print("Serializing filters") - serialized_filters = serialize_filters(filters) - - json.dump(serialized_filters, f) - - - keys = ('something', 'secret') - - if save_raw: - with open("data/e1_{}_raw.csv".format(entities), "w") as f: - save_subset_data(s1, f) - - with open("data/e2_{}_raw.csv".format(entities), "w") as f: - save_subset_data(s2, f) - t2 = timer() - print("Saved raw data in {:.3f} s".format(t2-t1)) - print("Locally hashing identity data to create bloom filters") - - # Save serialized filters - with open("data/e1_{}.json".format(entities), 'w') as f1: - save_filter_data(bloomfilter.calculate_bloom_filters(s1, nl.schema, keys), f1) - - with open("data/e2_{}.json".format(entities), 'w') as f2: - save_filter_data(bloomfilter.calculate_bloom_filters(s2, nl.schema, keys), f2) - - t3 = timer() - print("Hashed and serialized data in {:.3f} s".format(t3-t2)) - - -if __name__ == "__main__": - size = int(os.environ.get("ENTITY_SERVICE_TEST_SIZE", "100")) - create_test_data(size) diff --git a/backend/entityservice/tests/test_serialization.py b/backend/entityservice/tests/test_serialization.py index f08c9505..b5db735b 100644 --- a/backend/entityservice/tests/test_serialization.py +++ b/backend/entityservice/tests/test_serialization.py @@ -35,21 +35,24 @@ def test_generate_scores_produces_json(self): (array('I', [1, 2, 5]), array('I', [2, 2, 5])) ) + json_obj = self._serialize_and_load_scores(sims_iter) + assert len(json_obj["similarity_scores"]) == 3 + for pair_and_score in json_obj["similarity_scores"]: + self.assertEqual(len(pair_and_score), 3) + a, b, score = pair_and_score + self.assertEqual(len(a), 2) + self.assertEqual(len(b), 2) + + def _serialize_and_load_scores(self, sims_iter): buffer = io.BytesIO() anonlink.serialization.dump_candidate_pairs(sims_iter, buffer) buffer.seek(0) json_iterator = generate_scores(buffer) - # Consume the whole iterator and ensure it is valid json json_str = ''.join(json_iterator) json_obj = json.loads(json_str) self.assertIn('similarity_scores', json_obj) - assert len(json_obj["similarity_scores"]) == 3 - for pair_and_score in json_obj["similarity_scores"]: - self.assertEqual(len(pair_and_score), 3) - a, b, score = pair_and_score - self.assertEqual(len(a), 2) - self.assertEqual(len(b), 2) + return json_obj def test_sims_to_json_empty(self): sims_iter = ( @@ -58,15 +61,7 @@ def test_sims_to_json_empty(self): (array('I', []), array('I', [])) ) - buffer = io.BytesIO() - anonlink.serialization.dump_candidate_pairs(sims_iter, buffer) - buffer.seek(0) - json_iterator = generate_scores(buffer) - - # Consume the whole iterator and ensure it is valid json - json_str = ''.join(json_iterator) - json_obj = json.loads(json_str) - self.assertIn('similarity_scores', json_obj) + json_obj = self._serialize_and_load_scores(sims_iter) assert len(json_obj["similarity_scores"]) == 0 def test_binary_pack_filters(self): diff --git a/backend/entityservice/tests/util.py b/backend/entityservice/tests/util.py index 85836bbb..088be25e 100644 --- a/backend/entityservice/tests/util.py +++ b/backend/entityservice/tests/util.py @@ -1,19 +1,7 @@ import base64 -import datetime -import itertools -import math -import os import random -import struct -import time import tempfile from contextlib import contextmanager -from enum import IntEnum - -from bitarray import bitarray -import iso8601 - -from entityservice.tests.config import url @contextmanager @@ -31,425 +19,6 @@ def serialize_bytes(hash_bytes): return base64.b64encode(hash_bytes).decode() -def serialize_filters(filters): - """Serialize filters as clients are required to.""" - return [ - serialize_bytes(f) for f in filters - ] - - def generate_bytes(length): return random.getrandbits(length * 8).to_bytes(length, 'big') - -def generate_clks(count, size): - """Generate random clks of given size. - - :param count: The number of clks to generate - :param size: The number of bytes per generated clk. - """ - res = [] - for i in range(count): - hash_bytes = generate_bytes(size) - res.append(hash_bytes) - return res - - -def generate_clks_with_id(count, size): - return zip(range(count), generate_clks(count, size)) - - -def generate_json_serialized_clks(count, size=128): - clks = generate_clks(count, size) - return [serialize_bytes(hash_bytes) for hash_bytes in clks] - - -def nonempty_powerset(iterable): - "nonempty_powerset([1,2,3]) --> (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)" - # Inspired by: - # https://docs.python.org/3/library/itertools.html#itertools-recipes - s = tuple(iterable) - return itertools.chain.from_iterable( - itertools.combinations(s, r) for r in range(1, len(s)+1)) - - -def generate_overlapping_clk_data( - dataset_sizes, overlap=0.9, encoding_size=128, seed=666): - """Generate random datsets with fixed overlap. - - Postcondition: - for all some_datasets in nonempty_powerset(datasets), - len(set.intersection(*some_datasets)) - == floor(min(map(len, some_datasets)) - * overlap ** (len(some_datasets) - 1)) - (in case of two datasets A and V this reduces to: - len(A & B) = floor(min(len(A), len(B)) * overlap) - - For some sets of parameters (particularly when dataset_sizes is - long), meeting this postcondition is impossible. In that case, we - raise ValueError. - """ - i = 0 - datasets_n = len(dataset_sizes) - dataset_record_is = tuple(set() for _ in dataset_sizes) - for overlapping_n in range(datasets_n, 0, -1): - for overlapping_datasets in itertools.combinations( - range(datasets_n), overlapping_n): - records_n = int(overlap ** (len(overlapping_datasets) - 1) - * min(map(dataset_sizes.__getitem__, - overlapping_datasets))) - current_records_n = len(set.intersection( - *map(dataset_record_is.__getitem__, overlapping_datasets))) - if records_n < current_records_n: - raise ValueError( - 'parameters make meeting postcondition impossible') - for j in overlapping_datasets: - dataset_record_is[j].update( - range(i, i + records_n - current_records_n)) - i += records_n - current_records_n - - # Sanity check - if __debug__: - for dataset, dataset_size in zip(dataset_record_is, dataset_sizes): - assert len(dataset) == dataset_size - for datasets_with_size in nonempty_powerset( - zip(dataset_record_is, dataset_sizes)): - some_datasets, sizes = zip(*datasets_with_size) - intersection = set.intersection(*some_datasets) - aim_size = int(min(sizes) - * overlap ** (len(datasets_with_size) - 1)) - assert len(intersection) == aim_size - - records = generate_json_serialized_clks(i, size=encoding_size) - datasets = tuple(list(map(records.__getitem__, record_is)) - for record_is in dataset_record_is) - - rng = random.Random(seed) - for dataset in datasets: - rng.shuffle(dataset) - - return datasets - - -def get_project_description(requests, new_project_data): - project_description_response = requests.get(url + '/projects/{}'.format(new_project_data['project_id']), - headers={'Authorization': new_project_data['result_token']}) - - assert project_description_response.status_code == 200 - return project_description_response.json() - - -def create_project_no_data(requests, - result_type='groups', number_parties=None): - number_parties_param = ( - {} if number_parties is None else {'number_parties': number_parties}) - new_project_response = requests.post(url + '/projects', - headers={'Authorization': 'invalid'}, - json={ - 'schema': {}, - 'result_type': result_type, - **number_parties_param - }) - assert new_project_response.status_code == 201, 'I received this instead: {}'.format(new_project_response.text) - return new_project_response.json() - - -@contextmanager -def temporary_blank_project(requests, result_type='groups'): - project = create_project_no_data(requests, result_type) - yield project - delete_project(requests, project) - - -def create_project_upload_fake_data( - requests, - sizes, overlap=0.75, - result_type='groups', encoding_size=128): - data = generate_overlapping_clk_data( - sizes, overlap=overlap, encoding_size=encoding_size) - new_project_data, json_responses = create_project_upload_data( - requests, data, result_type=result_type) - assert len(json_responses) == len(sizes) - return new_project_data, json_responses - - -def create_project_upload_data( - requests, data, result_type='groups', binary=False, hash_size=None): - if binary and hash_size is None: - raise ValueError('binary mode must specify a hash_size') - - number_parties = len(data) - new_project_data = create_project_no_data( - requests, result_type=result_type, number_parties=number_parties) - - upload_url = url + f'/projects/{new_project_data["project_id"]}/{"binary" if binary else ""}clks' - json_responses = [] - for clks, update_token in zip(data, new_project_data['update_tokens']): - if binary: - hash_count, mod = divmod(len(clks), hash_size) - assert mod == 0 - r = requests.post( - upload_url, - headers={'Authorization': update_token, - 'Content-Type': 'application/octet-stream', - 'Hash-Count': str(hash_count), - 'Hash-Size': str(hash_size)}, - data=clks) - else: - r = requests.post( - upload_url, - headers={'Authorization': update_token}, - json={'clks': clks}) - assert r.status_code == 201, f'Got this instead: {r.text}' - json_responses.append(r.json()) - - assert len(json_responses) == number_parties - - return new_project_data, json_responses - - -def _check_delete_response(r): - # Note we allow for a 403 because the project may already have been deleted - assert r.status_code in {204, 403}, 'I received this instead: {}'.format(r.text) - # Delete project & run are both asynchronous so we generously allow the server some time to - # delete resources before moving on (e.g. running the next test) - time.sleep(0.5) - - -def delete_project(requests, project): - project_id = project['project_id'] - result_token = project['result_token'] - r = requests.delete(url + '/projects/{}'.format(project_id), - headers={'Authorization': result_token}) - - _check_delete_response(r) - - -def delete_run(requests, project_id, run_id, result_token): - r = requests.delete(url + '/projects/{}/runs/{}'.format(project_id, run_id), - headers={'Authorization': result_token}) - _check_delete_response(r) - - -def get_run_status(requests, project, run_id, result_token = None): - project_id = project['project_id'] - result_token = project['result_token'] if result_token is None else result_token - r = requests.get(url + '/projects/{}/runs/{}/status'.format(project_id, run_id), - headers={'Authorization': result_token}) - - assert r.status_code == 200, 'I received this instead: {}'.format(r.text) - return r.json() - - -def wait_for_run(requests, project, run_id, ok_statuses, result_token=None, timeout=10): - """ - Poll project/run_id until its status is one of the ok_statuses. Raise a - TimeoutError if we've waited more than timeout seconds. - It also checks that the status are progressing normally, using the checks implemented in - `has_progressed_validly`. - """ - start_time = time.time() - old_status = None - while True: - status = get_run_status(requests, project, run_id, result_token) - if old_status: - if not has_progressed_validly(old_status, status): - raise Exception("The run has not progressed as expected. The old status " - "update was '{}' while the new one is '{}'.".format(old_status, status)) - if status['state'] in ok_statuses or status['state'] == 'error': - break - if time.time() - start_time > timeout: - raise TimeoutError('waited for {}s'.format(timeout)) - time.sleep(0.5) - old_status = status - return status - - -def wait_for_run_completion(requests, project, run_id, result_token, timeout=20): - completion_statuses = {'completed'} - return wait_for_run(requests, project, run_id, completion_statuses, result_token, timeout) - - -def wait_while_queued(requests, project, run_id, result_token=None, timeout=10): - not_queued_statuses = {'running', 'completed'} - return wait_for_run(requests, project, run_id, not_queued_statuses, result_token, timeout) - - -def post_run(requests, project, threshold): - project_id = project['project_id'] - result_token = project['result_token'] - - req = requests.post( - url + '/projects/{}/runs'.format(project_id), - headers={'Authorization': result_token}, - json={'threshold': threshold}) - assert req.status_code == 201 - run_id = req.json()['run_id'] - # Each time we post a new run, we also check that the run endpoint works well. - get_run(requests, project, run_id, expected_threshold=threshold) - return run_id - - -def get_runs(requests, project, result_token=None, expected_status=200): - project_id = project['project_id'] - result_token = project['result_token'] if result_token is None else result_token - - req = requests.get( - url + '/projects/{}/runs'.format(project_id), - headers={'Authorization': result_token}) - assert req.status_code == expected_status - return req.json() - - -def get_run(requests, project, run_id, expected_status=200, expected_threshold=None): - project_id = project['project_id'] - result_token = project['result_token'] - - req = requests.get( - url + '/projects/{}/runs/{}'.format(project_id, run_id), - headers={'Authorization': result_token}) - - assert req.status_code == expected_status - run = req.json() - if expected_status == 200: - # only check these assertions if all went well. - assert 'run_id' in run - assert run['run_id'] == run_id - assert 'notes' in run - assert 'threshold' in run - if expected_threshold: - assert expected_threshold == run['threshold'] - return run - - -def get_run_result(requests, project, run_id, result_token=None, expected_status=200, wait=True, timeout=60): - result_token = project['result_token'] if result_token is None else result_token - if wait: - # wait_for_run_completion also checks that the progress is in order. - final_status = wait_for_run_completion(requests, project, run_id, result_token, timeout) - state = final_status['state'] - assert state == 'completed', "Expected: 'completed', got: '{}'".format(state) - - project_id = project['project_id'] - r = requests.get(url + '/projects/{}/runs/{}/result'.format(project_id, run_id), - headers={'Authorization': result_token}) - assert r.status_code == expected_status - return r.json() - - -def _check_new_project_response_fields(new_project_data): - assert 'project_id' in new_project_data - assert 'update_tokens' in new_project_data - assert 'result_token' in new_project_data - assert len(new_project_data['update_tokens']) == 2 - - -class State(IntEnum): - created = -1 - queued = 0 - running = 1 - completed = 2 - - @staticmethod - def from_string(state): - if state == 'created': - return State.created - elif state == 'queued': - return State.queued - elif state == 'running': - return State.running - elif state == 'completed': - return State.completed - - -def has_progressed_validly(status_old, status_new): - """ - If there happened to be progress between the two statuses we check if it was valid. - Thus, we return False if there was invalid progress, and True otherwise. (even if there was no progress) - stage change counts as progress, but has to be done in the right order. - also if both runs are in the same stage, we compare progress. - - :param status_old: json describing a run status as returned from the '/projects/{project_id}/runs/{run_id}/status' - endpoint - :param status_new: same as above - :return: False if the progress was not valid - """ - old_stage = status_old['current_stage']['number'] - new_stage = status_new['current_stage']['number'] - - if old_stage < new_stage: - return True - elif old_stage > new_stage: - return False - # both in the same stage then - if 'progress' in status_old['current_stage'] and 'progress' in status_new['current_stage']: - assert 0 <= status_new['current_stage']['progress']['relative'] <= 1.0, "{} not between 0 and 1".format(status_new['current_stage']['progress']['relative']) - assert 0 <= status_old['current_stage']['progress']['relative'] <= 1.0, "{} not between 0 and 1".format(status_old['current_stage']['progress']['relative']) - - if status_new['current_stage']['progress']['relative'] < status_old['current_stage']['progress']['relative']: - return False - else: - return True - else: # same stage, no progress info - return True - - -def is_run_status(status): - assert 'state' in status - run_state = State.from_string(status['state']) - assert run_state in State - assert 'stages' in status - assert 'time_added' in status - assert 'current_stage' in status - cur_stage = status['current_stage'] - - if run_state == State.completed: - assert 'time_started' in status - assert 'time_completed' in status - elif run_state == State.running: - assert 'time_started' in status - - assert 'number' in cur_stage - if 'progress' in cur_stage: - assert 'absolute' in cur_stage['progress'] - assert 'relative' in cur_stage['progress'] - - -def upload_binary_data(requests, data, project_id, token, count, size=128, expected_status_code=201): - r = requests.post( - url + '/projects/{}/binaryclks'.format(project_id), - headers={ - 'Authorization': token, - 'Content-Type': 'application/octet-stream', - 'Hash-Count': str(count), - 'Hash-Size': str(size) - }, - data=data - ) - assert r.status_code == expected_status_code, 'I received this instead: {}'.format(r.text) - - upload_response = r.json() - if expected_status_code == 201: - assert 'receipt_token' in upload_response - return upload_response - - -def upload_binary_data_from_file(requests, file_path, project_id, token, count, size=128, expected_status_code=201): - with open(file_path, 'rb') as f: - return upload_binary_data(requests, f, project_id, token, count, size, expected_status_code) - - -def get_expected_number_parties(project_params): - return project_params.get('number_parties', 2) - - -def binary_upload_format(encoding_size): - bit_packing_fmt = f"!{encoding_size}s" - bit_packing_struct = struct.Struct(bit_packing_fmt) - return bit_packing_struct - - -def binary_pack_for_upload(filters, encoding_size): - bit_packing_struct = binary_upload_format(encoding_size) - for hash_bytes in filters: - yield bit_packing_struct.pack(hash_bytes) diff --git a/e2etests/Dockerfile b/e2etests/Dockerfile new file mode 100644 index 00000000..26f3651a --- /dev/null +++ b/e2etests/Dockerfile @@ -0,0 +1,12 @@ +ARG VERSION=latest +FROM data61/anonlink-base:${VERSION} + +WORKDIR /var/www +ADD . /var/www/e2etests + +RUN python -c "import anonlink; print('anonlink version:', anonlink.__version__)" && \ + python -c "import clkhash; print('clkhash version:', clkhash.__version__)" + +ENV SERVER http://nginx:8851 +ENV INITIAL_DELAY 20 +CMD python -m pytest -n 2 e2etests/tests --junitxml=testResults.xml -x diff --git a/e2etests/config.py b/e2etests/config.py new file mode 100644 index 00000000..3fbec7e2 --- /dev/null +++ b/e2etests/config.py @@ -0,0 +1,6 @@ +import os + + +initial_delay = float(os.environ.get("INITIAL_DELAY", "2")) +rate_limit_delay = float(os.environ.get("RATE_LIMIT_DELAY", "0.25")) +url = os.environ.get("SERVER", "http://localhost:8851") + "/api/v1/" diff --git a/e2etests/tests/__init__.py b/e2etests/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/e2etests/tests/conftest.py b/e2etests/tests/conftest.py new file mode 100644 index 00000000..50d20208 --- /dev/null +++ b/e2etests/tests/conftest.py @@ -0,0 +1,187 @@ +import os +import time +import pytest +import requests as requests_library +import itertools +from e2etests.config import initial_delay +from e2etests.util import create_project_upload_fake_data, delete_project, create_project_no_data + +THROTTLE_SLEEP = 0.2 + + +@pytest.fixture(scope='session') +def requests(): + """ + We inject the requests session. + For now we just add a small sleep after every request to ensure we don't get throttled when + tests run back to back. Note the rate limit in nginx is 10 requests per ip per second. + """ + def delay_next(r, *args, **kwargs): + time.sleep(THROTTLE_SLEEP) + + testing_session = requests_library.Session() + testing_session.hooks['response'].append(delay_next) + + # Add an initial delay as well + time.sleep(initial_delay) + yield testing_session + +# Parameterising on: +# +# - pairs of dataset sizes +# - overlap of the sizes +# - result_type for 2 parties in ['similarity_scores', 'permutations'] and for more parties in ['groups'] +# - threshold + +ENVVAR_NAME = 'ENTITY_SERVICE_RUN_SLOW_TESTS' +THRESHOLDS = [0.9, 1.0] +OVERLAPS = [0.0, 0.9] +ENCODING_SIZES = [8] +USES_BLOCKING = [True, False] +NUMBERS_PARTIES = [2, 3, 5] + +if os.getenv(ENVVAR_NAME): + ENCODING_SIZES.extend([64, 128, 512, 2048]) + OVERLAPS.extend([0.2, 0.5, 1.0]) + THRESHOLDS.extend([0.6, 0.8, 0.95]) + +FAST_SIZES_2P = tuple(itertools.product([1, 1000], repeat=2)) +FAST_SIZES_NP = tuple(itertools.chain( + FAST_SIZES_2P, + [(1, 1000, 1000), + (1000, 1, 1000), + (1000, 1000, 1), + (1000, 1000, 1000), + (1000, 1000, 1000, 1000, 1000)])) + +SLOW_SIZES_2P = tuple(itertools.combinations([1, 10000, 100000, 1000000], 2)) +SLOW_SIZES_NP = tuple(itertools.chain( + SLOW_SIZES_2P, + itertools.product( + [10000, 100000], [10000, 100000], [100000, 1000000]), + ((10000, 10000, 100000, 100000, 1000000),))) + +SIZES_2P = (tuple(itertools.chain(FAST_SIZES_2P, SLOW_SIZES_2P)) + if os.getenv(ENVVAR_NAME) + else FAST_SIZES_2P) +SIZES_NP = (tuple(itertools.chain(FAST_SIZES_NP, SLOW_SIZES_NP)) + if os.getenv(ENVVAR_NAME) + else FAST_SIZES_NP) + +PROJECT_PARAMS_2P = tuple( + itertools.product(SIZES_2P, OVERLAPS, ENCODING_SIZES)) +PROJECT_PARAMS_NP = tuple( + itertools.product(SIZES_NP, OVERLAPS, ENCODING_SIZES, USES_BLOCKING)) +PROJECT_RESULT_TYPES_2P = ['similarity_scores', 'permutations'] +PROJECT_RESULT_TYPES_NP = ['groups'] + + +def create_project_response(requests, size, overlap, result_type, encoding_size=128, uses_blocking=False): + """ + Create a project with the given size, overlap and result_type. + + Tests that use one of these projects will get a dict like the following: + + { + "project_id": "ID", + "upload-mode": "BINARY" | "JSON", + "size": [size 1, size 2], + "encoding-size": int number of bytes in each encoding e.g. 128, + "overlap": float between 0 and 1, + "result_token": "TOKEN", + "upload_tokens": [TOKENS, ...], + "dp_1": + "dp_2": + } + """ + project, dp_responses = create_project_upload_fake_data( + requests, size, overlap=overlap, result_type=result_type, encoding_size=encoding_size) + project.update({ + 'size': size, + 'encoding-size': encoding_size, + 'upload-mode': 'JSON', + 'uses_blocking': uses_blocking, + 'overlap': overlap, + 'dp_responses': dp_responses + }) + return project + + +@pytest.fixture(scope='function', params=PROJECT_PARAMS_2P) +def similarity_scores_project(request, requests): + size, overlap, encoding_size = request.param + prj = create_project_response(requests, size, overlap, 'similarity_scores', encoding_size) + yield prj + delete_project(requests, prj) + + +@pytest.fixture(scope='function', params=tuple(itertools.chain( + [(t, 2) for t in PROJECT_RESULT_TYPES_2P], + [(t, n) for t in PROJECT_RESULT_TYPES_NP for n in NUMBERS_PARTIES]))) +def result_type_number_parties(request): + yield request.param + + +@pytest.fixture(params=( + *[(t, n) for t in PROJECT_RESULT_TYPES_2P + for n in (None, 2)], + *[(t, n) for t in PROJECT_RESULT_TYPES_NP + for n in (None, *NUMBERS_PARTIES)])) +def result_type_number_parties_or_none(request): + yield request.param + + +@pytest.fixture +def valid_project_params(request, result_type_number_parties_or_none): + result_type, number_parties_or_none = result_type_number_parties_or_none + # None is what we use to test handling of default values + params_dict = {'result_type': result_type} + if number_parties_or_none is not None: + params_dict['number_parties'] = number_parties_or_none + return params_dict + + +@pytest.fixture(scope='function') +def project(request, requests, result_type_number_parties): + result_type, number_parties = result_type_number_parties + project = create_project_no_data( + requests, + result_type=result_type, + number_parties=number_parties) + yield project + # Release project resource + delete_project(requests, project) + + +@pytest.fixture(scope='function', params=ENCODING_SIZES) +def encoding_size(request): + yield request.param + + +@pytest.fixture(scope='function', params=THRESHOLDS) +def threshold(request): + yield request.param + + +@pytest.fixture(scope='function', params=PROJECT_PARAMS_2P) +def permutations_project(request, requests): + size, overlap, encoding_size = request.param + prj = create_project_response(requests, size, overlap, 'permutations', encoding_size) + yield prj + delete_project(requests, prj) + + +@pytest.fixture(scope='function', params=PROJECT_PARAMS_NP) +def groups_project(request, requests): + size, overlap, encoding_size, uses_blocking = request.param + prj = create_project_response(requests, size, overlap, 'groups', encoding_size, uses_blocking) + yield prj + delete_project(requests, prj) + + +@pytest.fixture( + params=itertools.chain( + itertools.product(PROJECT_RESULT_TYPES_2P, [1, 3, 4, 5]), + [(t, 1) for t in PROJECT_RESULT_TYPES_NP])) +def invalid_result_type_number_parties(request): + yield request.param diff --git a/backend/entityservice/tests/test_admin.py b/e2etests/tests/test_admin.py similarity index 93% rename from backend/entityservice/tests/test_admin.py rename to e2etests/tests/test_admin.py index 4c18434a..0387a7b2 100644 --- a/backend/entityservice/tests/test_admin.py +++ b/e2etests/tests/test_admin.py @@ -1,6 +1,6 @@ import requests -from entityservice.tests.config import url +from e2etests.config import url def test_version(record_property): diff --git a/backend/entityservice/tests/test_deletes.py b/e2etests/tests/test_deletes.py similarity index 94% rename from backend/entityservice/tests/test_deletes.py rename to e2etests/tests/test_deletes.py index 7e85f774..b622f3e8 100644 --- a/backend/entityservice/tests/test_deletes.py +++ b/e2etests/tests/test_deletes.py @@ -1,4 +1,4 @@ -from entityservice.tests.util import post_run, delete_project, create_project_upload_fake_data, delete_run, \ +from e2etests.util import post_run, delete_project, create_project_upload_fake_data, delete_run, \ get_project_description, get_run, wait_for_run_completion import pytest diff --git a/backend/entityservice/tests/test_project.py b/e2etests/tests/test_project.py similarity index 98% rename from backend/entityservice/tests/test_project.py rename to e2etests/tests/test_project.py index 0a3fe251..c9a62931 100644 --- a/backend/entityservice/tests/test_project.py +++ b/e2etests/tests/test_project.py @@ -1,10 +1,8 @@ import itertools import time -import pytest - -from entityservice.tests.config import url -from entityservice.tests.util import ( +from e2etests.config import url +from e2etests.util import ( delete_project, generate_overlapping_clk_data, get_expected_number_parties, get_project_description) diff --git a/backend/entityservice/tests/test_project_run_description.py b/e2etests/tests/test_project_run_description.py similarity index 58% rename from backend/entityservice/tests/test_project_run_description.py rename to e2etests/tests/test_project_run_description.py index 3865a248..88f215fb 100644 --- a/backend/entityservice/tests/test_project_run_description.py +++ b/e2etests/tests/test_project_run_description.py @@ -1,4 +1,4 @@ -from entityservice.tests.util import create_project_upload_fake_data, post_run, get_run +from e2etests.util import create_project_upload_fake_data, post_run, get_run def test_run_description_missing_run(requests, project): diff --git a/backend/entityservice/tests/test_project_run_listing.py b/e2etests/tests/test_project_run_listing.py similarity index 85% rename from backend/entityservice/tests/test_project_run_listing.py rename to e2etests/tests/test_project_run_listing.py index e420c3cc..38775330 100644 --- a/backend/entityservice/tests/test_project_run_listing.py +++ b/e2etests/tests/test_project_run_listing.py @@ -1,5 +1,5 @@ -from entityservice.tests.config import url -from entityservice.tests.util import create_project_upload_fake_data, create_project_no_data, get_runs, post_run, \ +from e2etests.config import url +from e2etests.util import create_project_upload_fake_data, create_project_no_data, get_runs, post_run, \ temporary_blank_project diff --git a/backend/entityservice/tests/test_project_run_posting.py b/e2etests/tests/test_project_run_posting.py similarity index 92% rename from backend/entityservice/tests/test_project_run_posting.py rename to e2etests/tests/test_project_run_posting.py index ad6c3d42..ac606a35 100644 --- a/backend/entityservice/tests/test_project_run_posting.py +++ b/e2etests/tests/test_project_run_posting.py @@ -1,4 +1,4 @@ -from entityservice.tests.util import post_run, get_runs +from e2etests.util import post_run, get_runs def test_posting_run_before_data_upload(requests, project): diff --git a/backend/entityservice/tests/test_project_run_results.py b/e2etests/tests/test_project_run_results.py similarity index 96% rename from backend/entityservice/tests/test_project_run_results.py rename to e2etests/tests/test_project_run_results.py index 1b08b10f..6c9f0ba3 100644 --- a/backend/entityservice/tests/test_project_run_results.py +++ b/e2etests/tests/test_project_run_results.py @@ -1,4 +1,4 @@ -from entityservice.tests.util import create_project_no_data, post_run, get_run_result +from e2etests.util import create_project_no_data, post_run, get_run_result def test_run_similarity_score_results(requests, similarity_scores_project, threshold): diff --git a/backend/entityservice/tests/test_project_run_status.py b/e2etests/tests/test_project_run_status.py similarity index 78% rename from backend/entityservice/tests/test_project_run_status.py rename to e2etests/tests/test_project_run_status.py index 9c8c7c83..119bd2e3 100644 --- a/backend/entityservice/tests/test_project_run_status.py +++ b/e2etests/tests/test_project_run_status.py @@ -1,5 +1,5 @@ -from entityservice.tests.util import post_run, get_run_status, is_run_status, \ +from e2etests.util import post_run, get_run_status, is_run_status, \ create_project_no_data diff --git a/backend/entityservice/tests/test_project_uploads.py b/e2etests/tests/test_project_uploads.py similarity index 99% rename from backend/entityservice/tests/test_project_uploads.py rename to e2etests/tests/test_project_uploads.py index 775c855a..7edfb5c9 100644 --- a/backend/entityservice/tests/test_project_uploads.py +++ b/e2etests/tests/test_project_uploads.py @@ -2,8 +2,8 @@ import os import pytest -from entityservice.tests.config import url -from entityservice.tests.util import ( +from e2etests.config import url +from e2etests.util import ( create_project_upload_data, create_project_upload_fake_data, generate_clks, generate_json_serialized_clks, get_expected_number_parties, get_run_result, post_run, diff --git a/backend/entityservice/tests/test_results_correctness.py b/e2etests/tests/test_results_correctness.py similarity index 97% rename from backend/entityservice/tests/test_results_correctness.py rename to e2etests/tests/test_results_correctness.py index 37da082e..80698ecc 100644 --- a/backend/entityservice/tests/test_results_correctness.py +++ b/e2etests/tests/test_results_correctness.py @@ -4,7 +4,7 @@ import anonlink import pytest -from entityservice.tests.util import create_project_upload_data, post_run, get_run_result, delete_project +from e2etests.util import create_project_upload_data, post_run, get_run_result, delete_project # !!! We assume that anonlink computes the right results. diff --git a/backend/entityservice/tests/test_results_correctness_multiparty.py b/e2etests/tests/test_results_correctness_multiparty.py similarity index 97% rename from backend/entityservice/tests/test_results_correctness_multiparty.py rename to e2etests/tests/test_results_correctness_multiparty.py index 6a69303b..210980ac 100644 --- a/backend/entityservice/tests/test_results_correctness_multiparty.py +++ b/e2etests/tests/test_results_correctness_multiparty.py @@ -3,7 +3,7 @@ import anonlink -from entityservice.tests.util import ( +from e2etests.util import ( create_project_upload_data, delete_project, get_run_result, post_run, binary_pack_for_upload) DATA_FILENAME = 'test-multiparty-results-correctness-data.pkl' diff --git a/e2etests/tests/testdata/clks_128B_1k.bin b/e2etests/tests/testdata/clks_128B_1k.bin new file mode 100644 index 00000000..738d1b39 Binary files /dev/null and b/e2etests/tests/testdata/clks_128B_1k.bin differ diff --git a/e2etests/tests/testdata/febrl4_clks_and_truth.pkl b/e2etests/tests/testdata/febrl4_clks_and_truth.pkl new file mode 100644 index 00000000..35e13f65 Binary files /dev/null and b/e2etests/tests/testdata/febrl4_clks_and_truth.pkl differ diff --git a/e2etests/tests/testdata/single_clk.bin b/e2etests/tests/testdata/single_clk.bin new file mode 100644 index 00000000..2b294323 Binary files /dev/null and b/e2etests/tests/testdata/single_clk.bin differ diff --git a/e2etests/tests/testdata/test-multiparty-results-correctness-data.pkl b/e2etests/tests/testdata/test-multiparty-results-correctness-data.pkl new file mode 100644 index 00000000..ae0639e8 Binary files /dev/null and b/e2etests/tests/testdata/test-multiparty-results-correctness-data.pkl differ diff --git a/e2etests/tests/testdata/test_encoding.json b/e2etests/tests/testdata/test_encoding.json new file mode 100644 index 00000000..8adbdce7 --- /dev/null +++ b/e2etests/tests/testdata/test_encoding.json @@ -0,0 +1,8 @@ +{ + "clknblocks": [ + ["PNfT5qAAlAgFyQyoUhokyohywAAOYMJgdwPCRWBQOyCIsSEgePCo2CnRaON+FUog07AHTDUDARsUcJiSaYKNDiCAEeICbGYSZFhCVALQAylxDSAtSR4CsgiCmBjDiAEOGMfEi7ABkydqyKhIIFrBQQvFAUTDakAg0RyFYAWg8nE=", "1"], + ["WvjX1rZYiFCRQAyqptwAS7jAaACEhFApFxB9RSeYPTBLkYkIUjKacAIleYLoNUooU/AHYCQ2FSkUYCzZMiClDIGEIBKQXKZAogCAUpvOKikUKUI+CRgI+AkgGsqAJYkdZcHEAAWDEidgHbYgIjBpMgpUU2wCwgCoXwCCWCVAsPM=", "1"], + ["UPLX+gCgkbnVQhg4wBA8wogCUNA0QEbhPyHBPCBAKwSOmzJFWDGqWAZY/NsQX01Aw7A1UWBKAt5GPJib4BpLSHKQBesZbjsjYhTAQBBDZ7uQHKGNyRhCMhiWsAjaVEBcQlVkiCHzMhEimEgERlB/AQvFBwRjQHRywRQQQUuggnw=", "1", "2"], + ["MNXW4D9Gg5EYCQ2AQqGhBuAK+BiElCKJjyF1HjObORkqIDFgUTBZXQgEI4LAUc1k8tmMTkhijMynMKa44DiEWhDUAtAEphwapBCAFIpQIGgUAoIt0ZACOoxBEyBIFKIHRChFARbRunMh/GRi4hCRB2ZEJYZS1gTgAcAGXQGD++0=", "2"] + ] +} diff --git a/e2etests/util.py b/e2etests/util.py new file mode 100644 index 00000000..75d3a7c1 --- /dev/null +++ b/e2etests/util.py @@ -0,0 +1,453 @@ +import base64 +import datetime +import itertools +import math +import os +import random +import struct +import time +import tempfile +from contextlib import contextmanager +from enum import IntEnum + +from e2etests.config import url + + +@contextmanager +def temp_file_containing(data): + with tempfile.NamedTemporaryFile('wb') as fp: + fp.write(data) + fp.seek(0) + yield fp + + +def serialize_bytes(hash_bytes): + """ Serialize bloomfilter bytes + + """ + return base64.b64encode(hash_bytes).decode() + + +def serialize_filters(filters): + """Serialize filters as clients are required to.""" + return [ + serialize_bytes(f) for f in filters + ] + + +def generate_bytes(length): + return random.getrandbits(length * 8).to_bytes(length, 'big') + + +def generate_clks(count, size): + """Generate random clks of given size. + + :param count: The number of clks to generate + :param size: The number of bytes per generated clk. + """ + res = [] + for i in range(count): + hash_bytes = generate_bytes(size) + res.append(hash_bytes) + return res + + +def generate_clks_with_id(count, size): + return zip(range(count), generate_clks(count, size)) + + +def generate_json_serialized_clks(count, size=128): + clks = generate_clks(count, size) + return [serialize_bytes(hash_bytes) for hash_bytes in clks] + + +def nonempty_powerset(iterable): + "nonempty_powerset([1,2,3]) --> (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)" + # Inspired by: + # https://docs.python.org/3/library/itertools.html#itertools-recipes + s = tuple(iterable) + return itertools.chain.from_iterable( + itertools.combinations(s, r) for r in range(1, len(s)+1)) + + +def generate_overlapping_clk_data( + dataset_sizes, overlap=0.9, encoding_size=128, seed=666): + """Generate random datsets with fixed overlap. + + Postcondition: + for all some_datasets in nonempty_powerset(datasets), + len(set.intersection(*some_datasets)) + == floor(min(map(len, some_datasets)) + * overlap ** (len(some_datasets) - 1)) + (in case of two datasets A and V this reduces to: + len(A & B) = floor(min(len(A), len(B)) * overlap) + + For some sets of parameters (particularly when dataset_sizes is + long), meeting this postcondition is impossible. In that case, we + raise ValueError. + """ + i = 0 + datasets_n = len(dataset_sizes) + dataset_record_is = tuple(set() for _ in dataset_sizes) + for overlapping_n in range(datasets_n, 0, -1): + for overlapping_datasets in itertools.combinations( + range(datasets_n), overlapping_n): + records_n = int(overlap ** (len(overlapping_datasets) - 1) + * min(map(dataset_sizes.__getitem__, + overlapping_datasets))) + current_records_n = len(set.intersection( + *map(dataset_record_is.__getitem__, overlapping_datasets))) + if records_n < current_records_n: + raise ValueError( + 'parameters make meeting postcondition impossible') + for j in overlapping_datasets: + dataset_record_is[j].update( + range(i, i + records_n - current_records_n)) + i += records_n - current_records_n + + # Sanity check + if __debug__: + for dataset, dataset_size in zip(dataset_record_is, dataset_sizes): + assert len(dataset) == dataset_size + for datasets_with_size in nonempty_powerset( + zip(dataset_record_is, dataset_sizes)): + some_datasets, sizes = zip(*datasets_with_size) + intersection = set.intersection(*some_datasets) + aim_size = int(min(sizes) + * overlap ** (len(datasets_with_size) - 1)) + assert len(intersection) == aim_size + + records = generate_json_serialized_clks(i, size=encoding_size) + datasets = tuple(list(map(records.__getitem__, record_is)) + for record_is in dataset_record_is) + + rng = random.Random(seed) + for dataset in datasets: + rng.shuffle(dataset) + + return datasets + + +def get_project_description(requests, new_project_data): + project_description_response = requests.get(url + '/projects/{}'.format(new_project_data['project_id']), + headers={'Authorization': new_project_data['result_token']}) + + assert project_description_response.status_code == 200 + return project_description_response.json() + + +def create_project_no_data(requests, + result_type='groups', number_parties=None, uses_blocking=False): + number_parties_param = ( + {} if number_parties is None else {'number_parties': number_parties}) + new_project_response = requests.post(url + '/projects', + headers={'Authorization': 'invalid'}, + json={ + 'schema': {}, + 'result_type': result_type, + 'uses_blocking': uses_blocking, + **number_parties_param + }) + assert new_project_response.status_code == 201, 'I received this instead: {}'.format(new_project_response.text) + return new_project_response.json() + + +@contextmanager +def temporary_blank_project(requests, result_type='groups'): + project = create_project_no_data(requests, result_type) + yield project + delete_project(requests, project) + + +def create_project_upload_fake_data( + requests, + sizes, overlap=0.75, + result_type='groups', encoding_size=128, uses_blocking=False): + data = generate_overlapping_clk_data( + sizes, overlap=overlap, encoding_size=encoding_size) + new_project_data, json_responses = create_project_upload_data( + requests, data, result_type=result_type) + assert len(json_responses) == len(sizes) + return new_project_data, json_responses + + +def create_project_upload_data( + requests, data, result_type='groups', binary=False, hash_size=None, uses_blocking=False): + if binary and hash_size is None: + raise ValueError('binary mode must specify a hash_size') + + number_parties = len(data) + new_project_data = create_project_no_data( + requests, result_type=result_type, number_parties=number_parties, uses_blocking=False) + + upload_url = url + f'/projects/{new_project_data["project_id"]}/{"binary" if binary else ""}clks' + json_responses = [] + for clks, update_token in zip(data, new_project_data['update_tokens']): + if binary: + hash_count, mod = divmod(len(clks), hash_size) + assert mod == 0 + r = requests.post( + upload_url, + headers={'Authorization': update_token, + 'Content-Type': 'application/octet-stream', + 'Hash-Count': str(hash_count), + 'Hash-Size': str(hash_size)}, + data=clks) + else: + r = requests.post( + upload_url, + headers={'Authorization': update_token}, + json={'clks': clks}) + assert r.status_code == 201, f'Got this instead: {r.text}' + json_responses.append(r.json()) + + assert len(json_responses) == number_parties + + return new_project_data, json_responses + + +def _check_delete_response(r): + # Note we allow for a 403 because the project may already have been deleted + assert r.status_code in {204, 403}, 'I received this instead: {}'.format(r.text) + # Delete project & run are both asynchronous so we generously allow the server some time to + # delete resources before moving on (e.g. running the next test) + time.sleep(0.5) + + +def delete_project(requests, project): + project_id = project['project_id'] + result_token = project['result_token'] + r = requests.delete(url + '/projects/{}'.format(project_id), + headers={'Authorization': result_token}) + + _check_delete_response(r) + + +def delete_run(requests, project_id, run_id, result_token): + r = requests.delete(url + '/projects/{}/runs/{}'.format(project_id, run_id), + headers={'Authorization': result_token}) + _check_delete_response(r) + + +def get_run_status(requests, project, run_id, result_token = None): + project_id = project['project_id'] + result_token = project['result_token'] if result_token is None else result_token + r = requests.get(url + '/projects/{}/runs/{}/status'.format(project_id, run_id), + headers={'Authorization': result_token}) + + assert r.status_code == 200, 'I received this instead: {}'.format(r.text) + return r.json() + + +def wait_for_run(requests, project, run_id, ok_statuses, result_token=None, timeout=10): + """ + Poll project/run_id until its status is one of the ok_statuses. Raise a + TimeoutError if we've waited more than timeout seconds. + It also checks that the status are progressing normally, using the checks implemented in + `has_progressed_validly`. + """ + start_time = time.time() + old_status = None + while True: + status = get_run_status(requests, project, run_id, result_token) + if old_status: + if not has_progressed_validly(old_status, status): + raise Exception("The run has not progressed as expected. The old status " + "update was '{}' while the new one is '{}'.".format(old_status, status)) + if status['state'] in ok_statuses or status['state'] == 'error': + break + if time.time() - start_time > timeout: + raise TimeoutError('waited for {}s'.format(timeout)) + time.sleep(0.5) + old_status = status + return status + + +def wait_for_run_completion(requests, project, run_id, result_token, timeout=20): + completion_statuses = {'completed'} + return wait_for_run(requests, project, run_id, completion_statuses, result_token, timeout) + + +def wait_while_queued(requests, project, run_id, result_token=None, timeout=10): + not_queued_statuses = {'running', 'completed'} + return wait_for_run(requests, project, run_id, not_queued_statuses, result_token, timeout) + + +def post_run(requests, project, threshold): + project_id = project['project_id'] + result_token = project['result_token'] + + req = requests.post( + url + '/projects/{}/runs'.format(project_id), + headers={'Authorization': result_token}, + json={'threshold': threshold}) + assert req.status_code == 201 + run_id = req.json()['run_id'] + # Each time we post a new run, we also check that the run endpoint works well. + get_run(requests, project, run_id, expected_threshold=threshold) + return run_id + + +def get_runs(requests, project, result_token=None, expected_status=200): + project_id = project['project_id'] + result_token = project['result_token'] if result_token is None else result_token + + req = requests.get( + url + '/projects/{}/runs'.format(project_id), + headers={'Authorization': result_token}) + assert req.status_code == expected_status + return req.json() + + +def get_run(requests, project, run_id, expected_status=200, expected_threshold=None): + project_id = project['project_id'] + result_token = project['result_token'] + + req = requests.get( + url + '/projects/{}/runs/{}'.format(project_id, run_id), + headers={'Authorization': result_token}) + + assert req.status_code == expected_status + run = req.json() + if expected_status == 200: + # only check these assertions if all went well. + assert 'run_id' in run + assert run['run_id'] == run_id + assert 'notes' in run + assert 'threshold' in run + if expected_threshold: + assert expected_threshold == run['threshold'] + return run + + +def get_run_result(requests, project, run_id, result_token=None, expected_status=200, wait=True, timeout=60): + result_token = project['result_token'] if result_token is None else result_token + if wait: + # wait_for_run_completion also checks that the progress is in order. + final_status = wait_for_run_completion(requests, project, run_id, result_token, timeout) + state = final_status['state'] + assert state == 'completed', "Expected: 'completed', got: '{}'".format(state) + + project_id = project['project_id'] + r = requests.get(url + '/projects/{}/runs/{}/result'.format(project_id, run_id), + headers={'Authorization': result_token}) + assert r.status_code == expected_status + return r.json() + + +def _check_new_project_response_fields(new_project_data): + assert 'project_id' in new_project_data + assert 'update_tokens' in new_project_data + assert 'result_token' in new_project_data + assert len(new_project_data['update_tokens']) == 2 + + +class State(IntEnum): + created = -1 + queued = 0 + running = 1 + completed = 2 + + @staticmethod + def from_string(state): + if state == 'created': + return State.created + elif state == 'queued': + return State.queued + elif state == 'running': + return State.running + elif state == 'completed': + return State.completed + + +def has_progressed_validly(status_old, status_new): + """ + If there happened to be progress between the two statuses we check if it was valid. + Thus, we return False if there was invalid progress, and True otherwise. (even if there was no progress) + stage change counts as progress, but has to be done in the right order. + also if both runs are in the same stage, we compare progress. + + :param status_old: json describing a run status as returned from the '/projects/{project_id}/runs/{run_id}/status' + endpoint + :param status_new: same as above + :return: False if the progress was not valid + """ + old_stage = status_old['current_stage']['number'] + new_stage = status_new['current_stage']['number'] + + if old_stage < new_stage: + return True + elif old_stage > new_stage: + return False + # both in the same stage then + if 'progress' in status_old['current_stage'] and 'progress' in status_new['current_stage']: + assert 0 <= status_new['current_stage']['progress']['relative'] <= 1.0, "{} not between 0 and 1".format(status_new['current_stage']['progress']['relative']) + assert 0 <= status_old['current_stage']['progress']['relative'] <= 1.0, "{} not between 0 and 1".format(status_old['current_stage']['progress']['relative']) + + if status_new['current_stage']['progress']['relative'] < status_old['current_stage']['progress']['relative']: + return False + else: + return True + else: # same stage, no progress info + return True + + +def is_run_status(status): + assert 'state' in status + run_state = State.from_string(status['state']) + assert run_state in State + assert 'stages' in status + assert 'time_added' in status + assert 'current_stage' in status + cur_stage = status['current_stage'] + + if run_state == State.completed: + assert 'time_started' in status + assert 'time_completed' in status + elif run_state == State.running: + assert 'time_started' in status + + assert 'number' in cur_stage + if 'progress' in cur_stage: + assert 'absolute' in cur_stage['progress'] + assert 'relative' in cur_stage['progress'] + + +def upload_binary_data(requests, data, project_id, token, count, size=128, expected_status_code=201): + r = requests.post( + url + '/projects/{}/binaryclks'.format(project_id), + headers={ + 'Authorization': token, + 'Content-Type': 'application/octet-stream', + 'Hash-Count': str(count), + 'Hash-Size': str(size) + }, + data=data + ) + assert r.status_code == expected_status_code, 'I received this instead: {}'.format(r.text) + + upload_response = r.json() + if expected_status_code == 201: + assert 'receipt_token' in upload_response + return upload_response + + +def upload_binary_data_from_file(requests, file_path, project_id, token, count, size=128, expected_status_code=201): + with open(file_path, 'rb') as f: + return upload_binary_data(requests, f, project_id, token, count, size, expected_status_code) + + +def get_expected_number_parties(project_params): + return project_params.get('number_parties', 2) + + +def binary_upload_format(encoding_size): + bit_packing_fmt = f"!{encoding_size}s" + bit_packing_struct = struct.Struct(bit_packing_fmt) + return bit_packing_struct + + +def binary_pack_for_upload(filters, encoding_size): + bit_packing_struct = binary_upload_format(encoding_size) + for hash_bytes in filters: + yield bit_packing_struct.pack(hash_bytes) diff --git a/tools/ci.yml b/tools/ci.yml index c002fcb8..bae6bba6 100644 --- a/tools/ci.yml +++ b/tools/ci.yml @@ -1,17 +1,14 @@ version: '3.4' services: - tests: - image: data61/anonlink-app:${TAG:-latest} + image: data61/anonlink-test:${TAG:-latest} environment: - SERVER=http://nginx:8851 - INITIAL_DELAY=20 - JAEGER_AGENT_HOST=jaeger - command: /bin/sh -c "dockerize -wait tcp://nginx:8851/v1/status -timeout 1m python -m pytest -n 4 entityservice/tests --junitxml=testResults.xml -x" depends_on: - - db - - redis + - backend - worker - nginx diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index 3b9ed5a4..dd43f6ec 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -40,6 +40,7 @@ services: - db - db_init - redis + - minio # The application server can also setup the database @@ -53,6 +54,17 @@ services: depends_on: - db + # The dm admin + db_admin: + image: dpage/pgadmin4 + environment: + - PGADMIN_DEFAULT_PASSWORD=rX%QpV7Xgyrz + - PGADMIN_DEFAULT_EMAIL=brian@thorne.link + depends_on: + - db + ports: + - 5050:80 + # A celery worker worker: @@ -63,9 +75,7 @@ services: command: celery -A entityservice.async_worker worker --loglevel=info -O fair -Q celery,compute,highmemory environment: - DATABASE_PASSWORD=rX%QpV7Xgyrz - - DEBUG=true - - LOGGING_LEVEL=DEBUG - #- LOG_CFG=entityservice/verbose_logging.yaml + - LOG_CFG=entityservice/verbose_logging.yaml - MINIO_ACCESS_KEY=AKIAIOSFODNN7EXAMPLE - MINIO_SECRET_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY - CELERY_ACKS_LATE=true