Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
8f85d8c
Updated blobs shared code
annatisch Jul 19, 2019
a3aff08
Started blob refactor
annatisch Jul 19, 2019
90b9640
Refactoring upload
annatisch Jul 22, 2019
6d33776
Merge remote-tracking branch 'upstream/storage-preview2'
annatisch Jul 22, 2019
de42b24
Updated shared code
annatisch Jul 22, 2019
d8ec863
Started fixing tests
annatisch Jul 22, 2019
8d9bd39
Merge remote-tracking branch 'upstream/storage-preview2'
annatisch Jul 22, 2019
3a231d8
Refactored sync blobs
annatisch Jul 23, 2019
d527688
Added blob async APIs
annatisch Jul 24, 2019
76b9e3c
Some fixes
annatisch Jul 24, 2019
9a8fb9d
Append blob async tests
annatisch Jul 24, 2019
900e07b
blob access async tests
annatisch Jul 24, 2019
c8668c9
Blob client async tests
annatisch Jul 24, 2019
05b9020
encryption async tests
annatisch Jul 24, 2019
bee7e3f
Patch for azure core exception
annatisch Jul 24, 2019
dda031b
blob retry async
annatisch Jul 24, 2019
99bc35d
Retry async tests
annatisch Jul 24, 2019
4fd5b84
Get blob async tests
annatisch Jul 25, 2019
1eb8192
Bug fix for clear page operation
annatisch Jul 25, 2019
24dd430
More async tests + upload fix
annatisch Jul 26, 2019
30cbc1b
Merged from preview 2
annatisch Jul 26, 2019
076d062
Merged blobs
annatisch Jul 26, 2019
fa808bb
Updated Files shared code
annatisch Jul 26, 2019
38f6fc5
Updated queue shared code
annatisch Jul 26, 2019
e6d9544
Merge from upstream
annatisch Jul 26, 2019
704d8bb
async tests pass except 2 common blob tests
kristapratico Jul 26, 2019
9bcd94a
adds async paging to blobs and some async tests (not all pass)
kristapratico Jul 29, 2019
8a6df99
Merge pull request #2 from annatisch/async-blob-tests
kristapratico Jul 29, 2019
d28961c
initial commit
rakshith91 Jul 30, 2019
375323e
block_blob_tests
rakshith91 Jul 30, 2019
35261e9
page blob tests
rakshith91 Jul 30, 2019
0b5eaed
Merge pull request #3 from annatisch/async_tests
Jul 30, 2019
bdd8396
Merge branch 'storage-preview2' into master
kristapratico Jul 31, 2019
37fb8e3
fix for special chars, some tests, and recordings
kristapratico Jul 31, 2019
4bcbca7
add to shared storage and fix import
kristapratico Jul 31, 2019
a984b39
adding more tests/recordings
kristapratico Jul 31, 2019
90a7c61
more tests/recordings
kristapratico Aug 1, 2019
36155a8
Merge pull request #4 from annatisch/async-blob-tests
kristapratico Aug 1, 2019
5fb77f5
rerecord tests, fix imports
kristapratico Aug 1, 2019
e97af55
Merge pull request #5 from annatisch/async-blob-tests
kristapratico Aug 1, 2019
4687f62
fix import again
kristapratico Aug 1, 2019
7552cdb
Merge pull request #6 from annatisch/async-blob-tests
kristapratico Aug 1, 2019
753006e
blacklist azure-servicemanagement-legacy
kristapratico Aug 1, 2019
748237e
Merge pull request #7 from annatisch/async-blob-tests
kristapratico Aug 1, 2019
dfabb8b
get CI to run
kristapratico Aug 1, 2019
0ad6851
Merge pull request #8 from annatisch/async-blob-tests
kristapratico Aug 1, 2019
634a74a
rerecord all async tests
kristapratico Aug 2, 2019
2848c33
Merge pull request #9 from annatisch/async-blob-tests
kristapratico Aug 2, 2019
c607dde
testing
kristapratico Aug 2, 2019
b3b67fc
Merge pull request #10 from annatisch/async-blob-tests
kristapratico Aug 2, 2019
4df2eef
add variable indirection for storage live tests. this is a temporary …
danieljurek Aug 2, 2019
615fc42
newline
danieljurek Aug 2, 2019
f51dfb4
Merge pull request #11 from danieljurek/async-blob-tests
kristapratico Aug 2, 2019
7edbc6e
print envar
kristapratico Aug 2, 2019
ace7ea7
Merge pull request #12 from annatisch/async-blob-tests
kristapratico Aug 2, 2019
85490ca
remove testing
kristapratico Aug 3, 2019
d48e917
Merge pull request #13 from annatisch/async-blob-tests
kristapratico Aug 3, 2019
20adbbc
adjust pypy testing
kristapratico Aug 3, 2019
2b1a0ae
Merge pull request #14 from annatisch/async-blob-tests
kristapratico Aug 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Updated blobs shared code
  • Loading branch information
