Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
150 commits
Select commit Hold shift + click to select a range
02b7f95
Bump app version to 1.13.0-beta, frontend version to 1.4.6-beta
hardbyte Feb 10, 2020
881a6e1
Update worker autoscaler configuration
hardbyte Feb 10, 2020
21d20e0
Update changelog for 1.13.0-beta release
hardbyte Feb 10, 2020
81d6850
Feature openapi3 with blocking data upload support (#479)
wilko77 Feb 12, 2020
52fd307
Allow user to provide arbitrary config
hardbyte Feb 11, 2020
7688035
Allow user to provide arbitrary deployment annotations
hardbyte Feb 11, 2020
f508dee
Try use default tracing connection
hardbyte Feb 11, 2020
37182ee
Use default tracing connection for api too
hardbyte Feb 11, 2020
1ceea55
Extract load_yaml_config from setup_logging and add tests
hardbyte Feb 11, 2020
8c64217
K8s: Add a config file for tracing. Always mount all config files in …
hardbyte Feb 11, 2020
2cb9a3e
Remove redundant helper function from test_serialization
hardbyte Feb 11, 2020
7ccfda4
Remove individual tracing setting in favor of passing in a file path.
hardbyte Feb 11, 2020
d160540
Use tracing config file or default
hardbyte Feb 11, 2020
78b5603
Strings to yaml not objects
hardbyte Feb 12, 2020
ff633c8
Add config volume to db init job
hardbyte Feb 12, 2020
7938529
Try to use jaeger during CI testing for api
hardbyte Feb 12, 2020
0771973
Use default tracing connection for workers
hardbyte Feb 12, 2020
9a4c8bc
Try enable tracing
hardbyte Feb 12, 2020
f0481d5
Use default tracing connection for api too
hardbyte Feb 12, 2020
016a3c8
Ok enough of azure, just enable the tracing annotation by default
hardbyte Feb 12, 2020
d13eb79
Use default logging config in k8s
hardbyte Feb 12, 2020
1920a94
fix filename typo
hardbyte Feb 12, 2020
bac1025
cleanup after review
hardbyte Feb 12, 2020
381acc4
Only log celery container from worker pod
hardbyte Feb 12, 2020
ff024d4
Remove a stray print statement
hardbyte Feb 13, 2020
d39f530
Minor tweaks to improve logging
hardbyte Feb 13, 2020
34e7c68
Add docs on debugging locally (#504)
hardbyte Feb 19, 2020
88c3705
Feature extend binary format (#505)
wilko77 Feb 19, 2020
d5787ee
Refactor to use a separate base image in Docker
hardbyte Sep 26, 2019
7c6e4af
CI building of base image
hardbyte Feb 17, 2020
a95b6fb
Build Anonlink app using tag for base image
hardbyte Feb 17, 2020
a8195fe
Document base image
hardbyte Feb 17, 2020
6e23392
Update a few dependencies in base image
hardbyte Feb 17, 2020
e0b71a9
Update alpine to 3.11.3 and pin all c dependencies
hardbyte Feb 19, 2020
aad76e5
Update structlog
hardbyte Feb 19, 2020
b37ba6d
Update devops docs
hardbyte Feb 19, 2020
e381a10
Bump pytest from 5.1.3 to 5.3.5 in /base (#508)
dependabot-preview[bot] Feb 19, 2020
318fef1
Bump tornado from 4.5.3 to 6.0.3 in /base
dependabot-preview[bot] Feb 19, 2020
7a927a5
Detect invalid encoding size earlier (#507)
hardbyte Feb 20, 2020
b5a7705
Feature blocking (#510)
hardbyte Feb 23, 2020
296ee5a
Bump ijson from 2.3 to 2.6.1 in /base
dependabot-preview[bot] Feb 24, 2020
9c362c5
Update base image to use Python 3.8.2 (#518)
hardbyte Mar 1, 2020
3e62b93
CI: Skip building base image if not required
hardbyte Mar 1, 2020
40b5a33
CI system now automatically uses latest base image (#519)
hardbyte Mar 1, 2020
96c82dc
Store uploaded encodings in database
hardbyte Feb 24, 2020
b831449
Bulk upload encodings to database
hardbyte Feb 27, 2020
04576db
Clean up comments
hardbyte Feb 28, 2020
4f2f4b5
More efficient fetching from encodingblocks table
hardbyte Feb 28, 2020
cad57d1
Transaction size for encoding upload will depend on encoding_size.
hardbyte Mar 2, 2020
79a7e89
Peek at actual encoding size
hardbyte Mar 2, 2020
3b0905f
Bump redis from 3.2.1 to 3.4.1 in /base
dependabot-preview[bot] Mar 2, 2020
69c19c5
Add some integration tests that have access to the database
hardbyte Feb 27, 2020
2b16ddf
CI can run integration tests using docker-compose.
hardbyte Feb 28, 2020
34818e6
Insertion tests should also fetch data afterwards
hardbyte Feb 28, 2020
e69b411
Update docs regarding integration testing
hardbyte Mar 1, 2020
2a894b8
Basic integration tests for redis
hardbyte Mar 2, 2020
61459bf
Update integration tests following code review
hardbyte Mar 3, 2020
39cfe94
Configure jaeger for docker-compose
hardbyte Mar 3, 2020
1540c0f
Tweak to allow debugging of tests from docker-compose
hardbyte Mar 3, 2020
db51552
Transaction size for encoding upload will depend on encoding_size.
hardbyte Mar 2, 2020
e73597e
Pull encoding data from postgres
hardbyte Mar 2, 2020
1bdf1e8
User provided binary packed uploads now go straight into db
hardbyte Mar 3, 2020
0ff7bf0
Stop storing uploaded encodings in minio
hardbyte Mar 3, 2020
07e2371
Fixup tracing in comparison task
hardbyte Mar 3, 2020
c692bb2
Extract logging and span setup from views
hardbyte Mar 3, 2020
40dd27f
Handle null filename
hardbyte Mar 3, 2020
d5f3578
Minor cleanup to imports and unused functions
hardbyte Mar 3, 2020
631aa67
Add test to check stored encodings in db
hardbyte Mar 3, 2020
9c8304a
Fix local jaeger (#523)
hardbyte Mar 4, 2020
ecd6cfe
Remove timing requirements from insertion test
hardbyte Mar 15, 2020
23e78d4
Add docstring to precheck_encoding_upload function
hardbyte Mar 15, 2020
f8c32c3
Merge branch 'develop' into feature-use-encodings-from-db
hardbyte Mar 16, 2020
1a37280
Merge pull request #522 from data61/feature-use-encodings-from-db
hardbyte Mar 16, 2020
560e414
Update kubernetes deployment and CI (#529)
hardbyte Mar 18, 2020
4dd360a
Add function to fetch block ids and sizes from db
hardbyte Mar 4, 2020
3c6776b
Retrieve blocking info in create_comparison_jobs task
hardbyte Mar 4, 2020
8c96ca8
WIP - identify blocks that need to be broken up further
hardbyte Mar 4, 2020
2077fe2
Query for getting encodings in a block
hardbyte Mar 6, 2020
3002978
Split tasks into chunks using blocking information
hardbyte Mar 6, 2020
79bfcbf
Refactor create comparison jobs function
hardbyte Mar 8, 2020
5e18c12
More refactoring of chunk creation
hardbyte Mar 9, 2020
c1cc6eb
Add a few unit tests for chunking
hardbyte Mar 9, 2020
54460be
Add database index on encodings table
hardbyte Mar 9, 2020
0669620
clknblocks not clksnblocks and other minor cleanup
hardbyte Mar 10, 2020
e946e2c
cleanup
hardbyte Mar 10, 2020
4958d15
Add blocking concept to docs
hardbyte Mar 13, 2020
c260291
Deduplicate candidate pairs before solving
hardbyte Mar 15, 2020
37faa25
Catch the empty candidate pair case
hardbyte Mar 15, 2020
12525bc
Simplify solver task by using anonlink's _merge_similarities function
hardbyte Mar 16, 2020
9b8fcff
Update celery
hardbyte Mar 16, 2020
a3ab400
Address code review feedback
hardbyte Mar 18, 2020
5e123ff
Bump version to beta2
hardbyte Mar 18, 2020
1caa1a5
Celery concurrency defaults
hardbyte Mar 19, 2020
70794a5
Add another layer of tracing into the comparison task
hardbyte Mar 19, 2020
07d4291
Update task names in celery routing
hardbyte Mar 19, 2020
3e70f57
Faster encoding retrieval by using COPY.
hardbyte Mar 22, 2020
d6c8751
Pass on stored size when retrieving encodings from DB
hardbyte Mar 22, 2020
71e254d
Increase time on test
hardbyte Mar 22, 2020
50b6357
Refactor binary copy into own function for easier reuse and testing
hardbyte Mar 23, 2020
4c4d198
Add more detailed tracing around binary encoding insertions.
hardbyte Mar 24, 2020
7dbcf2d
Add tests for binary copy function
hardbyte Mar 24, 2020
5d94af8
Bump celery from 4.4.0 to 4.4.2 in /base
dependabot-preview[bot] Mar 24, 2020
00c715d
Split tests into unit and e2e
hardbyte Mar 11, 2020
b611b17
Add uses blocking to e2e test config
hardbyte Mar 12, 2020
b66b262
Build and use separate e2e test image on azure
hardbyte Mar 12, 2020
4d279c0
Remove duplicate config and utils from unit tests
hardbyte Mar 24, 2020
0ed4bc2
Cleanup docker compose (#533)
hardbyte Mar 26, 2020
cfe6549
Use local benchmark cache (#531)
hardbyte Mar 26, 2020
5130de7
Increase CELERYD_MAX_TASKS_PER_CHILD default (#534)
hardbyte Mar 27, 2020
446e3f9
Update tutorials to use anonlinkclient instead of clkhash (#536)
hardbyte Apr 6, 2020
f27e2d3
Bump flower from 0.9.3 to 0.9.4 in /base
dependabot-preview[bot] Apr 6, 2020
f69bbc4
Bump alpine from 3.11.3 to 3.11.5 in /base
dependabot-preview[bot] Apr 6, 2020
86a40d1
Update minio deployment (#539)
hardbyte Apr 8, 2020
bc66664
Bump flask from 1.1.1 to 1.1.2 in /base
dependabot-preview[bot] Apr 8, 2020
a994be4
Bump requests from 2.22.0 to 2.23.0 in /base
dependabot-preview[bot] Apr 13, 2020
012d9a0
Feature blocky benchmark (#541)
wilko77 Apr 17, 2020
23370ee
Fix calculation of total number of comparisons (#543)
hardbyte Apr 19, 2020
f17f336
Design doc for pull feature. (#537)
hardbyte Apr 19, 2020
ad5076e
Bump minio from 5.0.7 to 5.0.10 in /base
dependabot-preview[bot] Apr 20, 2020
4e2653d
Temporary object store credentials endpoint (#544)
hardbyte Apr 21, 2020
4298031
Use docker-compose without having to set the --project-directory (#547)
hardbyte Apr 22, 2020
e01a584
Expose minio (#548)
hardbyte Apr 27, 2020
52b9355
Modify upload endpoint to take information on external encodings.
hardbyte Apr 23, 2020
6902939
Minor: Fix api url in e2e test helper, ignore created log files when …
hardbyte Apr 23, 2020
33c5e98
Simplify abort_if_inconsistent_upload and add support for external en…
hardbyte Apr 23, 2020
efec190
Add utility functions to convert to common clknblocks format.
hardbyte Apr 23, 2020
91a52c8
Catch most common error in status check during startup
hardbyte Apr 23, 2020
edbc169
Modify upload endpoint to take external data, add background task to …
hardbyte Apr 23, 2020
fadc3cc
Use correct endpoint for testing object store uploads
hardbyte Apr 24, 2020
19240b6
Explicitly throw a notimplemented error for blocks + external encodings
hardbyte Apr 24, 2020
9356d41
Add test for pulling external block data from object store
hardbyte Apr 27, 2020
0feb5a1
Update OpenAPI to support external blocking info
hardbyte Apr 27, 2020
0b883ad
Extract object store path template
hardbyte Apr 27, 2020
1a3ab15
Task and backend support added for external blocks
hardbyte Apr 27, 2020
91d4599
Move object store credential parsing into object store module
hardbyte Apr 27, 2020
9b72ecd
Include release in k8s selector for workers
hardbyte Apr 29, 2020
8bc502f
Check for executable runs after handling data upload
hardbyte Apr 28, 2020
fbebda9
Upgrade to newer minio chart. Update open api doc
hardbyte Apr 29, 2020
50df1ff
Add to e2e upload test, update readme.
hardbyte Apr 29, 2020
602c2e6
Extend test using external blocking data to include a run
hardbyte Apr 29, 2020
a6ff398
Edit and add log statements throughout upload path
hardbyte Apr 29, 2020
191b9c6
Skip cleaning up object store files, when there are no files
hardbyte Apr 29, 2020
d2bad47
Fix tracing on project cleanup
hardbyte Apr 29, 2020
91b678f
Set the encoding size on upload
hardbyte Apr 29, 2020
0705541
Log expected and unexpected problems during upload with different sev…
hardbyte Apr 29, 2020
8c7cd8c
Remove uploaded data from object store during cleanup
hardbyte Apr 29, 2020
53ecdba
Update benchmark version
hardbyte Apr 30, 2020
b788ac4
Add changelog for v1.13.0-beta2
hardbyte Apr 30, 2020
7d12e4c
Remove deprecated debug flag from k8s config
hardbyte Apr 30, 2020
ade1124
Increase chance of a flaky test passing
hardbyte Apr 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Task and backend support added for external blocks
  • Loading branch information
hardbyte committed Apr 30, 2020
commit 1a3ab15a99ec7e7e0f0df2e8a4ba8fa9204ecf9e
32 changes: 31 additions & 1 deletion backend/entityservice/database/insertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def insert_dataprovider(cur, auth_token, project_id):

def insert_blocking_metadata(db, dp_id, blocks):
"""
Insert a new entry into the blocks table.
Insert new entries into the blocks table.

:param blocks: A dict mapping block id to the number of encodings per block.
"""
Expand Down Expand Up @@ -200,6 +200,24 @@ def update_encoding_metadata(db, clks_filename, dp_id, state):
])


def update_blocks_state(db, dp_id, blocks, state):
assert state in {'pending', 'ready', 'error'}
sql_query = """
UPDATE blocks
SET
state = %s
WHERE
dp = %s AND
block_name in %s
"""

with db.cursor() as cur:
cur.execute(sql_query, [
state,
dp_id,
tuple(blocks)
])

def update_encoding_metadata_set_encoding_size(db, dp_id, encoding_size):
sql_query = """
UPDATE uploads
Expand Down Expand Up @@ -278,6 +296,18 @@ def update_project_mark_all_runs_failed(conn, project_id):
cur.execute(sql_query, [project_id])


def update_dataprovider_uploaded_state(conn, project_id, dp_id, state):
with conn.cursor() as cur:
sql_query = """
UPDATE dataproviders SET
uploaded = %s
WHERE
id = %s AND
project = %s
"""
cur.execute(sql_query, [state, dp_id, project_id])


def mark_project_deleted(db, project_id):
with db.cursor() as cur:
sql_query = """
Expand Down
12 changes: 11 additions & 1 deletion backend/entityservice/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
logger = get_logger('objectstore')


def connect_to_object_store():
def connect_to_object_store(credentials=None):
mc = minio.Minio(
config.MINIO_SERVER,
config.MINIO_ACCESS_KEY,
config.MINIO_SECRET_KEY,
credentials=credentials,
secure=False
)
logger.debug("Connected to minio")
Expand Down Expand Up @@ -45,3 +46,12 @@ def create_bucket(minio_client, bucket):
minio_client.make_bucket(bucket)
except minio.error.BucketAlreadyOwnedByYou:
logger.info("The bucket {} was already created.".format(bucket))


def stat_and_stream_object(bucket_name, object_name, credentials=None):
mc = connect_to_object_store(credentials)
logger.debug("Checking object exists in object store")
stat = mc.stat_object(bucket_name=bucket_name, object_name=object_name)
logger.debug("Retrieving file from object store")
response = mc.get_object(bucket_name=bucket_name, object_name=object_name)
return stat, response
2 changes: 1 addition & 1 deletion backend/entityservice/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from entityservice.tasks.pre_run_check import check_for_executable_runs
from entityservice.tasks.assert_valid_run import assert_valid_run
from entityservice.tasks.run import prerun_check
from entityservice.tasks.encoding_uploading import handle_raw_upload, handle_external_data_pull
from entityservice.tasks.encoding_uploading import handle_raw_upload, pull_external_data_encodings_only, pull_external_data
from entityservice.tasks.comparing import create_comparison_jobs, compute_filter_similarity, aggregate_comparisons
from entityservice.tasks.permutation import save_and_permute, permute_mapping_data
from entityservice.tasks.solver import solver_task
118 changes: 103 additions & 15 deletions backend/entityservice/tasks/encoding_uploading.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,134 @@
import io
import json

import opentracing
from minio.credentials import Credentials, Static

from entityservice.database import *
from entityservice.encoding_storage import stream_json_clksnblocks, convert_encodings_from_base64_to_binary, \
store_encodings_in_db, upload_clk_data_binary, include_encoding_id_in_binary_stream
from entityservice.error_checking import check_dataproviders_encoding, handle_invalid_encoding_data, \
InvalidEncodingError
from entityservice.object_store import connect_to_object_store
from entityservice.object_store import connect_to_object_store, stat_and_stream_object
from entityservice.serialization import binary_format
from entityservice.settings import Config
from entityservice.async_worker import celery, logger
from entityservice.tasks.base_task import TracedTask
from entityservice.tasks.pre_run_check import check_for_executable_runs
from entityservice.utils import fmt_bytes, clks_uploaded_to_project
from entityservice.utils import fmt_bytes, clks_uploaded_to_project, object_store_upload_path

logger = get_logger()


@celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'dp_id'))
def handle_external_data_pull(project_id, dp_id, object_info, credentials, receipt_token, parent_span=None):
def pull_external_data(project_id, dp_id,
encoding_object_info, encoding_credentials,
blocks_object_info, blocks_credentials,
receipt_token, parent_span=None):
"""

- pull blocking map into memory, create blocks in db (with dummy counts?)
- stream encodings into DB and add encoding + blocks from in memory dict.

"""
log = logger.bind(pid=project_id, dp_id=dp_id)
log.info("Pulling data from an object store")
with DBConn() as conn:
if not check_project_exists(conn, project_id):
log.info("Project deleted, stopping immediately")
return

with DBConn() as db:
if not check_project_exists(db, project_id):
mc = connect_to_object_store(parse_minio_credentials(blocks_credentials))

log.debug("Pulling blocking information from object store")
response = mc.get_object(bucket_name=blocks_object_info['bucket'], object_name=blocks_object_info['path'])
encoding_to_block_map = json.load(response)

log.debug("Counting the blocks")
block_sizes = {}
for encoding_id in encoding_to_block_map:
_blocks = encoding_to_block_map[encoding_id]
for block_id in _blocks:
block_id = str(block_id)
block_sizes[block_id] = block_sizes.setdefault(block_id, 0) + 1

block_count = len(block_sizes)
log.debug(f"Processing {block_count} blocks")

# stream the encodings
bucket_name = encoding_object_info['bucket']
object_name = encoding_object_info['path']

stat, encodings_stream = stat_and_stream_object(bucket_name, object_name, parse_minio_credentials(encoding_credentials))
count = int(stat.metadata['X-Amz-Meta-Hash-Count'])
size = int(stat.metadata['X-Amz-Meta-Hash-Size'])
log.debug(f"Processing {count} encodings")
assert count == len(encoding_to_block_map), f"Expected {count} encodings in blocks. Got {len(encoding_to_block_map)}"

with DBConn() as conn:
with opentracing.tracer.start_span('update-metadata-db', child_of=parent_span):
update_encoding_metadata_set_encoding_size(conn, dp_id, size)
insert_encoding_metadata(conn, None, dp_id, receipt_token, encoding_count=count, block_count=block_count)
with opentracing.tracer.start_span('create-block-entries-in-db', child_of=parent_span):
log.debug("Adding blocks to db")
insert_blocking_metadata(conn, dp_id, block_sizes)

def encoding_iterator(encoding_stream):
binary_formatter = binary_format(size)
for encoding_id in range(count):
yield (
str(encoding_id),
binary_formatter.pack(encoding_id, encoding_stream.read(size)),
encoding_to_block_map[str(encoding_id)]
)

with opentracing.tracer.start_span('upload-encodings-to-db', child_of=parent_span):
log.debug("Adding encodings and associated blocks to db")
try:
store_encodings_in_db(conn, dp_id, encoding_iterator(encodings_stream), size)
except Exception as e:
update_dataprovider_uploaded_state(conn, project_id, dp_id, 'error')
log.warning(e)

with opentracing.tracer.start_span('update-encoding-metadata', child_of=parent_span):
update_encoding_metadata(conn, None, dp_id, 'ready')
update_blocks_state(conn, dp_id, block_sizes.keys(), 'ready')


@celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'dp_id'))
def pull_external_data_encodings_only(project_id, dp_id, object_info, credentials, receipt_token, parent_span=None):
"""

"""
log = logger.bind(pid=project_id, dp_id=dp_id)

with DBConn() as conn:
if not check_project_exists(conn, project_id):
log.info("Project deleted, stopping immediately")
return

mc = connect_to_object_store()
log.info("Retrieving metadata from object store")
stat = mc.stat_object(bucket_name=object_info['bucket'], object_name=object_info['path'])
logger.info(stat.metadata)
bucket_name = object_info['bucket']
object_name = object_info['path']


log.info("Pulling encoding data from an object store")
mc_credentials = parse_minio_credentials(credentials)
stat, stream = stat_and_stream_object(bucket_name, object_name, mc_credentials)

count = int(stat.metadata['X-Amz-Meta-Hash-Count'])
size = int(stat.metadata['X-Amz-Meta-Hash-Size'])
log.info("Retrieving file from object store")
response = mc.get_object(bucket_name=object_info['bucket'], object_name=object_info['path'])
stream = response.stream()
converted_stream = include_encoding_id_in_binary_stream(stream, size, count)
upload_clk_data_binary(project_id, dp_id, converted_stream, receipt_token, count, size)


def parse_minio_credentials(credentials):
if credentials:
access_key = credentials['AccessKeyId']
secret_key = credentials['SecretAccessKey']
session_token = credentials.get('SessionToken', None)
mc_credentials = Credentials(provider=Static(access_key, secret_key, session_token))
else:
mc_credentials = None
return mc_credentials


@celery.task(base=TracedTask, ignore_result=True, args_as_tags=('project_id', 'dp_id'))
def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None):
"""
Expand Down
41 changes: 31 additions & 10 deletions backend/entityservice/views/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

import entityservice.database as db
from entityservice.encoding_storage import upload_clk_data_binary, include_encoding_id_in_binary_stream
from entityservice.tasks import handle_raw_upload, remove_project, handle_external_data_pull
from entityservice.tasks import handle_raw_upload, remove_project, pull_external_data_encodings_only, pull_external_data
from entityservice.tracing import serialize_span
from entityservice.utils import safe_fail_request, get_json, generate_code
from entityservice.utils import safe_fail_request, get_json, generate_code, object_store_upload_path
from entityservice.database import DBConn, get_project_column
from entityservice.views.auth_checks import abort_if_project_doesnt_exist, abort_if_invalid_dataprovider_token, \
abort_if_invalid_results_token, get_authorization_token_type_or_abort, abort_if_inconsistent_upload
Expand Down Expand Up @@ -119,7 +119,7 @@ def project_binaryclks_post(project_id):

log = log.bind(dp_id=dp_id)
log.info("Receiving CLK data.")
receipt_token = None
receipt_token = generate_code()

with opentracing.tracer.start_span('upload-clk-data', child_of=parent_span) as span:
span.set_tag("project_id", project_id)
Expand Down Expand Up @@ -328,22 +328,43 @@ def handle_encoding_upload_json(project_id, dp_id, clk_json, receipt_token, uses
if "encodings" in clk_json and 'file' in clk_json['encodings']:
# external encodings
logger.info("External encodings uploaded")
object_info = clk_json['encodings']['file']
credentials = clk_json['encodings'].get('credentials')
encoding_object_info = clk_json['encodings']['file']
object_name = encoding_object_info['path']
if not object_name.startswith(object_store_upload_path(project_id, dp_id)):
safe_fail_request(403, "Provided object store path is not allowed")

encoding_credentials = clk_json['encodings'].get('credentials')
# Schedule a background task to pull the encodings from the object store
# This background task updates the database with encoding metadata assuming
# that there are no blocks.
if 'blocks' not in clk_json:
handle_external_data_pull.delay(
pull_external_data_encodings_only.delay(
project_id,
dp_id,
object_info,
credentials,
encoding_object_info,
encoding_credentials,
receipt_token,
parent_span=serialize_span(parent_span))
else:
# TODO need to deal with the optional blocks
raise NotImplementedError("Don't currently handle combination of external encodings and blocks")
# Need to deal with both encodings and blocks
if 'file' in clk_json['blocks']:
object_name = clk_json['blocks']['file']['path']
if not object_name.startswith(object_store_upload_path(project_id, dp_id)):
safe_fail_request(403, "Provided object store path is not allowed")
# Blocks are in an external file
blocks_object_info = clk_json['blocks']['file']
blocks_credentials = clk_json['blocks'].get('credentials')
pull_external_data.delay(
project_id,
dp_id,
encoding_object_info,
encoding_credentials,
blocks_object_info,
blocks_credentials,
receipt_token,
parent_span=serialize_span(parent_span))
else:
raise NotImplementedError("Don't currently handle combination of external encodings and blocks")


return
Expand Down