Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
.git/
data
.env
*.log
106 changes: 93 additions & 13 deletions backend/entityservice/api_def/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -321,16 +321,21 @@ paths:
'/projects/{project_id}/clks':
post:
operationId: entityservice.views.project.project_clks_post
summary: Upload encoded PII data to a linkage project.
summary: Upload encoded data to a linkage project.
tags:
- Project
description: |
Called by each of the data providers with their calculated `CLK` vectors.
The project must have been created, and the caller must have both the
`project_id` and a valid `upload_token` in order to contribute data.
Called by each data provider with their encodings and optional blocking
information.

The data uploaded must be of one of the following formats.
- CLKs only upload: An array of base64 encoded [CLKs](./concepts.html#cryptographic-longterm-keys), one per
The caller must have both the `project_id` and a valid `upload_token` in order to contribute data,
both of these are generated when a project is created.
This endpoint can directly accept uploads up to several hundred MiB, and can pull encoding data from
an object store for larger uploads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 💯


The data uploaded must be of one of the following formats:

- Encodings only: An array of base64 encoded [CLKs](./concepts.html#cryptographic-longterm-keys), one per
entity.
- CLKs with blocking information upload: An array of base64 encoded CLKs with corresponding blocking
information. One element in this array is an array with the first element being a base64 encoded CLK followed
Expand All @@ -342,7 +347,7 @@ paths:
The uploaded encodings must all have the same length in bytes. If the project's linkage schema
specifes an encoding size it will be checked and enforced before any runs are computed. Note a
minimum and maximum encoding size can be set at the server level at deployment time.
Currently anonlink requires this _encoding size_ to be a multiple of 8. An example value is 128 Bytes.
Currently anonlink requires this _encoding size_ to be a multiple of 8. A common value is `128 Bytes`.

Note in the default deployment the maximum request size is set to `~10 GB`, which __should__
translate to just over 20 million entities.
Expand All @@ -361,12 +366,13 @@ paths:
- $ref: '#/components/parameters/project_id'
- $ref: '#/components/parameters/token'
requestBody:
description: the encoded PII
description: Data to upload
required: true
content:
application/json:
schema:
oneOf:
- $ref: '#/components/schemas/EncodingUpload'
- $ref: '#/components/schemas/CLKUpload'
- $ref: '#/components/schemas/CLKnBlockUpload'
responses:
Expand Down Expand Up @@ -1081,17 +1087,91 @@ components:
required:
- number

EncodingUpload:
description: Object that contains one data provider's encodings
type: object
required: [encodings]
properties:
encodings:
oneOf:
- $ref: '#/components/schemas/EncodingArray'
- $ref: '#/components/schemas/ExternalData'
blocks:
oneOf:
- $ref: '#/components/schemas/BlockMap'
## TODO may be useful to handle external blocking data too
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely. With very small blocks, the blocking info is in the same order of size as the encodings.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be part of a follow up PR

#- $ref: '#/ExternalData'

EncodingArray:
description: Array of encodings, base64 encoded.
type: array
items:
- type: string
format: byte
description: Base64 encoded CLK data

BlockMap:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is BlockMap compulsory? What if there are only encodings from external object. Would BlockMap become arrays of same blockid?

Copy link
Collaborator Author

@hardbyte hardbyte Apr 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it isn't required, see above where it is referenced - only the encodings are required:

EncodingUpload:
      description: Object that contains one data provider's encodings
      type: object
      required: [encodings]
      properties:
        encodings:
          ...
        blocks:
          oneOf:
            - $ref: '#/components/schemas/BlockMap'

What if there are only encodings from external object. Would BlockMap become arrays of same blockid?

blocks (schema/type BlockMap) would be the same regardless of whether the encodings are externally provided or directly provided. The blocks are a map from encoding id to block ids:

{
  "1": ["block1", "block2"],
  "2": ["block2"]
}

description: Blocking information for encodings. A mapping from encoding id (a string) to a list of block ids
type: object
additionalProperties:
type: array
items:
- type: string
description: Block ID
example:
"1": ["block1", "block2"]
"2": []
"3": ["block1"]
ExternalData:
description: A file in an object store.
type: object
required: [file]
properties:
credentials:
type: object
description: |
Optional credentials to pull the file from the object store.

Not required if using the Anonlink Entity Service's own object store.
properties:
AccessKeyId:
type: string
SecretAccessKey:
type: string
SessionToken:
type: string
file:
type: object
required: [bucket, path]
properties:
bucket:
type: string
example: anonlink-uploads
path:
type: string
description: The object name in the bucket.
example: project-foo/encodings.bin
endpoint:
type: string
description: |
Object store endpoint - usually a public endpoint for a MinIO as part of an Anonlink deployment e.g.
`minio.anonlink.easd.data61.xyz`, or a public (region specific) endpoint for AWS S3:
`s3.ap-southeast-2.amazonaws.com`.

If not given the Anonlink Entity Service's own object store will be assumed.
example: s3.ap-southeast-2.amazonaws.com
secure:
type: boolean
default: true
description: If this object store should be connected to only over a secure connection.

CLKUpload:
description: Object that contains this party's Bloom Filters
type: object
required: [clks]
properties:
clks:
type: array
items:
type: string
format: byte
description: Base64 encoded CLK data
$ref: '#/components/schemas/EncodingArray'

CLKnBlockUpload:
description: Object that contains this party's Bloom Filters including blocking information
Expand Down
55 changes: 54 additions & 1 deletion backend/entityservice/encoding_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@
from typing import Iterator, List, Tuple

import ijson
import opentracing
from flask import g
from structlog import get_logger

from entityservice import database as db
from entityservice.database import insert_encodings_into_blocks, get_encodingblock_ids, \
get_chunk_of_encodings
get_chunk_of_encodings, DBConn
from entityservice.serialization import deserialize_bytes, binary_format, binary_unpack_filters
from entityservice.utils import fmt_bytes

logger = get_logger()

DEFAULT_BLOCK_ID = '1'


def stream_json_clksnblocks(f):
Expand Down Expand Up @@ -110,3 +119,47 @@ def get_encoding_chunk(conn, chunk_info, encoding_size=128):
chunk_data = binary_unpack_filters(encoding_iter, encoding_size=encoding_size)
return chunk_data, len(chunk_data)


def upload_clk_data_binary(project_id, dp_id, encoding_iter, receipt_token, count, size=128):
"""
Save the user provided binary-packed CLK data.

"""
filename = None
# Set the state to 'pending' in the uploads table
with DBConn() as conn:
db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, encoding_count=count, block_count=1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's with the filename = None business?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just a way of keeping the column in the uploads table set to NULL.

db.update_encoding_metadata_set_encoding_size(conn, dp_id, size)
num_bytes = binary_format(size).size * count

logger.debug("Directly storing binary file with index, base64 encoded CLK, popcount")

# Upload to database
logger.info(f"Uploading {count} binary encodings to database. Total size: {fmt_bytes(num_bytes)}")
parent_span = g.flask_tracer.get_span()

with DBConn() as conn:
with opentracing.tracer.start_span('create-default-block-in-db', child_of=parent_span):
db.insert_blocking_metadata(conn, dp_id, {DEFAULT_BLOCK_ID: count})

with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span):
store_encodings_in_db(conn, dp_id, encoding_iter, size)

with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span):
db.update_encoding_metadata(conn, filename, dp_id, 'ready')


