-
Notifications
You must be signed in to change notification settings - Fork 3.2k
[Cosmos] Reconfigure retry policy #7544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
eb8d30a
a584cb5
9710498
9ceedb2
e48fe4f
e8be3bf
c0575ba
28aa675
cf15fb3
bd2946f
f28f69f
63dcc00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,9 @@ | |
|
|
||
| import time | ||
|
|
||
| from azure.core.exceptions import AzureError, ClientAuthenticationError | ||
| from azure.core.pipeline.policies import RetryPolicy | ||
|
|
||
| from . import errors | ||
| from . import _endpoint_discovery_retry_policy | ||
| from . import _resource_throttle_retry_policy | ||
|
|
@@ -64,6 +67,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): | |
| ) | ||
| while True: | ||
| try: | ||
| client_timeout = kwargs.get('timeout') | ||
| start_time = time.time() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be outside the loop?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @johanste |
||
| if args: | ||
| result = ExecuteFunction(function, global_endpoint_manager, *args, **kwargs) | ||
| else: | ||
|
|
@@ -113,9 +118,92 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): | |
|
|
||
| # Wait for retry_after_in_milliseconds time before the next retry | ||
| time.sleep(retry_policy.retry_after_in_milliseconds / 1000.0) | ||
| if client_timeout: | ||
| kwargs['timeout'] = client_timeout - (time.time() - start_time) | ||
| if kwargs['timeout'] <= 0: | ||
| raise errors.CosmosClientTimeoutError() | ||
|
|
||
|
|
||
| def ExecuteFunction(function, *args, **kwargs): | ||
| """ Stub method so that it can be used for mocking purposes as well. | ||
| """ | ||
| return function(*args, **kwargs) | ||
|
|
||
|
|
||
| def _configure_timeout(request, absolute, per_request): | ||
| # type: (azure.core.pipeline.PipelineRequest, Optional[int], int) -> Optional[AzureError] | ||
| if absolute is not None: | ||
| if absolute <= 0: | ||
| raise errors.CosmosClientTimeoutError() | ||
| if per_request: | ||
| # Both socket timeout and client timeout have been provided - use the shortest value. | ||
| request.context.options['connection_timeout'] = min(per_request, absolute) | ||
| else: | ||
| # Only client timeout provided. | ||
| request.context.options['connection_timeout'] = absolute | ||
| elif per_request: | ||
| # Only socket timeout provided. | ||
| request.context.options['connection_timeout'] = per_request | ||
|
|
||
|
|
||
| class ConnectionRetryPolicy(RetryPolicy): | ||
|
|
||
| def __init__(self, **kwargs): | ||
| clean_kwargs = {k: v for k, v in kwargs.items() if v is not None} | ||
| super(ConnectionRetryPolicy, self).__init__(**clean_kwargs) | ||
|
|
||
| def send(self, request): | ||
| """Sends the PipelineRequest object to the next policy. Uses retry settings if necessary. | ||
| Also enforces an absolute client-side timeout that spans multiple retry attempts. | ||
|
|
||
| :param request: The PipelineRequest object | ||
| :type request: ~azure.core.pipeline.PipelineRequest | ||
| :return: Returns the PipelineResponse or raises error if maximum retries exceeded. | ||
| :rtype: ~azure.core.pipeline.PipelineResponse | ||
| :raises: ~azure.core.exceptions.AzureError if maximum retries exceeded. | ||
| :raises: ~azure.cosmos.CosmosClientTimeoutError if specified timeout exceeded. | ||
| :raises: ~azure.core.exceptions.ClientAuthenticationError if authentication | ||
| """ | ||
| absolute_timeout = request.context.options.pop('timeout', None) | ||
| per_request_timeout = request.context.options.pop('connection_timeout', 0) | ||
|
|
||
| retry_error = None | ||
| retry_active = True | ||
| response = None | ||
| retry_settings = self.configure_retries(request.context.options) | ||
| while retry_active: | ||
| try: | ||
| start_time = time.time() | ||
| _configure_timeout(request, absolute_timeout, per_request_timeout) | ||
|
|
||
| response = self.next.send(request) | ||
| if self.is_retry(retry_settings, response): | ||
| retry_active = self.increment(retry_settings, response=response) | ||
| if retry_active: | ||
| self.sleep(retry_settings, request.context.transport, response=response) | ||
| continue | ||
| break | ||
| except ClientAuthenticationError: # pylint:disable=try-except-raise | ||
| # the authentication policy failed such that the client's request can't | ||
| # succeed--we'll never have a response to it, so propagate the exception | ||
| raise | ||
| except errors.CosmosClientTimeoutError as timeout_error: | ||
| timeout_error.inner_exception = retry_error | ||
| timeout_error.response = response | ||
| timeout_error.history = retry_settings['history'] | ||
| raise | ||
| except AzureError as err: | ||
| retry_error = err | ||
| if self._is_method_retryable(retry_settings, request.http_request): | ||
| retry_active = self.increment(retry_settings, response=request, error=err) | ||
| if retry_active: | ||
| self.sleep(retry_settings, request.context.transport) | ||
| continue | ||
| raise err | ||
| finally: | ||
| end_time = time.time() | ||
| if absolute_timeout: | ||
| absolute_timeout -= (end_time - start_time) | ||
|
|
||
| self.update_context(response.context, retry_settings) | ||
| return response | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,2 @@ | ||
| -e ../../../tools/azure-sdk-tools | ||
| -e ../../core/azure-core |
Uh oh!
There was an error while loading. Please reload this page.