Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
decorators and policy added
  • Loading branch information
SuyogSoti committed Jul 23, 2019
commit 4a63b84947811a685cce071dd5d761cb5ae434b0
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,45 @@
# --------------------------------------------------------------------------
# pylint: disable=no-self-use

from io import (BytesIO, IOBase, SEEK_CUR, SEEK_END, SEEK_SET, UnsupportedOperation)
from io import BytesIO, IOBase, SEEK_CUR, SEEK_END, SEEK_SET, UnsupportedOperation
from threading import Lock

from math import ceil

import six

from .models import ModifiedAccessConditions
from .utils import (
encode_base64,
url_quote,
get_length,
return_response_headers)
from .utils import encode_base64, url_quote, get_length, return_response_headers
from .encryption import _get_blob_encryptor_and_padder
from azure.core.tracing.context import tracing_context
from azure.core.tracing.decorator import distributed_trace


_LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024
_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.'
_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = "{0} should be a seekable file-like/io.IOBase type stream object."


def upload_blob_chunks(blob_service, blob_size, block_size, stream, max_connections, validate_content, # pylint: disable=too-many-locals
access_conditions, uploader_class, append_conditions=None, modified_access_conditions=None,
timeout=None, content_encryption_key=None, initialization_vector=None, **kwargs):
@distributed_trace
def upload_blob_chunks(
blob_service,
blob_size,
block_size,
stream,
max_connections,
validate_content, # pylint: disable=too-many-locals
access_conditions,
uploader_class,
append_conditions=None,
modified_access_conditions=None,
timeout=None,
content_encryption_key=None,
initialization_vector=None,
**kwargs
):

encryptor, padder = _get_blob_encryptor_and_padder(
content_encryption_key,
initialization_vector,
uploader_class is not PageBlobChunkUploader)
content_encryption_key, initialization_vector, uploader_class is not PageBlobChunkUploader
)