def include_encoding_id_in_binary_stream(stream, size, count):
"""
Inject an encoding_id and default block into a binary stream of encodings.
"""

binary_formatter = binary_format(size)

def encoding_iterator(filter_stream):
# Assumes encoding id and block info not provided (yet)
for entity_id in range(count):
yield str(entity_id), binary_formatter.pack(entity_id, filter_stream.read(size)), [DEFAULT_BLOCK_ID]

return encoding_iterator(stream)

1 change: 1 addition & 0 deletions backend/entityservice/init-db-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ CREATE TABLE uploads (
token CHAR(48) NOT NULL UNIQUE,

-- Filename for the raw unprocessed uploaded data
-- Set to null where this is skipped (e.g. external data)
file CHAR(64) NULL,

state PROCESSEDSTATE NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion backend/entityservice/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from entityservice.tasks.pre_run_check import check_for_executable_runs
from entityservice.tasks.assert_valid_run import assert_valid_run
from entityservice.tasks.run import prerun_check
from entityservice.tasks.encoding_uploading import handle_raw_upload
from entityservice.tasks.encoding_uploading import handle_raw_upload, handle_external_data_pull
from entityservice.tasks.comparing import create_comparison_jobs, compute_filter_similarity, aggregate_comparisons
from entityservice.tasks.permutation import save_and_permute, permute_mapping_data
from entityservice.tasks.solver import solver_task
37 changes: 35 additions & 2 deletions backend/entityservice/tasks/encoding_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,50 @@

from entityservice.database import *
from entityservice.encoding_storage import stream_json_clksnblocks, convert_encodings_from_base64_to_binary, \
store_encodings_in_db
store_encodings_in_db, upload_clk_data_binary, include_encoding_id_in_binary_stream
from entityservice.error_checking import check_dataproviders_encoding, handle_invalid_encoding_data, \
InvalidEncodingError
from entityservice.object_store import connect_to_object_store
from entityservice.settings import Config
from entityservice.async_worker import celery, logger
from entityservice.tasks.base_task import TracedTask
from entityservice.tasks.pre_run_check import check_for_executable_runs
from entityservice.tracing import serialize_span
from entityservice.utils import fmt_bytes, clks_uploaded_to_project


@celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'dp_id'))
def handle_external_data_pull(project_id, dp_id, object_info, credentials, receipt_token, parent_span=None):
"""

"""
log = logger.bind(pid=project_id, dp_id=dp_id)
log.info("Pulling data from an object store")

