Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bcfb173
attachments are saved as intact files
dimitri-yatsenko Sep 13, 2019
18366e6
reform attachments and filepath
dimitri-yatsenko Sep 13, 2019
11d7d9b
complete implementation of external storage except for `clean`
dimitri-yatsenko Sep 16, 2019
22347ae
Merge branch 'dev' of https://github.com/datajoint/datajoint-python i…
dimitri-yatsenko Sep 16, 2019
7e7c183
refactor external storage
dimitri-yatsenko Sep 18, 2019
8aa1543
complete refactoring of external storage for version 0.12
dimitri-yatsenko Sep 20, 2019
a2d6f9a
rename attribute `basename` to `attachment_name` in external table
dimitri-yatsenko Sep 20, 2019
2c11a65
add __repr__ to ExternalMapping
dimitri-yatsenko Sep 20, 2019
3a1c6ab
external files are not copied if stage and store are the same
dimitri-yatsenko Sep 20, 2019
478b36a
make external tables require setting the `delete_external_files` argu…
dimitri-yatsenko Sep 20, 2019
bb1deab
update CHANGELOG
dimitri-yatsenko Sep 23, 2019
e2636ed
fix Python 3.4 compatibility
dimitri-yatsenko Sep 23, 2019
11a1f93
fix Python 3.4 and 3.5 compatibility
dimitri-yatsenko Sep 23, 2019
165f795
dropped support for Python 3.4
dimitri-yatsenko Sep 23, 2019
c18af74
Merge branch 'dev' of https://github.com/datajoint/datajoint-python i…
dimitri-yatsenko Sep 26, 2019
48147f1
minor changes in error messages
dimitri-yatsenko Oct 1, 2019
2e26c4a
Merge branch 'dev' of https://github.com/datajoint/datajoint-python i…
dimitri-yatsenko Oct 3, 2019
0c4fd1c
Update to pathlib in test init.
guzman-raphael Oct 3, 2019
6ba86e0
Update test_blob_migrate to be compatible for WIN10.
guzman-raphael Oct 3, 2019
43a5126
Fix WIN10 compatibility with KeyboardInterrupt and SystemExit excepti…
guzman-raphael Oct 3, 2019
c7ca34c
Merge pull request #4 from guzman-raphael/dimitri-attach
dimitri-yatsenko Oct 3, 2019
5c99e37
Fix WIN10 filepath to store as posix and fetch as user's platform.
guzman-raphael Oct 4, 2019
e2c3f23
Fix relpath for Python3.5.
guzman-raphael Oct 4, 2019
93aeefc
Fix copytree for Python3.5.
guzman-raphael Oct 4, 2019
20719ae
Fix typo.
guzman-raphael Oct 4, 2019
7ca0099
Fix for Python3.5.
guzman-raphael Oct 4, 2019
f3ffd63
Update coveralls.
guzman-raphael Oct 4, 2019
bb1b40f
Update coverall env vars.
guzman-raphael Oct 4, 2019
f05c50d
Merge pull request #5 from guzman-raphael/win-filepath
dimitri-yatsenko Oct 5, 2019
298efed
add environment variable DJ_SUPPORT_FILEPATH_MANAGEMENT to enable/dis…
dimitri-yatsenko Oct 7, 2019
e609fbe
Merge branch 'attach' of https://github.com/dimitri-yatsenko/datajoin…
dimitri-yatsenko Oct 7, 2019
6dda528
Update CHANGELOG.md
dimitri-yatsenko Oct 8, 2019
796dae6
Update CHANGELOG.md
dimitri-yatsenko Oct 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
attachments are saved as intact files
  • Loading branch information
dimitri-yatsenko committed Sep 13, 2019
commit bcfb17364531ca5ce9e76af25f81d9c673a03e68
28 changes: 0 additions & 28 deletions datajoint/attach.py

This file was deleted.

146 changes: 77 additions & 69 deletions datajoint/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def s3(self):
self._s3 = s3.Folder(**self.spec)
return self._s3

# --- BLOBS ----

