Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bcfb173
attachments are saved as intact files
dimitri-yatsenko Sep 13, 2019
18366e6
reform attachments and filepath
dimitri-yatsenko Sep 13, 2019
11d7d9b
complete implementation of external storage except for `clean`
dimitri-yatsenko Sep 16, 2019
22347ae
Merge branch 'dev' of https://github.com/datajoint/datajoint-python i…
dimitri-yatsenko Sep 16, 2019
7e7c183
refactor external storage
dimitri-yatsenko Sep 18, 2019
8aa1543
complete refactoring of external storage for version 0.12
dimitri-yatsenko Sep 20, 2019
a2d6f9a
rename attribute `basename` to `attachment_name` in external table
dimitri-yatsenko Sep 20, 2019
2c11a65
add __repr__ to ExternalMapping
dimitri-yatsenko Sep 20, 2019
3a1c6ab
external files are not copied if stage and store are the same
dimitri-yatsenko Sep 20, 2019
478b36a
make external tables require setting the `delete_external_files` argu…
dimitri-yatsenko Sep 20, 2019
bb1deab
update CHANGELOG
dimitri-yatsenko Sep 23, 2019
e2636ed
fix Python 3.4 compatibility
dimitri-yatsenko Sep 23, 2019
11a1f93
fix Python 3.4 and 3.5 compatibility
dimitri-yatsenko Sep 23, 2019
165f795
dropped support for Python 3.4
dimitri-yatsenko Sep 23, 2019
c18af74
Merge branch 'dev' of https://github.com/datajoint/datajoint-python i…
dimitri-yatsenko Sep 26, 2019
48147f1
minor changes in error messages
dimitri-yatsenko Oct 1, 2019
2e26c4a
Merge branch 'dev' of https://github.com/datajoint/datajoint-python i…
dimitri-yatsenko Oct 3, 2019
0c4fd1c
Update to pathlib in test init.
guzman-raphael Oct 3, 2019
6ba86e0
Update test_blob_migrate to be compatible for WIN10.
guzman-raphael Oct 3, 2019
43a5126
Fix WIN10 compatibility with KeyboardInterrupt and SystemExit excepti…
guzman-raphael Oct 3, 2019
c7ca34c
Merge pull request #4 from guzman-raphael/dimitri-attach
dimitri-yatsenko Oct 3, 2019
5c99e37
Fix WIN10 filepath to store as posix and fetch as user's platform.
guzman-raphael Oct 4, 2019
e2c3f23
Fix relpath for Python3.5.
guzman-raphael Oct 4, 2019
93aeefc
Fix copytree for Python3.5.
guzman-raphael Oct 4, 2019
20719ae
Fix typo.
guzman-raphael Oct 4, 2019
7ca0099
Fix for Python3.5.
guzman-raphael Oct 4, 2019
f3ffd63
Update coveralls.
guzman-raphael Oct 4, 2019
bb1b40f
Update coverall env vars.
guzman-raphael Oct 4, 2019
f05c50d
Merge pull request #5 from guzman-raphael/win-filepath
dimitri-yatsenko Oct 5, 2019
298efed
add environment variable DJ_SUPPORT_FILEPATH_MANAGEMENT to enable/dis…
dimitri-yatsenko Oct 7, 2019
e609fbe
Merge branch 'attach' of https://github.com/dimitri-yatsenko/datajoin…
dimitri-yatsenko Oct 7, 2019
6dda528
Update CHANGELOG.md
dimitri-yatsenko Oct 8, 2019
796dae6
Update CHANGELOG.md
dimitri-yatsenko Oct 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
complete implementation of external storage except for clean
  • Loading branch information
dimitri-yatsenko committed Sep 16, 2019
commit 11d7d9b956b1df102338c4b7e3a111f96f2f5147
122 changes: 35 additions & 87 deletions datajoint/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def s3(self):
self._s3 = s3.Folder(**self.spec)
return self._s3

def upload_file(self, local_path, remote_path, metadata=None):
if self.spec['protocol'] == 's3':
self.s3.fput(remote_path, local_path, metadata)
else:
safe_copy(local_path, Path(self.spec['location']) / remote_path, overwrite=True)

# --- BLOBS ----

