Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions backend/entityservice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
from flask import g, request
import structlog
from tenacity import RetryError
try:
import ijson.backends.yajl2_cffi as ijson
except ImportError:
import ijson

from entityservice.logger_setup import setup_logging

Expand Down
32 changes: 29 additions & 3 deletions backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,12 @@ def get_encodingblock_ids(db, dp_id, block_name=None):
FROM encodingblocks
WHERE dp = %s
{}
ORDER BY
encoding_ID ASC
""".format("AND block_id = %s" if block_name else "")
# Specifying a name for the cursor creates a server-side cursor, which prevents all of the
# records from being downloaded at once.
cur = db.cursor(f'encodingfetcher-{dp_id}')
cur = db.cursor(f'encodingblockfetcher-{dp_id}')

args = (dp_id, block_name) if block_name else (dp_id,)

Expand All @@ -298,12 +300,36 @@ def get_encodingblock_ids(db, dp_id, block_name=None):
yield row[0]


def get_encodings_by_id_range(db, dp_id, encoding_id_min=None, encoding_id_max=None):
"""Yield all encodings in a given range for a given data provider."""
sql_query = """
SELECT encoding
FROM encodings
WHERE dp = %s
{}
{}
ORDER BY
encoding_id ASC
""".format(
f"AND encoding_id >= {encoding_id_min}" if encoding_id_min else "",
f"AND encoding_id < {encoding_id_max}" if encoding_id_max else "",
)
cur = db.cursor()
cur.execute(sql_query, (dp_id,))
rows = cur.fetchall()
for row in rows:
# Note row[0] is a memoryview
yield bytes(row[0])


def get_filter_metadata(db, dp_id):
"""
:return: The filename and the encoding size of the raw clks.
:return: The filename (which could be None), and the encoding size of the raw clks.
"""
filename, encoding_size = get_uploads_columns(db, dp_id, ['file', 'encoding_size'])
return filename.strip(), encoding_size
if filename is not None:
filename = filename.strip()
return filename, encoding_size


def get_encoding_metadata(db, dp_id):
Expand Down
35 changes: 9 additions & 26 deletions backend/entityservice/encoding_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import ijson

from entityservice.database import insert_encodings_into_blocks
from entityservice.serialization import deserialize_bytes, binary_format
from entityservice.database import insert_encodings_into_blocks, get_encodings_by_id_range
from entityservice.serialization import deserialize_bytes, binary_format, binary_unpack_filters


def stream_json_clksnblocks(f):
Expand Down Expand Up @@ -50,6 +50,7 @@ def generator(first_i, first_encoding_data, first_blocks):

return encoding_size, generator(i, encoding_data, blocks)


def _grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
Expand Down Expand Up @@ -98,29 +99,11 @@ def _estimate_group_size(encoding_size):
return math.ceil(network_transaction_size / ((blocks_per_record_estimate * 64) + (encoding_size + 4)))


def convert_encodings_from_json_to_binary(f):
"""
Temp helper function
def get_encoding_chunk(conn, chunk_info, encoding_size=128):
chunk_range_start, chunk_range_stop = chunk_info['range']
dataprovider_id = chunk_info['dataproviderId']

