diff --git a/sdk/storage/azure-storage-file/azure/storage/file/_shared/uploads.py b/sdk/storage/azure-storage-file/azure/storage/file/_shared/uploads.py index 2b269fb1d0ba..4f61476cde21 100644 --- a/sdk/storage/azure-storage-file/azure/storage/file/_shared/uploads.py +++ b/sdk/storage/azure-storage-file/azure/storage/file/_shared/uploads.py @@ -20,7 +20,7 @@ _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024 -_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = '{0} should be a seekable file-like/io.IOBase type stream object.' +_ERROR_VALUE_SHOULD_BE_SEEKABLE_STREAM = "{0} should be a seekable file-like/io.IOBase type stream object." def _parallel_uploads(executor, uploader, pending, running): @@ -150,7 +150,7 @@ def __init__(self, service, total_size, chunk_size, stream, parallel, encryptor= def get_chunk_streams(self): index = 0 while True: - data = b'' + data = b"" read_size = self.chunk_size # Buffer until we either reach the end of the stream or get a whole chunk. @@ -159,12 +159,12 @@ def get_chunk_streams(self): read_size = min(self.chunk_size - len(data), self.total_size - (index + len(data))) temp = self.stream.read(read_size) if not isinstance(temp, six.binary_type): - raise TypeError('Blob data should be of type bytes.') + raise TypeError("Blob data should be of type bytes.") data += temp or b"" # We have read an empty string and so are at the end # of the buffer or we have read a full chunk. - if temp == b'' or len(data) == self.chunk_size: + if temp == b"" or len(data) == self.chunk_size: break if len(data) == self.chunk_size: @@ -247,7 +247,8 @@ def _upload_chunk(self, chunk_offset, chunk_data): chunk_data, data_stream_total=self.total_size, upload_stream_current=self.progress_total, - **self.request_options) + **self.request_options + ) return block_id def _upload_substream_block(self, block_id, block_stream): @@ -258,7 +259,8 @@ def _upload_substream_block(self, block_id, block_stream): block_stream, data_stream_total=self.total_size, upload_stream_current=self.progress_total, - **self.request_options) + **self.request_options + ) finally: block_stream.close() return block_id @@ -270,7 +272,7 @@ def _is_chunk_empty(self, chunk_data): # read until non-zero byte is encountered # if reached the end without returning, then chunk_data is all 0's for each_byte in chunk_data: - if each_byte not in [0, b'\x00']: + if each_byte not in [0, b"\x00"]: return False return True @@ -278,7 +280,7 @@ def _upload_chunk(self, chunk_offset, chunk_data): # avoid uploading the empty pages if not self._is_chunk_empty(chunk_data): chunk_end = chunk_offset + len(chunk_data) - 1 - content_range = 'bytes={0}-{1}'.format(chunk_offset, chunk_end) + content_range = "bytes={0}-{1}".format(chunk_offset, chunk_end) computed_md5 = None self.response_headers = self.service.upload_pages( chunk_data, @@ -288,7 +290,8 @@ def _upload_chunk(self, chunk_offset, chunk_data): cls=return_response_headers, data_stream_total=self.total_size, upload_stream_current=self.progress_total, - **self.request_options) + **self.request_options + ) if not self.parallel and self.request_options.get('modified_access_conditions'): self.request_options['modified_access_conditions'].if_match = self.response_headers['etag'] @@ -310,7 +313,7 @@ def _upload_chunk(self, chunk_offset, chunk_data): upload_stream_current=self.progress_total, **self.request_options ) - self.current_length = int(self.response_headers['blob_append_offset']) + self.current_length = int(self.response_headers["blob_append_offset"]) else: self.request_options['append_position_access_conditions'].append_position = \ self.current_length + chunk_offset @@ -360,8 +363,9 @@ def __init__(self, wrapped_stream, stream_begin_index, length, lockObj): # we must avoid buffering more than necessary, and also not use up too much memory # so the max buffer size is capped at 4MB - self._max_buffer_size = length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE \ - else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE + self._max_buffer_size = ( + length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE + ) self._current_buffer_start = 0 self._current_buffer_size = 0 super(SubStream, self).__init__() @@ -391,7 +395,7 @@ def read(self, n): # return fast if n == 0 or self._buffer.closed: - return b'' + return b"" # attempt first read from the read buffer and update position read_buffer = self._buffer.read(n) @@ -447,7 +451,7 @@ def seek(self, offset, whence=0): start_index = self._position elif whence is SEEK_END: start_index = self._length - offset = - offset + offset = -offset else: raise ValueError("Invalid argument for the 'whence' parameter.") @@ -490,10 +494,11 @@ class IterStreamer(object): """ File-like streaming iterator. """ - def __init__(self, generator, encoding='UTF-8'): + + def __init__(self, generator, encoding="UTF-8"): self.generator = generator self.iterator = iter(generator) - self.leftover = b'' + self.leftover = b"" self.encoding = encoding def __len__(self): diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client.py b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client.py index 166d7e81a2ae..104d323d93df 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client.py @@ -20,6 +20,7 @@ from azure.core import Configuration from azure.core.pipeline import Pipeline from azure.core.pipeline.transport import RequestsTransport +from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy from azure.core.pipeline.policies import ( RedirectPolicy, ContentDecodePolicy, @@ -175,6 +176,7 @@ def _create_pipeline(self, credential, **kwargs): config.retry_policy, config.logging_policy, StorageResponseHook(**kwargs), + DistributedTracingPolicy(), ] return config, Pipeline(config.transport, policies=policies) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py index 567303a12521..90f22450e6c3 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py @@ -15,6 +15,7 @@ from azure.core.pipeline.transport import AioHttpTransport as AsyncTransport except ImportError: from azure.core.pipeline.transport import AsyncioRequestsTransport as AsyncTransport +from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy from azure.core.pipeline.policies import ( ContentDecodePolicy, BearerTokenCredentialPolicy, @@ -81,5 +82,6 @@ def _create_pipeline(self, credential, **kwargs): config.retry_policy, config.logging_policy, AsyncStorageResponseHook(**kwargs), + DistributedTracingPolicy(), ] return config, AsyncPipeline(config.transport, policies=policies) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/downloads.py b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/downloads.py index 1d46ffc95293..a35f45e43fc9 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/downloads.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/downloads.py @@ -9,6 +9,7 @@ from io import BytesIO from azure.core.exceptions import HttpResponseError +from azure.core.tracing.context import tracing_context from .request_handlers import validate_and_format_range_headers from .response_handlers import process_storage_error, parse_length_from_content_range @@ -17,7 +18,7 @@ def process_range_and_offset(start_range, end_range, length, encryption): start_offset, end_offset = 0, 0 - if encryption.get('key') is not None or encryption.get('resolver') is not None: + if encryption.get("key") is not None or encryption.get("resolver") is not None: if start_range is not None: # Align the start of the range along a 16 byte block start_offset = start_range % 16 @@ -42,37 +43,36 @@ def process_content(data, start_offset, end_offset, encryption): if data is None: raise ValueError("Response cannot be None.") content = b"".join(list(data)) - if content and encryption.get('key') is not None or encryption.get('resolver') is not None: + if content and encryption.get("key") is not None or encryption.get("resolver") is not None: try: return decrypt_blob( - encryption.get('required'), - encryption.get('key'), - encryption.get('resolver'), + encryption.get("required"), + encryption.get("key"), + encryption.get("resolver"), content, start_offset, end_offset, - data.response.headers) + data.response.headers, + ) except Exception as error: - raise HttpResponseError( - message="Decryption failed.", - response=data.response, - error=error) + raise HttpResponseError(message="Decryption failed.", response=data.response, error=error) return content class _ChunkDownloader(object): - def __init__( - self, service=None, - total_size=None, - chunk_size=None, - current_progress=None, - start_range=None, - end_range=None, - stream=None, - validate_content=None, - encryption_options=None, - **kwargs): + self, + service=None, + total_size=None, + chunk_size=None, + current_progress=None, + start_range=None, + end_range=None, + stream=None, + validate_content=None, + encryption_options=None, + **kwargs + ): self.service = service @@ -129,12 +129,10 @@ def _write_to_stream(self, chunk_data, chunk_start): pass def _download_chunk(self, chunk_start, chunk_end): - download_range, offset = process_range_and_offset( - chunk_start, chunk_end, chunk_end, self.encryption_options) + download_range, offset = process_range_and_offset(chunk_start, chunk_end, chunk_end, self.encryption_options) range_header, range_validation = validate_and_format_range_headers( - download_range[0], - download_range[1] - 1, - check_content_md5=self.validate_content) + download_range[0], download_range[1] - 1, check_content_md5=self.validate_content + ) try: _, response = self.service.download( @@ -143,7 +141,8 @@ def _download_chunk(self, chunk_start, chunk_end): validate_content=self.validate_content, data_stream_total=self.total_size, download_stream_current=self.progress_total, - **self.request_options) + **self.request_options + ) except HttpResponseError as error: process_storage_error(error) @@ -151,25 +150,26 @@ def _download_chunk(self, chunk_start, chunk_end): # This makes sure that if_match is set so that we can validate # that subsequent downloads are to an unmodified blob - if self.request_options.get('modified_access_conditions'): - self.request_options['modified_access_conditions'].if_match = response.properties.etag + if self.request_options.get("modified_access_conditions"): + self.request_options["modified_access_conditions"].if_match = response.properties.etag return chunk_data class ParallelChunkDownloader(_ChunkDownloader): - def __init__( - self, service=None, - total_size=None, - chunk_size=None, - current_progress=None, - start_range=None, - end_range=None, - stream=None, - validate_content=None, - encryption_options=None, - **kwargs): + self, + service=None, + total_size=None, + chunk_size=None, + current_progress=None, + start_range=None, + end_range=None, + stream=None, + validate_content=None, + encryption_options=None, + **kwargs + ): super(ParallelChunkDownloader, self).__init__( service=service, total_size=total_size, @@ -180,7 +180,8 @@ def __init__( stream=stream, validate_content=validate_content, encryption_options=encryption_options, - **kwargs) + **kwargs + ) # for a parallel download, the stream is always seekable, so we note down the current position # in order to seek to the right place when out-of-order chunks come in @@ -202,7 +203,6 @@ def _write_to_stream(self, chunk_data, chunk_start): class SequentialChunkDownloader(_ChunkDownloader): - def _update_progress(self, length): self.progress_total += length @@ -219,14 +219,16 @@ class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attr """ def __init__( - self, service=None, - config=None, - offset=None, - length=None, - validate_content=None, - encryption_options=None, - extra_properties=None, - **kwargs): + self, + service=None, + config=None, + offset=None, + length=None, + validate_content=None, + encryption_options=None, + extra_properties=None, + **kwargs + ): self.service = service self.config = config self.offset = offset @@ -240,8 +242,9 @@ def __init__( # The service only provides transactional MD5s for chunks under 4MB. # If validate_content is on, get only self.MAX_CHUNK_GET_SIZE for the first # chunk so a transactional MD5 can be retrieved. - self.first_get_size = self.config.max_single_get_size if not self.validate_content \ - else self.config.max_chunk_get_size + self.first_get_size = ( + self.config.max_single_get_size if not self.validate_content else self.config.max_chunk_get_size + ) initial_request_start = self.offset if self.offset is not None else 0 if self.length is not None and self.length - self.offset < self.first_get_size: initial_request_end = self.length @@ -249,7 +252,8 @@ def __init__( initial_request_end = initial_request_start + self.first_get_size - 1 self.initial_range, self.initial_offset = process_range_and_offset( - initial_request_start, initial_request_end, self.length, self.encryption_options) + initial_request_start, initial_request_end, self.length, self.encryption_options + ) self.download_size = None self.file_size = None @@ -261,7 +265,7 @@ def __init__( self.properties.size = self.download_size # Overwrite the content range to the user requested range - self.properties.content_range = 'bytes {0}-{1}/{2}'.format(self.offset, self.length, self.file_size) + self.properties.content_range = "bytes {0}-{1}/{2}".format(self.offset, self.length, self.file_size) # Set additional properties according to download type if extra_properties: @@ -281,7 +285,8 @@ def __iter__(self): content = b"" else: content = process_content( - self.response, self.initial_offset[0], self.initial_offset[1], self.encryption_options) + self.response, self.initial_offset[0], self.initial_offset[1], self.encryption_options + ) if content is not None: yield content @@ -304,7 +309,8 @@ def __iter__(self): validate_content=self.validate_content, encryption_options=self.encryption_options, use_location=self.location_mode, - **self.request_options) + **self.request_options + ) for chunk in downloader.get_chunk_offsets(): yield downloader.yield_chunk(chunk) @@ -315,7 +321,8 @@ def _initial_request(self): self.initial_range[1], start_range_required=False, end_range_required=False, - check_content_md5=self.validate_content) + check_content_md5=self.validate_content, + ) try: location_mode, response = self.service.download( @@ -324,7 +331,8 @@ def _initial_request(self): validate_content=self.validate_content, data_stream_total=None, download_stream_current=0, - **self.request_options) + **self.request_options + ) # Check the location we read from to ensure we use the same one # for subsequent requests. @@ -351,7 +359,8 @@ def _initial_request(self): validate_content=self.validate_content, data_stream_total=0, download_stream_current=0, - **self.request_options) + **self.request_options + ) except HttpResponseError as error: process_storage_error(error) @@ -365,15 +374,14 @@ def _initial_request(self): # If file size is large, download the rest of the file in chunks. if response.properties.size != self.download_size: # Lock on the etag. This can be overriden by the user by specifying '*' - if self.request_options.get('modified_access_conditions'): - if not self.request_options['modified_access_conditions'].if_match: - self.request_options['modified_access_conditions'].if_match = response.properties.etag + if self.request_options.get("modified_access_conditions"): + if not self.request_options["modified_access_conditions"].if_match: + self.request_options["modified_access_conditions"].if_match = response.properties.etag else: self._download_complete = True return response - def content_as_bytes(self, max_connections=1): """Download the contents of this file. @@ -387,7 +395,7 @@ def content_as_bytes(self, max_connections=1): self.download_to_stream(stream, max_connections=max_connections) return stream.getvalue() - def content_as_text(self, max_connections=1, encoding='UTF-8'): + def content_as_text(self, max_connections=1, encoding="UTF-8"): """Download the contents of this file, and decode as text. This operation is blocking until all data is downloaded. @@ -424,7 +432,8 @@ def download_to_stream(self, stream, max_connections=1): content = b"" else: content = process_content( - self.response, self.initial_offset[0], self.initial_offset[1], self.encryption_options) + self.response, self.initial_offset[0], self.initial_offset[1], self.encryption_options + ) # Write the content to the user stream if content is not None: @@ -449,12 +458,14 @@ def download_to_stream(self, stream, max_connections=1): validate_content=self.validate_content, encryption_options=self.encryption_options, use_location=self.location_mode, - **self.request_options) + **self.request_options + ) if max_connections > 1: import concurrent.futures + executor = concurrent.futures.ThreadPoolExecutor(max_connections) - list(executor.map(downloader.process_chunk, downloader.get_chunk_offsets())) + list(executor.map(tracing_context(downloader.process_chunk), downloader.get_chunk_offsets())) else: for chunk in downloader.get_chunk_offsets(): downloader.process_chunk(chunk) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py index 2b269fb1d0ba..a8739136fdee 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/uploads.py @@ -12,6 +12,8 @@ from math import ceil import six +from azure.core.tracing.context import tracing_context + from . import encode_base64, url_quote from .request_handlers import get_length @@ -34,7 +36,7 @@ def _parallel_uploads(executor, uploader, pending, running): except StopIteration: break else: - running.add(executor.submit(uploader.process_chunk, next_chunk)) + running.add(executor.submit(tracing_context.with_current_context(uploader.process_chunk), next_chunk)) # Wait for the remaining uploads to finish done, _running = futures.wait(running) @@ -79,7 +81,7 @@ def upload_data_chunks( executor = futures.ThreadPoolExecutor(max_connections) upload_tasks = uploader.get_chunk_streams() running_futures = [ - executor.submit(uploader.process_chunk, u) + executor.submit(tracing_context.with_current_context(uploader.process_chunk), u) for u in islice(upload_tasks, 0, max_connections) ] range_ids = _parallel_uploads(executor, uploader, upload_tasks, running_futures) @@ -115,7 +117,7 @@ def upload_substream_blocks( executor = futures.ThreadPoolExecutor(max_connections) upload_tasks = uploader.get_substream_blocks() running_futures = [ - executor.submit(uploader.process_substream_block, u) + executor.submit(tracing_context.with_current_context(uploader.process_substream_block), u) for u in islice(upload_tasks, 0, max_connections) ] return _parallel_uploads(executor, uploader, upload_tasks, running_futures) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/queue_client_async.py index d1757d12bb1f..ea5f3b4ee031 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/queue_client_async.py @@ -6,23 +6,35 @@ import functools from typing import ( # pylint: disable=unused-import - Union, Optional, Any, IO, Iterable, AnyStr, Dict, List, Tuple, - TYPE_CHECKING) + Union, + Optional, + Any, + IO, + Iterable, + AnyStr, + Dict, + List, + Tuple, + TYPE_CHECKING, +) + try: - from urllib.parse import urlparse, quote, unquote # pylint: disable=unused-import + from urllib.parse import urlparse, quote, unquote # pylint: disable=unused-import except ImportError: - from urlparse import urlparse # type: ignore - from urllib2 import quote, unquote # type: ignore + from urlparse import urlparse # type: ignore + from urllib2 import quote, unquote # type: ignore + +from azure.core.tracing.decorator import distributed_trace +from azure.core.tracing.decorator_async import distributed_trace_async from azure.storage.queue._shared.base_client_async import AsyncStorageAccountHostsMixin from azure.storage.queue._shared.request_handlers import add_metadata_headers, serialize_iso from azure.storage.queue._shared.response_handlers import ( return_response_headers, process_storage_error, - return_headers_and_deserialized) -from azure.storage.queue._deserialize import ( - deserialize_queue_properties, - deserialize_queue_creation) + return_headers_and_deserialized, +) +from azure.storage.queue._deserialize import deserialize_queue_properties, deserialize_queue_creation from azure.storage.queue._generated.aio import AzureQueueStorage from azure.storage.queue._generated.models import StorageErrorException, SignedIdentifier from azure.storage.queue._generated.models import QueueMessage as GenQueueMessage @@ -78,25 +90,23 @@ class QueueClient(AsyncStorageAccountHostsMixin, QueueClientBase): :dedent: 12 :caption: Create the queue client with url and credential. """ + def __init__( - self, queue_url, # type: str - queue=None, # type: Optional[Union[QueueProperties, str]] - credential=None, # type: Optional[Any] - loop=None, # type: Any - **kwargs # type: Any - ): + self, + queue_url, # type: str + queue=None, # type: Optional[Union[QueueProperties, str]] + credential=None, # type: Optional[Any] + loop=None, # type: Any + **kwargs # type: Any + ): # type: (...) -> None - kwargs['retry_policy'] = kwargs.get('retry_policy') or ExponentialRetry(**kwargs) - super(QueueClient, self).__init__( - queue_url, - queue=queue, - credential=credential, - loop=loop, - **kwargs) + kwargs["retry_policy"] = kwargs.get("retry_policy") or ExponentialRetry(**kwargs) + super(QueueClient, self).__init__(queue_url, queue=queue, credential=credential, loop=loop, **kwargs) self._client = AzureQueueStorage(self.url, pipeline=self._pipeline, loop=loop) # type: ignore self._loop = loop - async def create_queue(self, metadata=None, timeout=None, **kwargs): # type: ignore + @distributed_trace_async + async def create_queue(self, metadata=None, timeout=None, **kwargs): # type: ignore # type: (Optional[Dict[str, Any]], Optional[int], Optional[Any]) -> None """Creates a new queue in the storage account. @@ -122,19 +132,17 @@ async def create_queue(self, metadata=None, timeout=None, **kwargs): # type: ign :dedent: 8 :caption: Create a queue. """ - headers = kwargs.pop('headers', {}) - headers.update(add_metadata_headers(metadata)) # type: ignore + headers = kwargs.pop("headers", {}) + headers.update(add_metadata_headers(metadata)) # type: ignore try: - return await self._client.queue.create( # type: ignore - metadata=metadata, - timeout=timeout, - headers=headers, - cls=deserialize_queue_creation, - **kwargs) + return await self._client.queue.create( # type: ignore + metadata=metadata, timeout=timeout, headers=headers, cls=deserialize_queue_creation, **kwargs + ) except StorageErrorException as error: process_storage_error(error) - async def delete_queue(self, timeout=None, **kwargs): # type: ignore + @distributed_trace_async + async def delete_queue(self, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[Any]) -> None """Deletes the specified queue and any messages it contains. @@ -163,7 +171,8 @@ async def delete_queue(self, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) - async def get_queue_properties(self, timeout=None, **kwargs): # type: ignore + @distributed_trace_async + async def get_queue_properties(self, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[Any]) -> QueueProperties """Returns all user-defined metadata for the specified queue. @@ -184,15 +193,15 @@ async def get_queue_properties(self, timeout=None, **kwargs): # type: ignore """ try: response = await self._client.queue.get_properties( - timeout=timeout, - cls=deserialize_queue_properties, - **kwargs) + timeout=timeout, cls=deserialize_queue_properties, **kwargs + ) except StorageErrorException as error: process_storage_error(error) response.name = self.queue_name - return response # type: ignore + return response # type: ignore - async def set_queue_metadata(self, metadata=None, timeout=None, **kwargs): # type: ignore + @distributed_trace_async + async def set_queue_metadata(self, metadata=None, timeout=None, **kwargs): # type: ignore # type: (Optional[Dict[str, Any]], Optional[int], Optional[Any]) -> None """Sets user-defined metadata on the specified queue. @@ -213,18 +222,17 @@ async def set_queue_metadata(self, metadata=None, timeout=None, **kwargs): # typ :dedent: 12 :caption: Set metadata on the queue. """ - headers = kwargs.pop('headers', {}) - headers.update(add_metadata_headers(metadata)) # type: ignore + headers = kwargs.pop("headers", {}) + headers.update(add_metadata_headers(metadata)) # type: ignore try: - return (await self._client.queue.set_metadata( # type: ignore - timeout=timeout, - headers=headers, - cls=return_response_headers, - **kwargs)) + return await self._client.queue.set_metadata( # type: ignore + timeout=timeout, headers=headers, cls=return_response_headers, **kwargs + ) except StorageErrorException as error: process_storage_error(error) - async def get_queue_access_policy(self, timeout=None, **kwargs): # type: ignore + @distributed_trace_async + async def get_queue_access_policy(self, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[Any]) -> Dict[str, Any] """Returns details about any stored access policies specified on the queue that may be used with Shared Access Signatures. @@ -236,14 +244,14 @@ async def get_queue_access_policy(self, timeout=None, **kwargs): # type: ignore """ try: _, identifiers = await self._client.queue.get_access_policy( - timeout=timeout, - cls=return_headers_and_deserialized, - **kwargs) + timeout=timeout, cls=return_headers_and_deserialized, **kwargs + ) except StorageErrorException as error: process_storage_error(error) return {s.id: s.access_policy or AccessPolicy() for s in identifiers} - async def set_queue_access_policy(self, signed_identifiers=None, timeout=None, **kwargs): # type: ignore + @distributed_trace_async + async def set_queue_access_policy(self, signed_identifiers=None, timeout=None, **kwargs): # type: ignore # type: (Optional[Dict[str, Optional[AccessPolicy]]], Optional[int], Optional[Any]) -> None """Sets stored access policies for the queue that may be used with Shared Access Signatures. @@ -278,30 +286,30 @@ async def set_queue_access_policy(self, signed_identifiers=None, timeout=None, * if signed_identifiers: if len(signed_identifiers) > 15: raise ValueError( - 'Too many access policies provided. The server does not support setting ' - 'more than 15 access policies on a single resource.') + "Too many access policies provided. The server does not support setting " + "more than 15 access policies on a single resource." + ) identifiers = [] for key, value in signed_identifiers.items(): if value: value.start = serialize_iso(value.start) value.expiry = serialize_iso(value.expiry) identifiers.append(SignedIdentifier(id=key, access_policy=value)) - signed_identifiers = identifiers # type: ignore + signed_identifiers = identifiers # type: ignore try: - await self._client.queue.set_access_policy( - queue_acl=signed_identifiers or None, - timeout=timeout, - **kwargs) + await self._client.queue.set_access_policy(queue_acl=signed_identifiers or None, timeout=timeout, **kwargs) except StorageErrorException as error: process_storage_error(error) - async def enqueue_message( # type: ignore - self, content, # type: Any - visibility_timeout=None, # type: Optional[int] - time_to_live=None, # type: Optional[int] - timeout=None, # type: Optional[int] - **kwargs # type: Optional[Any] - ): + @distributed_trace_async + async def enqueue_message( # type: ignore + self, + content, # type: Any + visibility_timeout=None, # type: Optional[int] + time_to_live=None, # type: Optional[int] + timeout=None, # type: Optional[int] + **kwargs # type: Optional[Any] + ): # type: (...) -> QueueMessage """Adds a new message to the back of the message queue. @@ -348,9 +356,8 @@ async def enqueue_message( # type: ignore :caption: Enqueue messages. """ self._config.message_encode_policy.configure( - self.require_encryption, - self.key_encryption_key, - self.key_resolver_function) + self.require_encryption, self.key_encryption_key, self.key_resolver_function + ) content = self._config.message_encode_policy(content) new_message = GenQueueMessage(message_text=content) @@ -360,7 +367,8 @@ async def enqueue_message( # type: ignore visibilitytimeout=visibility_timeout, message_time_to_live=time_to_live, timeout=timeout, - **kwargs) + **kwargs + ) queue_message = QueueMessage(content=new_message.message_text) queue_message.id = enqueued[0].message_id queue_message.insertion_time = enqueued[0].insertion_time @@ -371,7 +379,8 @@ async def enqueue_message( # type: ignore except StorageErrorException as error: process_storage_error(error) - def receive_messages(self, messages_per_page=None, visibility_timeout=None, timeout=None, **kwargs): # type: ignore + @distributed_trace + def receive_messages(self, messages_per_page=None, visibility_timeout=None, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[int], Optional[int], Optional[Any]) -> QueueMessage """Removes one or more messages from the front of the queue. @@ -411,9 +420,8 @@ def receive_messages(self, messages_per_page=None, visibility_timeout=None, time :caption: Receive messages from the queue. """ self._config.message_decode_policy.configure( - self.require_encryption, - self.key_encryption_key, - self.key_resolver_function) + self.require_encryption, self.key_encryption_key, self.key_resolver_function + ) try: command = functools.partial( self._client.messages.dequeue, @@ -426,8 +434,16 @@ def receive_messages(self, messages_per_page=None, visibility_timeout=None, time except StorageErrorException as error: process_storage_error(error) - async def update_message(self, message, visibility_timeout=None, pop_receipt=None, # type: ignore - content=None, timeout=None, **kwargs): + @distributed_trace_async + async def update_message( + self, + message, + visibility_timeout=None, + pop_receipt=None, # type: ignore + content=None, + timeout=None, + **kwargs + ): # type: (Any, int, Optional[str], Optional[Any], Optional[int], Any) -> QueueMessage """Updates the visibility timeout of a message. You can also use this operation to update the contents of a message. @@ -492,13 +508,12 @@ async def update_message(self, message, visibility_timeout=None, pop_receipt=Non raise ValueError("pop_receipt must be present") if message_text is not None: self._config.message_encode_policy.configure( - self.require_encryption, - self.key_encryption_key, - self.key_resolver_function) + self.require_encryption, self.key_encryption_key, self.key_resolver_function + ) message_text = self._config.message_encode_policy(message_text) updated = GenQueueMessage(message_text=message_text) else: - updated = None # type: ignore + updated = None # type: ignore try: response = await self._client.message_id.update( queue_message=updated, @@ -507,19 +522,21 @@ async def update_message(self, message, visibility_timeout=None, pop_receipt=Non pop_receipt=receipt, cls=return_response_headers, queue_message_id=message_id, - **kwargs) + **kwargs + ) new_message = QueueMessage(content=message_text) new_message.id = message_id new_message.insertion_time = insertion_time new_message.expiration_time = expiration_time new_message.dequeue_count = dequeue_count - new_message.pop_receipt = response['popreceipt'] - new_message.time_next_visible = response['time_next_visible'] + new_message.pop_receipt = response["popreceipt"] + new_message.time_next_visible = response["time_next_visible"] return new_message except StorageErrorException as error: process_storage_error(error) - async def peek_messages(self, max_messages=None, timeout=None, **kwargs): # type: ignore + @distributed_trace_async + async def peek_messages(self, max_messages=None, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[int], Optional[Any]) -> List[QueueMessage] """Retrieves one or more messages from the front of the queue, but does not alter the visibility of the message. @@ -558,15 +575,12 @@ async def peek_messages(self, max_messages=None, timeout=None, **kwargs): # type if max_messages and not 1 <= max_messages <= 32: raise ValueError("Number of messages to peek should be between 1 and 32") self._config.message_decode_policy.configure( - self.require_encryption, - self.key_encryption_key, - self.key_resolver_function) + self.require_encryption, self.key_encryption_key, self.key_resolver_function + ) try: messages = await self._client.messages.peek( - number_of_messages=max_messages, - timeout=timeout, - cls=self._config.message_decode_policy, - **kwargs) + number_of_messages=max_messages, timeout=timeout, cls=self._config.message_decode_policy, **kwargs + ) wrapped_messages = [] for peeked in messages: wrapped_messages.append(QueueMessage._from_generated(peeked)) # pylint: disable=protected-access @@ -574,7 +588,8 @@ async def peek_messages(self, max_messages=None, timeout=None, **kwargs): # type except StorageErrorException as error: process_storage_error(error) - async def clear_messages(self, timeout=None, **kwargs): # type: ignore + @distributed_trace_async + async def clear_messages(self, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[Any]) -> None """Deletes all messages from the specified queue. @@ -594,7 +609,8 @@ async def clear_messages(self, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) - async def delete_message(self, message, pop_receipt=None, timeout=None, **kwargs): # type: ignore + @distributed_trace_async + async def delete_message(self, message, pop_receipt=None, timeout=None, **kwargs): # type: ignore # type: (Any, Optional[str], Optional[str], Optional[int]) -> None """Deletes the specified message. @@ -635,10 +651,8 @@ async def delete_message(self, message, pop_receipt=None, timeout=None, **kwargs raise ValueError("pop_receipt must be present") try: await self._client.message_id.delete( - pop_receipt=receipt, - timeout=timeout, - queue_message_id=message_id, - **kwargs + pop_receipt=receipt, timeout=timeout, queue_message_id=message_id, **kwargs ) except StorageErrorException as error: process_storage_error(error) + diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/queue_service_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/queue_service_client_async.py index 47d767069933..a79a8ca9b874 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/queue_service_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/queue_service_client_async.py @@ -13,6 +13,9 @@ except ImportError: from urlparse import urlparse # type: ignore +from azure.core.tracing.decorator import distributed_trace +from azure.core.tracing.decorator_async import distributed_trace_async + from azure.storage.queue._shared.policies_async import ExponentialRetry from azure.storage.queue.queue_service_client import QueueServiceClient as QueueServiceClientBase from azure.storage.queue._shared.models import LocationMode @@ -95,6 +98,7 @@ def __init__( self._client = AzureQueueStorage(url=self.url, pipeline=self._pipeline, loop=loop) # type: ignore self._loop = loop + @distributed_trace_async async def get_service_stats(self, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[Any]) -> Dict[str, Any] """Retrieves statistics related to replication for the Queue service. @@ -126,6 +130,7 @@ async def get_service_stats(self, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def get_service_properties(self, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[Any]) -> Dict[str, Any] """Gets the properties of a storage account's Queue service, including @@ -148,6 +153,7 @@ async def get_service_properties(self, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace_async async def set_service_properties( # type: ignore self, logging=None, # type: Optional[Logging] hour_metrics=None, # type: Optional[Metrics] @@ -202,6 +208,7 @@ async def set_service_properties( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def list_queues( self, name_starts_with=None, # type: Optional[str] include_metadata=False, # type: Optional[bool] @@ -253,6 +260,7 @@ def list_queues( return QueuePropertiesPaged( command, prefix=name_starts_with, results_per_page=results_per_page, marker=marker) + @distributed_trace_async async def create_queue( # type: ignore self, name, # type: str metadata=None, # type: Optional[Dict[str, str]] @@ -287,6 +295,7 @@ async def create_queue( # type: ignore metadata=metadata, timeout=timeout, **kwargs) return queue + @distributed_trace_async async def delete_queue( # type: ignore self, queue, # type: Union[QueueProperties, str] timeout=None, # type: Optional[int] diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/queue_client.py b/sdk/storage/azure-storage-queue/azure/storage/queue/queue_client.py index 8815bb61007a..40220c2805ad 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/queue_client.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/queue_client.py @@ -28,6 +28,7 @@ from ._generated import AzureQueueStorage from ._generated.models import StorageErrorException, SignedIdentifier from ._generated.models import QueueMessage as GenQueueMessage +from azure.core.tracing.decorator import distributed_trace from .models import QueueMessage, AccessPolicy, MessagesPaged @@ -229,6 +230,7 @@ def generate_shared_access_signature( protocol=protocol, ) + @distributed_trace def create_queue(self, metadata=None, timeout=None, **kwargs): # type: (Optional[Dict[str, Any]], Optional[int], Optional[Any]) -> None """Creates a new queue in the storage account. @@ -267,6 +269,7 @@ def create_queue(self, metadata=None, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace def delete_queue(self, timeout=None, **kwargs): # type: (Optional[int], Optional[Any]) -> None """Deletes the specified queue and any messages it contains. @@ -296,6 +299,7 @@ def delete_queue(self, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace def get_queue_properties(self, timeout=None, **kwargs): # type: (Optional[int], Optional[Any]) -> QueueProperties """Returns all user-defined metadata for the specified queue. @@ -325,6 +329,7 @@ def get_queue_properties(self, timeout=None, **kwargs): response.name = self.queue_name return response # type: ignore + @distributed_trace def set_queue_metadata(self, metadata=None, timeout=None, **kwargs): # type: (Optional[Dict[str, Any]], Optional[int], Optional[Any]) -> None """Sets user-defined metadata on the specified queue. @@ -357,6 +362,7 @@ def set_queue_metadata(self, metadata=None, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace def get_queue_access_policy(self, timeout=None, **kwargs): # type: (Optional[int], Optional[Any]) -> Dict[str, Any] """Returns details about any stored access policies specified on the @@ -376,6 +382,7 @@ def get_queue_access_policy(self, timeout=None, **kwargs): process_storage_error(error) return {s.id: s.access_policy or AccessPolicy() for s in identifiers} + @distributed_trace def set_queue_access_policy(self, signed_identifiers=None, timeout=None, **kwargs): # type: (Optional[Dict[str, Optional[AccessPolicy]]], Optional[int], Optional[Any]) -> None """Sets stored access policies for the queue that may be used with Shared @@ -428,6 +435,7 @@ def set_queue_access_policy(self, signed_identifiers=None, timeout=None, **kwarg except StorageErrorException as error: process_storage_error(error) + @distributed_trace def enqueue_message( # type: ignore self, content, # type: Any visibility_timeout=None, # type: Optional[int] @@ -504,6 +512,7 @@ def enqueue_message( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def receive_messages(self, messages_per_page=None, visibility_timeout=None, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[int], Optional[int], Optional[Any]) -> QueueMessage """Removes one or more messages from the front of the queue. @@ -559,6 +568,7 @@ def receive_messages(self, messages_per_page=None, visibility_timeout=None, time except StorageErrorException as error: process_storage_error(error) + @distributed_trace def update_message(self, message, visibility_timeout=None, pop_receipt=None, # type: ignore content=None, timeout=None, **kwargs): # type: (Any, int, Optional[str], Optional[Any], Optional[int], Any) -> QueueMessage @@ -652,6 +662,7 @@ def update_message(self, message, visibility_timeout=None, pop_receipt=None, # t except StorageErrorException as error: process_storage_error(error) + @distributed_trace def peek_messages(self, max_messages=None, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[int], Optional[Any]) -> List[QueueMessage] """Retrieves one or more messages from the front of the queue, but does @@ -707,6 +718,7 @@ def peek_messages(self, max_messages=None, timeout=None, **kwargs): # type: igno except StorageErrorException as error: process_storage_error(error) + @distributed_trace def clear_messages(self, timeout=None, **kwargs): # type: (Optional[int], Optional[Any]) -> None """Deletes all messages from the specified queue. @@ -727,6 +739,7 @@ def clear_messages(self, timeout=None, **kwargs): except StorageErrorException as error: process_storage_error(error) + @distributed_trace def delete_message(self, message, pop_receipt=None, timeout=None, **kwargs): # type: (Any, Optional[str], Optional[str], Optional[int]) -> None """Deletes the specified message. diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/queue_service_client.py b/sdk/storage/azure-storage-queue/azure/storage/queue/queue_service_client.py index c4eef8ff21e1..882325631350 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/queue_service_client.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/queue_service_client.py @@ -22,6 +22,7 @@ from .models import QueuePropertiesPaged from .queue_client import QueueClient +from azure.core.tracing.decorator import distributed_trace if TYPE_CHECKING: from datetime import datetime @@ -189,6 +190,7 @@ def generate_shared_access_signature( return sas.generate_account( Services.QUEUE, resource_types, permission, expiry, start=start, ip=ip, protocol=protocol) # type: ignore + @distributed_trace def get_service_stats(self, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[Any]) -> Dict[str, Any] """Retrieves statistics related to replication for the Queue service. @@ -220,6 +222,7 @@ def get_service_stats(self, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def get_service_properties(self, timeout=None, **kwargs): # type: ignore # type: (Optional[int], Optional[Any]) -> Dict[str, Any] """Gets the properties of a storage account's Queue service, including @@ -242,6 +245,7 @@ def get_service_properties(self, timeout=None, **kwargs): # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def set_service_properties( # type: ignore self, logging=None, # type: Optional[Logging] hour_metrics=None, # type: Optional[Metrics] @@ -296,6 +300,7 @@ def set_service_properties( # type: ignore except StorageErrorException as error: process_storage_error(error) + @distributed_trace def list_queues( self, name_starts_with=None, # type: Optional[str] include_metadata=False, # type: Optional[bool] @@ -347,6 +352,7 @@ def list_queues( return QueuePropertiesPaged( command, prefix=name_starts_with, results_per_page=results_per_page, marker=marker) + @distributed_trace def create_queue( self, name, # type: str metadata=None, # type: Optional[Dict[str, str]] @@ -381,6 +387,7 @@ def create_queue( metadata=metadata, timeout=timeout, **kwargs) return queue + @distributed_trace def delete_queue( self, queue, # type: Union[QueueProperties, str] timeout=None, # type: Optional[int]