with DBConn() as db:
if not check_project_exists(db, project_id):
log.info("Project deleted, stopping immediately")
return

mc = connect_to_object_store()
log.info("Retrieving metadata from object store")
stat = mc.stat_object(bucket_name=object_info['bucket'], object_name=object_info['path'])
logger.info(stat.metadata)
count = int(stat.metadata['X-Amz-Meta-Hash-Count'])
size = int(stat.metadata['X-Amz-Meta-Hash-Size'])
log.info("Retrieving file from object store")
response = mc.get_object(bucket_name=object_info['bucket'], object_name=object_info['path'])
stream = response.stream()
converted_stream = include_encoding_id_in_binary_stream(stream, size, count)
upload_clk_data_binary(project_id, dp_id, converted_stream, receipt_token, count, size)

# # Now work out if all parties have added their data
if clks_uploaded_to_project(project_id):
logger.info("All parties data present. Scheduling any queued runs")
check_for_executable_runs.delay(project_id, serialize_span(parent_span))



@celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'dp_id'))
def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
"""
Expand Down Expand Up @@ -45,7 +78,7 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
with DBConn() as db:
store_encodings_in_db(db, dp_id, pipeline, encoding_size)

log.info(f"Converted uploaded encodings of size {encoding_size} bytes into internal binary format. Number of blocks: {block_count}")
log.info(f"Converted uploaded encodings of size {fmt_bytes(encoding_size)} into internal binary format. Number of blocks: {block_count}")

# As this is the first time we've seen the encoding size actually uploaded from this data provider
# We check it complies with the project encoding size.
Expand Down
31 changes: 31 additions & 0 deletions backend/entityservice/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,37 @@
from entityservice.errors import InvalidConfiguration
from entityservice.utils import load_yaml_config
from entityservice.tests.util import generate_bytes, temp_file_containing
from entityservice.views import convert_encoding_upload_to_clknblock


class TestEncodingConversionUtils:

def test_convert_encoding_upload_to_clknblock_encodings_only(self):

out = convert_encoding_upload_to_clknblock({
"encodings": ['123', '456', '789']
})

assert 'clknblocks' in out
assert len(out['clknblocks']) == 3
assert out['clknblocks'][0] == ['123', '1']

def test_convert_encoding_upload_to_clknblock(self):

out = convert_encoding_upload_to_clknblock({
"encodings": ['123', '456', '789', '000'],
"blocks": {
'0': ['1', '2'],
'1': ['1'],
'2': []
}
})

assert 'clknblocks' in out
assert len(out['clknblocks']) == 3
assert out['clknblocks'][0] == ['123', '1', '2']
assert out['clknblocks'][1] == ['456', '1']
assert out['clknblocks'][2] == ['789', ]


class TestYamlLoader:
Expand Down
2 changes: 1 addition & 1 deletion backend/entityservice/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def clks_uploaded_to_project(project_id, check_data_ready=False):
""" See if the given project has had all parties contribute data.
"""
log = logger.bind(pid=project_id)
log.info("Counting contributing parties")
log.debug("Counting contributing parties")
with DBConn() as conn:
if check_data_ready:
parties_contributed = get_number_parties_ready(conn, project_id)
Expand Down
Loading