Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
[PageBlob][DownloadSparseBlob]Addressing comments
  • Loading branch information
xiafu-msft committed Oct 3, 2019
commit e994161c49fc4cc0b00b80ab26e88e818a596c61
2 changes: 2 additions & 0 deletions sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .container_client import ContainerClient
from .blob_service_client import BlobServiceClient
from .lease import LeaseClient
from .download import StorageStreamDownloader
from ._shared.policies import ExponentialRetry, LinearRetry, NoRetry
from ._shared.models import(
LocationMode,
Expand Down Expand Up @@ -85,6 +86,7 @@
'BlobPermissions',
'ResourceTypes',
'AccountPermissions',
'StorageStreamDownloader',
]


Expand Down
11 changes: 11 additions & 0 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_deserialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,14 @@ def deserialize_container_properties(response, obj, headers):
**headers
)
return container_properties


def get_page_ranges_result(ranges):
# type: (PageList) -> Tuple(List[Dict[str, int]], List[Dict[str, int]])
page_range = [] # type: ignore
clear_range = [] # type: List
if ranges.page_range:
page_range = [{'start': b.start, 'end': b.end} for b in ranges.page_range] # type: ignore
if ranges.clear_range:
clear_range = [{'start': b.start, 'end': b.end} for b in ranges.clear_range]
return page_range, clear_range # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,3 @@ def _str(value):

def _to_utc_datetime(value):
return value.strftime('%Y-%m-%dT%H:%M:%SZ')


def get_empty_chunk(chunk_size):
empty_chunk = b''
for i in range(0, chunk_size): # pylint:disable=unused-variable
empty_chunk += b'\x00'
return empty_chunk
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,3 @@ def parse_to_internal_user_delegation_key(service_user_delegation_key):
internal_user_delegation_key.signed_version = service_user_delegation_key.signed_version
internal_user_delegation_key.value = service_user_delegation_key.value
return internal_user_delegation_key


def get_page_ranges_result(ranges):
# type: (PageList) -> Tuple(List[Dict[str, int]], List[Dict[str, int]])
page_range = [] # type: ignore
clear_range = [] # type: List
if ranges.page_range:
page_range = [{'start': b.start, 'end': b.end} for b in ranges.page_range] # type: ignore
if ranges.clear_range:
clear_range = [{'start': b.start, 'end': b.end} for b in ranges.clear_range]
return page_range, clear_range # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
BlobPropertiesPaged,
BlobPrefix
)
from .download_async import StorageStreamDownloader
from .blob_client_async import BlobClient
from .container_client_async import ContainerClient
from .blob_service_client_async import BlobServiceClient
Expand Down Expand Up @@ -81,4 +82,5 @@
'BlobPermissions',
'ResourceTypes',
'AccountPermissions',
'StorageStreamDownloader',
]
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
)

from azure.core.tracing.decorator_async import distributed_trace_async
from azure.storage.blob._generated.models import CpkInfo

from .._shared.base_client_async import AsyncStorageAccountHostsMixin
from .._shared.policies_async import ExponentialRetry
from .._shared.response_handlers import return_response_headers, process_storage_error, get_page_ranges_result
from .._shared.response_handlers import return_response_headers, process_storage_error
from .._deserialize import get_page_ranges_result
from .._generated.aio import AzureBlobStorage
from .._generated.models import ModifiedAccessConditions, StorageErrorException
from .._generated.models import ModifiedAccessConditions, StorageErrorException, CpkInfo
from .._deserialize import deserialize_blob_properties
from ..blob_client import BlobClient as BlobClientBase
from ._upload_helpers import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@

from azure.core import HttpResponseError
from .._shared.encryption import decrypt_blob
from .._shared.parser import get_empty_chunk
from .._shared.request_handlers import validate_and_format_range_headers
from .._shared.response_handlers import process_storage_error, parse_length_from_content_range, \
get_page_ranges_result
from .._shared.response_handlers import process_storage_error, parse_length_from_content_range
from .._deserialize import get_page_ranges_result
from ..download import process_range_and_offset


Expand Down Expand Up @@ -144,7 +143,7 @@ async def _download_chunk(self, chunk_start, chunk_end):
chunk_start, chunk_end, chunk_end, self.encryption_options)

