Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5eca2bb
[storage] Add async APIs for Files SDK (#6405)
annatisch Jul 18, 2019
aba5f69
Async implementation for storage queues. (#6360)
rakshith91 Jul 22, 2019
323cc61
Merge remote-tracking branch 'origin/master' into storage-preview2
annatisch Jul 22, 2019
7a737d5
Merge latest azure-core changes
annatisch Jul 22, 2019
47d063a
Updated shared blob client
annatisch Jul 22, 2019
c755392
Merge remote-tracking branch 'origin/master' into storage-preview2
annatisch Jul 25, 2019
6296b96
add decorator and policy to storage_files and propagate context for i…
SuyogSoti Jul 25, 2019
e08bfc6
Trace storage queue (#6449)
SuyogSoti Jul 25, 2019
6ada100
Trace storage blob (#6478)
SuyogSoti Jul 25, 2019
118d10f
New paging to storage preview2 branch (with async) (#6493)
lmazuel Jul 26, 2019
3f070bb
Fix async tests
lmazuel Jul 26, 2019
1c37b69
Fix continuation token bug
lmazuel Jul 26, 2019
804112c
Merge remote-tracking branch 'upstream/master' into storage-preview2
lmazuel Jul 26, 2019
c6b2151
Support for aiohttp records from vcrpy (#6552)
lmazuel Jul 30, 2019
eb1051c
Async recording for Storage (#6560)
lmazuel Jul 31, 2019
b5a2491
Merge remote-tracking branch 'upstream/master' into storage-preview2
lmazuel Jul 31, 2019
4b64b40
Aiohttp is the only default for async clients (#6561)
lmazuel Jul 31, 2019
03aa8f6
seed tests.yml (#6645)
danieljurek Aug 2, 2019
3acf780
[storage] Blob async APIs (#6489)
annatisch Aug 3, 2019
050fa0c
Merge branch 'master' into storage-preview2
kristapratico Aug 3, 2019
48d2651
Storage Recordings For Queues and Files (#6629)
Aug 4, 2019
254a0d1
allowing specific project targeting for storage livetests
scbedd Aug 5, 2019
ce11dcc
allowing BuildTargetingString to flow through for templates following…
scbedd Aug 5, 2019
55190af
passing service directory to setup task
scbedd Aug 5, 2019
aeec5c9
Merge remote-tracking branch 'origin/master' into storage-preview2
annatisch Aug 5, 2019
17e3eb3
Support for Live storage tests (#6663)
Aug 5, 2019
a93bcdd
[storage] Preview2 updates (#6658)
annatisch Aug 5, 2019
6458fa0
fix test (#6674)
Aug 6, 2019
ca9a0ac
Fix for queue models (#6681)
annatisch Aug 6, 2019
1c44a9c
[storage] Readme tweaks (#6697)
annatisch Aug 6, 2019
5665c17
Some final tweaks (#6687)
Aug 6, 2019
2e614ab
[storage] Better async import error message (#6700)
annatisch Aug 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Trace storage queue (#6449)
* decorators and policy added

* properly clear context

* dont decorate private stuff

* added policy

* decorated async

* get rid of those that dont make netowrk calls

* propagate context
  • Loading branch information
SuyogSoti authored Jul 25, 2019
commit e08bfc601f31b1d7cb53552c3ed7489a5f19155f
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024
_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.'
_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = "{0} should be a seekable file-like/io.IOBase type stream object."


def _parallel_uploads(executor, uploader, pending, running):
Expand Down Expand Up @@ -152,7 +152,7 @@ def __init__(self, service, total_size, chunk_size, stream, parallel, encryptor=
def get_chunk_streams(self):
index = 0
while True:
data = b''
data = b""
read_size = self.chunk_size

# Buffer until we either reach the end of the stream or get a whole chunk.
Expand All @@ -161,12 +161,12 @@ def get_chunk_streams(self):
read_size = min(self.chunk_size - len(data), self.total_size - (index + len(data)))
temp = self.stream.read(read_size)
if not isinstance(temp, six.binary_type):
raise TypeError('Blob data should be of type bytes.')
raise TypeError("Blob data should be of type bytes.")
data += temp or b""

# We have read an empty string and so are at the end
# of the buffer or we have read a full chunk.
if temp == b'' or len(data) == self.chunk_size:
if temp == b"" or len(data) == self.chunk_size:
break

if len(data) == self.chunk_size:
Expand Down Expand Up @@ -249,7 +249,8 @@ def _upload_chunk(self, chunk_offset, chunk_data):
chunk_data,
data_stream_total=self.total_size,
upload_stream_current=self.progress_total,
**self.request_options)
**self.request_options
)
return block_id

def _upload_substream_block(self, block_id, block_stream):
Expand All @@ -260,7 +261,8 @@ def _upload_substream_block(self, block_id, block_stream):
block_stream,
data_stream_total=self.total_size,
upload_stream_current=self.progress_total,
**self.request_options)
**self.request_options
)
finally:
block_stream.close()
return block_id
Expand All @@ -272,15 +274,15 @@ def _is_chunk_empty(self, chunk_data):
# read until non-zero byte is encountered
# if reached the end without returning, then chunk_data is all 0's
for each_byte in chunk_data:
if each_byte not in [0, b'\x00']:
if each_byte not in [0, b"\x00"]:
return False
return True

def _upload_chunk(self, chunk_offset, chunk_data):
# avoid uploading the empty pages
if not self._is_chunk_empty(chunk_data):
chunk_end = chunk_offset + len(chunk_data) - 1
content_range = 'bytes={0}-{1}'.format(chunk_offset, chunk_end)
content_range = "bytes={0}-{1}".format(chunk_offset, chunk_end)
computed_md5 = None
self.response_headers = self.service.upload_pages(
chunk_data,
Expand All @@ -290,7 +292,8 @@ def _upload_chunk(self, chunk_offset, chunk_data):
cls=return_response_headers,
data_stream_total=self.total_size,
upload_stream_current=self.progress_total,
**self.request_options)
**self.request_options
)

if not self.parallel and self.request_options.get('modified_access_conditions'):
self.request_options['modified_access_conditions'].if_match = self.response_headers['etag']
Expand All @@ -312,7 +315,7 @@ def _upload_chunk(self, chunk_offset, chunk_data):
upload_stream_current=self.progress_total,
**self.request_options
)
self.current_length = int(self.response_headers['blob_append_offset'])
self.current_length = int(self.response_headers["blob_append_offset"])
else:
self.request_options['append_position_access_conditions'].append_position = \
self.current_length + chunk_offset
Expand Down Expand Up @@ -362,8 +365,9 @@ def __init__(self, wrapped_stream, stream_begin_index, length, lockObj):

# we must avoid buffering more than necessary, and also not use up too much memory
# so the max buffer size is capped at 4MB
self._max_buffer_size = length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE \
else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
self._max_buffer_size = (
length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
)
self._current_buffer_start = 0
self._current_buffer_size = 0
super(SubStream, self).__init__()
Expand Down Expand Up @@ -393,7 +397,7 @@ def read(self, n):

# return fast
if n == 0 or self._buffer.closed:
return b''
return b""

# attempt first read from the read buffer and update position
read_buffer = self._buffer.read(n)
Expand Down Expand Up @@ -449,7 +453,7 @@ def seek(self, offset, whence=0):
start_index = self._position
elif whence is SEEK_END:
start_index = self._length
offset = - offset
offset = -offset
else:
raise ValueError("Invalid argument for the 'whence' parameter.")

Expand Down Expand Up @@ -492,10 +496,11 @@ class IterStreamer(object):
"""
File-like streaming iterator.
"""
def __init__(self, generator, encoding='UTF-8'):

def __init__(self, generator, encoding="UTF-8"):
self.generator = generator
self.iterator = iter(generator)
self.leftover = b''
self.leftover = b""
self.encoding = encoding

def __len__(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from azure.core import Configuration
from azure.core.pipeline import Pipeline
from azure.core.pipeline.transport import RequestsTransport
from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy
from azure.core.pipeline.policies import (
RedirectPolicy,
ContentDecodePolicy,
Expand Down Expand Up @@ -175,6 +176,7 @@ def _create_pipeline(self, credential, **kwargs):
config.retry_policy,
config.logging_policy,
StorageResponseHook(**kwargs),
DistributedTracingPolicy(),
]
return config, Pipeline(config.transport, policies=policies)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from azure.core.pipeline.transport import AioHttpTransport as AsyncTransport
except ImportError:
from azure.core.pipeline.transport import AsyncioRequestsTransport as AsyncTransport
from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy
from azure.core.pipeline.policies import (
ContentDecodePolicy,
BearerTokenCredentialPolicy,
Expand Down Expand Up @@ -81,5 +82,6 @@ def _create_pipeline(self, credential, **kwargs):
config.retry_policy,
config.logging_policy,
AsyncStorageResponseHook(**kwargs),
DistributedTracingPolicy(),
]
return config, AsyncPipeline(config.transport, policies=policies)
Loading