uploader = uploader_class(
blob_service,
Expand Down Expand Up @@ -79,7 +90,7 @@ def upload_blob_chunks(blob_service, blob_size, block_size, stream, max_connecti
running_futures.remove(f)

chunk_throttler.acquire()
future = executor.submit(uploader.process_chunk, chunk)
future = executor.submit(tracing_context.with_current_context(uploader.process_chunk), chunk)

# Calls callback upon completion (even if the callback was added after the Future task is done).
future.add_done_callback(lambda x: chunk_throttler.release())
Expand All @@ -96,9 +107,21 @@ def upload_blob_chunks(blob_service, blob_size, block_size, stream, max_connecti
return uploader.response_headers


def upload_blob_substream_blocks(blob_service, blob_size, block_size, stream, max_connections,
validate_content, access_conditions, uploader_class,
append_conditions=None, modified_access_conditions=None, timeout=None, **kwargs):
@distributed_trace
def upload_blob_substream_blocks(
blob_service,
blob_size,
block_size,
stream,
max_connections,
validate_content,
access_conditions,
uploader_class,
append_conditions=None,
modified_access_conditions=None,
timeout=None,
**kwargs
):

uploader = uploader_class(
blob_service,
Expand All @@ -123,18 +146,35 @@ def upload_blob_substream_blocks(blob_service, blob_size, block_size, stream, ma

if max_connections > 1:
import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor(max_connections)
range_ids = list(executor.map(uploader.process_substream_block, uploader.get_substream_blocks()))
range_ids = list(
executor.map(
tracing_context.with_current_context(uploader.process_substream_block), uploader.get_substream_blocks()
)
)
else:
range_ids = [uploader.process_substream_block(result) for result in uploader.get_substream_blocks()]

return range_ids


class _BlobChunkUploader(object): # pylint: disable=too-many-instance-attributes

def __init__(self, blob_service, blob_size, chunk_size, stream, parallel, validate_content,
access_conditions, append_conditions, timeout, encryptor, padder, **kwargs):
def __init__(
self,
blob_service,
blob_size,
chunk_size,
stream,
parallel,
validate_content,
access_conditions,
append_conditions,
timeout,
encryptor,
padder,
**kwargs
):
self.blob_service = blob_service
self.blob_size = blob_size
self.chunk_size = chunk_size
Expand All @@ -159,7 +199,7 @@ def __init__(self, blob_service, blob_size, chunk_size, stream, parallel, valida
def get_chunk_streams(self):
index = 0
while True:
data = b''
data = b""
read_size = self.chunk_size

# Buffer until we either reach the end of the stream or get a whole chunk.
Expand All @@ -168,12 +208,12 @@ def get_chunk_streams(self):
read_size = min(self.chunk_size - len(data), self.blob_size - (index + len(data)))
temp = self.stream.read(read_size)
if not isinstance(temp, six.binary_type):
raise TypeError('Blob data should be of type bytes.')
raise TypeError("Blob data should be of type bytes.")
data += temp or b""

# We have read an empty string and so are at the end
# of the buffer or we have read a full chunk.
if temp == b'' or len(data) == self.chunk_size:
if temp == b"" or len(data) == self.chunk_size:
break

if len(data) == self.chunk_size:
Expand All @@ -192,6 +232,7 @@ def get_chunk_streams(self):
break
index += len(data)

@distributed_trace
def process_chunk(self, chunk_data):
chunk_bytes = chunk_data[1]
chunk_offset = chunk_data[0]
Expand Down Expand Up @@ -226,10 +267,14 @@ def get_substream_blocks(self):
last_block_size = self.chunk_size if blob_length % self.chunk_size == 0 else blob_length % self.chunk_size

for i in range(blocks):
yield ('BlockId{}'.format("%05d" % i),
_SubStream(self.stream, i * self.chunk_size, last_block_size if i == blocks - 1 else self.chunk_size,
lock))
yield (
"BlockId{}".format("%05d" % i),
_SubStream(
self.stream, i * self.chunk_size, last_block_size if i == blocks - 1 else self.chunk_size, lock
),
)

@distributed_trace
def process_substream_block(self, block_data):
return self._upload_substream_block_with_progress(block_data[0], block_data[1])

Expand All @@ -246,10 +291,9 @@ def set_response_properties(self, resp):


class BlockBlobChunkUploader(_BlobChunkUploader):

def _upload_chunk(self, chunk_offset, chunk_data):
# TODO: This is incorrect, but works with recording.
block_id = encode_base64(url_quote(encode_base64('{0:032d}'.format(chunk_offset))))
block_id = encode_base64(url_quote(encode_base64("{0:032d}".format(chunk_offset))))
self.blob_service.stage_block(
block_id,
len(chunk_data),
Expand All @@ -259,7 +303,8 @@ def _upload_chunk(self, chunk_offset, chunk_data):
validate_content=self.validate_content,
data_stream_total=self.blob_size,
upload_stream_current=self.progress_total,
**self.request_options)
**self.request_options
)
return block_id

def _upload_substream_block(self, block_id, block_stream):
Expand All @@ -273,27 +318,27 @@ def _upload_substream_block(self, block_id, block_stream):
timeout=self.timeout,
data_stream_total=self.blob_size,
upload_stream_current=self.progress_total,
**self.request_options)
**self.request_options
)
finally:
block_stream.close()
return block_id


class PageBlobChunkUploader(_BlobChunkUploader): # pylint: disable=abstract-method

def _is_chunk_empty(self, chunk_data):
# read until non-zero byte is encountered
# if reached the end without returning, then chunk_data is all 0's
for each_byte in chunk_data:
if each_byte not in [0, b'\x00']:
if each_byte not in [0, b"\x00"]:
return False
return True

def _upload_chunk(self, chunk_offset, chunk_data):
# avoid uploading the empty pages
if not self._is_chunk_empty(chunk_data):
chunk_end = chunk_offset + len(chunk_data) - 1
content_range = 'bytes={0}-{1}'.format(chunk_offset, chunk_end)
content_range = "bytes={0}-{1}".format(chunk_offset, chunk_end)
computed_md5 = None
self.response_headers = self.blob_service.upload_pages(
chunk_data,
Expand All @@ -307,15 +352,14 @@ def _upload_chunk(self, chunk_offset, chunk_data):
cls=return_response_headers,
data_stream_total=self.blob_size,
upload_stream_current=self.progress_total,
**self.request_options)
**self.request_options
)

if not self.parallel:
self.modified_access_conditions = ModifiedAccessConditions(
if_match=self.response_headers['etag'])
self.modified_access_conditions = ModifiedAccessConditions(if_match=self.response_headers["etag"])


class AppendBlobChunkUploader(_BlobChunkUploader): # pylint: disable=abstract-method

def __init__(self, *args, **kwargs):
super(AppendBlobChunkUploader, self).__init__(*args, **kwargs)
self.current_length = None
Expand All @@ -335,7 +379,7 @@ def _upload_chunk(self, chunk_offset, chunk_data):
upload_stream_current=self.progress_total,
**self.request_options
)
self.current_length = int(self.response_headers['blob_append_offset'])
self.current_length = int(self.response_headers["blob_append_offset"])
else:
self.append_conditions.append_position = self.current_length + chunk_offset
self.response_headers = self.blob_service.append_block(
Expand Down Expand Up @@ -373,8 +417,9 @@ def __init__(self, wrapped_stream, stream_begin_index, length, lockObj):

# we must avoid buffering more than necessary, and also not use up too much memory
# so the max buffer size is capped at 4MB
self._max_buffer_size = length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE \
else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
self._max_buffer_size = (
length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
)
self._current_buffer_start = 0
self._current_buffer_size = 0
super(_SubStream, self).__init__()
Expand Down Expand Up @@ -404,7 +449,7 @@ def read(self, n):

# return fast
if n == 0 or self._buffer.closed:
return b''
return b""

# attempt first read from the read buffer and update position
read_buffer = self._buffer.read(n)
Expand Down Expand Up @@ -460,7 +505,7 @@ def seek(self, offset, whence=0):
start_index = self._position
elif whence is SEEK_END:
start_index = self._length
offset = - offset
offset = -offset
else:
raise ValueError("Invalid argument for the 'whence' parameter.")

Expand Down Expand Up @@ -503,10 +548,11 @@ class IterStreamer(object):
"""
File-like streaming iterator.
"""
def __init__(self, generator, encoding='UTF-8'):

def __init__(self, generator, encoding="UTF-8"):
self.generator = generator
self.iterator = iter(generator)
self.leftover = b''
self.leftover = b""
self.encoding = encoding

def __len__(self):
Expand Down
Loading