annatisch committed Jul 19, 2019
commit 8f85d8c036514888802e5f48166465c4c34ed8a8
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ def _add_authorization_header(self, request, string_to_sign):
raise _wrap_exception(ex, AzureSigningError)

def on_request(self, request, **kwargs):
if not 'content-type' in request.http_request.headers:
request.http_request.headers['content-type'] = 'application/xml; charset=utf-8'

string_to_sign = \
self._get_verb(request) + \
self._get_headers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ def __init__(
self.require_encryption = kwargs.get('require_encryption', False)
self.key_encryption_key = kwargs.get('key_encryption_key')
self.key_resolver_function = kwargs.get('key_resolver_function')

self._config, self._pipeline = create_pipeline(
self.credential, storage_sdk=service, hosts=self._hosts, **kwargs)
self._config, self._pipeline = self._create_pipeline(self.credential, storage_sdk=service, **kwargs)

def __enter__(self):
self._client.__enter__()
Expand Down Expand Up @@ -145,6 +143,38 @@ def _format_query_string(self, sas_token, credential, snapshot=None, share_snaps
credential = None
return query_str.rstrip('?&'), credential

def _create_pipeline(self, credential, **kwargs):
# type: (Any, **Any) -> Tuple[Configuration, Pipeline]
credential_policy = None
if hasattr(credential, 'get_token'):
credential_policy = BearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE)
elif isinstance(credential, SharedKeyCredentialPolicy):
credential_policy = credential
elif credential is not None:
raise TypeError("Unsupported credential: {}".format(credential))

config = kwargs.get('_configuration') or create_configuration(**kwargs)
if kwargs.get('_pipeline'):
return config, kwargs['_pipeline']
config.transport = kwargs.get('transport') # type: HttpTransport
if not config.transport:
config.transport = RequestsTransport(config)
policies = [
QueueMessagePolicy(),
config.headers_policy,
config.user_agent_policy,
StorageContentValidation(),
StorageRequestHook(**kwargs),
credential_policy,
ContentDecodePolicy(),
RedirectPolicy(**kwargs),
StorageHosts(hosts=self._hosts, **kwargs),
config.retry_policy,
config.logging_policy,
StorageResponseHook(**kwargs),
]
return config, Pipeline(config.transport, policies=policies)


def format_shared_key_credential(account, credential):
if isinstance(credential, six.string_types):
Expand Down Expand Up @@ -219,7 +249,6 @@ def create_configuration(**kwargs):
config.headers_policy = StorageHeadersPolicy(**kwargs)
config.user_agent_policy = StorageUserAgentPolicy(**kwargs)
config.retry_policy = kwargs.get('retry_policy') or ExponentialRetry(**kwargs)
config.redirect_policy = RedirectPolicy(**kwargs)
config.logging_policy = StorageLoggingPolicy(**kwargs)
config.proxy_policy = ProxyPolicy(**kwargs)

Expand All @@ -244,39 +273,6 @@ def create_configuration(**kwargs):
return config


def create_pipeline(credential, **kwargs):
# type: (Any, **Any) -> Tuple[Configuration, Pipeline]
credential_policy = None
if hasattr(credential, 'get_token'):
credential_policy = BearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE)
elif isinstance(credential, SharedKeyCredentialPolicy):
credential_policy = credential
elif credential is not None:
raise TypeError("Unsupported credential: {}".format(credential))

config = kwargs.get('_configuration') or create_configuration(**kwargs)
if kwargs.get('_pipeline'):
return config, kwargs['_pipeline']
transport = kwargs.get('transport') # type: HttpTransport
if not transport:
transport = RequestsTransport(config)
policies = [
QueueMessagePolicy(),
config.headers_policy,
config.user_agent_policy,
StorageContentValidation(),
StorageRequestHook(**kwargs),
credential_policy,
ContentDecodePolicy(),
config.redirect_policy,
StorageHosts(**kwargs),
config.retry_policy,
config.logging_policy,
StorageResponseHook(**kwargs),
]
return config, Pipeline(transport, policies=policies)


