Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bcfb173
attachments are saved as intact files
dimitri-yatsenko Sep 13, 2019
18366e6
reform attachments and filepath
dimitri-yatsenko Sep 13, 2019
11d7d9b
complete implementation of external storage except for `clean`
dimitri-yatsenko Sep 16, 2019
22347ae
Merge branch 'dev' of https://github.com/datajoint/datajoint-python i…
dimitri-yatsenko Sep 16, 2019
7e7c183
refactor external storage
dimitri-yatsenko Sep 18, 2019
8aa1543
complete refactoring of external storage for version 0.12
dimitri-yatsenko Sep 20, 2019
a2d6f9a
rename attribute `basename` to `attachment_name` in external table
dimitri-yatsenko Sep 20, 2019
2c11a65
add __repr__ to ExternalMapping
dimitri-yatsenko Sep 20, 2019
3a1c6ab
external files are not copied if stage and store are the same
dimitri-yatsenko Sep 20, 2019
478b36a
make external tables require setting the `delete_external_files` argu…
dimitri-yatsenko Sep 20, 2019
bb1deab
update CHANGELOG
dimitri-yatsenko Sep 23, 2019
e2636ed
fix Python 3.4 compatibility
dimitri-yatsenko Sep 23, 2019
11a1f93
fix Python 3.4 and 3.5 compatibility
dimitri-yatsenko Sep 23, 2019
165f795
dropped support for Python 3.4
dimitri-yatsenko Sep 23, 2019
c18af74
Merge branch 'dev' of https://github.com/datajoint/datajoint-python i…
dimitri-yatsenko Sep 26, 2019
48147f1
minor changes in error messages
dimitri-yatsenko Oct 1, 2019
2e26c4a
Merge branch 'dev' of https://github.com/datajoint/datajoint-python i…
dimitri-yatsenko Oct 3, 2019
0c4fd1c
Update to pathlib in test init.
guzman-raphael Oct 3, 2019
6ba86e0
Update test_blob_migrate to be compatible for WIN10.
guzman-raphael Oct 3, 2019
43a5126
Fix WIN10 compatibility with KeyboardInterrupt and SystemExit excepti…
guzman-raphael Oct 3, 2019
c7ca34c
Merge pull request #4 from guzman-raphael/dimitri-attach
dimitri-yatsenko Oct 3, 2019
5c99e37
Fix WIN10 filepath to store as posix and fetch as user's platform.
guzman-raphael Oct 4, 2019
e2c3f23
Fix relpath for Python3.5.
guzman-raphael Oct 4, 2019
93aeefc
Fix copytree for Python3.5.
guzman-raphael Oct 4, 2019
20719ae
Fix typo.
guzman-raphael Oct 4, 2019
7ca0099
Fix for Python3.5.
guzman-raphael Oct 4, 2019
f3ffd63
Update coveralls.
guzman-raphael Oct 4, 2019
bb1b40f
Update coverall env vars.
guzman-raphael Oct 4, 2019
f05c50d
Merge pull request #5 from guzman-raphael/win-filepath
dimitri-yatsenko Oct 5, 2019
298efed
add environment variable DJ_SUPPORT_FILEPATH_MANAGEMENT to enable/dis…
dimitri-yatsenko Oct 7, 2019
e609fbe
Merge branch 'attach' of https://github.com/dimitri-yatsenko/datajoin…
dimitri-yatsenko Oct 7, 2019
6dda528
Update CHANGELOG.md
dimitri-yatsenko Oct 8, 2019
796dae6
Update CHANGELOG.md
dimitri-yatsenko Oct 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor external storage
  • Loading branch information
