Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
40a999f
Refactored blob shared utils
annatisch Jul 9, 2019
438b101
Refactored file shared utils
annatisch Jul 9, 2019
81fe88e
Refactored queue shared utils
annatisch Jul 9, 2019
7663070
Refactored downloads
annatisch Jul 10, 2019
42a4579
Refactored file downloads
annatisch Jul 10, 2019
0dcd679
Started async downloads
annatisch Jul 11, 2019
5d1ddbd
Async Files API
annatisch Jul 15, 2019
70854c4
Flatten copy polling
annatisch Jul 15, 2019
2337824
Renamed uploads
annatisch Jul 15, 2019
8db1a24
Fixes samples based on vendor feedback (#6357)
iscai-msft Jul 15, 2019
6ee03c9
Upload refactor
annatisch Jul 15, 2019
1dd0602
Release approval docs (#6361)
scbedd Jul 16, 2019
aacf90a
Updated async pipeline
annatisch Jul 16, 2019
26a7f15
Avoid surprising aiohttp with unexpected kwargs (#6355)
chlowell Jul 16, 2019
f69bdbe
Add challenge authentication to azure-keyvault-keys (#6244)
chlowell Jul 17, 2019
be84bd3
Add decorator (#6299)
SuyogSoti Jul 17, 2019
a62e09c
Added async file tests
annatisch Jul 17, 2019
1964544
Consolidate Key Vault shared code (#6384)
chlowell Jul 17, 2019
fb46ff2
Download tests
annatisch Jul 17, 2019
f261414
Service property tests
annatisch Jul 17, 2019
8508816
No recordings
annatisch Jul 17, 2019
e197b59
Add credential wrapping MSAL ConfidentialClientApplication (#6358)
chlowell Jul 17, 2019
47c24b5
Add policy (#6379)
SuyogSoti Jul 18, 2019
337db3c
adding dockerfile (#6393)
Jul 18, 2019
932cf73
Update cheatsheet.md
Jul 18, 2019
62aa8e9
Async share tests
annatisch Jul 18, 2019
04fafe9
Async directory tests
annatisch Jul 18, 2019
91d3678
Fixed some tests
annatisch Jul 18, 2019
b427140
aiohttp socket timeout
annatisch Jul 18, 2019
d02d82b
Merge remote-tracking branch 'upstream/master'
annatisch Jul 18, 2019
a9cc2bc
Patch azure core
annatisch Jul 18, 2019
1ce4e64
CI fixes
annatisch Jul 18, 2019
bee9ee4
Fix async tests for py35
annatisch Jul 18, 2019
dae678a
Python 3.5 support
annatisch Jul 18, 2019
d68d48c
Clean pylint
annatisch Jul 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactored downloads
  • Loading branch information
annatisch committed Jul 10, 2019
commit 76630709ddac1d8ef618ea155f4dd90be1ef7aba
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
from .lease import LeaseClient
from .polling import CopyStatusPoller
from ._shared.policies import ExponentialRetry, LinearRetry, NoRetry
from ._shared.download_chunking import StorageStreamDownloader
from ._shared.models import(
LocationMode,
ResourceTypes,
AccountPermissions,
StorageErrorCode
)
from ._blob_utils import StorageStreamDownloader
from .models import (
BlobType,
BlockState,
Expand Down
299 changes: 5 additions & 294 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_blob_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# pylint: disable=no-self-use

import sys
from io import BytesIO, SEEK_SET, UnsupportedOperation
from io import SEEK_SET, UnsupportedOperation
from typing import Optional, Union, Any, TypeVar, TYPE_CHECKING # pylint: disable=unused-import

import six
Expand All @@ -25,13 +25,7 @@
BlockBlobChunkUploader,
PageBlobChunkUploader,
AppendBlobChunkUploader)
from ._shared.download_chunking import (
process_content,
process_range_and_offset,
ParallelBlobChunkDownloader,
SequentialBlobChunkDownloader
)
from ._shared.encryption import _generate_blob_encryption_data, _encrypt_blob
from ._shared.encryption import generate_blob_encryption_data, encrypt_blob
from ._generated.models import (
StorageErrorException,
BlockLookupList,
Expand Down Expand Up @@ -100,7 +94,7 @@ def get_modification_conditions(
if_match=if_match,
if_none_match=if_none_match
)
return None
return ModifiedAccessConditions()


def upload_block_blob( # pylint: disable=too-many-locals
Expand Down Expand Up @@ -137,7 +131,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
except AttributeError:
pass
if key_encryption_key:
encryption_data, data = _encrypt_blob(data, key_encryption_key)
encryption_data, data = encrypt_blob(data, key_encryption_key)
headers['x-ms-meta-encryptiondata'] = encryption_data
return client.upload(
data,
Expand All @@ -162,7 +156,7 @@ def upload_block_blob( # pylint: disable=too-many-locals

if use_original_upload_path:
if key_encryption_key:
cek, iv, encryption_data = _generate_blob_encryption_data(key_encryption_key)
cek, iv, encryption_data = generate_blob_encryption_data(key_encryption_key)
headers['x-ms-meta-encryptiondata'] = encryption_data
block_ids = upload_blob_chunks(
blob_service=client,
Expand Down Expand Up @@ -413,286 +407,3 @@ def deserialize_container_properties(response, obj, headers):
**headers
)
return container_properties


class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attributes
"""A streaming object to download a blob.

The stream downloader can iterated, or download to open file or stream
over multiple threads.
"""

def __init__(
self, name, container, service, config, offset, length, validate_content,
access_conditions, mod_conditions, timeout,
require_encryption, key_encryption_key, key_resolver_function, **kwargs
):
self.service = service
self.config = config
self.offset = offset
self.length = length
self.timeout = timeout
self.validate_content = validate_content
self.access_conditions = access_conditions
self.mod_conditions = mod_conditions
self.require_encryption = require_encryption
self.key_encryption_key = key_encryption_key
self.key_resolver_function = key_resolver_function
self.request_options = kwargs
self.location_mode = None
self._download_complete = False

# The service only provides transactional MD5s for chunks under 4MB.
# If validate_content is on, get only self.MAX_CHUNK_GET_SIZE for the first
# chunk so a transactional MD5 can be retrieved.
self.first_get_size = self.config.max_single_get_size if not self.validate_content \
else self.config.max_chunk_get_size
initial_request_start = self.offset if self.offset is not None else 0
if self.length is not None and self.length - self.offset < self.first_get_size:
initial_request_end = self.length
else:
initial_request_end = initial_request_start + self.first_get_size - 1

self.initial_range, self.initial_offset = process_range_and_offset(
initial_request_start,
initial_request_end,
self.length,
self.key_encryption_key,
self.key_resolver_function)

self.download_size = None
self.blob_size = None
self.blob = self._initial_request()
self.properties = self.blob.properties
self.properties.name = name
self.properties.container = container
# Set the content length to the download size instead of the size of
# the last range
self.properties.size = self.download_size

# Overwrite the content range to the user requested range
self.properties.content_range = 'bytes {0}-{1}/{2}'.format(self.offset, self.length, self.blob_size)

# Overwrite the content MD5 as it is the MD5 for the last range instead
# of the stored MD5
# TODO: Set to the stored MD5 when the service returns this
self.properties.content_md5 = None

def __len__(self):
return self.download_size

def __iter__(self):
if self.download_size == 0:
content = b""
else:
content = process_content(
self.blob,
self.initial_offset[0],
self.initial_offset[1],
self.require_encryption,
self.key_encryption_key,
self.key_resolver_function)

if content is not None:
yield content
if self._download_complete:
return

end_blob = self.blob_size
if self.length is not None:
# Use the length unless it is over the end of the blob
end_blob = min(self.blob_size, self.length + 1)

downloader = SequentialBlobChunkDownloader(
blob_service=self.service,
download_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
progress=self.first_get_size,
start_range=self.initial_range[1] + 1, # start where the first download ended
end_range=end_blob,
stream=None,
validate_content=self.validate_content,
access_conditions=self.access_conditions,
mod_conditions=self.mod_conditions,
timeout=self.timeout,
require_encryption=self.require_encryption,
key_encryption_key=self.key_encryption_key,
key_resolver_function=self.key_resolver_function,
use_location=self.location_mode,
cls=deserialize_blob_stream,
**self.request_options)

for chunk in downloader.get_chunk_offsets():
yield downloader.yield_chunk(chunk)

def _initial_request(self):
range_header, range_validation = validate_and_format_range_headers(
self.initial_range[0],
self.initial_range[1],
start_range_required=False,
end_range_required=False,
check_content_md5=self.validate_content)

try:
location_mode, blob = self.service.download(
timeout=self.timeout,
range=range_header,
range_get_content_md5=range_validation,
lease_access_conditions=self.access_conditions,
modified_access_conditions=self.mod_conditions,
validate_content=self.validate_content,
cls=deserialize_blob_stream,
data_stream_total=None,
download_stream_current=0,
**self.request_options)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self.location_mode = location_mode

# Parse the total blob size and adjust the download size if ranges
# were specified
self.blob_size = parse_length_from_content_range(blob.properties.content_range)
if self.length is not None:
# Use the length unless it is over the end of the blob
self.download_size = min(self.blob_size, self.length - self.offset + 1)
elif self.offset is not None:
self.download_size = self.blob_size - self.offset
else:
self.download_size = self.blob_size

except StorageErrorException as error:
if self.offset is None and error.response.status_code == 416:
# Get range will fail on an empty blob. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, blob = self.service.download(
timeout=self.timeout,
lease_access_conditions=self.access_conditions,
modified_access_conditions=self.mod_conditions,
validate_content=self.validate_content,
cls=deserialize_blob_stream,
data_stream_total=0,
download_stream_current=0,
**self.request_options)
except StorageErrorException as error:
process_storage_error(error)

# Set the download size to empty
self.download_size = 0
self.blob_size = 0
else:
process_storage_error(error)

# If the blob is small, the download is complete at this point.
# If blob size is large, download the rest of the blob in chunks.
if blob.properties.size != self.download_size:
# Lock on the etag. This can be overriden by the user by specifying '*'
if not self.mod_conditions:
self.mod_conditions = ModifiedAccessConditions()
if not self.mod_conditions.if_match:
self.mod_conditions.if_match = blob.properties.etag
else:
self._download_complete = True

return blob


def content_as_bytes(self, max_connections=1):
"""Download the contents of this blob.

This operation is blocking until all data is downloaded.

:param int max_connections:
The number of parallel connections with which to download.
:rtype: bytes
"""
stream = BytesIO()
self.download_to_stream(stream, max_connections=max_connections)
return stream.getvalue()

def content_as_text(self, max_connections=1, encoding='UTF-8'):
"""Download the contents of this blob, and decode as text.

This operation is blocking until all data is downloaded.

:param int max_connections:
The number of parallel connections with which to download.
:rtype: str
"""
content = self.content_as_bytes(max_connections=max_connections)
return content.decode(encoding)

def download_to_stream(self, stream, max_connections=1):
"""Download the contents of this blob to a stream.

:param stream:
The stream to download to. This can be an open file-handle,
or any writable stream. The stream must be seekable if the download
uses more than one parallel connection.
:returns: The properties of the downloaded blob.
:rtype: ~azure.storage.blob.models.BlobProperties
"""
# the stream must be seekable if parallel download is required
if max_connections > 1:
error_message = "Target stream handle must be seekable."
if sys.version_info >= (3,) and not stream.seekable():
raise ValueError(error_message)

try:
stream.seek(stream.tell())
except (NotImplementedError, AttributeError):
raise ValueError(error_message)

if self.download_size == 0:
content = b""
else:
content = process_content(
self.blob,
self.initial_offset[0],
self.initial_offset[1],
self.require_encryption,
self.key_encryption_key,
self.key_resolver_function)
# Write the content to the user stream
# Clear blob content since output has been written to user stream
if content is not None:
stream.write(content)
if self._download_complete:
return self.properties

end_blob = self.blob_size
if self.length is not None:
# Use the length unless it is over the end of the blob
end_blob = min(self.blob_size, self.length + 1)

downloader_class = ParallelBlobChunkDownloader if max_connections > 1 else SequentialBlobChunkDownloader
downloader = downloader_class(
blob_service=self.service,
download_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
progress=self.first_get_size,
start_range=self.initial_range[1] + 1, # start where the first download ended
end_range=end_blob,
stream=stream,
validate_content=self.validate_content,
access_conditions=self.access_conditions,
mod_conditions=self.mod_conditions,
timeout=self.timeout,
require_encryption=self.require_encryption,
key_encryption_key=self.key_encryption_key,
key_resolver_function=self.key_resolver_function,
use_location=self.location_mode,
cls=deserialize_blob_stream,
**self.request_options)

if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
else:
for chunk in downloader.get_chunk_offsets():
downloader.process_chunk(chunk)

return self.properties
Loading