diff --git a/sdk/core/azure-core/azure/core/exceptions.py b/sdk/core/azure-core/azure/core/exceptions.py index ae6ff7c53eb8..b69912108a40 100644 --- a/sdk/core/azure-core/azure/core/exceptions.py +++ b/sdk/core/azure-core/azure/core/exceptions.py @@ -114,6 +114,7 @@ def __init__(self, message=None, response=None, **kwargs): if response: self.reason = response.reason self.status_code = response.status_code + message = message or "Operation returned an invalid status '{}'".format(self.reason) try: try: diff --git a/sdk/core/azure-core/azure/core/pipeline/policies/universal.py b/sdk/core/azure-core/azure/core/pipeline/policies/universal.py index 79b48bb162ce..b9338950ff38 100644 --- a/sdk/core/azure-core/azure/core/pipeline/policies/universal.py +++ b/sdk/core/azure-core/azure/core/pipeline/policies/universal.py @@ -350,7 +350,7 @@ def deserialize_from_http_generics(cls, response): # Try to use content-type from headers if available content_type = None if response.content_type: # type: ignore - content_type = response.content_type[0].strip().lower() # type: ignore + content_type = response.content_type.split(";")[0].strip().lower() # type: ignore # Ouch, this server did not declare what it sent... # Let's guess it's JSON... diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py index f6ee555cea1c..9fb2eb125a5c 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py @@ -31,7 +31,7 @@ import aiohttp from azure.core.configuration import ConnectionConfiguration -from azure.core.exceptions import ServiceRequestError +from azure.core.exceptions import ServiceRequestError, ServiceResponseError from azure.core.pipeline import Pipeline from requests.exceptions import ( @@ -181,7 +181,8 @@ async def send(self, request: HttpRequest, **config: Any) -> Optional[AsyncHttpR await response.load_body() except aiohttp.client_exceptions.ClientConnectorError as err: error = ServiceRequestError(err, error=err) - + except asyncio.TimeoutError as err: + error = ServiceResponseError(err, error=err) if error: raise error return response @@ -191,19 +192,16 @@ class AioHttpStreamDownloadGenerator(AsyncIterator): """Streams the response body data. :param pipeline: The pipeline object - :param request: The request object :param response: The client response object. - :type response: aiohttp.ClientResponse :param block_size: block size of data sent over connection. :type block_size: int """ - def __init__(self, pipeline: Pipeline, request: HttpRequest, - response: aiohttp.ClientResponse, block_size: int) -> None: + def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.pipeline = pipeline - self.request = request + self.request = response.request self.response = response - self.block_size = block_size - self.content_length = int(response.headers.get('Content-Length', 0)) + self.block_size = response.block_size + self.content_length = int(response.internal_response.headers.get('Content-Length', 0)) self.downloaded = 0 def __len__(self): @@ -215,13 +213,13 @@ async def __anext__(self): retry_interval = 1000 while retry_active: try: - chunk = await self.response.content.read(self.block_size) + chunk = await self.response.internal_response.content.read(self.block_size) if not chunk: raise _ResponseStopIteration() self.downloaded += self.block_size return chunk except _ResponseStopIteration: - self.response.close() + self.response.internal_response.close() raise StopAsyncIteration() except (ChunkedEncodingError, ConnectionError): retry_total -= 1 @@ -233,7 +231,7 @@ async def __anext__(self): resp = self.pipeline.run(self.request, stream=True, headers=headers) if resp.status_code == 416: raise - chunk = await self.response.content.read(self.block_size) + chunk = await self.response.internal_response.content.read(self.block_size) if not chunk: raise StopIteration() self.downloaded += chunk @@ -243,7 +241,7 @@ async def __anext__(self): raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) - self.response.close() + self.response.internal_response.close() raise class AioHttpTransportResponse(AsyncHttpResponse): @@ -282,4 +280,4 @@ def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: :param pipeline: The pipeline object :type pipeline: azure.core.pipeline """ - return AioHttpStreamDownloadGenerator(pipeline, self.request, self.internal_response, self.block_size) + return AioHttpStreamDownloadGenerator(pipeline, self) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/requests_asyncio.py index 1f302034a173..6093713c2928 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/requests_asyncio.py @@ -78,6 +78,9 @@ async def __aenter__(self): async def __aexit__(self, *exc_details): # pylint: disable=arguments-differ return super(AsyncioRequestsTransport, self).__exit__() + async def sleep(self, duration): + await asyncio.sleep(duration) + async def send(self, request: HttpRequest, **kwargs: Any) -> AsyncHttpResponse: # type: ignore """Send the request using this HTTP sender. @@ -135,18 +138,16 @@ class AsyncioStreamDownloadGenerator(AsyncIterator): """Streams the response body data. :param pipeline: The pipeline object - :param request: The request object :param response: The response object. - :param int block_size: block size of data sent over connection. :param generator iter_content_func: Iterator for response data. :param int content_length: size of body in bytes. """ - def __init__(self, pipeline: Pipeline, request: HttpRequest, response: requests.Response, block_size: int) -> None: + def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.pipeline = pipeline - self.request = request + self.request = response.request self.response = response - self.block_size = block_size - self.iter_content_func = self.response.iter_content(self.block_size) + self.block_size = response.block_size + self.iter_content_func = self.response.internal_response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) self.downloaded = 0 @@ -170,7 +171,7 @@ async def __anext__(self): self.downloaded += self.block_size return chunk except _ResponseStopIteration: - self.response.close() + self.response.internal_response.close() raise StopAsyncIteration() except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError): @@ -197,7 +198,7 @@ async def __anext__(self): raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) - self.response.close() + self.response.internal_response.close() raise @@ -206,5 +207,4 @@ class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportRespo """ def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: # type: ignore """Generator for streaming request body data.""" - return AsyncioStreamDownloadGenerator(pipeline, self.request, - self.internal_response, self.block_size) # type: ignore + return AsyncioStreamDownloadGenerator(pipeline, self) # type: ignore diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py index 8907937c9a5a..95757edb63dd 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/requests_basic.py @@ -65,9 +65,7 @@ def __init__(self, request, requests_response, block_size=None): self.status_code = requests_response.status_code self.headers = requests_response.headers self.reason = requests_response.reason - content_type = requests_response.headers.get('content-type') - if content_type: - self.content_type = content_type.split(";") + self.content_type = requests_response.headers.get('content-type') def body(self): return self.internal_response.content @@ -82,18 +80,16 @@ class StreamDownloadGenerator(object): """Generator for streaming response data. :param pipeline: The pipeline object - :param request: The request object :param response: The response object. - :param int block_size: Number of bytes to read into memory. :param generator iter_content_func: Iterator for response data. :param int content_length: size of body in bytes. """ - def __init__(self, pipeline, request, response, block_size): + def __init__(self, pipeline, response): self.pipeline = pipeline - self.request = request + self.request = response.request self.response = response - self.block_size = block_size - self.iter_content_func = self.response.iter_content(self.block_size) + self.block_size = response.block_size + self.iter_content_func = self.response.internal_response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) self.downloaded = 0 @@ -115,7 +111,7 @@ def __next__(self): self.downloaded += self.block_size return chunk except StopIteration: - self.response.close() + self.response.internal_response.close() raise StopIteration() except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError): @@ -138,7 +134,7 @@ def __next__(self): raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) - self.response.close() + self.response.internal_response.close() raise next = __next__ # Python 2 compatibility. @@ -149,7 +145,7 @@ class RequestsTransportResponse(HttpResponse, _RequestsTransportResponseBase): def stream_download(self, pipeline): # type: (PipelineType) -> Iterator[bytes] """Generator for streaming request body data.""" - return StreamDownloadGenerator(pipeline, self.request, self.internal_response, self.block_size) + return StreamDownloadGenerator(pipeline, self) class RequestsTransport(HttpTransport): diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py index c3a1c591587b..f27f034033ea 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py @@ -53,18 +53,16 @@ class TrioStreamDownloadGenerator(AsyncIterator): """Generator for streaming response data. :param pipeline: The pipeline object - :param request: The request object :param response: The response object. - :param int block_size: Number of bytes to read into memory. :param generator iter_content_func: Iterator for response data. :param int content_length: size of body in bytes. """ - def __init__(self, pipeline: Pipeline, request: HttpRequest, response: requests.Response, block_size: int) -> None: + def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.pipeline = pipeline - self.request = request + self.request = response.request self.response = response - self.block_size = block_size - self.iter_content_func = self.response.iter_content(self.block_size) + self.block_size = response.block_size + self.iter_content_func = self.response.internal_response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) self.downloaded = 0 @@ -85,7 +83,7 @@ async def __anext__(self): self.downloaded += self.block_size return chunk except _ResponseStopIteration: - self.response.close() + self.response.internal_response.close() raise StopAsyncIteration() except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError): @@ -111,7 +109,7 @@ async def __anext__(self): raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) - self.response.close() + self.response.internal_response.close() raise class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore @@ -120,8 +118,7 @@ class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: # type: ignore """Generator for streaming response data. """ - return TrioStreamDownloadGenerator(pipeline, self.request, - self.internal_response, self.block_size) # type: ignore + return TrioStreamDownloadGenerator(pipeline, self) # type: ignore class TrioRequestsTransport(RequestsTransport, AsyncHttpTransport): # type: ignore diff --git a/sdk/core/azure-core/tests/azure_core_asynctests/test_pipeline.py b/sdk/core/azure-core/tests/azure_core_asynctests/test_pipeline.py index d344a9f90fff..bcefb9b4bb1a 100644 --- a/sdk/core/azure-core/tests/azure_core_asynctests/test_pipeline.py +++ b/sdk/core/azure-core/tests/azure_core_asynctests/test_pipeline.py @@ -119,6 +119,23 @@ async def test_basic_async_requests(): assert response.http_response.status_code == 200 +@pytest.mark.asyncio +async def test_async_transport_sleep(): + + async with AsyncioRequestsTransport() as transport: + await transport.sleep(1) + + async with AioHttpTransport() as transport: + await transport.sleep(1) + +def test_async_trio_transport_sleep(): + + async def do(): + async with TrioRequestsTransport() as transport: + await transport.sleep(1) + + response = trio.run(do) + @pytest.mark.asyncio async def test_conf_async_requests(): diff --git a/sdk/core/azure-core/tests/test_universal_pipeline.py b/sdk/core/azure-core/tests/test_universal_pipeline.py index 0049e2277374..cc783b0d4c74 100644 --- a/sdk/core/azure-core/tests/test_universal_pipeline.py +++ b/sdk/core/azure-core/tests/test_universal_pipeline.py @@ -123,9 +123,7 @@ class MockResponse(HttpResponse): def __init__(self, body, content_type): super(MockResponse, self).__init__(None, None) self._body = body - self.content_type = None - if content_type: - self.content_type = [content_type] + self.content_type = content_type def body(self): return self._body diff --git a/sdk/identity/azure-identity/tests/helpers.py b/sdk/identity/azure-identity/tests/helpers.py index 8acb3d5f646b..694a92df922a 100644 --- a/sdk/identity/azure-identity/tests/helpers.py +++ b/sdk/identity/azure-identity/tests/helpers.py @@ -41,7 +41,7 @@ def mock_response(status_code=200, headers={}, json_payload=None): if json_payload is not None: response.text = lambda: json.dumps(json_payload) response.headers["content-type"] = "application/json" - response.content_type = ["application/json"] + response.content_type = "application/json" return response diff --git a/sdk/identity/azure-identity/tests/test_authn_client.py b/sdk/identity/azure-identity/tests/test_authn_client.py index 39889f003627..1aca947b3d89 100644 --- a/sdk/identity/azure-identity/tests/test_authn_client.py +++ b/sdk/identity/azure-identity/tests/test_authn_client.py @@ -28,7 +28,7 @@ def test_authn_client_deserialization(): scope = "scope" mock_response = Mock( - headers={"content-type": "application/json"}, status_code=200, content_type=["application/json"] + headers={"content-type": "application/json"}, status_code=200, content_type="application/json" ) mock_send = Mock(return_value=mock_response) @@ -87,7 +87,7 @@ def test_caching_when_only_expires_in_set(): text=lambda: json.dumps({"access_token": access_token, "expires_in": expires_in, "token_type": "Bearer"}), headers={"content-type": "application/json"}, status_code=200, - content_type=["application/json"], + content_type="application/json", ) mock_send = Mock(return_value=mock_response) @@ -106,7 +106,7 @@ def test_expires_in_strings(): expected_token = "token" mock_response = Mock( - headers={"content-type": "application/json"}, status_code=200, content_type=["application/json"] + headers={"content-type": "application/json"}, status_code=200, content_type="application/json" ) mock_send = Mock(return_value=mock_response) @@ -133,7 +133,7 @@ def test_cache_expiry(): text=lambda: json.dumps(token_payload), headers={"content-type": "application/json"}, status_code=200, - content_type=["application/json"], + content_type="application/json", ) mock_send = Mock(return_value=mock_response) diff --git a/sdk/identity/azure-identity/tests/test_identity.py b/sdk/identity/azure-identity/tests/test_identity.py index c9756ad3b344..b017792595a8 100644 --- a/sdk/identity/azure-identity/tests/test_identity.py +++ b/sdk/identity/azure-identity/tests/test_identity.py @@ -191,7 +191,7 @@ def test_imds_credential_cache(): text=lambda: json.dumps(token_payload), headers={"content-type": "application/json"}, status_code=200, - content_type=["application/json"], + content_type="application/json", ) mock_send = Mock(return_value=mock_response) @@ -219,7 +219,7 @@ def test_imds_credential_retries(): mock_response = Mock( text=lambda: b"{}", headers={"content-type": "application/json", "Retry-After": "0"}, - content_type=["application/json"], + content_type="application/json", ) mock_send = Mock(return_value=mock_response) diff --git a/sdk/identity/azure-identity/tests/test_identity_async.py b/sdk/identity/azure-identity/tests/test_identity_async.py index ba203cd2eb59..03b53db9eccd 100644 --- a/sdk/identity/azure-identity/tests/test_identity_async.py +++ b/sdk/identity/azure-identity/tests/test_identity_async.py @@ -196,7 +196,7 @@ async def test_imds_credential_cache(): text=lambda: json.dumps(token_payload), headers={"content-type": "application/json"}, status_code=200, - content_type=["application/json"], + content_type="application/json", ) mock_send = Mock(return_value=mock_response) @@ -225,7 +225,7 @@ async def test_imds_credential_retries(): mock_response = Mock( text=lambda: b"{}", headers={"content-type": "application/json", "Retry-After": "0"}, - content_type=["application/json"], + content_type="application/json", ) mock_send = Mock(return_value=mock_response)