dimitri-yatsenko committed Sep 18, 2019
commit 7e7c18392e2df438e107224c56921e9f5c778e8e
266 changes: 142 additions & 124 deletions datajoint/external.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import itertools
from pathlib import Path
from pathlib import Path, PurePosixPath
from collections import Mapping
from tqdm import tqdm
from .settings import config
from .errors import DataJointError, MissingExternalFile
from .hash import uuid_from_buffer, uuid_from_file
Expand Down Expand Up @@ -45,7 +45,6 @@ def __init__(self, connection, store=None, database=None):
self._connection = connection
if not self.is_declared:
self.declare()

self._s3 = None

@property
Expand All @@ -55,7 +54,8 @@ def definition(self):
hash : uuid # hash of contents (blob), of filename + contents (attach), or relative filepath (filepath)
---
size :bigint unsigned # size of object in bytes
filepath=null : varchar(1000) # relative filepath used in the filepath datatype
basename=null : varchar(255) # the filename of an attachment
filepath=null : varchar(1000) # relative filepath or attachment filename
contents_hash=null : uuid # used for the filepath datatype
timestamp=CURRENT_TIMESTAMP :timestamp # automatic timestamp
"""
Expand All @@ -70,124 +70,135 @@ def s3(self):
self._s3 = s3.Folder(**self.spec)
return self._s3

def upload_file(self, local_path, remote_path, metadata=None):
# - low-level operations - private

def _make_external_filepath(self, relative_filepath):
return PurePosixPath(self.spec['location'], relative_filepath)

def _make_uuid_path(self, uuid, suffix=''):
return PurePosixPath(
self.spec['location'], self.database,
'/'.join(subfold(uuid.hex, self.spec['subfolding'])), uuid.hex).with_suffix(suffix)

def _upload_file(self, local_path, external_path, metadata=None):
if self.spec['protocol'] == 's3':
self.s3.fput(local_path, external_path, metadata)
elif self.spec['protocol'] == 'file':
safe_copy(local_path, external_path, overwrite=True)
else:
assert False

def _download_file(self, external_path, download_path):
if self.spec['protocol'] == 's3':
self.s3.fget(external_path, download_path)
elif self.spec['protocol'] == 'file':
safe_copy(external_path, download_path)
else:
assert False

def _upload_buffer(self, buffer, external_path):
if self.spec['protocol'] == 's3':
self.s3.fput(remote_path, local_path, metadata)
self.s3.put(external_path, buffer)
elif self.spec['protocol'] == 'file':
safe_write(external_path, buffer)
else:
safe_copy(local_path, Path(self.spec['location']) / remote_path, overwrite=True)
assert False

def _download_buffer(self, external_path):
if self.spec['protocol'] == 's3':
return self.s3.get(external_path)
if self.spec['protocol'] == 'file':
return Path(external_path).read_bytes()
assert False

def _remove_external_file(self, external_path):
if self.spec['protocol'] == 's3':
self.s3.remove_object(external_path)
elif self.spec['protocol'] == 'file':
Path(external_path).unlink()

def exists(self, external_filepath):
"""
:return: True if the external file is accessible
"""
if self.spec['protocol'] == 's3':
return self.s3.exists(external_filepath)
if self.spec['protocol'] == 'file':
return Path(external_filepath).is_file()
assert False

# --- BLOBS ----

def put(self, blob):
"""
put a binary string in external store
put a binary string (blob) in external store
"""
uuid = uuid_from_buffer(blob)
if self.spec['protocol'] == 's3':
self.s3.put('/'.join((self.database, '/'.join(subfold(uuid.hex, self.spec['subfolding'])), uuid.hex)), blob)
else:
remote_file = Path(Path(
self.spec['location'], self.database, *subfold(uuid.hex, self.spec['subfolding'])), uuid.hex)
safe_write(remote_file, blob)
self._upload_buffer(blob, self._make_uuid_path(uuid))
# insert tracking info
self.connection.query(
"INSERT INTO {tab} (hash, size) VALUES (%s, {size}) ON DUPLICATE KEY "
"UPDATE timestamp=CURRENT_TIMESTAMP".format(
tab=self.full_table_name, size=len(blob)), args=(uuid.bytes,))
return uuid

def get(self, blob_hash):
def get(self, uuid):
"""
get an object from external store.
"""
if blob_hash is None:
if uuid is None:
return None
# attempt to get object from cache
blob = None
cache_folder = config.get('cache', None)
if cache_folder:
try:
cache_path = Path(cache_folder, *subfold(blob_hash.hex, CACHE_SUBFOLDING))
cache_file = Path(cache_path, blob_hash.hex)
cache_path = Path(cache_folder, *subfold(uuid.hex, CACHE_SUBFOLDING))
cache_file = Path(cache_path, uuid.hex)
blob = cache_file.read_bytes()
except FileNotFoundError:
pass # not cached
# attempt to get object from store
# download blob from external store
if blob is None:
if self.spec['protocol'] == 'file':
subfolders = Path(*subfold(blob_hash.hex, self.spec['subfolding']))
full_path = Path(self.spec['location'], self.database, subfolders, blob_hash.hex)
try:
blob = full_path.read_bytes()
except MissingExternalFile:
if not SUPPORT_MIGRATED_BLOBS:
raise MissingExternalFile("Missing blob file " + full_path) from None
# migrated blobs from 0.11
relative_filepath, contents_hash = (self & {'hash': blob_hash}).fetch1(
'filepath', 'contents_hash')
if relative_filepath is None:
raise MissingExternalFile("Missing blob file " + full_path) from None
stored_path = Path(self.spec['location'], relative_filepath)
try:
blob = stored_path.read_bytes()
except FileNotFoundError:
raise MissingExternalFile("Missing blob file " + stored_path)
elif self.spec['protocol'] == 's3':
full_path = '/'.join(
(self.database,) + subfold(blob_hash.hex, self.spec['subfolding']) + (blob_hash.hex,))
try:
blob = self.s3.get(full_path)
except MissingExternalFile:
if not SUPPORT_MIGRATED_BLOBS:
raise
relative_filepath, contents_hash = (self & {'hash': blob_hash}).fetch1(
'filepath', 'contents_hash')
if relative_filepath is None:
raise
blob = self.s3.get(relative_filepath)
try:
blob = self._download_buffer(self._make_uuid_path(uuid))
except MissingExternalFile:
if not SUPPORT_MIGRATED_BLOBS:
raise
# blobs migrated from datajoint 0.11 are stored at explicitly defined filepaths
relative_filepath, contents_hash = (self & {'hash': uuid}).fetch1('filepath', 'contents_hash')
if relative_filepath is None:
raise
blob = self._download_buffer(relative_filepath)
if cache_folder:
cache_path.mkdir(parents=True, exist_ok=True)
safe_write(cache_path / blob_hash.hex, blob)
safe_write(cache_path / uuid.hex, blob)
return blob

# --- ATTACHMENTS ---

def upload_attachment(self, local_path):
basename = Path(local_path).name
uuid = uuid_from_file(local_path, init_string=basename + '\0')
remote_path = '/'.join((
self.database, '/'.join(subfold(uuid.hex, self.spec['subfolding'])), uuid.hex + '-' + basename))
self.upload_file(local_path, remote_path)
external_path = self._make_uuid_path(uuid, '.' + basename)
self._upload_file(local_path, external_path)
# insert tracking info
self.connection.query(
"INSERT INTO {tab} (hash, size) VALUES (%s, {size}) ON DUPLICATE KEY "
"UPDATE timestamp=CURRENT_TIMESTAMP".format(
tab=self.full_table_name, size=Path(local_path).stat().st_size), args=[uuid.bytes])
self.connection.query("""
INSERT INTO {tab} (hash, size, basename)
VALUES (%s, {size}, "{basename}")
ON DUPLICATE KEY UPDATE timestamp=CURRENT_TIMESTAMP""".format(
tab=self.full_table_name,
size=Path(local_path).stat().st_size,
basename=basename), args=[uuid.bytes])
return uuid

def get_attachment_basename(self, uuid):
"""
get the original filename, stripping the checksum
"""
remote_path = '/'.join((self.database, '/'.join(subfold(uuid.hex, self.spec['subfolding']))))
name_generator = (
Path(self.spec['location'], remote_path).glob(uuid.hex + '-*') if self.spec['protocol'] == 'file'
else (obj.object_name for obj in self.s3.list_objects(remote_path) if uuid.hex in obj.object_name))
try:
attachment_filename = next(name_generator)
except StopIteration:
raise MissingExternalFile('Missing attachment {protocol}://{path}'.format(
path=remote_path + '/' + uuid.hex + '-*', **self.spec))
return attachment_filename.split(uuid.hex + '-')[-1]
return (self & {'hash': uuid}).fetch1('basename')

def download_attachment(self, uuid, basename, download_path):
""" save attachment from memory buffer into the save_path """
remote_path = '/'.join([
self.database, '/'.join(subfold(uuid.hex, self.spec['subfolding'])), uuid.hex + '-' + basename])
if self.spec['protocol'] == 's3':
self.s3.fget(remote_path, download_path)
else:
safe_copy(Path(self.spec['location']) / remote_path, download_path)
external_path = self._make_uuid_path(uuid, '.' + basename)
self._download_file(external_path, download_path)

# --- FILEPATH ---

Expand All @@ -198,12 +209,12 @@ def upload_filepath(self, local_filepath):
If an external entry exists with the same checksum, then no copying should occur
"""
local_filepath = Path(local_filepath)
local_folder = local_filepath.parent
try:
relative_filepath = str(local_filepath.relative_to(self.spec['stage']))
except:
raise DataJointError('The path {path} is not in stage {stage}'.format(path=local_folder, **self.spec))
uuid = uuid_from_buffer(init_string=relative_filepath)
except ValueError:
raise DataJointError('The path {path} is not in stage {stage}'.format(
path=local_filepath.parent, **self.spec)) from None
uuid = uuid_from_buffer(init_string=relative_filepath) # hash relative path, not contents
contents_hash = uuid_from_file(local_filepath)