def put(self, blob):
"""
put a binary string in external store
Expand All @@ -88,54 +90,14 @@ def put(self, blob):
tab=self.full_table_name, size=len(blob)), args=(uuid.bytes,))
return uuid

def fput(self, local_filepath):
"""
Raise exception if an external entry already exists with a different contents checksum.
Otherwise, copy (with overwrite) file to remote and
If an external entry exists with the same checksum, then no copying should occur
"""
local_folder = os.path.dirname(local_filepath)
relative_filepath = os.path.relpath(local_filepath, start=self.spec['stage'])
if relative_filepath.startswith(os.path.pardir):
raise DataJointError('The path {path} is not in stage {stage}'.format(
path=local_folder, stage=self.spec['stage']))
uuid = uuid_from_buffer(init_string=relative_filepath)
contents_hash = uuid_from_file(local_filepath)

# check if the remote file already exists and verify that it matches
check_hash = (self & {'hash': uuid}).fetch('contents_hash')
if check_hash:
# the tracking entry exists, check that it's the same file as before
if contents_hash != check_hash[0]:
raise DataJointError(
"A different version of '{file}' has already been placed.".format(file=relative_filepath))
else:
# upload the file and create its tracking entry
if self.spec['protocol'] == 's3':
self.s3.fput(relative_filepath, local_filepath, contents_hash=str(contents_hash))
else:
remote_file = os.path.join(self.spec['location'], relative_filepath)
safe_copy(local_filepath, remote_file, overwrite=True)
self.connection.query(
"INSERT INTO {tab} (hash, size, filepath, contents_hash) VALUES (%s, {size}, '{filepath}', %s)".format(
tab=self.full_table_name, size=os.path.getsize(local_filepath),
filepath=relative_filepath), args=(uuid.bytes, contents_hash.bytes))
return uuid

def peek(self, blob_hash, bytes_to_peek=120):
return self.get(blob_hash, size=bytes_to_peek)

def get(self, blob_hash, *, size=-1):
def get(self, blob_hash):
"""
get an object from external store.
:param size: max number of bytes to retrieve. If size<0, retrieve entire blob
:param explicit_path: if given, then use it as relative path rather than the path derived from
"""

def read_file(filepath, size):
def read_file(filepath):
try:
with open(filepath, 'rb') as f:
blob = f.read(size)
blob = f.read()
except FileNotFoundError:
raise MissingExternalFile('Lost access to external blob %s.' % full_path) from None
return blob
Expand All @@ -146,26 +108,22 @@ def read_file(filepath, size):
# attempt to get object from cache
blob = None
cache_folder = config.get('cache', None)
blob_size = None
if cache_folder:
try:
cache_path = os.path.join(cache_folder, *subfold(blob_hash.hex, CACHE_SUBFOLDING))
cache_file = os.path.join(cache_path, blob_hash.hex)
with open(cache_file, 'rb') as f:
blob = f.read(size)
blob = f.read()
except FileNotFoundError:
pass
else:
if size > 0:
blob_size = os.path.getsize(cache_file)

# attempt to get object from store
if blob is None:
if self.spec['protocol'] == 'file':
subfolders = os.path.join(*subfold(blob_hash.hex, self.spec['subfolding']))
full_path = os.path.join(self.spec['location'], self.database, subfolders, blob_hash.hex)
try:
blob = read_file(full_path, size)
blob = read_file(full_path)
except MissingExternalFile:
if not SUPPORT_MIGRATED_BLOBS:
raise
Expand All @@ -175,33 +133,83 @@ def read_file(filepath, size):
if relative_filepath is None:
raise
blob = read_file(os.path.join(self.spec['location'], relative_filepath))
else:
if size > 0:
blob_size = os.path.getsize(full_path)
elif self.spec['protocol'] == 's3':
full_path = '/'.join(
(self.database,) + subfold(blob_hash.hex, self.spec['subfolding']) + (blob_hash.hex,))
if size < 0:
try:
blob = self.s3.get(full_path)
except MissingExternalFile:
if not SUPPORT_MIGRATED_BLOBS:
raise
relative_filepath, contents_hash = (self & {'hash': blob_hash}).fetch1(
'filepath', 'contents_hash')
if relative_filepath is None:
raise
blob = self.s3.get(relative_filepath)
else:
blob = self.s3.partial_get(full_path, 0, size)
blob_size = self.s3.get_size(full_path)

if cache_folder and size < 0:
try:
blob = self.s3.get(full_path)
except MissingExternalFile:
if not SUPPORT_MIGRATED_BLOBS:
raise
relative_filepath, contents_hash = (self & {'hash': blob_hash}).fetch1(
'filepath', 'contents_hash')
if relative_filepath is None:
raise
blob = self.s3.get(relative_filepath)
if cache_folder:
if not os.path.exists(cache_path):
os.makedirs(cache_path)
safe_write(os.path.join(cache_path, blob_hash.hex), blob)
return blob

# --- ATTACHMENTS ---
def get_attachment_filename(self, hash):
pass

def upload_attachment(self, local_filepath):
pass

def download_attachment(self, hash):

""" save attachment from memory buffer into the save_path """
rel_path, buffer = buffer.split(b'\0', 1)
file_path = os.path.abspath(os.path.join(save_path, rel_path.decode()))

if os.path.isfile(file_path):
# generate a new filename
file, ext = os.path.splitext(file_path)
file_path = next(f for f in ('%s_%04x%s' % (file, n, ext) for n in count())
if not os.path.isfile(f))

with open(file_path, mode='wb') as f:
f.write(buffer)
return file_path

# --- FILEPATH ---

def fput(self, local_filepath):
"""
Raise exception if an external entry already exists with a different contents checksum.
Otherwise, copy (with overwrite) file to remote and
If an external entry exists with the same checksum, then no copying should occur
"""
local_folder = os.path.dirname(local_filepath)
relative_filepath = os.path.relpath(local_filepath, start=self.spec['stage'])
if relative_filepath.startswith(os.path.pardir):
raise DataJointError('The path {path} is not in stage {stage}'.format(
path=local_folder, stage=self.spec['stage']))
uuid = uuid_from_buffer(init_string=relative_filepath)
contents_hash = uuid_from_file(local_filepath)

return blob if size < 0 else (blob, blob_size)
# check if the remote file already exists and verify that it matches
check_hash = (self & {'hash': uuid}).fetch('contents_hash')
if check_hash:
# the tracking entry exists, check that it's the same file as before
if contents_hash != check_hash[0]:
raise DataJointError(
"A different version of '{file}' has already been placed.".format(file=relative_filepath))
else:
# upload the file and create its tracking entry
if self.spec['protocol'] == 's3':
self.s3.fput(relative_filepath, local_filepath, contents_hash=str(contents_hash))
else:
remote_file = os.path.join(self.spec['location'], relative_filepath)
safe_copy(local_filepath, remote_file, overwrite=True)
self.connection.query(
"INSERT INTO {tab} (hash, size, filepath, contents_hash) VALUES (%s, {size}, '{filepath}', %s)".format(
tab=self.full_table_name, size=os.path.getsize(local_filepath),
filepath=relative_filepath), args=(uuid.bytes, contents_hash.bytes))
return uuid

def fget(self, filepath_hash):
"""
Expand Down
21 changes: 10 additions & 11 deletions datajoint/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def _get(connection, attr, data, squeeze, download_path):

