Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
8f85d8c
Updated blobs shared code
annatisch Jul 19, 2019
a3aff08
Started blob refactor
annatisch Jul 19, 2019
90b9640
Refactoring upload
annatisch Jul 22, 2019
6d33776
Merge remote-tracking branch 'upstream/storage-preview2'
annatisch Jul 22, 2019
de42b24
Updated shared code
annatisch Jul 22, 2019
d8ec863
Started fixing tests
annatisch Jul 22, 2019
8d9bd39
Merge remote-tracking branch 'upstream/storage-preview2'
annatisch Jul 22, 2019
3a231d8
Refactored sync blobs
annatisch Jul 23, 2019
d527688
Added blob async APIs
annatisch Jul 24, 2019
76b9e3c
Some fixes
annatisch Jul 24, 2019
9a8fb9d
Append blob async tests
annatisch Jul 24, 2019
900e07b
blob access async tests
annatisch Jul 24, 2019
c8668c9
Blob client async tests
annatisch Jul 24, 2019
05b9020
encryption async tests
annatisch Jul 24, 2019
bee7e3f
Patch for azure core exception
annatisch Jul 24, 2019
dda031b
blob retry async
annatisch Jul 24, 2019
99bc35d
Retry async tests
annatisch Jul 24, 2019
4fd5b84
Get blob async tests
annatisch Jul 25, 2019
1eb8192
Bug fix for clear page operation
annatisch Jul 25, 2019
24dd430
More async tests + upload fix
annatisch Jul 26, 2019
30cbc1b
Merged from preview 2
annatisch Jul 26, 2019
076d062
Merged blobs
annatisch Jul 26, 2019
fa808bb
Updated Files shared code
annatisch Jul 26, 2019
38f6fc5
Updated queue shared code
annatisch Jul 26, 2019
e6d9544
Merge from upstream
annatisch Jul 26, 2019
704d8bb
async tests pass except 2 common blob tests
kristapratico Jul 26, 2019
9bcd94a
adds async paging to blobs and some async tests (not all pass)
kristapratico Jul 29, 2019
8a6df99
Merge pull request #2 from annatisch/async-blob-tests
kristapratico Jul 29, 2019
d28961c
initial commit
rakshith91 Jul 30, 2019
375323e
block_blob_tests
rakshith91 Jul 30, 2019
35261e9
page blob tests
rakshith91 Jul 30, 2019
0b5eaed
Merge pull request #3 from annatisch/async_tests
Jul 30, 2019
bdd8396
Merge branch 'storage-preview2' into master
kristapratico Jul 31, 2019
37fb8e3
fix for special chars, some tests, and recordings
kristapratico Jul 31, 2019
4bcbca7
add to shared storage and fix import
kristapratico Jul 31, 2019
a984b39
adding more tests/recordings
kristapratico Jul 31, 2019
90a7c61
more tests/recordings
kristapratico Aug 1, 2019
36155a8
Merge pull request #4 from annatisch/async-blob-tests
kristapratico Aug 1, 2019
5fb77f5
rerecord tests, fix imports
kristapratico Aug 1, 2019
e97af55
Merge pull request #5 from annatisch/async-blob-tests
kristapratico Aug 1, 2019
4687f62
fix import again
kristapratico Aug 1, 2019
7552cdb
Merge pull request #6 from annatisch/async-blob-tests
kristapratico Aug 1, 2019
753006e
blacklist azure-servicemanagement-legacy
kristapratico Aug 1, 2019
748237e
Merge pull request #7 from annatisch/async-blob-tests
kristapratico Aug 1, 2019
dfabb8b
get CI to run
kristapratico Aug 1, 2019
0ad6851
Merge pull request #8 from annatisch/async-blob-tests
kristapratico Aug 1, 2019
634a74a
rerecord all async tests
kristapratico Aug 2, 2019
2848c33
Merge pull request #9 from annatisch/async-blob-tests
kristapratico Aug 2, 2019
c607dde
testing
kristapratico Aug 2, 2019
b3b67fc
Merge pull request #10 from annatisch/async-blob-tests
kristapratico Aug 2, 2019
4df2eef
add variable indirection for storage live tests. this is a temporary …
danieljurek Aug 2, 2019
615fc42
newline
danieljurek Aug 2, 2019
f51dfb4
Merge pull request #11 from danieljurek/async-blob-tests
kristapratico Aug 2, 2019
7edbc6e
print envar
kristapratico Aug 2, 2019
ace7ea7
Merge pull request #12 from annatisch/async-blob-tests
kristapratico Aug 2, 2019
85490ca
remove testing
kristapratico Aug 3, 2019
d48e917
Merge pull request #13 from annatisch/async-blob-tests
kristapratico Aug 3, 2019
20adbbc
adjust pypy testing
kristapratico Aug 3, 2019
2b1a0ae
Merge pull request #14 from annatisch/async-blob-tests
kristapratico Aug 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactored sync blobs
  • Loading branch information
