Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
extend binary format to also include entity id.
  • Loading branch information
wilko committed Feb 17, 2020
commit e77c624fe9f91a2717ba2e35da72df3375bb1e4c
29 changes: 20 additions & 9 deletions backend/entityservice/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ def binary_format(encoding_size):

The binary format string can be understood as:
- "!" Use network byte order (big-endian).
- "I" store the entity ID as an unsigned int
- "<encoding size>s" Store the n (e.g. 128) raw bytes of the bitarray

https://docs.python.org/3/library/struct.html
"""
bit_packing_fmt = f"!{encoding_size}s"
bit_packing_fmt = f"!I{encoding_size}s"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are just going to pretend the old format doesn't exist aren't we? Should we take the opportunity to add a version byte to the format?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an internal format, so yes, I pretend that it never existed. I'm old, I forget quickly.
A version byte for each filter would mean a lot of version bytes in the object store. This should rather be solved via a header. But again, since this is only used internally in the service, we will never have to differentiate between the versions anyway and thus, the versioning is redundant.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was meaning a header for the file rather than for each encoding. But sure not worth it right now.

bit_packing_struct = struct.Struct(bit_packing_fmt)
return bit_packing_struct

Expand All @@ -54,31 +55,41 @@ def binary_pack_filters(filters, encoding_size):
"""Efficient packing of bloomfilters.

:param filters:
An iterable of bytes as produced by deserialize_bytes.
An iterable of tuples, with
- first element is the entity ID as an unsigned int
- second element is 'encoding_size' number of bytes as produced by deserialize_bytes.

:return:
An iterable of bytes.
"""
bit_packing_struct = binary_format(encoding_size)

for hash_bytes in filters:
yield bit_packing_struct.pack(hash_bytes)
yield bit_packing_struct.pack(*hash_bytes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⭐️



def binary_unpack_one(data, bit_packing_struct):
clk_bytes, = bit_packing_struct.unpack(data)
return clk_bytes
entity_id, clk_bytes, = bit_packing_struct.unpack(data)
return entity_id, clk_bytes


def binary_unpack_filters(streamable_data, max_bytes=None, encoding_size=None):
def binary_unpack_filters(data_iterable, max_bytes=None, encoding_size=None):
"""
Unpack filters that were packed with the 'binary_pack_filters' method.

:param data_iterable: an iterable of binary packed filters.
:param max_bytes: if present, only read up to 'max_bytes' bytes.
:param encoding_size: the encoding size of one filter, excluding the entity ID info
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we bounce between bytes and bits I'd try be explicit about the size.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

:return: list of filters with their corresponding entity IDs as a list of tuples.
"""
assert encoding_size is not None
bit_packed_element = binary_format(encoding_size)
bit_packed_element_size = bit_packed_element.size
filters = []
bytes_consumed = 0

logger.info(f"Unpacking stream of encodings with size {encoding_size} - packed as {bit_packed_element_size}")
for raw_bytes in streamable_data.stream(bit_packed_element_size):
logger.info(f"Iterating over encodings of size {encoding_size} - packed as {bit_packed_element_size}")
for raw_bytes in data_iterable:
filters.append(binary_unpack_one(raw_bytes, bit_packed_element))

bytes_consumed += bit_packed_element_size
Expand Down Expand Up @@ -185,6 +196,6 @@ def get_chunk_from_object_store(chunk_info, encoding_size=128):
bit_packed_element_size * chunk_range_start,
chunk_bytes)

chunk_data = binary_unpack_filters(chunk_stream, chunk_bytes, encoding_size)
chunk_data = binary_unpack_filters(chunk_stream.stream(bit_packed_element_size), chunk_bytes, encoding_size)

return chunk_data, chunk_length
25 changes: 21 additions & 4 deletions backend/entityservice/tests/test_serialization.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import io
import unittest
import random

import json
import random
import unittest
from array import array

import anonlink

from entityservice.serialization import deserialize_bytes, generate_scores
from entityservice.serialization import deserialize_bytes, generate_scores, binary_pack_filters, \
binary_unpack_filters, binary_unpack_one, binary_format
from entityservice.tests.util import serialize_bytes, generate_bytes


Expand Down Expand Up @@ -69,6 +69,23 @@ def test_sims_to_json_empty(self):
self.assertIn('similarity_scores', json_obj)
assert len(json_obj["similarity_scores"]) == 0

def test_binary_pack_filters(self):
encoding_size = 128
filters = [(random.randint(0, 2 ** 32 - 1), generate_bytes(encoding_size)) for _ in range(10)]
packed_filters = binary_pack_filters(filters, encoding_size)
bin_format = binary_format(encoding_size)
for filter, packed_filter in zip(filters, packed_filters):
assert len(packed_filter) == encoding_size + 4
unpacked = binary_unpack_one(packed_filter, bin_format)
assert filter == unpacked

def test_binary_unpack_filters(self):
encoding_size = 128
filters = [(random.randint(0, 2 ** 32 - 1), generate_bytes(encoding_size)) for _ in range(10)]
laundered_filters = binary_unpack_filters(binary_pack_filters(filters, encoding_size),
encoding_size=encoding_size)
assert filters == laundered_filters


if __name__ == "__main__":
unittest.main()