Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
065b11a
Add function to fetch block ids and sizes from db
hardbyte Mar 4, 2020
6f7c75c
Retrieve blocking info in create_comparison_jobs task
hardbyte Mar 4, 2020
ebcb248
WIP - identify blocks that need to be broken up further
hardbyte Mar 4, 2020
a066ccc
Query for getting encodings in a block
hardbyte Mar 6, 2020
fb550de
Split tasks into chunks using blocking information
hardbyte Mar 6, 2020
610b3bb
Refactor create comparison jobs function
hardbyte Mar 8, 2020
d838fe4
More refactoring of chunk creation
hardbyte Mar 9, 2020
ec36e8d
Add a few unit tests for chunking
hardbyte Mar 9, 2020
ddcbcc3
Add database index on encodings table
hardbyte Mar 9, 2020
4ab16e6
clknblocks not clksnblocks and other minor cleanup
hardbyte Mar 10, 2020
d66bf58
cleanup
hardbyte Mar 10, 2020
1e5151f
Add blocking concept to docs
hardbyte Mar 13, 2020
aec9b5c
Deduplicate candidate pairs before solving
hardbyte Mar 15, 2020
f30c819
Catch the empty candidate pair case
hardbyte Mar 15, 2020
9dc59e1
Simplify solver task by using anonlink's _merge_similarities function
hardbyte Mar 16, 2020
6219e44
Update celery
hardbyte Mar 16, 2020
0b6a4c2
Address code review feedback
hardbyte Mar 18, 2020
5467362
Bump version to beta2
hardbyte Mar 18, 2020
f342d5a
Celery concurrency defaults
hardbyte Mar 19, 2020
2add5ef
Add another layer of tracing into the comparison task
hardbyte Mar 19, 2020
e2ebe99
Update task names in celery routing
hardbyte Mar 19, 2020
38b624f
Faster encoding retrieval by using COPY.
hardbyte Mar 22, 2020
7ec7fef
Pass on stored size when retrieving encodings from DB
hardbyte Mar 22, 2020
24caa79
Increase time on test
hardbyte Mar 22, 2020
dc1983b
Refactor binary copy into own function for easier reuse and testing
hardbyte Mar 23, 2020
8bae410
Add more detailed tracing around binary encoding insertions.
hardbyte Mar 24, 2020
88e968d
Add tests for binary copy function
hardbyte Mar 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/entityservice/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.13.0-beta
v1.13.0-beta2
11 changes: 7 additions & 4 deletions backend/entityservice/database/insertions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List

import opentracing
import psycopg2
import psycopg2.extras