extern = connection.schemas[attr.database].external[attr.store] if attr.is_external else None

# apply attribute adapter if present
adapt = attr.adapter.get if attr.adapter else lambda x: x

if attr.is_filepath:
Expand All @@ -57,19 +58,17 @@ def _get(connection, attr, data, squeeze, download_path):
# 2. check if the file already exists at download_path, verify checksum
# 3. if exists and checksum passes then return the local filepath
# 4. Otherwise, download the remote file and return the new filepath
peek, size = extern.peek(uuid.UUID(bytes=data)) if attr.is_external else (data, len(data))
assert size is not None
filename = peek.split(b"\0", 1)[0].decode()
size -= len(filename) + 1
filepath = os.path.join(download_path, filename)
if os.path.isfile(filepath) and size == os.path.getsize(filepath):
local_checksum = hash.uuid_from_file(filepath, filename + '\0')
filename = (extern.get_attachment_filename(uuid.UUID(bytes=data))
if attr.is_external else data.split(b"\0", 1)[0].decode())
local_filepath = os.path.join(download_path, filename)
if os.path.isfile(local_filepath):
local_checksum = hash.uuid_from_file(local_filepath, filename + '\0')
remote_checksum = uuid.UUID(bytes=data) if attr.is_external else hash.uuid_from_buffer(data)
if local_checksum == remote_checksum:
return adapt(filepath) # the existing file is okay
return adapt(local_filepath) # no need to download again
# Download remote attachment
if attr.is_external:
data = extern.get(uuid.UUID(bytes=data))
data = extern.download_attachment(uuid.UUID(bytes=data), local_filepath)
return adapt(attach.save(data, download_path)) # download file from remote store

return adapt(uuid.UUID(bytes=data) if attr.uuid else (
Expand Down Expand Up @@ -104,8 +103,8 @@ def __init__(self, expression):
def __call__(self, *attrs, offset=None, limit=None, order_by=None, format=None, as_dict=None,
squeeze=False, download_path='.'):
"""
Fetches the expression results from the database into an np.array or list of dictionaries and unpacks blob attributes.

Fetches the expression results from the database into an np.array or list of dictionaries and
unpacks blob attributes.
:param attrs: zero or more attributes to fetch. If not provided, the call will return
all attributes of this relation. If provided, returns tuples with an entry for each attribute.
:param offset: the number of tuples to skip in the returned result
Expand Down
11 changes: 10 additions & 1 deletion datajoint/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas
import logging
import uuid
from os import path
from .settings import config
from .declare import declare, alter
from .expression import QueryExpression
Expand Down Expand Up @@ -270,8 +271,16 @@ def make_placeholder(name, value):
value = blob.pack(value)
value = self.external[attr.store].put(value).bytes if attr.is_external else value
elif attr.is_attachment:
# value is a local_path to the attachment
if attr.is_external:
value = self.external[attr.store].upload_attachment(value) # value is local filepath
else:
# if database blob, then insert the file contents
with open(value, mode='rb') as f:
contents = f.read()
value = str.encode(path.basename(value)) + b'\0' + contents
value = attach.load(value)
value = self.external[attr.store].put(value).bytes if attr.is_external else value
value = self.external[attr.store].upload_attachment(value).bytes if attr.is_external else value
elif attr.is_filepath:
value = self.external[attr.store].fput(value).bytes
elif attr.numeric:
Expand Down