# check if the remote file already exists and verify that it matches
Expand All @@ -215,7 +226,8 @@ def upload_filepath(self, local_filepath):
"A different version of '{file}' has already been placed.".format(file=relative_filepath))
else:
# upload the file and create its tracking entry
self.upload_file(local_filepath, relative_filepath, metadata={'contents_hash': str(contents_hash)})
self._upload_file(local_filepath, self._make_external_filepath(relative_filepath),
metadata={'contents_hash': str(contents_hash)})
self.connection.query(
"INSERT INTO {tab} (hash, size, filepath, contents_hash) VALUES (%s, {size}, '{filepath}', %s)".format(
tab=self.full_table_name, size=Path(local_filepath).stat().st_size,
Expand All @@ -230,19 +242,18 @@ def download_filepath(self, filepath_hash):
"""
if filepath_hash is not None:
relative_filepath, contents_hash = (self & {'hash': filepath_hash}).fetch1('filepath', 'contents_hash')
local_filepath = Path(self.spec['stage']).absolute() / Path(relative_filepath)
external_path = self._make_external_filepath(relative_filepath)
local_filepath = Path(self.spec['stage']).absolute() / relative_filepath
file_exists = Path(local_filepath).is_file() and uuid_from_file(local_filepath) == contents_hash
if not file_exists:
if self.spec['protocol'] == 's3':
checksum = s3.Folder(**self.spec).fget(relative_filepath, local_filepath)
else:
remote_file = Path(self.spec['location'], relative_filepath)
safe_copy(remote_file, local_filepath)
checksum = uuid_from_file(local_filepath)
self._download_file(external_path, local_filepath)
checksum = uuid_from_file(local_filepath)
if checksum != contents_hash: # this should never happen without outside interference
raise DataJointError("'{file}' downloaded but did not pass checksum'".format(file=local_filepath))
return local_filepath, contents_hash

# --- UTILITIES ---

@property
def references(self):
"""
Expand All @@ -254,48 +265,55 @@ def references(self):
WHERE referenced_table_name="{tab}" and referenced_table_schema="{db}"
""".format(tab=self.table_name, db=self.database), as_dict=True)

def delete_quick(self):
raise DataJointError('The external table does not support delete_quick. Please use delete instead.')

def delete(self):
def fetch_filepaths(self, **fetch_kwargs):
"""
Delete items that are no longer referenced.
This operation is safe to perform at any time but may reduce performance of queries while in progress.
generate complete external filepaths from the query.
Each element is a tuple: (uuid, path)
:param fetch_kwargs: keyword arguments to pass to fetch
"""
self.connection.query(
"DELETE FROM `{db}`.`{tab}` WHERE ".format(tab=self.table_name, db=self.database) + (
" AND ".join(
'hash NOT IN (SELECT `{column_name}` FROM {referencing_table})'.format(**ref)
for ref in self.references) or "TRUE"))
print('Deleted %d items' % self.connection.query("SELECT ROW_COUNT()").fetchone()[0])

def get_untracked_external_files(self, *, limit=None,
include_blobs=True, include_attachments=True, include_filepaths=True):
fetch_kwargs.update(as_dict=True)
for item in self.fetch('hash', 'basename', 'filepath', **fetch_kwargs):
if item['basename']:
yield item['hash'], self._make_uuid_path(item['hash'], '.' + item['basename'])
elif item['filepath']:
yield item['hash'], self._make_external_filepath(item['filepath'])
else:
yield item['hash'], self._make_external_filepath(item['hash'])

def unused(self):
"""
:return: generate the absolute paths to external blobs, attachments, and filepaths that are no longer
tracked by this external table.
Caution: when multiple schemas manage the same filepath location or if multiple servers use the same
external location for blobs and attachments, then it is not safe to assume that untracked external files
are no longer needed by other schemas. Delete with caution. The safest approach is to ensure that each external
store is tracked by one database server only and that filepath locations are tracked by only one schema.
query expression for unused hashes
:return: self restricted to elements that are not in use by any tables in the schema
"""
raise NotImplementedError
return self - [
"hash IN (SELECT `{column_name}` FROM {referencing_table})".format(**ref)
for ref in self.references]

def clean(self, *, limit=None, verbose=True,
delete_blobs=True, delete_attachments=True, delete_filepaths=True):
def used(self):
"""
remove blobs, attachments, and
:param verbose: if True, print information about deleted files
:param: limit: max number of items to delete. None=delete all
:param include_{blobs, attachments, filepaths}: if True, delete blobs, attachments, filepaths
query expression for used hashes
:return: self restricted to elements that in use by tables in the schema
"""
delete_list = self.get_untracked_external_files(
limit=limit,
include_blobs=delete_blobs, include_attachments=delete_attachments, include_filepaths=delete_filepaths)
if verbose:
print('Deleting %i items:' % len(delete_list))
for item in delete_list:
print(item)
return self - [
"hash IN (SELECT `{column_name}` FROM {referencing_table})".format(**ref)
for ref in self.references]

def delete(self, limit=None, display_progress=True):
items = self.unused().fetch_filepaths(limit=limit)
if display_progress:
items = tqdm(items)
# delete items one by one, close to transaction-safe
for uuid, external_path in items:
try:
count = (self & {'hash': uuid}).delete_quck(get_count=True)
except Exception:
pass # if delete failed, do not remove the external file
else:
assert count in (0, 1)
try:
self._remove_external_file(external_path)
except Exception as error:
yield uuid, external_path, str(error)


class ExternalMapping(Mapping):
Expand Down
Loading