annatisch committed Jul 23, 2019
commit 3a231d8a1113a76203c25988e7056377c5d686df
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _parallel_uploads(executor, uploader, pending, running):
except StopIteration:
break
else:
running.add(executor.submit(uploader.process_chunk, next_chunk))
running.add(executor.submit(uploader, next_chunk))

# Wait for the remaining uploads to finish
done, _running = futures.wait(running)
Expand All @@ -55,7 +55,7 @@ def upload_data_chunks(

if encryption_options:
encryptor, padder = get_blob_encryptor_and_padder(
encryption_options.get('key'),
encryption_options.get('cek'),
encryption_options.get('vector'),
uploader_class is not PageBlobChunkUploader)
kwargs['encryptor'] = encryptor
Expand All @@ -74,20 +74,18 @@ def upload_data_chunks(
parallel=parallel,
validate_content=validate_content,
**kwargs)

if parallel:
executor = futures.ThreadPoolExecutor(max_connections)
upload_tasks = uploader.get_chunk_streams()
running_futures = [
executor.submit(uploader.process_chunk, u)
for u in islice(upload_tasks, 0, max_connections)
]
range_ids = _parallel_uploads(executor, uploader, upload_tasks, running_futures)
range_ids = _parallel_uploads(executor, uploader.process_chunk, upload_tasks, running_futures)
else:
range_ids = [uploader.process_chunk(result) for result in uploader.get_chunk_streams()]

if any(range_ids):
return range_ids
return [r[1] for r in sorted(range_ids, key=lambda r: r[0])]
return uploader.response_headers


Expand Down Expand Up @@ -118,8 +116,10 @@ def upload_substream_blocks(
executor.submit(uploader.process_substream_block, u)
for u in islice(upload_tasks, 0, max_connections)
]
return _parallel_uploads(executor, uploader, upload_tasks, running_futures)
return [uploader.process_substream_block(b) for b in uploader.get_substream_blocks()]
range_ids = _parallel_uploads(executor, uploader.process_substream_block, upload_tasks, running_futures)
else:
range_ids = [uploader.process_substream_block(b) for b in uploader.get_substream_blocks()]
return sorted(range_ids)


class _ChunkUploader(object): # pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -229,6 +229,7 @@ def _upload_substream_block(self, block_id, block_stream):

def _upload_substream_block_with_progress(self, block_id, block_stream):
range_id = self._upload_substream_block(block_id, block_stream)
self._update_progress(len(block_stream))
return range_id

def set_response_properties(self, resp):
Expand All @@ -238,17 +239,23 @@ def set_response_properties(self, resp):

class BlockBlobChunkUploader(_ChunkUploader):

def __init__(self, *args, **kwargs):
kwargs.pop('modified_access_conditions', None)
super(BlockBlobChunkUploader, self).__init__(*args, **kwargs)
self.current_length = None

def _upload_chunk(self, chunk_offset, chunk_data):
# TODO: This is incorrect, but works with recording.
block_id = encode_base64(url_quote(encode_base64('{0:032d}'.format(chunk_offset))))
index = '{0:032d}'.format(chunk_offset)
block_id = encode_base64(url_quote(encode_base64(index)))
self.service.stage_block(
block_id,
len(chunk_data),
chunk_data,
data_stream_total=self.total_size,
upload_stream_current=self.progress_total,
**self.request_options)
return block_id
return index, block_id

def _upload_substream_block(self, block_id, block_stream):
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async def upload_data_chunks(

if encryption_options:
encryptor, padder = get_blob_encryptor_and_padder(
encryption_options.get('key'),
encryption_options.get('cek'),
encryption_options.get('vector'),
uploader_class is not PageBlobChunkUploader)
kwargs['encryptor'] = encryptor
Expand Down
112 changes: 42 additions & 70 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_upload_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,13 @@ def _convert_mod_error(error):
raise overwrite_error


def _create_append_blob(
client,
blob_headers,
timeout,
access_conditions,
mod_conditions,
headers,
**kwargs):
created = client.create(
content_length=0,
blob_http_headers=blob_headers,
timeout=timeout,
lease_access_conditions=access_conditions,
modified_access_conditions=mod_conditions,
cls=return_response_headers,
headers=headers,
**kwargs)
ModifiedAccessConditions(if_match=created['etag']) # TODO: Not working...
def _any_conditions(modified_access_conditions=None, **kwargs):
return any([
modified_access_conditions.if_modified_since,
modified_access_conditions.if_unmodified_since,
modified_access_conditions.if_none_match,
modified_access_conditions.if_match
])


def upload_block_blob( # pylint: disable=too-many-locals
Expand All @@ -84,8 +73,8 @@ def upload_block_blob( # pylint: disable=too-many-locals
encryption_options=None,
**kwargs):
try:
if not overwrite and not kwargs.get('modified_access_conditions'):
kwargs['modified_access_conditions'] = ModifiedAccessConditions(if_none_match='*')
if not overwrite and not _any_conditions(**kwargs):
kwargs['modified_access_conditions'].if_none_match = '*'
adjusted_count = length
if (encryption_options.get('key') is not None) and (adjusted_count is not None):
adjusted_count += (16 - (length % 16))
Expand Down Expand Up @@ -123,7 +112,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
if encryption_options.get('key'):
cek, iv, encryption_data = generate_blob_encryption_data(encryption_options['key'])
headers['x-ms-meta-encryptiondata'] = encryption_data
encryption_options['key'] = cek
encryption_options['cek'] = cek
encryption_options['vector'] = iv
block_ids = upload_data_chunks(
service=client,
Expand All @@ -145,7 +134,6 @@ def upload_block_blob( # pylint: disable=too-many-locals
max_connections=max_connections,
stream=stream,
validate_content=validate_content,
encryption_options=encryption_options,
**kwargs
)

Expand Down Expand Up @@ -179,8 +167,8 @@ def upload_page_blob(
encryption_options=None,
**kwargs):
try:
if not overwrite and not kwargs.get('modified_access_conditions'):
kwargs['modified_access_conditions'] = ModifiedAccessConditions(if_none_match='*')
if not overwrite and not _any_conditions(**kwargs):
kwargs['modified_access_conditions'].if_none_match = '*'
if length is None or length < 0:
raise ValueError("A content length must be specified for a Page Blob.")
if length % 512 != 0:
Expand Down Expand Up @@ -227,49 +215,39 @@ def upload_page_blob(


def upload_append_blob(
client,
stream,
length,
overwrite,
headers,
blob_headers,
access_conditions,
mod_conditions,
maxsize_condition,
validate_content,
timeout,
max_connections,
blob_settings,
**kwargs
):
client=None,
stream=None,
length=None,
overwrite=None,
headers=None,
validate_content=None,
max_connections=None,
blob_settings=None,
encryption_options=None,
**kwargs):
try:
if length == 0:
return {}
blob_headers = kwargs.pop('blob_headers', None)
append_conditions = AppendPositionAccessConditions(
max_size=maxsize_condition,
max_size=kwargs.pop('maxsize_condition', None),
append_position=None)
try:
if overwrite:
_create_append_blob(
client,
blob_headers,
timeout,
access_conditions,
mod_conditions,
headers,
client.create(
content_length=0,
blob_http_headers=blob_headers,
headers=headers,
**kwargs)
return upload_data_chunks(
blob_service=client,
blob_size=length,
block_size=blob_settings.max_block_size,
service=client,
uploader_class=AppendBlobChunkUploader,
total_size=length,
chunk_size=blob_settings.max_block_size,
stream=stream,
append_conditions=append_conditions,
max_connections=max_connections,
validate_content=validate_content,
access_conditions=access_conditions,
uploader_class=AppendBlobChunkUploader,
modified_access_conditions=mod_conditions,
timeout=timeout,
append_position_access_conditions=append_conditions,
**kwargs)
except StorageErrorException as error:
if error.response.status_code != 404:
Expand All @@ -282,26 +260,20 @@ def upload_append_blob(
except UnsupportedOperation:
# if body is not seekable, then retry would not work
raise error
_create_append_blob(
client,
blob_headers,
timeout,
access_conditions,
mod_conditions,
headers,
client.create(
content_length=0,
blob_http_headers=blob_headers,
headers=headers,
**kwargs)
return upload_data_chunks(
blob_service=client,
blob_size=length,
block_size=blob_settings.max_block_size,
service=client,
uploader_class=AppendBlobChunkUploader,
total_size=length,
chunk_size=blob_settings.max_block_size,
stream=stream,
append_conditions=append_conditions,
max_connections=max_connections,
validate_content=validate_content,
access_conditions=access_conditions,
uploader_class=AppendBlobChunkUploader,
modified_access_conditions=mod_conditions,
timeout=timeout,
append_position_access_conditions=append_conditions,
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
34 changes: 15 additions & 19 deletions sdk/storage/azure-storage-blob/azure/storage/blob/blob_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from ._deserialize import deserialize_blob_properties, deserialize_blob_stream
from ._upload_helpers import (
upload_block_blob,
# upload_append_blob,
upload_append_blob,
upload_page_blob)
from .models import BlobType, BlobBlock
from .lease import LeaseClient, get_access_conditions
Expand Down Expand Up @@ -495,24 +495,20 @@ def upload_blob( # pylint: disable=too-many-locals
blob_settings=self._config,
encryption_options=encryption_options,
**kwargs)
# if blob_type == BlobType.AppendBlob:
# if self.require_encryption or (self.key_encryption_key is not None):
# raise ValueError(_ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION)
# return upload_append_blob(
# self._client.append_blob,
# stream,
# length,
# overwrite,
# headers,
# blob_headers,
# access_conditions,
# mod_conditions,
# maxsize_condition,
# validate_content,
# timeout,
# max_connections,
# self._config,
# **kwargs)
if blob_type == BlobType.AppendBlob:
if self.require_encryption or (self.key_encryption_key is not None):
raise ValueError(_ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION)
return upload_append_blob(
client=self._client.append_blob,
stream=stream,
length=length,
overwrite=overwrite,
headers=headers,
validate_content=validate_content,
timeout=timeout,
max_connections=max_connections,
blob_settings=self._config,
**kwargs)
raise ValueError("Unsupported BlobType: {}".format(blob_type))

def download_blob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_soft_delete_and_undelete_blob(self):
blob_service_client.set_service_properties(delete_retention_policy=delete_retention_policy)

# Instantiate a ContainerClient
container_client = blob_service_client.get_container_client("containerforblobs")
container_client = blob_service_client.get_container_client("containerfordeletedblobs")

# Create new Container
try:
Expand All @@ -114,10 +114,7 @@ def test_soft_delete_and_undelete_blob(self):

# Upload a blob to the container
with open(SOURCE_FILE, "rb") as data:
container_client.upload_blob(name="my_blob", data=data)

# Get the blob client
blob_client = blob_service_client.get_blob_client("containerforblobs", "my_blob")
blob_client = container_client.upload_blob(name="my_blob", data=data)

# Soft delete blob in the container (blob can be recovered with undelete)
blob_client.delete_blob()
Expand All @@ -134,7 +131,7 @@ def test_soft_delete_and_undelete_blob(self):
assert properties is not None

# Delete container
blob_service_client.delete_container("containerforblobs")
blob_service_client.delete_container("containerfordeletedblobs")

@record
def test_acquire_lease_on_blob(self):
Expand Down
10 changes: 5 additions & 5 deletions sdk/storage/azure-storage-blob/tests/test_common_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ def test_download_to_file_with_credential(self):
download_blob_from_url(
source_blob.url, FILE_PATH,
max_connections=2,
credential=self.settings.STORAGE_ACCOUNT_KEY)
credential=self.settings.REMOTE_STORAGE_ACCOUNT_KEY)

# Assert
with open(FILE_PATH, 'rb') as stream:
Expand All @@ -1495,7 +1495,7 @@ def test_download_to_stream_with_credential(self):
download_blob_from_url(
source_blob.url, stream,
max_connections=2,
credential=self.settings.STORAGE_ACCOUNT_KEY)
credential=self.settings.REMOTE_STORAGE_ACCOUNT_KEY)

# Assert
with open(FILE_PATH, 'rb') as stream:
Expand All @@ -1514,7 +1514,7 @@ def test_download_to_file_with_existing_file(self):
# Act
download_blob_from_url(
source_blob.url, FILE_PATH,
credential=self.settings.STORAGE_ACCOUNT_KEY)
credential=self.settings.REMOTE_STORAGE_ACCOUNT_KEY)

with self.assertRaises(ValueError):
download_blob_from_url(source_blob.url, FILE_PATH)
Expand All @@ -1536,13 +1536,13 @@ def test_download_to_file_with_existing_file_overwrite(self):
# Act
download_blob_from_url(
source_blob.url, FILE_PATH,
credential=self.settings.STORAGE_ACCOUNT_KEY)
credential=self.settings.REMOTE_STORAGE_ACCOUNT_KEY)

data2 = b'ABCDEFGH' * 1024 * 1024
source_blob = self._create_remote_block_blob(blob_data=data2)
download_blob_from_url(
source_blob.url, FILE_PATH, overwrite=True,
credential=self.settings.STORAGE_ACCOUNT_KEY)
credential=self.settings.REMOTE_STORAGE_ACCOUNT_KEY)

# Assert
with open(FILE_PATH, 'rb') as stream:
Expand Down
Loading