:param f: File-like object containing `clksnblocks` json.
:return: a tuple comprising:
- dict mapping blocks to lists of bytes (each encoding in our internal encoding file format)
- the size of the first encoding in bytes (excluding entity ID info)
"""
# Each block index contains a set of base64 encodings.
encodings_by_block = {}

# Default which is ignored but makes IDE/typechecker happier
bit_packing_struct = binary_format(128)
encoding_size = None

for i, encoding_data, blocks in stream_json_clksnblocks(f):
if encoding_size is None:
encoding_size = len(encoding_data)
bit_packing_struct = binary_format(encoding_size)
binary_packed_encoding = bit_packing_struct.pack(i, encoding_data)
for block in blocks:
encodings_by_block.setdefault(block, []).append(binary_packed_encoding)

return encodings_by_block, encoding_size
encoding_data_stream = get_encodings_by_id_range(conn, dataprovider_id, chunk_range_start, chunk_range_stop)
chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size)
return chunk_data, len(chunk_data)

3 changes: 2 additions & 1 deletion backend/entityservice/error_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ def handle_invalid_encoding_data(project_id, dp_id):
with DBConn() as conn:
filename, _ = get_filter_metadata(conn, dp_id)
update_encoding_metadata(conn, 'DELETED', dp_id, state='error')
delete_minio_objects.delay([filename], project_id)
if filename is not None:
delete_minio_objects.delay([filename], project_id)
2 changes: 1 addition & 1 deletion backend/entityservice/init-db-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ CREATE TABLE uploads (
token CHAR(48) NOT NULL UNIQUE,

-- Filename for the raw unprocessed uploaded data
file CHAR(64) NOT NULL,
file CHAR(64) NULL,

state PROCESSEDSTATE NOT NULL,

Expand Down
23 changes: 8 additions & 15 deletions backend/entityservice/integrationtests/dbtests/test_insertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import psycopg2
from pytest import raises

from entityservice.database import insert_dataprovider, insert_new_project, \
insert_encodings_into_blocks, insert_blocking_metadata, get_project, get_encodingblock_ids
from entityservice.database import insert_dataprovider, insert_encodings_into_blocks, insert_blocking_metadata, \
get_project, get_encodingblock_ids, get_encodings_by_id_range
from entityservice.models import Project
from entityservice.tests.util import generate_bytes
from entityservice.utils import generate_code
Expand Down Expand Up @@ -74,26 +74,19 @@ def test_insert_many_clks(self):
num_entities = 10_000
blocks = [['1'] for _ in range(num_entities)]
encodings = [data[i % 100] for i in range(num_entities)]
start_time = time.perf_counter()
insert_encodings_into_blocks(conn, dp_id,
block_ids=blocks,
encoding_ids=list(range(num_entities)),
encodings=encodings
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
# This takes ~0.5s using docker compose on a ~5yo desktop.
# If the database is busy - e.g. if you're running integration
# tests and e2e tests at the same time, this assertion could fail.
assert elapsed_time < 2

stored_encoding_ids = list(get_encodingblock_ids(conn, dp_id, '1'))
fetch_time = time.perf_counter() - end_time
# retrieval of encoding ids should be much faster than insertion
assert fetch_time < elapsed_time

assert len(stored_encoding_ids) == num_entities
for stored_encoding_id, original in zip(stored_encoding_ids, range(num_entities)):
assert stored_encoding_id == original
for stored_encoding_id, original_id in zip(stored_encoding_ids, range(num_entities)):
assert stored_encoding_id == original_id

# TODO fetch binary encodings and verify against uploaded
stored_encodings = list(get_encodings_by_id_range(conn, dp_id))
assert len(stored_encodings) == num_entities
for stored_encoding, original_encoding in zip(stored_encodings, encodings):
assert stored_encoding == original_encoding
3 changes: 0 additions & 3 deletions backend/entityservice/object_store.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import minio
from structlog import get_logger
import psycopg2

from entityservice.database import insert_similarity_score_file
from entityservice.errors import RunDeleted
from entityservice.settings import Config as config

logger = get_logger('objectstore')
Expand Down
38 changes: 2 additions & 36 deletions backend/entityservice/serialization.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import io

import typing
import urllib3
from bitarray import bitarray

import base64
import struct

Expand All @@ -12,7 +11,7 @@

from entityservice.object_store import connect_to_object_store
from entityservice.settings import Config as config
from entityservice.utils import chunks, safe_fail_request, iterable_to_stream
from entityservice.utils import chunks, safe_fail_request
import concurrent.futures


Expand Down Expand Up @@ -103,39 +102,6 @@ def binary_unpack_filters(data_iterable, max_bytes=None, encoding_size=None):
return filters


def deserialize_filters(filters):
"""
Deserialize iterable of base64 encoded clks.

Carrying out the popcount and adding the index as we go.
"""
res = []
for i, f in enumerate(filters):
ba = deserialize_bitarray(f)
res.append((ba, i, ba.count()))
return res


def deserialize_filters_concurrent(filters):
"""
Deserialize iterable of base64 encoded clks.

Carrying out the popcount and adding the index as we go.
"""
res = []
chunk_size = int(100000)
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = []
for i, chunk in enumerate(chunks(filters, chunk_size)):
future = executor.submit(deserialize_filters, chunk)
futures.append(future)

for future in futures:
res.extend(future.result())

return res


def generate_scores(candidate_pair_stream: typing.BinaryIO):
"""
Processes a TextIO stream of candidate pair similarity scores into
Expand Down
Loading