Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Extend upload endpoint to accept both types - clks and clknblocks
  • Loading branch information
joyceyuu authored and hardbyte committed Feb 20, 2020
commit 1c9e562151cf66c18d6907aaf6b4b430505d7576
6 changes: 6 additions & 0 deletions backend/entityservice/database/selections.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def check_project_exists(db, resource_id):
return query_result['count'] == 1


def get_uses_blocking(db, project_id):
sql_query = 'select uses_blocking from projects WHERE project_id = %s'
query_result = query_db(db, sql_query, [project_id], one=True)
return query_result['uses_blocking']


def check_run_exists(db, project_id, run_id):
sql_query = '''
SELECT count(*)
Expand Down
62 changes: 47 additions & 15 deletions backend/entityservice/views/project.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
from io import BytesIO
import tempfile

import minio
from flask import request
Expand Down Expand Up @@ -225,7 +227,11 @@ def project_clks_post(project_id):
# consume the stream first to validate it against the spec. Thus the backflip to fully reading the CLks as
# json into memory. -> issue #184

receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span)
# get flag use_blocking from table projects
with DBConn() as conn:
uses_blocking = db.get_uses_blocking(conn, project_id)

receipt_token, raw_file = upload_json_clk_data(dp_id, get_json(), span, uses_blocking)
# Schedule a task to deserialize the hashes, and carry
# out a pop count.
handle_raw_upload.delay(project_id, dp_id, receipt_token, parent_span=serialize_span(span))
Expand Down Expand Up @@ -355,39 +361,65 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128):
return receipt_token


def upload_json_clk_data(dp_id, clk_json, parent_span):
def upload_json_clk_data(dp_id, clk_json, parent_span, uses_blocking):
"""
Convert user provided encodings from json array of base64 data into
a newline separated file of base64 data.

Note this implementation is non-streaming.
"""
if 'clks' not in clk_json or len(clk_json['clks']) < 1:
safe_fail_request(400, message="Missing CLKs information")
# check if the following combinations are true
# - uses_blocking is False AND 'clks' element in upload JSON
# - uses_blocking if True AND 'clknblocks' element in upload JSON
# otherwise, return safe_fail_request
hold_condition1 = not uses_blocking and 'clks' in clk_json
hold_condition2 = uses_blocking and 'clknblocks' in clk_json

if not (hold_condition1 or hold_condition2):
# fail condition1 - uses_blocking is True but uploaded element is "clks"
if uses_blocking and 'clks' in clk_json:
safe_fail_request(400, message='Uploaded element is "clks" while expecting "clknblocks"')
# fail condition2 - uses_blocking is False but uploaded element is "clknblocks"
if not uses_blocking and 'clknblocks' in clk_json:
safe_fail_request(400, message='Uploaded element is "clknblocks" while expecting "clks"')
# fail condition3 - "clks" exist in JSON but there is no data
if 'clks' in clk_json and len(clk_json['clks']) < 1:
safe_fail_request(400, message='Missing CLKs information')
# fail condition4 - "clknblocks" exist in JSON but there is no data
if 'clknblocks' in clk_json and len(clk_json['clknblocks']) < 1:
safe_fail_request(400, message='Missing CLK and Blocks information')
# fail condition5 - unknown element in JSON
if 'clks' not in clk_json and 'clknblocks' not in clk_json:
safe_fail_request(400, message='Unknown upload element - expect "clks" or "clknblocks"')

# now we need to know element name - clks or clknblocks
element = 'clks' if hold_condition1 else 'clknblocks'

receipt_token = generate_code()

filename = Config.RAW_FILENAME_FMT.format(receipt_token)
logger.info("Storing user {} supplied clks from json".format(dp_id))
logger.info("Storing user {} supplied {} from json".format(dp_id, element))

with opentracing.tracer.start_span('splitting-json-clks', child_of=parent_span) as span:
count = len(clk_json['clks'])
span.set_tag("clks", count)
data = b''.join(''.join(clk.split('\n')).encode() + b'\n' for clk in clk_json['clks'])
count = len(clk_json[element])
span.set_tag(element, count)

logger.info(f"Received {count} encodings.")

# write clk_json into a temp file
_, tmp_filename = tempfile.mkstemp()

num_bytes = len(data)
span.set_tag("num_bytes", num_bytes)
buffer = BytesIO(data)
logger.info('Writing uploaded {} JSON to {}'.format(element.upper(), tmp_filename))
with open(tmp_filename, 'w') as f:
json.dump(clk_json, f)

logger.info(f"Received {count} encodings. Uploading {fmt_bytes(num_bytes)} to object store")
with opentracing.tracer.start_span('save-clk-file-to-quarantine', child_of=parent_span) as span:
span.set_tag('filename', filename)
mc = connect_to_object_store()
mc.put_object(
mc.fput_object(
Config.MINIO_BUCKET,
filename,
data=buffer,
length=num_bytes
tmp_filename
)

with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span):
Expand Down