diff --git a/README.md b/README.md index 9d8828cb..c9e54661 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,33 @@ # Anonlink Entity Service [![Documentation Status](https://readthedocs.org/projects/anonlink-entity-service/badge/?version=stable)](https://anonlink-entity-service.readthedocs.io/en/stable/?badge=stable) +[![Build Status](https://dev.azure.com/data61/Anonlink/_apis/build/status/data61.anonlink-entity-service?branchName=develop)](https://dev.azure.com/data61/Anonlink/_build/latest?definitionId=1&branchName=develop) -A service for performing privacy preserving record linkage. Allows organizations to carry out record linkage without disclosing personally identifiable information. +A REST service for performing privacy preserving record linkage. Allows organizations to carry out record linkage +without disclosing personally identifiable information. -Clients should use [anonlink-client](https://github.com/data61/anonlink-client/) or the [encoding-service](https://github.com/data61/anonlink-encoding-service/). +The *Anonlink Entity Service* is based on the concept of comparing *Anonymous Linking Codes* (ALC) - bit-arrays +representing an entity. -## Documentation +## Features -Project documentation including tutorials are hosted at https://anonlink-entity-service.readthedocs.io/en/stable/ +- Highly scalable architecture, able to distribute work to many machines at once. +- Optimized low level comparison code (provided by [anonlink](https://github.com/data61/anonlink)) +- Support for client side blocking (provided by [blocklib](https://github.com/data61/blocklib)) -The [docs](./docs) folder contains the documentation source. +Data providers wanting to link their records may want to consider using +[anonlink-client](https://github.com/data61/anonlink-client/) or the +[encoding-service](https://github.com/data61/anonlink-encoding-service/). + +## Documentation + +Project documentation including tutorials are hosted at +[anonlink-entity-service.readthedocs.io](https://anonlink-entity-service.readthedocs.io/en/stable/). + +## Demo + +A demo deployment is available at [anonlink.easd.data61.xyz](https://anonlink.easd.data61.xyz/) ## Build @@ -31,7 +47,8 @@ Note docker images are pushed to Docker Hub, which can be used instead of buildi | Component | Docker Hub | |------------------|---------| | Backend/Worker | [data61/anonlink-app](https://hub.docker.com/r/data61/anonlink-app) | -| Nginx | [data61/anonlink-nginx](https://hub.docker.com/r/data61/anonlink-nginx) | +| E2E Tests | [data61/anonlink-test](https://hub.docker.com/r/data61/anonlink-test) | +| Nginx Proxy | [data61/anonlink-nginx](https://hub.docker.com/r/data61/anonlink-nginx) | | Benchmark | [data61/anonlink-benchmark](https://hub.docker.com/r/data61/anonlink-benchmark) | | Docs | [data61/anonlink-docs-builder](https://hub.docker.com/r/data61/anonlink-docs-builder) | @@ -43,27 +60,31 @@ See the docs for more complete deployment documentation: - [Local Deployment](./docs/local-deployment.rst) - [Production Deployment](./docs/production-deployment.rst) -To run locally with `docker-compose`: +To test locally with `docker-compose`: docker-compose -f tools/docker-compose.yml up ## Testing -A simple query with curl should tell you the status of the service: - - curl localhost:8851/api/v1/status - { - "number_mappings": 2, - "rate": 44051409, - "status": "ok" - } - - ### Testing with docker-compose An additional docker-compose config file can be found in `./tools/ci.yml`, this can be added to run tests along with the rest of the service: - docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p entityservicetest up -d + docker-compose -f tools/docker-compose.yml -f tools/ci.yml up -d +## Citing + +The Anonlink Entity Service, and the wider Anonlink project is designed, developed and supported by +[CSIRO's Data61](https://www.data61.csiro.au/). If you use any part of this library in your research, please +cite it using the following BibTex entry:: + + @misc{Anonlink, + author = {CSIRO's Data61}, + title = {Anonlink Private Record Linkage System}, + year = {2017}, + publisher = {GitHub}, + journal = {GitHub Repository}, + howpublished = {\url{https://github.com/data61/anonlink-entity-service}}, + } diff --git a/backend/.dockerignore b/backend/.dockerignore index 2f5e4d8d..16d090b6 100644 --- a/backend/.dockerignore +++ b/backend/.dockerignore @@ -2,3 +2,4 @@ .git/ data .env +*.log diff --git a/backend/entityservice/api_def/openapi.yaml b/backend/entityservice/api_def/openapi.yaml index 11757d0f..34291bb7 100644 --- a/backend/entityservice/api_def/openapi.yaml +++ b/backend/entityservice/api_def/openapi.yaml @@ -321,16 +321,21 @@ paths: '/projects/{project_id}/clks': post: operationId: entityservice.views.project.project_clks_post - summary: Upload encoded PII data to a linkage project. + summary: Upload encoded data to a linkage project. tags: - Project description: | - Called by each of the data providers with their calculated `CLK` vectors. - The project must have been created, and the caller must have both the - `project_id` and a valid `upload_token` in order to contribute data. + Called by each data provider with their encodings and optional blocking + information. - The data uploaded must be of one of the following formats. - - CLKs only upload: An array of base64 encoded [CLKs](./concepts.html#cryptographic-longterm-keys), one per + The caller must have both the `project_id` and a valid `upload_token` in order to contribute data, + both of these are generated when a project is created. + This endpoint can directly accept uploads up to several hundred MiB, and can pull encoding data from + an object store for larger uploads. + + The data uploaded must be of one of the following formats: + + - Encodings only: An array of base64 encoded [CLKs](./concepts.html#cryptographic-longterm-keys), one per entity. - CLKs with blocking information upload: An array of base64 encoded CLKs with corresponding blocking information. One element in this array is an array with the first element being a base64 encoded CLK followed @@ -342,7 +347,7 @@ paths: The uploaded encodings must all have the same length in bytes. If the project's linkage schema specifes an encoding size it will be checked and enforced before any runs are computed. Note a minimum and maximum encoding size can be set at the server level at deployment time. - Currently anonlink requires this _encoding size_ to be a multiple of 8. An example value is 128 Bytes. + Currently anonlink requires this _encoding size_ to be a multiple of 8. A common value is `128 Bytes`. Note in the default deployment the maximum request size is set to `~10 GB`, which __should__ translate to just over 20 million entities. @@ -352,6 +357,12 @@ paths: This endpoint can be used with the Content-Type: application/json and uses the `CLKUpload` structure of a JSON array of base64 encoded strings. + ### Object Store Upload + + `encodings` and `blocks` can be pulled from an object store. `encodings` must be in the binary format + documented under the `/projects/{project_id}/binaryclks` endpoint. `blocks` must be a JSON file, comprising + a mapping of encoding identifiers to a list of block identifiers; both identifiers must be strings. + ### Binary Upload An additional api endpoint (/projects/{project_id}/binaryclks) has been added for uploading CLKs as a binary @@ -361,12 +372,13 @@ paths: - $ref: '#/components/parameters/project_id' - $ref: '#/components/parameters/token' requestBody: - description: the encoded PII + description: Data to upload required: true content: application/json: schema: oneOf: + - $ref: '#/components/schemas/EncodingUpload' - $ref: '#/components/schemas/CLKUpload' - $ref: '#/components/schemas/CLKnBlockUpload' responses: @@ -394,8 +406,8 @@ paths: tags: - Project description: | - An experimental api for uploading CLKs as a binary file. This is to allow for - faster and more efficient data transfer. + An experimental api for directly uploading CLKs as a binary file. You may instead want to + upload via an object store. Called by each of the data providers with their calculated `CLK` vectors. The project must have been created, and the caller must have both the `project_id` and a valid `upload_token` in order to contribute data. @@ -1081,17 +1093,92 @@ components: required: - number + EncodingUpload: + description: Object that contains one data provider's encodings + type: object + required: [encodings] + properties: + encodings: + oneOf: + - $ref: '#/components/schemas/EncodingArray' + - $ref: '#/components/schemas/ExternalData' + blocks: + oneOf: + - $ref: '#/components/schemas/BlockMap' + - $ref: '#/components/schemas/ExternalData' + EncodingArray: + description: Array of encodings, base64 encoded. + type: array + items: + - type: string + format: byte + description: Base64 encoded CLK data + + + BlockMap: + description: Blocking information for encodings. A mapping from encoding id (a string) to a list of block ids + type: object + additionalProperties: + type: array + items: + - type: string + description: Block ID + example: + "1": ["block1", "block2"] + "2": [] + "3": ["block1"] + + ExternalData: + description: A file in an object store. + type: object + required: [file] + properties: + credentials: + type: object + required: [AccessKeyId, SecretAccessKey] + description: | + Optional credentials to pull the file from the object store. + + Not required if using the Anonlink Entity Service's own object store. + properties: + AccessKeyId: + type: string + SecretAccessKey: + type: string + SessionToken: + type: string + file: + type: object + required: [bucket, path] + properties: + bucket: + type: string + example: anonlink-uploads + path: + type: string + description: The object name in the bucket. + example: project-foo/encodings.bin + endpoint: + type: string + description: | + Object store endpoint - usually a public endpoint for a MinIO as part of an Anonlink deployment e.g. + `minio.anonlink.easd.data61.xyz`, or a public (region specific) endpoint for AWS S3: + `s3.ap-southeast-2.amazonaws.com`. + + If not given the Anonlink Entity Service's own object store will be assumed. + example: s3.ap-southeast-2.amazonaws.com + secure: + type: boolean + default: true + description: If this object store should be connected to only over a secure connection. + CLKUpload: description: Object that contains this party's Bloom Filters type: object required: [clks] properties: clks: - type: array - items: - type: string - format: byte - description: Base64 encoded CLK data + $ref: '#/components/schemas/EncodingArray' CLKnBlockUpload: description: Object that contains this party's Bloom Filters including blocking information diff --git a/backend/entityservice/database/insertions.py b/backend/entityservice/database/insertions.py index 4cd917f6..3021e118 100644 --- a/backend/entityservice/database/insertions.py +++ b/backend/entityservice/database/insertions.py @@ -47,7 +47,7 @@ def insert_dataprovider(cur, auth_token, project_id): def insert_blocking_metadata(db, dp_id, blocks): """ - Insert a new entry into the blocks table. + Insert new entries into the blocks table. :param blocks: A dict mapping block id to the number of encodings per block. """ @@ -200,6 +200,24 @@ def update_encoding_metadata(db, clks_filename, dp_id, state): ]) +def update_blocks_state(db, dp_id, blocks, state): + assert state in {'pending', 'ready', 'error'} + sql_query = """ + UPDATE blocks + SET + state = %s + WHERE + dp = %s AND + block_name in %s + """ + + with db.cursor() as cur: + cur.execute(sql_query, [ + state, + dp_id, + tuple(blocks) + ]) + def update_encoding_metadata_set_encoding_size(db, dp_id, encoding_size): sql_query = """ UPDATE uploads @@ -209,7 +227,7 @@ def update_encoding_metadata_set_encoding_size(db, dp_id, encoding_size): dp = %s """ - logger.info("Updating database with info about encodings") + logger.info(f"Updating uploads table for dp {dp_id} with encoding size ({encoding_size})") with db.cursor() as cur: cur.execute(sql_query, [ encoding_size, @@ -278,6 +296,18 @@ def update_project_mark_all_runs_failed(conn, project_id): cur.execute(sql_query, [project_id]) +def update_dataprovider_uploaded_state(conn, project_id, dp_id, state): + with conn.cursor() as cur: + sql_query = """ + UPDATE dataproviders SET + uploaded = %s + WHERE + id = %s AND + project = %s + """ + cur.execute(sql_query, [state, dp_id, project_id]) + + def mark_project_deleted(db, project_id): with db.cursor() as cur: sql_query = """ @@ -328,10 +358,11 @@ def get_created_runs_and_queue(db, project_id): def is_dataprovider_allowed_to_upload_and_lock(db, dp_id): """ - This method returns true if the dataprovider is allowed to upload her clks. - A dataprovider is not allowed to upload clks if she has already uploaded them, or if the upload is in progress. + This method returns true if the data provider is allowed to upload their encodings. + + A dataprovider is not allowed to upload clks if they has already uploaded them, or if the upload is in progress. This method will lock the resource by setting the upload state to `in_progress` and returning `true`. - Note that the upload state can be `error`, in which case we are allowing the dataprovider to re-try uploading + Note that the upload state can be `error`, in which case we allow the dataprovider to re-try uploading her clks not to block a project if a failure occurred. """ logger.debug("Setting dataprovider {} upload state to `in_progress``".format(dp_id)) @@ -348,5 +379,5 @@ def is_dataprovider_allowed_to_upload_and_lock(db, dp_id): elif length > 1: logger.error("{} rows in the table `dataproviders` are associated to the same dataprovider id {}, while each" " dataprovider id should be unique.".format(length, dp_id)) - raise ValueError("Houston, we have a problem!!! This dataprovider can upload multiple times her clks.") + raise ValueError("Houston, we have a problem!!! This dataprovider has uploaded multiple times") return True diff --git a/backend/entityservice/encoding_storage.py b/backend/entityservice/encoding_storage.py index 7b4a5e91..d0c7ab11 100644 --- a/backend/entityservice/encoding_storage.py +++ b/backend/entityservice/encoding_storage.py @@ -3,10 +3,19 @@ from typing import Iterator, List, Tuple import ijson +import opentracing +from flask import g +from structlog import get_logger +from entityservice import database as db from entityservice.database import insert_encodings_into_blocks, get_encodingblock_ids, \ - get_chunk_of_encodings + get_chunk_of_encodings, DBConn from entityservice.serialization import deserialize_bytes, binary_format, binary_unpack_filters +from entityservice.utils import fmt_bytes + +logger = get_logger() + +DEFAULT_BLOCK_ID = '1' def stream_json_clksnblocks(f): @@ -110,3 +119,49 @@ def get_encoding_chunk(conn, chunk_info, encoding_size=128): chunk_data = binary_unpack_filters(encoding_iter, encoding_size=encoding_size) return chunk_data, len(chunk_data) + +def upload_clk_data_binary(project_id, dp_id, encoding_iter, receipt_token, count, size=128): + """ + Save the user provided binary-packed CLK data. + + """ + filename = None + # Set the state to 'pending' in the uploads table + with DBConn() as conn: + db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, encoding_count=count, block_count=1) + db.update_encoding_metadata_set_encoding_size(conn, dp_id, size) + num_bytes = binary_format(size).size * count + + logger.debug("Directly storing binary file with index, base64 encoded CLK, popcount") + + # Upload to database + logger.info(f"Uploading {count} binary encodings to database. Total size: {fmt_bytes(num_bytes)}") + parent_span = g.flask_tracer.get_span() + + with DBConn() as conn: + db.update_encoding_metadata_set_encoding_size(conn, dp_id, size) + + with opentracing.tracer.start_span('create-default-block-in-db', child_of=parent_span): + db.insert_blocking_metadata(conn, dp_id, {DEFAULT_BLOCK_ID: count}) + + with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span): + store_encodings_in_db(conn, dp_id, encoding_iter, size) + + with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span): + db.update_encoding_metadata(conn, filename, dp_id, 'ready') + + +def include_encoding_id_in_binary_stream(stream, size, count): + """ + Inject an encoding_id and default block into a binary stream of encodings. + """ + + binary_formatter = binary_format(size) + + def encoding_iterator(filter_stream): + # Assumes encoding id and block info not provided (yet) + for entity_id in range(count): + yield str(entity_id), binary_formatter.pack(entity_id, filter_stream.read(size)), [DEFAULT_BLOCK_ID] + + return encoding_iterator(stream) + diff --git a/backend/entityservice/init-db-schema.sql b/backend/entityservice/init-db-schema.sql index 8798322c..d483d755 100644 --- a/backend/entityservice/init-db-schema.sql +++ b/backend/entityservice/init-db-schema.sql @@ -131,6 +131,7 @@ CREATE TABLE uploads ( token CHAR(48) NOT NULL UNIQUE, -- Filename for the raw unprocessed uploaded data + -- Set to null where this is skipped (e.g. external data) file CHAR(64) NULL, state PROCESSEDSTATE NOT NULL, diff --git a/backend/entityservice/object_store.py b/backend/entityservice/object_store.py index 83eb0f61..232b987f 100644 --- a/backend/entityservice/object_store.py +++ b/backend/entityservice/object_store.py @@ -1,16 +1,19 @@ import minio +from minio.credentials import Credentials, Static from structlog import get_logger +from entityservice.async_worker import logger from entityservice.settings import Config as config logger = get_logger('objectstore') -def connect_to_object_store(): +def connect_to_object_store(credentials=None): mc = minio.Minio( config.MINIO_SERVER, config.MINIO_ACCESS_KEY, config.MINIO_SECRET_KEY, + credentials=credentials, secure=False ) logger.debug("Connected to minio") @@ -45,3 +48,30 @@ def create_bucket(minio_client, bucket): minio_client.make_bucket(bucket) except minio.error.BucketAlreadyOwnedByYou: logger.info("The bucket {} was already created.".format(bucket)) + + +def stat_and_stream_object(bucket_name, object_name, credentials=None): + mc = connect_to_object_store(credentials) + logger.debug("Checking object exists in object store") + stat = mc.stat_object(bucket_name=bucket_name, object_name=object_name) + logger.debug("Retrieving file from object store") + response = mc.get_object(bucket_name=bucket_name, object_name=object_name) + return stat, response + + +def parse_minio_credentials(credentials): + if credentials: + access_key = credentials['AccessKeyId'] + secret_key = credentials['SecretAccessKey'] + session_token = credentials.get('SessionToken', None) + mc_credentials = Credentials(provider=Static(access_key, secret_key, session_token)) + else: + mc_credentials = None + return mc_credentials + + +def delete_object_store_folder(mc, bucket_name, path): + objects_to_delete = mc.list_objects(bucket_name, prefix=path, recursive=True) + objects_to_delete = [x.object_name for x in objects_to_delete] + for del_err in mc.remove_objects(bucket_name, objects_to_delete): + logger.warning("Deletion Error: {}".format(del_err)) \ No newline at end of file diff --git a/backend/entityservice/serialization.py b/backend/entityservice/serialization.py index acc4ba80..0e879cf7 100644 --- a/backend/entityservice/serialization.py +++ b/backend/entityservice/serialization.py @@ -91,7 +91,7 @@ def binary_unpack_filters(data_iterable, max_bytes=None, encoding_size=None): filters = [] bytes_consumed = 0 - logger.info(f"Iterating over encodings of size {encoding_size} - packed as {bit_packed_element_size}") + logger.debug(f"Iterating over encodings of size {encoding_size} - packed as {bit_packed_element_size}") for raw_bytes in data_iterable: filters.append(binary_unpack_one(raw_bytes, bit_packed_element)) diff --git a/backend/entityservice/tasks/__init__.py b/backend/entityservice/tasks/__init__.py index 5eff8e98..928a9f65 100644 --- a/backend/entityservice/tasks/__init__.py +++ b/backend/entityservice/tasks/__init__.py @@ -6,7 +6,7 @@ from entityservice.tasks.pre_run_check import check_for_executable_runs from entityservice.tasks.assert_valid_run import assert_valid_run from entityservice.tasks.run import prerun_check -from entityservice.tasks.encoding_uploading import handle_raw_upload +from entityservice.tasks.encoding_uploading import handle_raw_upload, pull_external_data_encodings_only, pull_external_data from entityservice.tasks.comparing import create_comparison_jobs, compute_filter_similarity, aggregate_comparisons from entityservice.tasks.permutation import save_and_permute, permute_mapping_data from entityservice.tasks.solver import solver_task diff --git a/backend/entityservice/tasks/encoding_uploading.py b/backend/entityservice/tasks/encoding_uploading.py index 9a06f567..568205aa 100644 --- a/backend/entityservice/tasks/encoding_uploading.py +++ b/backend/entityservice/tasks/encoding_uploading.py @@ -1,19 +1,127 @@ -import io - -import opentracing +import json from entityservice.database import * from entityservice.encoding_storage import stream_json_clksnblocks, convert_encodings_from_base64_to_binary, \ - store_encodings_in_db + store_encodings_in_db, upload_clk_data_binary, include_encoding_id_in_binary_stream from entityservice.error_checking import check_dataproviders_encoding, handle_invalid_encoding_data, \ InvalidEncodingError -from entityservice.object_store import connect_to_object_store +from entityservice.object_store import connect_to_object_store, stat_and_stream_object, parse_minio_credentials +from entityservice.serialization import binary_format from entityservice.settings import Config -from entityservice.async_worker import celery, logger +from entityservice.async_worker import celery from entityservice.tasks.base_task import TracedTask from entityservice.tasks.pre_run_check import check_for_executable_runs +from entityservice.tracing import serialize_span from entityservice.utils import fmt_bytes, clks_uploaded_to_project +logger = get_logger() + + +@celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'dp_id')) +def pull_external_data(project_id, dp_id, + encoding_object_info, encoding_credentials, + blocks_object_info, blocks_credentials, + receipt_token, parent_span=None): + """ + Load encoding and blocking data from object store. + + - pull blocking map into memory, create blocks in db + - stream encodings into DB and add encoding + blocks from in memory dict. + + """ + log = logger.bind(pid=project_id, dp_id=dp_id) + with DBConn() as conn: + if not check_project_exists(conn, project_id): + log.info("Project deleted, stopping immediately") + return + + mc = connect_to_object_store(parse_minio_credentials(blocks_credentials)) + + log.debug("Pulling blocking information from object store") + response = mc.get_object(bucket_name=blocks_object_info['bucket'], object_name=blocks_object_info['path']) + encoding_to_block_map = json.load(response) + + log.debug("Counting the blocks") + block_sizes = {} + for encoding_id in encoding_to_block_map: + _blocks = encoding_to_block_map[encoding_id] + for block_id in _blocks: + block_id = str(block_id) + block_sizes[block_id] = block_sizes.setdefault(block_id, 0) + 1 + + block_count = len(block_sizes) + log.debug(f"Processing {block_count} blocks") + + # stream the encodings + bucket_name = encoding_object_info['bucket'] + object_name = encoding_object_info['path'] + + stat, encodings_stream = stat_and_stream_object(bucket_name, object_name, parse_minio_credentials(encoding_credentials)) + count = int(stat.metadata['X-Amz-Meta-Hash-Count']) + size = int(stat.metadata['X-Amz-Meta-Hash-Size']) + log.debug(f"Processing {count} encodings of size {size}") + assert count == len(encoding_to_block_map), f"Expected {count} encodings in blocks got {len(encoding_to_block_map)}" + + with DBConn() as conn: + with opentracing.tracer.start_span('update-metadata-db', child_of=parent_span): + insert_encoding_metadata(conn, None, dp_id, receipt_token, encoding_count=count, block_count=block_count) + update_encoding_metadata_set_encoding_size(conn, dp_id, size) + with opentracing.tracer.start_span('create-block-entries-in-db', child_of=parent_span): + log.debug("Adding blocks to db") + insert_blocking_metadata(conn, dp_id, block_sizes) + + def encoding_iterator(encoding_stream): + binary_formatter = binary_format(size) + for encoding_id in range(count): + yield ( + str(encoding_id), + binary_formatter.pack(encoding_id, encoding_stream.read(size)), + encoding_to_block_map[str(encoding_id)] + ) + + with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span): + log.debug("Adding encodings and associated blocks to db") + try: + store_encodings_in_db(conn, dp_id, encoding_iterator(encodings_stream), size) + except Exception as e: + update_dataprovider_uploaded_state(conn, project_id, dp_id, 'error') + log.warning(e) + + with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span): + update_encoding_metadata(conn, None, dp_id, 'ready') + update_blocks_state(conn, dp_id, block_sizes.keys(), 'ready') + + +@celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'dp_id')) +def pull_external_data_encodings_only(project_id, dp_id, object_info, credentials, receipt_token, parent_span=None): + """ + + """ + log = logger.bind(pid=project_id, dp_id=dp_id) + + with DBConn() as conn: + if not check_project_exists(conn, project_id): + log.info("Project deleted, stopping immediately") + return + + bucket_name = object_info['bucket'] + object_name = object_info['path'] + + log.info("Pulling encoding data from an object store") + mc_credentials = parse_minio_credentials(credentials) + stat, stream = stat_and_stream_object(bucket_name, object_name, mc_credentials) + + count = int(stat.metadata['X-Amz-Meta-Hash-Count']) + size = int(stat.metadata['X-Amz-Meta-Hash-Size']) + converted_stream = include_encoding_id_in_binary_stream(stream, size, count) + upload_clk_data_binary(project_id, dp_id, converted_stream, receipt_token, count, size) + + # # Now work out if all parties have added their data + if clks_uploaded_to_project(project_id): + logger.info("All parties data present. Scheduling any queued runs") + check_for_executable_runs.delay(project_id, serialize_span(parent_span)) + + @celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'dp_id')) def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): @@ -45,7 +153,7 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): with DBConn() as db: store_encodings_in_db(db, dp_id, pipeline, encoding_size) - log.info(f"Converted uploaded encodings of size {encoding_size} bytes into internal binary format. Number of blocks: {block_count}") + log.info(f"Converted uploaded encodings of size {fmt_bytes(encoding_size)} into internal binary format. Number of blocks: {block_count}") # As this is the first time we've seen the encoding size actually uploaded from this data provider # We check it complies with the project encoding size. diff --git a/backend/entityservice/tasks/project_cleanup.py b/backend/entityservice/tasks/project_cleanup.py index de6e7483..43b44e0b 100644 --- a/backend/entityservice/tasks/project_cleanup.py +++ b/backend/entityservice/tasks/project_cleanup.py @@ -3,7 +3,7 @@ import entityservice.database as db from entityservice.cache.active_runs import set_run_state_deleted from entityservice.database import DBConn -from entityservice.object_store import connect_to_object_store +from entityservice.object_store import connect_to_object_store, delete_object_store_folder from entityservice.async_worker import celery, logger from entityservice.tasks.base_task import TracedTask from entityservice.settings import Config @@ -12,7 +12,7 @@ @celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id',)) -def remove_project(project_id): +def remove_project(project_id, parent_span=None): """ """ @@ -28,20 +28,24 @@ def remove_project(project_id): db.delete_project_data(conn, project_id) log.debug("Getting object store files associated with project from database") object_store_files = db.get_all_objects_for_project(conn, project_id) - - log.debug(f"Removing {len(object_store_files)} object store files associated with project.") - delete_minio_objects.delay(object_store_files, project_id) + + delete_minio_objects.delay(object_store_files, project_id, parent_span) log.info("Project resources removed") @celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id',)) -def delete_minio_objects(filenames, project_id): +def delete_minio_objects(filenames, project_id, parent_span=None): log = logger.bind(pid=project_id) mc = connect_to_object_store() log.info(f"Deleting {len(filenames)} files from object store") try: - mc.remove_objects(Config.MINIO_BUCKET, filenames) + for del_err in mc.remove_objects(Config.MINIO_BUCKET, filenames): + log.debug("Deletion error: {}".format(del_err)) except MinioError as e: log.warning(f"Error occurred while removing object {filenames}. Ignoring.") + + if Config.UPLOAD_OBJECT_STORE_ENABLED: + log.debug("Deleting everything uploaded to object store for project") + delete_object_store_folder(mc, Config.UPLOAD_OBJECT_STORE_BUCKET, f"{project_id}/") diff --git a/backend/entityservice/tasks/stats.py b/backend/entityservice/tasks/stats.py index fdfb4455..41eecd32 100644 --- a/backend/entityservice/tasks/stats.py +++ b/backend/entityservice/tasks/stats.py @@ -27,7 +27,7 @@ def calculate_comparison_rate(): rate = total_comparisons/total_time.total_seconds() logger.info("Total comparisons: {}".format(total_comparisons)) logger.info("Total time: {}".format(total_time.total_seconds())) - logger.info("Comparison rate: {}".format(rate)) + logger.info("Comparison rate: {:.0f}".format(rate)) with dbinstance.cursor() as cur: insert_comparison_rate(cur, rate) diff --git a/backend/entityservice/tests/test_utils.py b/backend/entityservice/tests/test_utils.py index b927a8b3..6f401add 100644 --- a/backend/entityservice/tests/test_utils.py +++ b/backend/entityservice/tests/test_utils.py @@ -5,6 +5,37 @@ from entityservice.errors import InvalidConfiguration from entityservice.utils import load_yaml_config from entityservice.tests.util import generate_bytes, temp_file_containing +from entityservice.views import convert_encoding_upload_to_clknblock + + +class TestEncodingConversionUtils: + + def test_convert_encoding_upload_to_clknblock_encodings_only(self): + + out = convert_encoding_upload_to_clknblock({ + "encodings": ['123', '456', '789'] + }) + + assert 'clknblocks' in out + assert len(out['clknblocks']) == 3 + assert out['clknblocks'][0] == ['123', '1'] + + def test_convert_encoding_upload_to_clknblock(self): + + out = convert_encoding_upload_to_clknblock({ + "encodings": ['123', '456', '789', '000'], + "blocks": { + '0': ['1', '2'], + '1': ['1'], + '2': [] + } + }) + + assert 'clknblocks' in out + assert len(out['clknblocks']) == 3 + assert out['clknblocks'][0] == ['123', '1', '2'] + assert out['clknblocks'][1] == ['456', '1'] + assert out['clknblocks'][2] == ['789', ] class TestYamlLoader: diff --git a/backend/entityservice/utils.py b/backend/entityservice/utils.py index e06f22e5..313735d2 100644 --- a/backend/entityservice/utils.py +++ b/backend/entityservice/utils.py @@ -146,7 +146,7 @@ def clks_uploaded_to_project(project_id, check_data_ready=False): """ See if the given project has had all parties contribute data. """ log = logger.bind(pid=project_id) - log.info("Counting contributing parties") + log.debug("Counting contributing parties") with DBConn() as conn: if check_data_ready: parties_contributed = get_number_parties_ready(conn, project_id) @@ -189,3 +189,7 @@ def convert_mapping_to_list(permutation): defined by the keys. """ return [permutation[i] for i in range(len(permutation))] + + +def object_store_upload_path(project_id, dp_id): + return f"{project_id}/{dp_id}" diff --git a/backend/entityservice/views/auth_checks.py b/backend/entityservice/views/auth_checks.py index dc65b02c..954d0493 100644 --- a/backend/entityservice/views/auth_checks.py +++ b/backend/entityservice/views/auth_checks.py @@ -32,10 +32,11 @@ def abort_if_run_doesnt_exist(project_id, run_id): def abort_if_invalid_dataprovider_token(update_token): - logger.debug("checking authorization token to update data") + logger.debug("checking authorization token to upload data") with DBConn() as conn: resource_exists = db.check_update_auth(conn, update_token) if not resource_exists: + logger.debug("authorization token invalid") safe_fail_request(403, message=INVALID_ACCESS_MSG) @@ -60,35 +61,37 @@ def abort_if_invalid_results_token(resource_id, results_token): def abort_if_inconsistent_upload(uses_blocking, clk_json): - """ - check if the following combinations are true - - uses_blocking is False AND 'clks' element in upload JSON - - uses_blocking if True AND 'clknblocks' element in upload JSON - otherwise, return safe_fail_request + """Check if the uploaded data is consistent with requirements imposed at the project + level. + + Specifically checks that one of these combinations is true: + + If the project uses_blocking: + - `clknblocks` element in upload JSON, OR + - `blocks` element in uploaded JSON + + If the project doesn't use blocking: + - No `blocks` element or `clksnblocks` elment in uploaded JSON. + + Finally checks that encodings are uploaded somehow by either `clksnblocks` or + `encodings` being present in the uploaded JSON. + + If these conditions are not met raise a ValueError exception. :param uses_blocking: Boolean that indicates if the project uses blocking - :param clk_json: a json dict - :return: safe_fail_request if conditions are not met + :param clk_json: a dict of the JSON from the client. + :raises ValueError: """ - is_valid_clks = not uses_blocking and 'clks' in clk_json - is_valid_clknblocks = uses_blocking and 'clknblocks' in clk_json - if not (is_valid_clks or is_valid_clknblocks): - # fail condition1 - uses_blocking is True but uploaded element is "clks" - if uses_blocking and 'clks' in clk_json: - raise ValueError('Uploaded element is "clks" while expecting "clknblocks"') - # fail condition2 - uses_blocking is False but uploaded element is "clknblocks" - if not uses_blocking and 'clknblocks' in clk_json: - raise ValueError('Uploaded element is "clknblocks" while expecting "clks"') - # fail condition3 - "clks" exist in JSON but there is no data - if 'clks' in clk_json and len(clk_json['clks']) < 1: - raise ValueError('Missing CLKs information') - # fail condition4 - "clknblocks" exist in JSON but there is no data - if 'clknblocks' in clk_json and len(clk_json['clknblocks']) < 1: - raise ValueError('Missing CLK and Blocks information') - # fail condition5 - unknown element in JSON - if 'clks' not in clk_json and 'clknblocks' not in clk_json: - raise ValueError('Unknown upload element - expect "clks" or "clknblocks"') + if uses_blocking: + if 'clknblocks' not in clk_json and 'blocks' not in clk_json: + raise ValueError('Expecting blocking information. Either in "blocks" or "clksnblocks"') + else: + if 'clknblocks' in clk_json or 'blocks' in clk_json: + _msg = 'Blocking has been disabled for this project, but received blocking information' + raise ValueError(_msg) + if 'clknblocks' not in clk_json and 'encodings' not in clk_json and 'clks' not in clk_json: + raise ValueError('Missing encoding information') def dataprovider_id_if_authorize(resource_id, receipt_token): diff --git a/backend/entityservice/views/general.py b/backend/entityservice/views/general.py index c2da5ab5..451425f3 100644 --- a/backend/entityservice/views/general.py +++ b/backend/entityservice/views/general.py @@ -1,9 +1,11 @@ import platform import anonlink +import psycopg2.errors from entityservice.cache import service_status import entityservice.database as db +from entityservice.utils import safe_fail_request from entityservice.version import __version__ @@ -15,9 +17,12 @@ def status_get(): if status is None: # We ensure we can connect to the database during the status check with db.DBConn() as conn: - number_of_mappings = db.query_db(conn, ''' - SELECT COUNT(*) FROM projects - ''', one=True)['count'] + try: + number_of_mappings = db.query_db(conn, ''' + SELECT COUNT(*) FROM projects + ''', one=True)['count'] + except psycopg2.errors.UndefinedTable: + safe_fail_request(500, "DB uninitialized") current_rate = db.get_latest_rate(conn) diff --git a/backend/entityservice/views/objectstore.py b/backend/entityservice/views/objectstore.py index 2e3a47b0..32eb3459 100644 --- a/backend/entityservice/views/objectstore.py +++ b/backend/entityservice/views/objectstore.py @@ -7,7 +7,7 @@ from entityservice.settings import Config as config import entityservice.database as db from entityservice.object_store import connect_to_upload_object_store -from entityservice.utils import safe_fail_request +from entityservice.utils import safe_fail_request, object_store_upload_path from entityservice.views import bind_log_and_span, precheck_upload_token from entityservice.views.serialization import ObjectStoreCredentials @@ -56,7 +56,7 @@ def authorize_external_upload(project_id): client.set_app_info("anonlink", "development version") bucket_name = config.UPLOAD_OBJECT_STORE_BUCKET - path = f"{project_id}/{dp_id}" + path = object_store_upload_path(project_id, dp_id) log.info(f"Retrieving temporary object store credentials for path: '{bucket_name}/{path}'") credentials_provider = AssumeRoleProvider(client, @@ -79,6 +79,6 @@ def authorize_external_upload(project_id): "endpoint": config.UPLOAD_OBJECT_STORE_SERVER, "secure": config.UPLOAD_OBJECT_STORE_SECURE, "bucket": bucket_name, - "path": f"{project_id}/{dp_id}" + "path": path } }, 201 diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 0f38eb81..fcb664c8 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -2,16 +2,16 @@ import json import tempfile +from connexion import ProblemException from flask import request -from flask import g from structlog import get_logger import opentracing import entityservice.database as db -from entityservice.encoding_storage import store_encodings_in_db -from entityservice.tasks import handle_raw_upload, check_for_executable_runs, remove_project +from entityservice.encoding_storage import upload_clk_data_binary, include_encoding_id_in_binary_stream +from entityservice.tasks import handle_raw_upload, remove_project, pull_external_data_encodings_only, pull_external_data, check_for_executable_runs from entityservice.tracing import serialize_span -from entityservice.utils import safe_fail_request, get_json, generate_code, clks_uploaded_to_project, fmt_bytes +from entityservice.utils import safe_fail_request, get_json, generate_code, object_store_upload_path, clks_uploaded_to_project from entityservice.database import DBConn, get_project_column from entityservice.views.auth_checks import abort_if_project_doesnt_exist, abort_if_invalid_dataprovider_token, \ abort_if_invalid_results_token, get_authorization_token_type_or_abort, abort_if_inconsistent_upload @@ -20,11 +20,11 @@ from entityservice.serialization import binary_format from entityservice.settings import Config from entityservice.views.serialization import ProjectList, NewProjectResponse, ProjectDescription -from entityservice.views.util import bind_log_and_span +from entityservice.views.util import bind_log_and_span, convert_clks_to_clknblocks, \ + convert_encoding_upload_to_clknblock logger = get_logger() -DEFAULT_BLOCK_ID = '1' def projects_get(): @@ -61,7 +61,7 @@ def projects_post(project): def project_delete(project_id): - log = logger.bind(pid=project_id) + log, parent_span = bind_log_and_span(project_id) log.info('Request to delete project') # Check the resource exists and hasn't already been marked for deletion abort_if_project_doesnt_exist(project_id) @@ -74,7 +74,7 @@ def project_delete(project_id): db.mark_project_deleted(db_conn, project_id) log.info("Queuing authorized request to delete project resources") - remove_project.delay(project_id) + remove_project.delay(project_id, serialize_span(parent_span)) return '', 204 @@ -120,7 +120,7 @@ def project_binaryclks_post(project_id): log = log.bind(dp_id=dp_id) log.info("Receiving CLK data.") - receipt_token = None + receipt_token = generate_code() with opentracing.tracer.start_span('upload-clk-data', child_of=parent_span) as span: span.set_tag("project_id", project_id) @@ -145,12 +145,8 @@ def project_binaryclks_post(project_id): # https://github.com/zalando/connexion/issues/592 # stream = get_stream() stream = BytesIO(request.data) - binary_formatter = binary_format(size) - def encoding_iterator(filter_stream): - # Assumes encoding id and block info not provided (yet) - for entity_id in range(count): - yield str(entity_id), binary_formatter.pack(entity_id, filter_stream.read(size)), [DEFAULT_BLOCK_ID] + converted_stream = include_encoding_id_in_binary_stream(stream, size, count) expected_bytes = size * count log.debug(f"Stream size is {len(request.data)} B, and we expect {expected_bytes} B") @@ -158,7 +154,7 @@ def encoding_iterator(filter_stream): safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct") try: - receipt_token = upload_clk_data_binary(project_id, dp_id, encoding_iterator(stream), count, size) + upload_clk_data_binary(project_id, dp_id, converted_stream, receipt_token, count, size) except ValueError: safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct.") @@ -173,6 +169,13 @@ def encoding_iterator(filter_stream): raise with DBConn() as conn: db.set_dataprovider_upload_state(conn, dp_id, state='done') + + # Now work out if all parties have added their data + if clks_uploaded_to_project(project_id): + logger.info("All parties data present. Scheduling any queued runs") + check_for_executable_runs.delay(project_id, serialize_span(parent_span)) + + return {'message': 'Updated', 'receipt_token': receipt_token}, 201 @@ -201,9 +204,9 @@ def project_clks_post(project_id): headers = request.headers log, parent_span = bind_log_and_span(project_id) - + log.debug("Starting data upload request") token = precheck_upload_token(project_id, headers, parent_span) - + receipt_token = generate_code() with DBConn() as conn: dp_id = db.get_dataprovider_id(conn, token) project_encoding_size = db.get_project_schema_encoding_size(conn, project_id) @@ -216,7 +219,6 @@ def project_clks_post(project_id): log = log.bind(dp_id=dp_id) log.info("Receiving CLK data.") - receipt_token = None with opentracing.tracer.start_span('upload-clk-data', child_of=parent_span) as span: span.set_tag("project_id", project_id) @@ -228,11 +230,9 @@ def project_clks_post(project_id): # However, as connexion is very, very strict about input validation when it comes to json, it will always # consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as # json into memory. -> issue #184 - receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), uses_blocking, parent_span=span) - # Schedule a task to deserialize the hashes, and carry - # out a pop count. - handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span)) - log.info("Job scheduled to handle user uploaded hashes") + handle_encoding_upload_json(project_id, dp_id, get_json(), receipt_token, uses_blocking, parent_span=span) + + log.info("Job scheduled to handle users upload") elif headers['Content-Type'] == "application/octet-stream": span.set_tag("content-type", 'binary') log.info("Handling binary CLK upload") @@ -258,19 +258,34 @@ def project_clks_post(project_id): if len(request.data) != expected_bytes: safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct") try: - receipt_token = upload_clk_data_binary(project_id, dp_id, stream, count, size) + upload_clk_data_binary(project_id, dp_id, stream, receipt_token, count, size) except ValueError: safe_fail_request(400, "Uploaded data did not match the expected size. Check request headers are correct.") else: safe_fail_request(400, "Content Type not supported") - except Exception: - log.info("The dataprovider was not able to upload her clks," - " re-enable the corresponding upload token to be used.") + except ProblemException as e: + # Have an exception that is safe for the user. We reset the upload state to + # allow the user to try upload again. + log.info(f"Problem occurred, returning status={e.status} - {e.detail}") with DBConn() as conn: - db.set_dataprovider_upload_state(conn, dp_id, state='error') + db.set_dataprovider_upload_state(conn, dp_id, state='not_started') raise + except Exception as e: + log.warning("Unhandled error occurred during data upload") + log.exception(e) + with DBConn() as conn: + db.set_dataprovider_upload_state(conn, dp_id, state='error') + safe_fail_request(500, "Sorry, the server couldn't handle that request") + + with DBConn() as conn: db.set_dataprovider_upload_state(conn, dp_id, state='done') + + # Now work out if all parties have added their data + if clks_uploaded_to_project(project_id): + logger.info("All parties data present. Scheduling any queued runs") + check_for_executable_runs.delay(project_id, serialize_span(parent_span)) + return {'message': 'Updated', 'receipt_token': receipt_token}, 201 @@ -317,65 +332,85 @@ def authorise_get_request(project_id): return dp_id, project_object -def upload_clk_data_binary(project_id, dp_id, encoding_iter, count, size=128): - """ - Save the user provided binary-packed CLK data. - +def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses_blocking, parent_span): """ - receipt_token = generate_code() - filename = None - # Set the state to 'pending' in the uploads table - with DBConn() as conn: - db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, encoding_count=count, block_count=1) - db.update_encoding_metadata_set_encoding_size(conn, dp_id, size) - logger.info(f"Storing supplied binary clks of individual size {size} in file: {filename}") - - num_bytes = binary_format(size).size * count - - logger.debug("Directly storing binary file with index, base64 encoded CLK, popcount") - - # Upload to database - logger.info(f"Uploading {count} binary encodings to database. Total size: {fmt_bytes(num_bytes)}") - parent_span = g.flask_tracer.get_span() - - with DBConn() as conn: - with opentracing.tracer.start_span('create-default-block-in-db', child_of=parent_span): - db.insert_blocking_metadata(conn, dp_id, {DEFAULT_BLOCK_ID: count}) - - with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span): - store_encodings_in_db(conn, dp_id, encoding_iter, size) + Take user provided upload information - accepting multiple formats - and eventually + injest into the database. - with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span): - db.update_encoding_metadata(conn, filename, dp_id, 'ready') - - # Now work out if all parties have added their data - if clks_uploaded_to_project(project_id): - logger.info("All parties data present. Scheduling any queued runs") - check_for_executable_runs.delay(project_id, serialize_span(parent_span)) + Encodings uploaded directly in the JSON are first quarantined in the object store, + and a background task deserializes them. - return receipt_token - - -def upload_json_clk_data(dp_id, clk_json, uses_blocking, parent_span): - """ - Take user provided encodings as json dict and save them, as-is, to the object store. - - Note this implementation is non-streaming. + Encodings that are in an object store are streamed directly into the database by + a background task. """ + log = logger.bind(pid=project_id) + log.info("Checking json is consistent") try: abort_if_inconsistent_upload(uses_blocking, clk_json) except ValueError as e: - safe_fail_request(403, e) + safe_fail_request(403, e.args[0]) + + if "encodings" in clk_json and 'file' in clk_json['encodings']: + # external encodings + log.info("External encodings uploaded") + encoding_object_info = clk_json['encodings']['file'] + object_name = encoding_object_info['path'] + _check_object_path_allowed(project_id, dp_id, object_name, log) + + encoding_credentials = clk_json['encodings'].get('credentials') + # Schedule a background task to pull the encodings from the object store + # This background task updates the database with encoding metadata assuming + # that there are no blocks. + if 'blocks' not in clk_json: + log.info("scheduling task to pull encodings from object store") + pull_external_data_encodings_only.delay( + project_id, + dp_id, + encoding_object_info, + encoding_credentials, + receipt_token, + parent_span=serialize_span(parent_span)) + else: + # Need to deal with both encodings and blocks + if 'file' in clk_json['blocks']: + object_name = clk_json['blocks']['file']['path'] + _check_object_path_allowed(project_id, dp_id, object_name, log) + # Blocks are in an external file + blocks_object_info = clk_json['blocks']['file'] + blocks_credentials = clk_json['blocks'].get('credentials') + log.info("scheduling task to pull both encodings and blocking data from object store") + pull_external_data.delay( + project_id, + dp_id, + encoding_object_info, + encoding_credentials, + blocks_object_info, + blocks_credentials, + receipt_token, + parent_span=serialize_span(parent_span)) + else: + raise NotImplementedError("Don't currently handle combination of external encodings and blocks") + + + return + + # Convert uploaded JSON to common schema. + # + # The original JSON API simply accepted "clks", then came a combined encoding and + # blocking API expecting the top level element "clknblocks". Finally an API that + # specifies both "encodings" and "blocks" independently at the top level. + # + # We rewrite all into the "clknblocks" format. + if "encodings" in clk_json: + logger.debug("converting from 'encodings' & 'blocks' format to 'clknblocks'") + clk_json = convert_encoding_upload_to_clknblock(clk_json) - # now we need to know element name - clks or clknblocks is_valid_clks = not uses_blocking and 'clks' in clk_json element = 'clks' if is_valid_clks else 'clknblocks' if len(clk_json[element]) < 1: safe_fail_request(400, message="Missing CLKs information") - receipt_token = generate_code() - filename = Config.RAW_FILENAME_FMT.format(receipt_token) logger.info("Storing user {} supplied {} from json".format(dp_id, element)) @@ -386,7 +421,7 @@ def upload_json_clk_data(dp_id, clk_json, uses_blocking, parent_span): if element == 'clks': logger.info("Rewriting provided json into clknsblocks format") - clk_json = {'clknblocks': [[encoding, '1'] for encoding in clk_json['clks']]} + clk_json = convert_clks_to_clknblocks(clk_json) element = 'clknblocks' logger.info("Counting block sizes and number of blocks") @@ -421,4 +456,11 @@ def upload_json_clk_data(dp_id, clk_json, uses_blocking, parent_span): db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, encoding_count, block_count) db.insert_blocking_metadata(conn, dp_id, block_sizes) - return receipt_token, filename + # Schedule a task to deserialize the encodings + handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(parent_span)) + + +def _check_object_path_allowed(project_id, dp_id, object_name, log): + if not object_name.startswith(object_store_upload_path(project_id, dp_id)): + log.warning(f"Attempt to upload to illegal path: {object_name}") + safe_fail_request(403, "Provided object store path is not allowed") diff --git a/backend/entityservice/views/util.py b/backend/entityservice/views/util.py index a3763282..f882cf94 100644 --- a/backend/entityservice/views/util.py +++ b/backend/entityservice/views/util.py @@ -13,3 +13,31 @@ def bind_log_and_span(project_id, run_id=None): log = log.bind(rid=run_id) parent_span.set_tag("run_id", run_id) return log, parent_span + + +def convert_encoding_upload_to_clknblock(clk_json): + """Convert from one upload schema to another. + + The source schema specifies both "encodings" and "blocks" independently at the top level, + the target is the "clknblocks" format. + """ + result = [] + if "blocks" in clk_json: + # user has supplied blocking info as a mapping from encoding id to a list of block ids + # The "encodings" are also supplied indexed by encoding_id + # target format is ['UG9vcA==', '001', '211'], [...] + for encoding_id in clk_json['blocks']: + encoding = clk_json['encodings'][int(encoding_id)] + result.append( + [encoding] + clk_json['blocks'][encoding_id] + ) + else: + # user has only supplied encodings + for encoding in clk_json['encodings']: + result.append([encoding, '1']) + + return {'clknblocks': result} + + +def convert_clks_to_clknblocks(clk_json): + return {'clknblocks': [[encoding, '1'] for encoding in clk_json['clks']]} \ No newline at end of file diff --git a/deployment/entity-service/Chart.yaml b/deployment/entity-service/Chart.yaml index bfb82449..89bbd955 100644 --- a/deployment/entity-service/Chart.yaml +++ b/deployment/entity-service/Chart.yaml @@ -7,8 +7,7 @@ sources: - https://github.com/data61/anonlink - https://github.com/data61/clkhash maintainers: - - name: Brian Thorne - email: brian.thorne@data61.csiro.au + - name: Confidential Computing url: https://data61.csiro.au icon: https://s3-us-west-2.amazonaws.com/slack-files2/avatars/2016-04-11/33560836053_df0d62a81bf32f53df00_72.png apiVersion: v1 diff --git a/deployment/entity-service/requirements.yaml b/deployment/entity-service/requirements.yaml index 89564ece..44a2efa2 100644 --- a/deployment/entity-service/requirements.yaml +++ b/deployment/entity-service/requirements.yaml @@ -4,7 +4,7 @@ dependencies: repository: https://kubernetes-charts.storage.googleapis.com condition: provision.redis - name: minio - version: 5.0.22 + version: 5.0.24 repository: https://kubernetes-charts.storage.googleapis.com condition: provision.minio - name: postgresql diff --git a/deployment/entity-service/templates/highmemory-worker-deployment.yaml b/deployment/entity-service/templates/highmemory-worker-deployment.yaml index 13b23dd1..ad5617e9 100644 --- a/deployment/entity-service/templates/highmemory-worker-deployment.yaml +++ b/deployment/entity-service/templates/highmemory-worker-deployment.yaml @@ -16,6 +16,7 @@ spec: matchLabels: app: {{ template "es.appname" . }} component: {{ list (required "workers.name must be provided." .Values.workers.name) "highmemory" | join "-" | quote }} + release: {{ .Release.Name }} tier: backend {{- if .Values.workers.strategy }} strategy: diff --git a/deployment/entity-service/templates/worker-deployment.yaml b/deployment/entity-service/templates/worker-deployment.yaml index e7cc7d2a..5321ebfd 100644 --- a/deployment/entity-service/templates/worker-deployment.yaml +++ b/deployment/entity-service/templates/worker-deployment.yaml @@ -16,6 +16,7 @@ spec: matchLabels: app: {{ template "es.appname" . }} component: "{{ .Values.workers.name }}" + release: {{ .Release.Name }} tier: backend {{- if .Values.workers.strategy }} strategy: diff --git a/e2etests/config.py b/e2etests/config.py index 3fbec7e2..ad976bc8 100644 --- a/e2etests/config.py +++ b/e2etests/config.py @@ -4,3 +4,4 @@ initial_delay = float(os.environ.get("INITIAL_DELAY", "2")) rate_limit_delay = float(os.environ.get("RATE_LIMIT_DELAY", "0.25")) url = os.environ.get("SERVER", "http://localhost:8851") + "/api/v1/" +minio_host = os.environ.get("UPLOAD_OBJECT_STORE_SERVER") diff --git a/e2etests/tests/conftest.py b/e2etests/tests/conftest.py index 321ac74f..1c726f53 100644 --- a/e2etests/tests/conftest.py +++ b/e2etests/tests/conftest.py @@ -142,10 +142,26 @@ def valid_project_params(request, result_type_number_parties_or_none): @pytest.fixture(scope='function') def a_project(request, requests): + # a 2 party project with blocking disabled project = create_project_no_data( requests, result_type="groups", - number_parties=2) + number_parties=2, + uses_blocking=False + ) + yield project + # Release project resource + delete_project(requests, project) + +@pytest.fixture(scope='function') +def a_blocking_project(request, requests): + # a 2 party project with blocking disabled + project = create_project_no_data( + requests, + result_type="groups", + number_parties=2, + uses_blocking=True + ) yield project # Release project resource delete_project(requests, project) diff --git a/e2etests/tests/test_project_uploads.py b/e2etests/tests/test_project_uploads.py index 3dd13ded..4a7f7d94 100644 --- a/e2etests/tests/test_project_uploads.py +++ b/e2etests/tests/test_project_uploads.py @@ -1,9 +1,11 @@ +import io +import json import time import os import pytest from minio import Minio -from e2etests.config import url +from e2etests.config import url, minio_host from e2etests.util import ( create_project_upload_data, create_project_upload_fake_data, generate_clks, generate_json_serialized_clks, @@ -11,7 +13,7 @@ upload_binary_data, upload_binary_data_from_file, binary_pack_for_upload) -def test_project_single_party_data_uploaded(requests, valid_project_params): +def test_project_single_party_data_uploaded_clks_format(requests, valid_project_params): new_project_data = requests.post(url + '/projects', json={ 'schema': {}, @@ -29,15 +31,62 @@ def test_project_single_party_data_uploaded(requests, valid_project_params): assert 'receipt_token' in upload_response -def test_project_external_data_uploaded(requests, valid_project_params, binary_test_file_path): - new_project_data = requests.post(url + 'projects', +def test_project_single_party_data_uploaded_clknblocks_format(requests, a_blocking_project): + + clksnblocks = [[encoding, '1'] for encoding in generate_json_serialized_clks(10)] + + r = requests.post( + url + '/projects/{}/clks'.format(a_blocking_project['project_id']), + headers={'Authorization': a_blocking_project['update_tokens'][0]}, + json={ + 'clknblocks': clksnblocks + } + ) + assert r.status_code == 201, r.text + upload_response = r.json() + assert 'receipt_token' in upload_response + + +def test_project_single_party_data_uploaded_encodings_format(requests, valid_project_params): + new_project_data = requests.post(url + '/projects', json={ 'schema': {}, **valid_project_params }).json() - r = requests.get( - url + 'projects/{}/authorize-external-upload'.format(new_project_data['project_id']), + r = requests.post( + url + '/projects/{}/clks'.format(new_project_data['project_id']), headers={'Authorization': new_project_data['update_tokens'][0]}, + json={ + 'encodings': generate_json_serialized_clks(100) + } + ) + assert r.status_code == 201, r.text + upload_response = r.json() + assert 'receipt_token' in upload_response + + +def test_project_single_party_data_uploaded_encodings_and_blocks_format(requests, a_blocking_project): + encodings = generate_json_serialized_clks(10) + blocks = {encoding_id: ['1'] for encoding_id in range(len(encodings))} + + r = requests.post( + url + '/projects/{}/clks'.format(a_blocking_project['project_id']), + headers={'Authorization': a_blocking_project['update_tokens'][0]}, + json={ + 'encodings': encodings, + 'blocks': blocks + } + ) + assert r.status_code == 201, r.text + upload_response = r.json() + assert 'receipt_token' in upload_response + + +def test_project_upload_external_encodings(requests, a_project, binary_test_file_path): + + r = requests.get( + url + 'projects/{}/authorize-external-upload'.format(a_project['project_id']), + headers={'Authorization': a_project['update_tokens'][0]}, ) assert r.status_code == 201 upload_response = r.json() @@ -46,8 +95,9 @@ def test_project_external_data_uploaded(requests, valid_project_params, binary_t upload_info = upload_response['upload'] # Use Minio python client to upload data + mc = Minio( - upload_info['endpoint'], + minio_host or upload_info['endpoint'], access_key=credentials['AccessKeyId'], secret_key=credentials['SecretAccessKey'], session_token=credentials['SessionToken'], @@ -55,10 +105,145 @@ def test_project_external_data_uploaded(requests, valid_project_params, binary_t secure=upload_info['secure'] ) - etag = mc.fput_object(upload_info['bucket'], upload_info['path'] + "/test", binary_test_file_path) + etag = mc.fput_object( + upload_info['bucket'], + upload_info['path'] + "/test", + binary_test_file_path, + metadata={ + "hash-count": 1000, + "hash-size": 128 + } + ) + + # Should be able to notify the service that we've uploaded data + res = requests.post(url + f"projects/{a_project['project_id']}/clks", + headers={'Authorization': a_project['update_tokens'][0]}, + json={ + 'encodings': { + 'file': { + 'bucket': upload_info['bucket'], + 'path': upload_info['path'] + "/test", + } + } + } + ) + assert res.status_code == 201 + + +def test_project_upload_external_data(requests, a_blocking_project, binary_test_file_path): + project = a_blocking_project + blocking_data = json.dumps( + {str(encoding_id): [str(encoding_id % 2), str(encoding_id % 3)] for encoding_id in range(1000)}).encode() + + mc, upload_info = get_temp_upload_client(project, requests, project['update_tokens'][0]) + + _upload_encodings_and_blocks(mc, upload_info, blocking_data, binary_test_file_path) + + # Should be able to notify the service that we've uploaded data + res = requests.post(url + f"projects/{project['project_id']}/clks", + headers={'Authorization': project['update_tokens'][0]}, + json={ + 'encodings': { + 'file': { + 'bucket': upload_info['bucket'], + 'path': upload_info['path'] + "/encodings", + } + }, + 'blocks': { + 'file': { + 'bucket': upload_info['bucket'], + 'path': upload_info['path'] + "/blocks", + } + } + + } + ) + assert res.status_code == 201 + + # If the second data provider uses the same path to upload data, that shouldn't work + res2 = requests.post(url + f"projects/{project['project_id']}/clks", + headers={'Authorization': project['update_tokens'][1]}, + json={ + 'encodings': { + 'file': { + 'bucket': upload_info['bucket'], + 'path': upload_info['path'] + "/encodings", + } + }, + 'blocks': { + 'file': { + 'bucket': upload_info['bucket'], + 'path': upload_info['path'] + "/blocks", + } + } + + } + ) + assert res2.status_code == 403 + + mc2, upload_info2 = get_temp_upload_client(project, requests, project['update_tokens'][1]) + _upload_encodings_and_blocks(mc2, upload_info2, blocking_data, binary_test_file_path) + + # If the second data provider uses the correct path to upload data, that should work + res3 = requests.post(url + f"projects/{project['project_id']}/clks", + headers={'Authorization': project['update_tokens'][1]}, + json={ + 'encodings': { + 'file': { + 'bucket': upload_info2['bucket'], + 'path': upload_info2['path'] + "/encodings", + } + }, + 'blocks': { + 'file': { + 'bucket': upload_info2['bucket'], + 'path': upload_info2['path'] + "/blocks", + } + } + } + ) + assert res3.status_code == 201 + run_id = post_run(requests, project, threshold=0.9) + result = get_run_result(requests, project, run_id, timeout=60) + assert 'groups' in result + + +def _upload_encodings_and_blocks(mc, upload_info, blocking_data, binary_test_file_path): + mc.fput_object( + upload_info['bucket'], + upload_info['path'] + "/encodings", + binary_test_file_path, + metadata={ + "hash-count": 1000, + "hash-size": 128 + } + ) + mc.put_object( + upload_info['bucket'], + upload_info['path'] + "/blocks", + io.BytesIO(blocking_data), + len(blocking_data) + ) + - # Later - once the upload endpoint is complete notify the server - # of the uploaded data +def get_temp_upload_client(project, requests, update_token): + r = requests.get( + url + 'projects/{}/authorize-external-upload'.format(project['project_id']), + headers={'Authorization': update_token}, + ) + assert r.status_code == 201 + upload_response = r.json() + credentials = upload_response['credentials'] + upload_info = upload_response['upload'] + mc = Minio( + minio_host or upload_info['endpoint'], + access_key=credentials['AccessKeyId'], + secret_key=credentials['SecretAccessKey'], + session_token=credentials['SessionToken'], + region='us-east-1', + secure=upload_info['secure'] + ) + return mc, upload_info def test_project_binary_data_uploaded(requests, valid_project_params, binary_test_file_path): diff --git a/e2etests/tests/test_uploads.py b/e2etests/tests/test_uploads.py index 7c4131bc..5b8aeeff 100644 --- a/e2etests/tests/test_uploads.py +++ b/e2etests/tests/test_uploads.py @@ -4,13 +4,17 @@ import pytest from e2etests.config import url +from e2etests.util import generate_overlapping_clk_data class TestAuthorizeExternalUpload: def test_get_auth_credentials(self, requests, a_project): + expected_number_parties = 2 + datasets = generate_overlapping_clk_data( + [100] * expected_number_parties, overlap=0.8) - for dp_index in range(2): + for dp_index in range(expected_number_parties): pid = a_project['project_id'] res = requests.get(url + f"projects/{pid}/authorize-external-upload", headers={'Authorization': a_project['update_tokens'][dp_index]}) @@ -67,3 +71,11 @@ def test_get_auth_credentials(self, requests, a_project): with pytest.raises(minio.error.AccessDenied): list(restricted_mc_client.list_objects(bucket_name, prefix=allowed_path)) + # Should be able to notify the service that we've uploaded data + res = requests.post(url + f"projects/{pid}/clks", + headers={'Authorization': a_project['update_tokens'][dp_index]}, + json={ + 'encodings': datasets[dp_index] + } + ) + assert res.status_code == 201 diff --git a/e2etests/util.py b/e2etests/util.py index 75d3a7c1..b38ed922 100644 --- a/e2etests/util.py +++ b/e2etests/util.py @@ -140,7 +140,7 @@ def create_project_no_data(requests, result_type='groups', number_parties=None, uses_blocking=False): number_parties_param = ( {} if number_parties is None else {'number_parties': number_parties}) - new_project_response = requests.post(url + '/projects', + new_project_response = requests.post(url + 'projects', headers={'Authorization': 'invalid'}, json={ 'schema': {}, @@ -217,14 +217,14 @@ def _check_delete_response(r): def delete_project(requests, project): project_id = project['project_id'] result_token = project['result_token'] - r = requests.delete(url + '/projects/{}'.format(project_id), + r = requests.delete(url + 'projects/{}'.format(project_id), headers={'Authorization': result_token}) _check_delete_response(r) def delete_run(requests, project_id, run_id, result_token): - r = requests.delete(url + '/projects/{}/runs/{}'.format(project_id, run_id), + r = requests.delete(url + 'projects/{}/runs/{}'.format(project_id, run_id), headers={'Authorization': result_token}) _check_delete_response(r) @@ -232,7 +232,7 @@ def delete_run(requests, project_id, run_id, result_token): def get_run_status(requests, project, run_id, result_token = None): project_id = project['project_id'] result_token = project['result_token'] if result_token is None else result_token - r = requests.get(url + '/projects/{}/runs/{}/status'.format(project_id, run_id), + r = requests.get(url + 'projects/{}/runs/{}/status'.format(project_id, run_id), headers={'Authorization': result_token}) assert r.status_code == 200, 'I received this instead: {}'.format(r.text) diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index 9e7b9cc2..07368587 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -102,6 +102,8 @@ services: - LOG_CFG=entityservice/verbose_logging.yaml - MINIO_ACCESS_KEY=AKIAIOSFODNN7EXAMPLE - MINIO_SECRET_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY + - UPLOAD_OBJECT_STORE_BUCKET=uploads + - UPLOAD_OBJECT_STORE_SECURE=false - CELERY_ACKS_LATE=true - REDIS_USE_SENTINEL=false - CELERYD_MAX_TASKS_PER_CHILD=2048