def parse_query(query_str):
sas_values = QueryStringConstants.to_list()
parsed_query = {k: v[0] for k, v in parse_qs(query_str).items()}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from typing import ( # pylint: disable=unused-import
Union, Optional, Any, Iterable, Dict, List, Type, Tuple,
TYPE_CHECKING
)
import logging

from azure.core.pipeline import AsyncPipeline
try:
from azure.core.pipeline.transport import AioHttpTransport as AsyncTransport
except ImportError:
from azure.core.pipeline.transport import AsyncioRequestsTransport as AsyncTransport
from azure.core.pipeline.policies import (
ContentDecodePolicy,
BearerTokenCredentialPolicy,
AsyncRedirectPolicy)

from .constants import STORAGE_OAUTH_SCOPE, DEFAULT_SOCKET_TIMEOUT
from .authentication import SharedKeyCredentialPolicy
from .base_client import create_configuration
from .policies import (
StorageContentValidation,
StorageRequestHook,
StorageHosts,
QueueMessagePolicy)
from .policies_async import AsyncStorageResponseHook


_LOGGER = logging.getLogger(__name__)


class AsyncStorageAccountHostsMixin(object):

def __enter__(self):
raise TypeError("Async client only supports 'async with'.")

def __exit__(self, *args):
pass

async def __aenter__(self):
await self._client.__aenter__()
return self

async def __aexit__(self, *args):
await self._client.__aexit__(*args)

def _create_pipeline(self, credential, **kwargs):
# type: (Any, **Any) -> Tuple[Configuration, Pipeline]
credential_policy = None
if hasattr(credential, 'get_token'):
credential_policy = BearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE)
elif isinstance(credential, SharedKeyCredentialPolicy):
credential_policy = credential
elif credential is not None:
raise TypeError("Unsupported credential: {}".format(credential))

if 'connection_timeout' not in kwargs:
kwargs['connection_timeout'] = DEFAULT_SOCKET_TIMEOUT[0]
config = kwargs.get('_configuration') or create_configuration(**kwargs)
if kwargs.get('_pipeline'):
return config, kwargs['_pipeline']
config.transport = kwargs.get('transport') # type: HttpTransport
if not config.transport:
config.transport = AsyncTransport(config)
policies = [
QueueMessagePolicy(),
config.headers_policy,
config.user_agent_policy,
StorageContentValidation(),
StorageRequestHook(**kwargs),
credential_policy,
ContentDecodePolicy(),
AsyncRedirectPolicy(**kwargs),
StorageHosts(hosts=self._hosts, **kwargs),
config.retry_policy,
config.logging_policy,
AsyncStorageResponseHook(**kwargs),
]
return config, AsyncPipeline(config.transport, policies=policies)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from azure.core.exceptions import HttpResponseError

from .models import ModifiedAccessConditions
from .request_handlers import validate_and_format_range_headers
from .response_handlers import process_storage_error, parse_length_from_content_range
from .encryption import decrypt_blob
Expand Down Expand Up @@ -40,22 +39,25 @@ def process_range_and_offset(start_range, end_range, length, encryption):


def process_content(data, start_offset, end_offset, encryption):
if encryption.get('key') is not None or encryption.get('resolver') is not None:
if data is None:
raise ValueError("Response cannot be None.")
content = b"".join(list(data))
if content and encryption.get('key') is not None or encryption.get('resolver') is not None:
try:
return decrypt_blob(
encryption.get('required'),
encryption.get('key'),
encryption.get('resolver'),
data,
content,
start_offset,
end_offset)
end_offset,
data.response.headers)
except Exception as error:
raise HttpResponseError(
message="Decryption failed.",
response=data.response,
error=error)
else:
return b"".join(list(data))
return content


class _ChunkDownloader(object):
Expand Down Expand Up @@ -209,7 +211,7 @@ def _write_to_stream(self, chunk_data, chunk_start):
self.stream.write(chunk_data)


class StorageStreamDownloader(object):
class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attributes
"""A streaming object to download from Azure Storage.

The stream downloader can iterated, or download to open file or stream
Expand Down Expand Up @@ -291,14 +293,14 @@ def __iter__(self):
# Use the length unless it is over the end of the file
data_end = min(self.file_size, self.length + 1)

downloader = SequentialBlobChunkDownloader(
downloader = SequentialChunkDownloader(
service=self.service,
total_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
current_progress=self.first_get_size,
start_range=self.initial_range[1] + 1, # start where the first download ended
end_range=data_end,
stream=stream,
stream=None,
validate_content=self.validate_content,
encryption_options=self.encryption_options,
use_location=self.location_mode,
Expand Down
Loading