Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merge conflict
  • Loading branch information
SuyogSoti committed Jul 24, 2019
commit e4679cb77e90338b2db7052a0d10232417768a4e
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,4 @@ code_reports
sdk/storage/azure-storage-blob/tests/settings_real.py
sdk/storage/azure-storage-queue/tests/settings_real.py
sdk/storage/azure-storage-file/tests/settings_real.py
*.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,10 @@ def deserialize_from_http_generics(cls, response):
# Try to use content-type from headers if available
content_type = None
if response.content_type: # type: ignore
content_type = response.content_type[0].strip().lower() # type: ignore
try:
content_type = response.content_type.strip().lower() # type: ignore
except AttributeError:
content_type = response.content_type[0].strip().lower() # type: ignore

# Ouch, this server did not declare what it sent...
# Let's guess it's JSON...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
from .lease import LeaseClient
from .polling import CopyStatusPoller
from ._shared.policies import ExponentialRetry, LinearRetry, NoRetry
from ._shared.download_chunking import StorageStreamDownloader
from ._shared.models import(
LocationMode,
ResourceTypes,
AccountPermissions,
StorageErrorCode
)
from ._blob_utils import StorageStreamDownloader
from .models import (
BlobType,
BlockState,
Expand Down
309 changes: 8 additions & 301 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_blob_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
# pylint: disable=no-self-use

import sys
from io import BytesIO, SEEK_SET, UnsupportedOperation
from io import SEEK_SET, UnsupportedOperation
from typing import Optional, Union, Any, TypeVar, TYPE_CHECKING # pylint: disable=unused-import

import six
from azure.core.exceptions import ResourceExistsError, ResourceModifiedError

from ._shared.utils import (
from ._shared.request_handlers import validate_and_format_range_headers
from ._shared.response_handlers import (
process_storage_error,
validate_and_format_range_headers,
parse_length_from_content_range,
deserialize_metadata,
return_response_headers)
from ._shared.models import StorageErrorCode, ModifiedAccessConditions
from ._shared.upload_chunking import (
Expand All @@ -24,13 +25,7 @@
BlockBlobChunkUploader,
PageBlobChunkUploader,
AppendBlobChunkUploader)
from ._shared.download_chunking import (
process_content,
process_range_and_offset,
ParallelBlobChunkDownloader,
SequentialBlobChunkDownloader
)
from ._shared.encryption import _generate_blob_encryption_data, _encrypt_blob
from ._shared.encryption import generate_blob_encryption_data, encrypt_blob
from ._generated.models import (
StorageErrorException,
BlockLookupList,
Expand Down Expand Up @@ -99,7 +94,7 @@ def get_modification_conditions(
if_match=if_match,
if_none_match=if_none_match
)
return None
return ModifiedAccessConditions()


def upload_block_blob( # pylint: disable=too-many-locals
Expand Down Expand Up @@ -136,7 +131,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
except AttributeError:
pass
if key_encryption_key:
encryption_data, data = _encrypt_blob(data, key_encryption_key)
encryption_data, data = encrypt_blob(data, key_encryption_key)
headers['x-ms-meta-encryptiondata'] = encryption_data
return client.upload(
data,
Expand All @@ -161,7 +156,7 @@ def upload_block_blob( # pylint: disable=too-many-locals

if use_original_upload_path:
if key_encryption_key:
cek, iv, encryption_data = _generate_blob_encryption_data(key_encryption_key)
cek, iv, encryption_data = generate_blob_encryption_data(key_encryption_key)
headers['x-ms-meta-encryptiondata'] = encryption_data
block_ids = upload_blob_chunks(
blob_service=client,
Expand Down Expand Up @@ -385,11 +380,6 @@ def upload_append_blob(
process_storage_error(error)


def deserialize_metadata(response, _, headers): # pylint: disable=unused-argument
raw_metadata = {k: v for k, v in response.headers.items() if k.startswith("x-ms-meta-")}
return {k[10:]: v for k, v in raw_metadata.items()}


def deserialize_blob_properties(response, obj, headers):
metadata = deserialize_metadata(response, obj, headers)
blob_properties = BlobProperties(
Expand Down Expand Up @@ -417,286 +407,3 @@ def deserialize_container_properties(response, obj, headers):
**headers
)
return container_properties


class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attributes
"""A streaming object to download a blob.

The stream downloader can iterated, or download to open file or stream
over multiple threads.
"""

def __init__(
self, name, container, service, config, offset, length, validate_content,
access_conditions, mod_conditions, timeout,
require_encryption, key_encryption_key, key_resolver_function, **kwargs
):
self.service = service
self.config = config
self.offset = offset
self.length = length
self.timeout = timeout
self.validate_content = validate_content
self.access_conditions = access_conditions
self.mod_conditions = mod_conditions
self.require_encryption = require_encryption
self.key_encryption_key = key_encryption_key
self.key_resolver_function = key_resolver_function
self.request_options = kwargs
self.location_mode = None
self._download_complete = False

# The service only provides transactional MD5s for chunks under 4MB.
# If validate_content is on, get only self.MAX_CHUNK_GET_SIZE for the first
# chunk so a transactional MD5 can be retrieved.
self.first_get_size = self.config.max_single_get_size if not self.validate_content \
else self.config.max_chunk_get_size
initial_request_start = self.offset if self.offset is not None else 0
if self.length is not None and self.length - self.offset < self.first_get_size:
initial_request_end = self.length
else:
initial_request_end = initial_request_start + self.first_get_size - 1

self.initial_range, self.initial_offset = process_range_and_offset(
initial_request_start,
initial_request_end,
self.length,
self.key_encryption_key,
self.key_resolver_function)

self.download_size = None
self.blob_size = None
self.blob = self._initial_request()
self.properties = self.blob.properties
self.properties.name = name
self.properties.container = container
# Set the content length to the download size instead of the size of
# the last range
self.properties.size = self.download_size

# Overwrite the content range to the user requested range
self.properties.content_range = 'bytes {0}-{1}/{2}'.format(self.offset, self.length, self.blob_size)

# Overwrite the content MD5 as it is the MD5 for the last range instead
# of the stored MD5
# TODO: Set to the stored MD5 when the service returns this
self.properties.content_md5 = None

def __len__(self):
return self.download_size

def __iter__(self):
if self.download_size == 0:
content = b""
else:
content = process_content(
self.blob,
self.initial_offset[0],
self.initial_offset[1],
self.require_encryption,
self.key_encryption_key,
self.key_resolver_function)

if content is not None:
yield content
if self._download_complete:
return

end_blob = self.blob_size
if self.length is not None:
# Use the length unless it is over the end of the blob
end_blob = min(self.blob_size, self.length + 1)

downloader = SequentialBlobChunkDownloader(
blob_service=self.service,
download_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
progress=self.first_get_size,
start_range=self.initial_range[1] + 1, # start where the first download ended
end_range=end_blob,
stream=None,
validate_content=self.validate_content,
access_conditions=self.access_conditions,
mod_conditions=self.mod_conditions,
timeout=self.timeout,
require_encryption=self.require_encryption,
key_encryption_key=self.key_encryption_key,
key_resolver_function=self.key_resolver_function,
use_location=self.location_mode,
cls=deserialize_blob_stream,
**self.request_options)

for chunk in downloader.get_chunk_offsets():
yield downloader.yield_chunk(chunk)

def _initial_request(self):
range_header, range_validation = validate_and_format_range_headers(
self.initial_range[0],
self.initial_range[1],
start_range_required=False,
end_range_required=False,
check_content_md5=self.validate_content)

try:
location_mode, blob = self.service.download(
timeout=self.timeout,
range=range_header,
range_get_content_md5=range_validation,
lease_access_conditions=self.access_conditions,
modified_access_conditions=self.mod_conditions,
validate_content=self.validate_content,
cls=deserialize_blob_stream,
data_stream_total=None,
download_stream_current=0,
**self.request_options)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self.location_mode = location_mode

# Parse the total blob size and adjust the download size if ranges
# were specified
self.blob_size = parse_length_from_content_range(blob.properties.content_range)
if self.length is not None:
# Use the length unless it is over the end of the blob
self.download_size = min(self.blob_size, self.length - self.offset + 1)
elif self.offset is not None:
self.download_size = self.blob_size - self.offset
else:
self.download_size = self.blob_size

except StorageErrorException as error:
if self.offset is None and error.response.status_code == 416:
# Get range will fail on an empty blob. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, blob = self.service.download(
timeout=self.timeout,
lease_access_conditions=self.access_conditions,
modified_access_conditions=self.mod_conditions,
validate_content=self.validate_content,
cls=deserialize_blob_stream,
data_stream_total=0,
download_stream_current=0,
**self.request_options)
except StorageErrorException as error:
process_storage_error(error)

# Set the download size to empty
self.download_size = 0
self.blob_size = 0
else:
process_storage_error(error)

# If the blob is small, the download is complete at this point.
# If blob size is large, download the rest of the blob in chunks.
if blob.properties.size != self.download_size:
# Lock on the etag. This can be overriden by the user by specifying '*'
if not self.mod_conditions:
self.mod_conditions = ModifiedAccessConditions()
if not self.mod_conditions.if_match:
self.mod_conditions.if_match = blob.properties.etag
else:
self._download_complete = True

return blob


def content_as_bytes(self, max_connections=1):
"""Download the contents of this blob.

This operation is blocking until all data is downloaded.

:param int max_connections:
The number of parallel connections with which to download.
:rtype: bytes
"""
stream = BytesIO()
self.download_to_stream(stream, max_connections=max_connections)
return stream.getvalue()

def content_as_text(self, max_connections=1, encoding='UTF-8'):
"""Download the contents of this blob, and decode as text.

This operation is blocking until all data is downloaded.

:param int max_connections:
The number of parallel connections with which to download.
:rtype: str
"""
content = self.content_as_bytes(max_connections=max_connections)
return content.decode(encoding)

def download_to_stream(self, stream, max_connections=1):
"""Download the contents of this blob to a stream.

:param stream:
The stream to download to. This can be an open file-handle,
or any writable stream. The stream must be seekable if the download
uses more than one parallel connection.
:returns: The properties of the downloaded blob.
:rtype: ~azure.storage.blob.models.BlobProperties
"""
# the stream must be seekable if parallel download is required
if max_connections > 1:
error_message = "Target stream handle must be seekable."
if sys.version_info >= (3,) and not stream.seekable():
raise ValueError(error_message)

try:
stream.seek(stream.tell())
except (NotImplementedError, AttributeError):
raise ValueError(error_message)

if self.download_size == 0:
content = b""
else:
content = process_content(
self.blob,
self.initial_offset[0],
self.initial_offset[1],
self.require_encryption,
self.key_encryption_key,
self.key_resolver_function)
# Write the content to the user stream
# Clear blob content since output has been written to user stream
if content is not None:
stream.write(content)
if self._download_complete:
return self.properties

end_blob = self.blob_size
if self.length is not None:
# Use the length unless it is over the end of the blob
end_blob = min(self.blob_size, self.length + 1)

downloader_class = ParallelBlobChunkDownloader if max_connections > 1 else SequentialBlobChunkDownloader
downloader = downloader_class(
blob_service=self.service,
download_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
progress=self.first_get_size,
start_range=self.initial_range[1] + 1, # start where the first download ended
end_range=end_blob,
stream=stream,
validate_content=self.validate_content,
access_conditions=self.access_conditions,
mod_conditions=self.mod_conditions,
timeout=self.timeout,
require_encryption=self.require_encryption,
key_encryption_key=self.key_encryption_key,
key_resolver_function=self.key_resolver_function,
use_location=self.location_mode,
cls=deserialize_blob_stream,
**self.request_options)

if max_connections > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets()))
else:
for chunk in downloader.get_chunk_offsets():
downloader.process_chunk(chunk)

return self.properties
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.