def put(self, blob):
Expand Down Expand Up @@ -151,10 +157,7 @@ def upload_attachment(self, local_path):
uuid = uuid_from_file(local_path, init_string=basename + '\0')
remote_path = '/'.join((
self.database, '/'.join(subfold(uuid.hex, self.spec['subfolding'])), uuid.hex + '-' + basename))
if self.spec['protocol'] == 's3':
self.s3.fput(remote_path, local_path)
else:
safe_copy(local_path, Path(self.spec['location']) / remote_path)
self.upload_file(local_path, remote_path)
# insert tracking info
self.connection.query(
"INSERT INTO {tab} (hash, size) VALUES (%s, {size}) ON DUPLICATE KEY "
Expand All @@ -174,7 +177,7 @@ def get_attachment_basename(self, uuid):
attachment_filename = next(name_generator)
except StopIteration:
raise MissingExternalFile('Missing attachment {protocol}://{path}'.format(
path=remote_path + '/' + uuid.hex + '-*'), **self.spec)
path=remote_path + '/' + uuid.hex + '-*', **self.spec))
return attachment_filename.split(uuid.hex + '-')[-1]

def download_attachment(self, uuid, basename, download_path):
Expand All @@ -188,7 +191,7 @@ def download_attachment(self, uuid, basename, download_path):

# --- FILEPATH ---