Expand Down Expand Up @@ -95,13 +96,15 @@ def insert_encodings_into_blocks(db, dp_id: int, block_ids: List[List[str]], enc
def block_data_generator(encoding_ids, block_ids):
for eid, block_ids in zip(encoding_ids, block_ids):
for block_id in block_ids:
yield (dp_id, eid, block_id)
yield dp_id, eid, block_id

with db.cursor() as cur:
psycopg2.extras.execute_values(cur, encodings_insertion_query, encoding_data, page_size=page_size)
psycopg2.extras.execute_values(cur,
with opentracing.tracer.start_span('insert-encodings-to-db'):
psycopg2.extras.execute_values(cur, encodings_insertion_query, encoding_data, page_size=page_size)
with opentracing.tracer.start_span('insert-encodingblocks-to-db'):
psycopg2.extras.execute_values(cur,
blocks_insertion_query,
block_data_generator(encoding_ids, block_ids),
list(block_data_generator(encoding_ids, block_ids)),
page_size=page_size)


Expand Down
107 changes: 82 additions & 25 deletions backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import itertools

from entityservice.database.util import query_db, logger
Expand Down Expand Up @@ -275,51 +276,107 @@ def get_uploads_columns(db, dp_id, columns):
return [result[column] for column in columns]


def get_encodingblock_ids(db, dp_id, block_name=None):
def get_encodingblock_ids(db, dp_id, block_id=None, offset=0, limit=None):
"""Yield all encoding ids in either a single block, or all blocks for a given data provider."""
sql_query = """
SELECT encoding_id
FROM encodingblocks
WHERE dp = %s
WHERE dp = %(dp_id)s
{}
ORDER BY
encoding_ID ASC
""".format("AND block_id = %s" if block_name else "")
encoding_ID ASC
OFFSET %(offset)s
LIMIT %(limit)s
""".format("AND block_id = %(block_id)s" if block_id else "")
# Specifying a name for the cursor creates a server-side cursor, which prevents all of the
# records from being downloaded at once.
cur = db.cursor(f'encodingblockfetcher-{dp_id}')
args = {'dp_id': dp_id, 'block_id': block_id, 'offset': offset, 'limit': limit}
cur.execute(sql_query, args)
yield from iterate_cursor_results(cur)

args = (dp_id, block_name) if block_name else (dp_id,)

def get_block_metadata(db, dp_id):
"""Yield block id and counts for a given data provider."""
sql_query = """
SELECT block_name, count
FROM blocks
WHERE dp = %s
"""
# Specifying a name for the cursor creates a server-side cursor, which prevents all of the
# records from being downloaded at once.
cur = db.cursor(f'blockfetcher-{dp_id}')
args = (dp_id,)
cur.execute(sql_query, args)
for block_name, count in iterate_cursor_results(cur, one=False):
yield block_name.strip(), count


def iterate_cursor_results(cur, one=True, page_size=4096):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 'one' argument is a bit confusing. From the name alone it is not obvious what it does.
Do we really need that?
Wouldn't it be cleaner to always yield the full results? That's what the function name says.
We could just call it like this:
for block_name, _ in iterate_cursor_results(cur):

while True:
rows = cur.fetchmany(10_000)
rows = cur.fetchmany(page_size)
if not rows:
break
for row in rows:
yield row[0]
if one:
yield row[0]
else:
yield row


def copy_binary_column_from_select_query(cur, select_query, stored_binary_size=132):
"""Yields raw bytes from postgres given a query returning a column containing fixed size bytea data.

:param select_query: An sql query that select's a single binary column. Include ordering the results.
:param stored_binary_size: Fixed size of each bytea data.
:raises AssertionError if the database implements an unhandled extension or the EOF is corrupt.
"""

copy_to_stream_query = """COPY ({}) TO STDOUT WITH binary""".format(select_query)
stream = io.BytesIO()
cur.copy_expert(copy_to_stream_query, stream)

raw_data = stream.getvalue()

# Need to read/remove the Postgres Binary Header, Trailer, and the per tuple info
# https://www.postgresql.org/docs/current/sql-copy.html
_ignored_header = raw_data[:15]
header_extension = raw_data[15:19]
assert header_extension == b'\x00\x00\x00\x00', "Need to implement skipping postgres binary header extension"
binary_trailer = raw_data[-2:]
assert binary_trailer == b'\xff\xff', "Corrupt COPY of binary data from postgres"
raw_data = raw_data[19:-2]

# The first 6 bytes of each row contains: tuple field count and field length
per_row_header_size = 6
size = stored_binary_size + per_row_header_size
for i in range(0, len(raw_data), size):
start_index = i + per_row_header_size
end_index = start_index + stored_binary_size
yield raw_data[start_index: end_index]


def get_chunk_of_encodings(db, dp_id, encoding_ids, stored_binary_size=132):
"""Yields raw byte encodings for a data provider given the encoding ids.

:param dp_id: Fetch encodings from this dataprovider (encoding ids are not unique across data providers).
:param encoding_ids: List of ints of the encoding ids to include.
:param stored_binary_size: Size of each encoding stored in the database. Including encoding ids.
"""

cur = db.cursor()

def get_encodings_by_id_range(db, dp_id, encoding_id_min=None, encoding_id_max=None):
"""Yield all encodings in a given range for a given data provider."""
sql_query = """
SELECT encoding
FROM encodings
WHERE dp = %s
{}
{}
ORDER BY
encoding_id ASC
""".format(
f"AND encoding_id >= {encoding_id_min}" if encoding_id_min else "",
f"AND encoding_id < {encoding_id_max}" if encoding_id_max else "",
SELECT encoding
FROM encodings
WHERE encodings.dp = {}
AND encodings.encoding_id in ({})
ORDER BY encoding_id ASC
""".format(
dp_id,
','.join(map(str, encoding_ids))
)
cur = db.cursor()
cur.execute(sql_query, (dp_id,))
rows = cur.fetchall()
for row in rows:
# Note row[0] is a memoryview
yield bytes(row[0])
yield from copy_binary_column_from_select_query(cur, sql_query, stored_binary_size=stored_binary_size)


def get_filter_metadata(db, dp_id):
Expand Down
17 changes: 10 additions & 7 deletions backend/entityservice/encoding_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import ijson

from entityservice.database import insert_encodings_into_blocks, get_encodings_by_id_range
from entityservice.database import insert_encodings_into_blocks, get_encodingblock_ids, \
get_chunk_of_encodings
from entityservice.serialization import deserialize_bytes, binary_format, binary_unpack_filters


Expand All @@ -14,7 +15,7 @@ def stream_json_clksnblocks(f):
the following structure:

{
"clksnblocks": [
"clknblocks": [
["BASE64 ENCODED ENCODING 1", blockid1, blockid2, ...],
["BASE64 ENCODED ENCODING 2", blockid1, ...],
...
Expand All @@ -25,7 +26,7 @@ def stream_json_clksnblocks(f):
:return: Generator of (entity_id, base64 encoding, list of blocks)
"""
# At some point the user may supply the entity id. For now we use the order of uploaded encodings.
for i, obj in enumerate(ijson.items(f, 'clksnblocks.item')):
for i, obj in enumerate(ijson.items(f, 'clknblocks.item')):
b64_encoding, *blocks = obj
yield i, deserialize_bytes(b64_encoding), blocks

Expand All @@ -52,7 +53,7 @@ def generator(first_i, first_encoding_data, first_blocks):


def _grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
"Collect data into fixed-length chunks or blocks from an iterable"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
Expand Down Expand Up @@ -102,8 +103,10 @@ def _estimate_group_size(encoding_size):
def get_encoding_chunk(conn, chunk_info, encoding_size=128):
chunk_range_start, chunk_range_stop = chunk_info['range']
dataprovider_id = chunk_info['dataproviderId']

encoding_data_stream = get_encodings_by_id_range(conn, dataprovider_id, chunk_range_start, chunk_range_stop)
chunk_data = binary_unpack_filters(encoding_data_stream, encoding_size=encoding_size)
block_id = chunk_info['block_id']
limit = chunk_range_stop - chunk_range_start
encoding_ids = get_encodingblock_ids(conn, dataprovider_id, block_id, chunk_range_start, limit)
encoding_iter = get_chunk_of_encodings(conn, dataprovider_id, encoding_ids, stored_binary_size=(encoding_size+4))
chunk_data = binary_unpack_filters(encoding_iter, encoding_size=encoding_size)
return chunk_data, len(chunk_data)

5 changes: 3 additions & 2 deletions backend/entityservice/init-db-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ CREATE TABLE encodings (
PRIMARY KEY (dp, encoding_id)
);

CREATE INDEX ON encodings (dp, encoding_id);

-- Table mapping blocks to encodings
CREATE TABLE encodingblocks (
Expand All @@ -188,8 +189,8 @@ CREATE TABLE encodingblocks (
FOREIGN KEY (dp, block_id) REFERENCES blocks (dp, block_name)
);



CREATE INDEX ON encodingblocks (dp, block_id);
CREATE INDEX ON encodingblocks (encoding_id);

CREATE TABLE run_results (
-- Just the table index
Expand Down
13 changes: 13 additions & 0 deletions backend/entityservice/integrationtests/dbtests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import psycopg2

from entityservice.settings import Config as config


def _get_conn_and_cursor():
db = config.DATABASE
host = config.DATABASE_SERVER
user = config.DATABASE_USER
password = config.DATABASE_PASSWORD
conn = psycopg2.connect(host=host, dbname=db, user=user, password=password)
cursor = conn.cursor()
return conn, cursor
57 changes: 57 additions & 0 deletions backend/entityservice/integrationtests/dbtests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pytest
import psycopg2

from entityservice.settings import Config as config


@pytest.fixture
def conn():
db = config.DATABASE
host = config.DATABASE_SERVER
user = config.DATABASE_USER
password = config.DATABASE_PASSWORD
conn = psycopg2.connect(host=host, dbname=db, user=user, password=password)
yield conn
conn.close()

@pytest.fixture
def cur(conn):
return conn.cursor()




@pytest.fixture()
def prepopulated_binary_test_data(conn, cur, num_bytes=4, num_rows=100):
creation_sql = """
DROP TABLE IF EXISTS binary_test;
CREATE TABLE binary_test
(
id integer not null,
encoding bytea not null
);"""
cur.execute(creation_sql)
conn.commit()

# Add data using execute_values
data = [(i, bytes([i % 128] * num_bytes)) for i in range(num_rows)]
psycopg2.extras.execute_values(cur, """
INSERT INTO binary_test (id, encoding) VALUES %s
""", data)

conn.commit()

# quick check data is there
cur.execute("select count(*) from binary_test")
res = cur.fetchone()[0]
assert res == num_rows

cur.execute("select encoding from binary_test where id = 1")
assert bytes(cur.fetchone()[0]) == data[1][1]

yield data

# delete test table
deletion_sql = "drop table if exists binary_test cascade;"
cur.execute(deletion_sql)
conn.commit()
Loading