if self._do_optimize(download_range[0], download_range[1] - 1):
chunk_data = get_empty_chunk(self.chunk_size)
chunk_data = b"\x00" * self.chunk_size
else:
range_header, range_validation = validate_and_format_range_headers(
download_range[0],
Expand Down Expand Up @@ -179,15 +178,14 @@ class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attr
"""

def __init__(
self, client=None,
self,
clients=None,
config=None,
offset=None,
length=None,
validate_content=None,
encryption_options=None,
**kwargs):
self.client = client
self.clients = clients
self.config = config
self.offset = offset
Expand Down Expand Up @@ -243,7 +241,7 @@ async def __anext__(self):
# Use the length unless it is over the end of the file
data_end = min(self.file_size, self.length + 1)
self._iter_downloader = _AsyncChunkDownloader(
client=self.client,
client=self.clients.blob,
non_empty_ranges=self.non_empty_ranges,
total_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
Expand Down Expand Up @@ -300,7 +298,7 @@ async def _initial_request(self):
check_content_md5=self.validate_content)

try:
location_mode, response = await self.client.download(
location_mode, response = await self.clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
Expand Down Expand Up @@ -329,7 +327,7 @@ async def _initial_request(self):
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = await self.client.download(
_, response = await self.clients.blob.download(
validate_content=self.validate_content,
data_stream_total=0,
download_stream_current=0,
Expand Down Expand Up @@ -430,7 +428,7 @@ async def download_to_stream(self, stream, max_concurrency=1):
data_end = min(self.file_size, self.length + 1)

downloader = _AsyncChunkDownloader(
client=self.client,
client=self.clients.blob,
non_empty_ranges=self.non_empty_ranges,
total_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from ._shared.request_handlers import (
add_metadata_headers, get_length, read_length,
validate_and_format_range_headers)
from ._shared.response_handlers import return_response_headers, process_storage_error, get_page_ranges_result
from ._shared.response_handlers import return_response_headers, process_storage_error
from ._deserialize import get_page_ranges_result
from ._generated import AzureBlobStorage
from ._generated.models import ( # pylint: disable=unused-import
DeleteSnapshotsOptionType,
Expand Down Expand Up @@ -579,7 +580,6 @@ def _download_blob_options(self, offset=None, length=None, validate_content=Fals
encryption_algorithm=cpk.algorithm)

options = {
'client': self._client.blob,
'clients': self._client,
'config': self._config,
'offset': offset,
Expand Down
36 changes: 25 additions & 11 deletions sdk/storage/azure-storage-blob/azure/storage/blob/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
from azure.core import HttpResponseError
from azure.core.tracing.common import with_current_context
from ._shared.encryption import decrypt_blob
from ._shared.parser import get_empty_chunk
from ._shared.request_handlers import validate_and_format_range_headers
from ._shared.response_handlers import process_storage_error, parse_length_from_content_range, \
get_page_ranges_result
from ._shared.response_handlers import process_storage_error, parse_length_from_content_range
from ._deserialize import get_page_ranges_result


def process_range_and_offset(start_range, end_range, length, encryption):
Expand Down Expand Up @@ -133,26 +132,39 @@ def _write_to_stream(self, chunk_data, chunk_start):
pass

def _do_optimize(self, given_range_start, given_range_end):
# if we have no page range list stored, then assume there's data everywhere for that page blob
# or it's a block blob or append blob
if self.non_empty_ranges is None:
return False

for source_range in self.non_empty_ranges:
# case 1: As the range list is sorted, if we've reached such a source_range
# we've checked all the appropriate source_range already and haven't found any overlapping.
# so the given range doesn't have any data and download optimization could be applied.
# given range: | |
# source range: | |
if given_range_end < source_range['start']: # pylint:disable=no-else-return
return True
# case 2: the given range comes after source_range, continue checking.
# given range: | |
# source range: | |
elif source_range['end'] < given_range_start:
pass
# case 3: source_range and given range overlap somehow, no need to optimize.
else:
return False

# went through all src_ranges, but nothing overlapped. Optimization will be applied.
return True

def _download_chunk(self, chunk_start, chunk_end):
download_range, offset = process_range_and_offset(
chunk_start, chunk_end, chunk_end, self.encryption_options
)

# No need to download the empty chunk from server if there's no data in the chunk to be downloaded.
# Do optimize and create empty chunk locally if condition is met.
if self._do_optimize(download_range[0], download_range[1] - 1):
chunk_data = get_empty_chunk(self.chunk_size)
chunk_data = b"\x00" * self.chunk_size
else:
range_header, range_validation = validate_and_format_range_headers(
download_range[0], download_range[1] - 1, check_content_md5=self.validate_content
Expand Down Expand Up @@ -244,7 +256,6 @@ class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attr

def __init__(
self,
client=None,
clients=None,
config=None,
offset=None,
Expand All @@ -254,7 +265,6 @@ def __init__(
extra_properties=None,
**kwargs
):
self.client = client
self.clients = clients
self.config = config
self.offset = offset
Expand Down Expand Up @@ -326,7 +336,7 @@ def __iter__(self):
data_end = min(self.file_size, self.length + 1)

downloader = SequentialChunkDownloader(
client=self.client,
client=self.clients.blob,
non_empty_ranges=self.non_empty_ranges,
total_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
Expand All @@ -353,7 +363,7 @@ def _initial_request(self):
)

try:
location_mode, response = self.client.download(
location_mode, response = self.clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
Expand Down Expand Up @@ -383,7 +393,7 @@ def _initial_request(self):
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = self.client.download(
_, response = self.clients.blob.download(
validate_content=self.validate_content,
data_stream_total=0,
download_stream_current=0,
Expand All @@ -403,6 +413,10 @@ def _initial_request(self):
try:
page_ranges = self.clients.page_blob.get_page_ranges()
self.non_empty_ranges = get_page_ranges_result(page_ranges)[0]
# according to the REST API documentation:
# in a highly fragmented page blob with a large number of writes,
# a Get Page Ranges request can fail due to an internal server timeout.
# thus, if the page blob is not sparse, it's ok for it to fail
except HttpResponseError:
pass

Expand Down Expand Up @@ -484,7 +498,7 @@ def download_to_stream(self, stream, max_concurrency=1):

downloader_class = ParallelChunkDownloader if max_concurrency > 1 else SequentialChunkDownloader
downloader = downloader_class(
client=self.client,
client=self.clients.blob,
non_empty_ranges=self.non_empty_ranges,
total_size=self.download_size,
chunk_size=self.config.max_chunk_get_size,
Expand Down