def fput(self, local_filepath):
def upload_filepath(self, local_filepath):
"""
Raise exception if an external entry already exists with a different contents checksum.
Otherwise, copy (with overwrite) file to remote and
Expand All @@ -197,7 +200,7 @@ def fput(self, local_filepath):
local_filepath = Path(local_filepath)
local_folder = local_filepath.parent
try:
relative_filepath = local_filepath.relative_to(self.spec['stage'])
relative_filepath = str(local_filepath.relative_to(self.spec['stage']))
except:
raise DataJointError('The path {path} is not in stage {stage}'.format(path=local_folder, **self.spec))
uuid = uuid_from_buffer(init_string=relative_filepath)
Expand All @@ -212,26 +215,22 @@ def fput(self, local_filepath):
"A different version of '{file}' has already been placed.".format(file=relative_filepath))
else:
# upload the file and create its tracking entry
if self.spec['protocol'] == 's3':
self.s3.fput(relative_filepath, local_filepath, contents_hash=str(contents_hash))
else:
remote_file = Path(self.spec['location'], relative_filepath)
safe_copy(local_filepath, remote_file, overwrite=True)
self.upload_file(local_filepath, relative_filepath, metadata={'contents_hash': str(contents_hash)})
self.connection.query(
"INSERT INTO {tab} (hash, size, filepath, contents_hash) VALUES (%s, {size}, '{filepath}', %s)".format(
tab=self.full_table_name, size=Path(local_filepath).stat().st_size,
filepath=relative_filepath), args=(uuid.bytes, contents_hash.bytes))
return uuid

def fget(self, filepath_hash):
def download_filepath(self, filepath_hash):
"""
sync a file from external store to the local stage
:param filepath_hash: The hash (UUID) of the relative_path
:return: hash (UUID) of the contents of the downloaded file or Nones
"""
if filepath_hash is not None:
relative_filepath, contents_hash = (self & {'hash': filepath_hash}).fetch1('filepath', 'contents_hash')
local_filepath = Path(self.spec['stage']).absolute / Path(relative_filepath)
local_filepath = Path(self.spec['stage']).absolute() / Path(relative_filepath)
file_exists = Path(local_filepath).is_file() and uuid_from_file(local_filepath) == contents_hash
if not file_exists:
if self.spec['protocol'] == 's3':
Expand Down Expand Up @@ -270,84 +269,33 @@ def delete(self):
for ref in self.references) or "TRUE"))
print('Deleted %d items' % self.connection.query("SELECT ROW_COUNT()").fetchone()[0])

def get_untracked_filepaths(self):
def get_untracked_external_files(self, *, limit=None,
include_blobs=True, include_attachments=True, include_filepaths=True):
"""
:return: the collection of remote filepaths that are no longer tracked.
:return: generate the absolute paths to external blobs, attachments, and filepaths that are no longer
tracked by this external table.
Caution: when multiple schemas manage the same filepath location or if multiple servers use the same
external location for blobs and attachments, then it is not safe to assume that untracked external files
are no longer needed by other schemas. Delete with caution. The safest approach is to ensure that each external
store is tracked by one database server only and that filepath locations are tracked by only one schema.
"""
remote_path = self.spec['location']
if self.spec['protocol'] == 'file':
re
generator = (Path(folder[position:], file)
for folder, dirs, files in os.walk(remote_path, topdown=False) for file in files)
else: # self.spec['protocol'] == 's3'
position = len(remote_path.rstrip('/')) + 1
generator = (x.object_name[position:] for x in s3.Folder(**self.spec).list_objects())
in_use = set((self & '`filepath` IS NOT NULL').fetch('filepath'))
yield from ('/'.join((remote_path, f)) for f in generator if f not in in_use)

def clean_filepaths(self, verbose=True):
raise NotImplementedError

def clean(self, *, limit=None, verbose=True,
delete_blobs=True, delete_attachments=True, delete_filepaths=True):
"""
Delete filepaths that are not tracked in by this store in this schema.
Leaves empty subfolders.
remove blobs, attachments, and
:param verbose: if True, print information about deleted files
:param: limit: max number of items to delete. None=delete all
:param include_{blobs, attachments, filepaths}: if True, delete blobs, attachments, filepaths
"""
delete_list = self.get_untracked_external_files(
limit=limit,
include_blobs=delete_blobs, include_attachments=delete_attachments, include_filepaths=delete_filepaths)
if verbose:
print('Finding untracking files...')
untracked_filepaths = self.get_untracked_filepaths()
print('Deleting...')
if self.spec['protocol'] == 's3':
self.s3.remove_objects(untracked_filepaths)
print('Done')
else: # self.spec['protocol'] == 'file'
count = 0
for f in untracked_filepaths:
not verbose or print(f)
os.remove(f)
count += 1
print('Deleted %d files' % count)

def clean_blobs(self, *, verbose=True):
"""
Remove unused blobs from the external storage repository.
This must be performed after external_table.delete() during low-usage periods to reduce risks of data loss.
"""
assert False
in_use = set(x.hex for x in (self & '`filepath` is NULL').fetch('hash'))
if self.spec['protocol'] == 'file':
count = itertools.count()
print('Deleting...')
deleted_folders = set()
for folder, dirs, files in Path(self.spec['location'], self.database).rglob('*'):
if dirs and files:
raise DataJointError(
'Invalid repository with files in non-terminal folder %s' % folder)
dirs = set(d for d in dirs if Path(folder, d) not in deleted_folders)
if not dirs:
files_not_in_use = [f for f in files if f not in in_use]
for f in files_not_in_use:
filename = Path(folder, f)
next(count)
if verbose:
print(filename)
filename.unlink()
if len(files_not_in_use) == len(files):
os.rmdir(folder)
deleted_folders.add(folder)
print('Deleted %d objects' % next(count))
else: # self.spec['protocol'] == 's3'
count = itertools.count()

def names():
for x in self.s3.list_objects(self.database):
if x.object_name.split('/')[-1] not in in_use:
next(count)
if verbose:
print(x.object_name)
yield x.object_name

print('Deleting...')
failed_deletes = self.s3.remove_objects(names())
total = next(count)
print(' Deleted: %i S3 objects; %i failed.' % (total - len(failed_deletes), len(failed_deletes)))
print('Deleting %i items:' % len(delete_list))
for item in delete_list:
print(item)


class ExternalMapping(Mapping):
Expand Down
6 changes: 3 additions & 3 deletions datajoint/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def _get(connection, attr, data, squeeze, download_path):
adapt = attr.adapter.get if attr.adapter else lambda x: x

if attr.is_filepath:
return adapt(extern.fget(uuid.UUID(bytes=data))[0])
return adapt(extern.download_filepath(uuid.UUID(bytes=data))[0])

if attr.is_attachment:
# Steps:
Expand All @@ -68,7 +68,7 @@ def _get(connection, attr, data, squeeze, download_path):
return adapt(local_filepath) # checksum passed, no need to download again
# generate the next available alias filename
for n in itertools.count():
f = local_filepath.parent / ('_%04x' % n + local_filepath.suffix)
f = local_filepath.parent / (local_filepath.stem + '_%04x' % n + local_filepath.suffix)
if not f.is_file():
local_filepath = f
break
Expand All @@ -79,7 +79,7 @@ def _get(connection, attr, data, squeeze, download_path):
extern.download_attachment(_uuid, basename, local_filepath)
else:
# write from buffer
safe_write(local_filepath, data.split(b"\0")[1])
safe_write(local_filepath, data.split(b"\0", 1)[1])
return adapt(local_filepath) # download file from remote store

return adapt(uuid.UUID(bytes=data) if attr.uuid else (
Expand Down
4 changes: 2 additions & 2 deletions datajoint/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ def put(self, relative_name, buffer):
return self.client.put_object(
self.bucket, '/'.join((self.root_path, relative_name)), BytesIO(buffer), length=len(buffer))

def fput(self, relative_name, local_file, **meta):
def fput(self, relative_name, local_file, metadata=None):
return self.client.fput_object(
self.bucket, '/'.join((self.root_path, relative_name)), local_file, metadata=meta or None)
self.bucket, '/'.join((self.root_path, relative_name)), str(local_file), metadata=metadata)

def get(self, relative_name):
try:
Expand Down
2 changes: 1 addition & 1 deletion datajoint/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def make_placeholder(name, value):
# value is filename + contents
value = str.encode(attachment_path.name) + b'\0' + attachment_path.read_bytes()
elif attr.is_filepath:
value = self.external[attr.store].fput(value).bytes
value = self.external[attr.store].upload_filepath(value).bytes
elif attr.numeric:
value = str(int(value) if isinstance(value, bool) else value)
return name, placeholder, value
Expand Down
22 changes: 9 additions & 13 deletions tests/test_attach.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from nose.tools import assert_true, assert_equal, assert_not_equal
from numpy.testing import assert_array_equal
import tempfile
import filecmp
from pathlib import Path
import os

from .schema_external import Attach
Expand All @@ -13,27 +13,23 @@ def test_attach_attributes():
table = Attach()
source_folder = tempfile.mkdtemp()
for i in range(2):
attach1 = os.path.join(source_folder, 'attach1.img')
attach1 = Path(source_folder, 'attach1.img')
data1 = os.urandom(100)
with open(attach1, 'wb') as f:
f.write(data1)
attach2 = os.path.join(source_folder, 'attach2.txt')
attach1.write_bytes(data1)
attach2 = Path(source_folder, 'attach2.txt')
data2 = os.urandom(200)
with open(attach2, 'wb') as f:
f.write(data2)
attach2.write_bytes(data2)
table.insert1(dict(attach=i, img=attach1, txt=attach2))

download_folder = tempfile.mkdtemp()
download_folder = Path(tempfile.mkdtemp())
keys, path1, path2 = table.fetch("KEY", 'img', 'txt', download_path=download_folder, order_by="KEY")

# verify that different attachment are renamed if their filenames collide
assert_not_equal(path1[0], path2[0])
assert_not_equal(path1[0], path1[1])
assert_equal(os.path.split(path1[0])[0], download_folder)
with open(path1[-1], 'rb') as f:
check1 = f.read()
with open(path2[-1], 'rb') as f:
check2 = f.read()
assert_equal(path1[0].parent, download_folder)
check1 = path1[-1].read_bytes()
check2 = path2[-1].read_bytes()
assert_equal(data1, check1)
assert_equal(data2, check2)

Expand Down
6 changes: 3 additions & 3 deletions tests/test_external_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def setUp(self):


def test_heading():
heading = modu.Simple.heading
heading = modu.Simple().heading
assert_true('item' in heading)
assert_true(heading['item'].is_external)

Expand Down Expand Up @@ -41,15 +41,15 @@ def test_populate():
image = modu.Image()
image.populate()
remaining, total = image.progress()
image.external['raw'].clean_blobs()
image.external['raw'].clean()
assert_true(total == len(modu.Dimension() * modu.Seed()) and remaining == 0)
for img, neg, dimensions in zip(*(image * modu.Dimension()).fetch('img', 'neg', 'dimensions')):
assert_list_equal(list(img.shape), list(dimensions))
assert_almost_equal(img, -neg)
image.delete()
for external_table in image.external.values():
external_table.delete()
external_table.clean_blobs()
external_table.clean()



Loading