diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index 96479a6ef31b..282a8c574088 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -13,24 +13,6 @@ log = logging.getLogger(__name__) -def _retry_decorator(to_be_wrapped_func): - def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor - timeout = kwargs.pop("timeout", 100000) - if not timeout: - timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number. - timeout_time = time.time() + timeout - max_retries = self.client.config.max_retries - retry_count = 0 - last_exception = None - while True: - try: - return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs) - except Exception as exception: # pylint:disable=broad-except - last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access - retry_count += 1 - return wrapped_func - - class ConsumerProducerMixin(object): def __init__(self): self.client = None @@ -55,9 +37,9 @@ def _redirect(self, redirect): self.running = False self._close_connection() - def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor + def _open(self): """ - Open the EventHubConsumer using the supplied connection. + Open the EventHubConsumer/EventHubProducer using the supplied connection. If the handler has previously been redirected, the redirect context will be used to create a new handler before opening it. @@ -91,12 +73,36 @@ def _close_connection(self): self._close_handler() self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access - def _handle_exception(self, exception, retry_count, max_retries, timeout_time): + def _handle_exception(self, exception): if not self.running and isinstance(exception, compat.TimeoutException): exception = errors.AuthenticationException("Authorization timeout.") - return _handle_exception(exception, retry_count, max_retries, self, timeout_time) + return _handle_exception(exception, self) + + return _handle_exception(exception, self) + + def _do_retryable_operation(self, operation, timeout=None, **kwargs): + # pylint:disable=protected-access + if not timeout: + timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number. + timeout_time = time.time() + timeout + retried_times = 0 + last_exception = kwargs.pop('last_exception', None) + operation_need_param = kwargs.pop('operation_need_param', True) + + while retried_times <= self.client.config.max_retries: + try: + if operation_need_param: + return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs) + else: + return operation() + except Exception as exception: # pylint:disable=broad-except + last_exception = self._handle_exception(exception) + self.client._try_delay(retried_times=retried_times, last_exception=last_exception, + timeout_time=timeout_time, entity_name=self.name) + retried_times += 1 - return _handle_exception(exception, retry_count, max_retries, self, timeout_time) + log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception) + raise last_exception def close(self, exception=None): # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 53624c36f649..64873f843dc4 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -13,26 +13,6 @@ log = logging.getLogger(__name__) -def _retry_decorator(to_be_wrapped_func): - async def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor - timeout = kwargs.pop("timeout", 100000) - if not timeout: - timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number. - timeout_time = time.time() + timeout - max_retries = self.client.config.max_retries - retry_count = 0 - last_exception = None - while True: - try: - return await to_be_wrapped_func( - self, timeout_time=timeout_time, last_exception=last_exception, **kwargs - ) - except Exception as exception: # pylint:disable=broad-except - last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access - retry_count += 1 - return wrapped_func - - class ConsumerProducerMixin(object): def __init__(self): @@ -58,7 +38,7 @@ async def _redirect(self, redirect): self.running = False await self._close_connection() - async def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor + async def _open(self): """ Open the EventHubConsumer using the supplied connection. If the handler has previously been redirected, the redirect @@ -94,12 +74,36 @@ async def _close_connection(self): await self._close_handler() await self.client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access - async def _handle_exception(self, exception, retry_count, max_retries, timeout_time): + async def _handle_exception(self, exception): if not self.running and isinstance(exception, compat.TimeoutException): exception = errors.AuthenticationException("Authorization timeout.") - return await _handle_exception(exception, retry_count, max_retries, self, timeout_time) + return await _handle_exception(exception, self) + + return await _handle_exception(exception, self) + + async def _do_retryable_operation(self, operation, timeout=None, **kwargs): + # pylint:disable=protected-access + if not timeout: + timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number. + timeout_time = time.time() + timeout + retried_times = 0 + last_exception = kwargs.pop('last_exception', None) + operation_need_param = kwargs.pop('operation_need_param', True) + + while retried_times <= self.client.config.max_retries: + try: + if operation_need_param: + return await operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs) + else: + return await operation() + except Exception as exception: # pylint:disable=broad-except + last_exception = await self._handle_exception(exception) + await self.client._try_delay(retried_times=retried_times, last_exception=last_exception, + timeout_time=timeout_time, entity_name=self.name) + retried_times += 1 - return await _handle_exception(exception, retry_count, max_retries, self, timeout_time) + log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception) + raise last_exception async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 00b136b8bed5..10756da08701 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -4,6 +4,7 @@ # -------------------------------------------------------------------------------------------- import logging import datetime +import time import functools import asyncio @@ -100,23 +101,29 @@ def _create_auth(self, username=None, password=None): get_jwt_token, http_proxy=http_proxy, transport_type=transport_type) - async def _handle_exception(self, exception, retry_count, max_retries): - await _handle_exception(exception, retry_count, max_retries, self) - async def _close_connection(self): await self._conn_manager.reset_connection_if_broken() - async def _management_request(self, mgmt_msg, op_type): - if self._is_iothub and not self._iothub_redirect_info: - await self._iothub_redirect() + async def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_name=None): + entity_name = entity_name or self.container_id + backoff = self.config.backoff_factor * 2 ** retried_times + if backoff <= self.config.backoff_max and ( + timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return + asyncio.sleep(backoff) + log.info("%r has an exception (%r). Retrying...", format(entity_name), last_exception) + else: + log.info("%r operation has timed out. Last exception before timeout is (%r)", + entity_name, last_exception) + raise last_exception + async def _management_request(self, mgmt_msg, op_type): alt_creds = { "username": self._auth_config.get("iot_username"), "password": self._auth_config.get("iot_password") } - max_retries = self.config.max_retries - retry_count = 0 - while True: + + retried_times = 0 + while retried_times <= self.config.max_retries: mgmt_auth = self._create_auth(**alt_creds) mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.config.network_tracing) try: @@ -130,8 +137,9 @@ async def _management_request(self, mgmt_msg, op_type): description_fields=b'status-description') return response except Exception as exception: # pylint:disable=broad-except - await self._handle_exception(exception, retry_count, max_retries) - retry_count += 1 + last_exception = await _handle_exception(exception, self) + await self._try_delay(retried_times=retried_times, last_exception=last_exception) + retried_times += 1 finally: await mgmt_client.close_async() @@ -144,7 +152,7 @@ async def _iothub_redirect(self): event_position=EventPosition('-1'), operation='/messages/events') async with self._redirect_consumer: - await self._redirect_consumer._open_with_retry(timeout=self.config.receive_timeout) # pylint: disable=protected-access + await self._redirect_consumer._open_with_retry() # pylint: disable=protected-access self._redirect_consumer = None async def get_properties(self): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 49e85fe2e811..f26853e32cac 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -13,7 +13,7 @@ from azure.eventhub import EventData, EventPosition from azure.eventhub.error import EventHubError, ConnectError, _error_handler -from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator +from ._consumer_producer_mixin_async import ConsumerProducerMixin log = logging.getLogger(__name__) @@ -92,9 +92,8 @@ def __aiter__(self): return self async def __anext__(self): - max_retries = self.client.config.max_retries - retry_count = 0 - while True: + retried_times = 0 + while retried_times < self.client.config.max_retries: try: await self._open() if not self.messages_iter: @@ -102,11 +101,13 @@ async def __anext__(self): message = await self.messages_iter.__anext__() event_data = EventData._from_message(message) # pylint:disable=protected-access self.offset = EventPosition(event_data.offset, inclusive=False) - retry_count = 0 + retried_times = 0 return event_data except Exception as exception: # pylint:disable=broad-except - await self._handle_exception(exception, retry_count, max_retries, timeout_time=None) - retry_count += 1 + last_exception = await self._handle_exception(exception) + await self.client._try_delay(retried_times=retried_times, last_exception=last_exception, + entity_name=self.name) + retried_times += 1 def _create_handler(self): alt_creds = { @@ -136,7 +137,7 @@ async def _redirect(self, redirect): self.messages_iter = None await super(EventHubConsumer, self)._redirect(redirect) - async def _open(self, timeout_time=None, **kwargs): + async def _open(self): """ Open the EventHubConsumer using the supplied connection. If the handler has previously been redirected, the redirect @@ -149,17 +150,16 @@ async def _open(self, timeout_time=None, **kwargs): if not self.running and self.redirected: self.client._process_redirect_uri(self.redirected) self.source = self.redirected.address - await super(EventHubConsumer, self)._open(timeout_time) + await super(EventHubConsumer, self)._open() - @_retry_decorator - async def _open_with_retry(self, timeout_time=None, **kwargs): - return await self._open(timeout_time=timeout_time, **kwargs) + async def _open_with_retry(self): + return await self._do_retryable_operation(self._open, operation_need_param=False) async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): last_exception = kwargs.get("last_exception") data_batch = kwargs.get("data_batch") - await self._open(timeout_time) + await self._open() remaining_time = timeout_time - time.time() if remaining_time <= 0.0: if last_exception: @@ -177,9 +177,9 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): data_batch.append(event_data) return data_batch - @_retry_decorator - async def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs): - return await self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs) + async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): + return await self._do_retryable_operation(self._receive, timeout=timeout, + max_batch_size=max_batch_size, **kwargs) @property def queue_size(self): @@ -227,7 +227,7 @@ async def receive(self, *, max_batch_size=None, timeout=None): max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch) data_batch = [] # type: List[EventData] - return await self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) + return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py index 5d0cff3ebc1d..58ecee91ad1b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py @@ -36,7 +36,7 @@ def _create_eventhub_exception(exception): return error -async def _handle_exception(exception, retry_count, max_retries, closable, timeout_time=None): # pylint:disable=too-many-branches, too-many-statements +async def _handle_exception(exception, closable): # pylint:disable=too-many-branches, too-many-statements if isinstance(exception, asyncio.CancelledError): raise exception try: @@ -45,10 +45,10 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo name = closable.container_id if isinstance(exception, KeyboardInterrupt): # pylint:disable=no-else-raise log.info("%r stops due to keyboard interrupt", name) - closable.close() + await closable.close() raise exception elif isinstance(exception, EventHubError): - closable.close() + await closable.close() raise exception elif isinstance(exception, ( errors.MessageAccepted, @@ -65,10 +65,6 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo log.info("%r Event data send error (%r)", name, exception) error = EventDataSendError(str(exception), exception) raise error - elif retry_count >= max_retries: - error = _create_eventhub_exception(exception) - log.info("%r has exhausted retry. Exception still occurs (%r)", name, exception) - raise error else: if isinstance(exception, errors.AuthenticationException): if hasattr(closable, "_close_connection"): @@ -95,20 +91,4 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo else: if hasattr(closable, "_close_connection"): await closable._close_connection() # pylint:disable=protected-access - # start processing retry delay - try: - backoff_factor = closable.client.config.backoff_factor - backoff_max = closable.client.config.backoff_max - except AttributeError: - backoff_factor = closable.config.backoff_factor - backoff_max = closable.config.backoff_max - backoff = backoff_factor * 2 ** retry_count - if backoff <= backoff_max and (timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return - await asyncio.sleep(backoff) - log.info("%r has an exception (%r). Retrying...", format(name), exception) - return _create_eventhub_exception(exception) - else: - error = _create_eventhub_exception(exception) - log.info("%r operation has timed out. Last exception before timeout is (%r)", name, error) - raise error - # end of processing retry delay + return _create_eventhub_exception(exception) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 5cf04d26af28..f9fb32420e81 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -14,8 +14,7 @@ from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError from ..producer import _error, _set_partition_key -from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator - +from ._consumer_producer_mixin_async import ConsumerProducerMixin log = logging.getLogger(__name__) @@ -98,7 +97,7 @@ def _create_handler(self): self.client.config.user_agent), loop=self.loop) - async def _open(self, timeout_time=None, **kwargs): # pylint:disable=arguments-differ, unused-argument # TODO: to refactor + async def _open(self): """ Open the EventHubProducer using the supplied connection. If the handler has previously been redirected, the redirect @@ -108,15 +107,14 @@ async def _open(self, timeout_time=None, **kwargs): # pylint:disable=arguments- if not self.running and self.redirected: self.client._process_redirect_uri(self.redirected) # pylint: disable=protected-access self.target = self.redirected.address - await super(EventHubProducer, self)._open(timeout_time) + await super(EventHubProducer, self)._open() - @_retry_decorator - async def _open_with_retry(self, timeout_time=None, **kwargs): - return await self._open(timeout_time=timeout_time, **kwargs) + async def _open_with_retry(self): + return await self._do_retryable_operation(self._open, operation_need_param=False) async def _send_event_data(self, timeout_time=None, last_exception=None): if self.unsent_events: - await self._open(timeout_time) + await self._open() remaining_time = timeout_time - time.time() if remaining_time <= 0.0: if last_exception: @@ -135,9 +133,8 @@ async def _send_event_data(self, timeout_time=None, last_exception=None): _error(self._outcome, self._condition) return - @_retry_decorator - async def _send_event_data_with_retry(self, timeout_time=None, last_exception=None): - return await self._send_event_data(timeout_time=timeout_time, last_exception=last_exception) + async def _send_event_data_with_retry(self, timeout=None): + return await self._do_retryable_operation(self._send_event_data, timeout=timeout) def _on_outcome(self, outcome, condition): """ @@ -176,7 +173,7 @@ async def create_batch(self, max_size=None, partition_key=None): """ if not self._max_message_size_on_link: - await self._open_with_retry(timeout=self.client.config.send_timeout) + await self._open_with_retry() if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index f0e4eca2ac47..347b1263be2c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -6,6 +6,7 @@ import logging import datetime +import time import functools import threading @@ -102,21 +103,29 @@ def _create_auth(self, username=None, password=None): get_jwt_token, http_proxy=http_proxy, transport_type=transport_type) - def _handle_exception(self, exception, retry_count, max_retries): - _handle_exception(exception, retry_count, max_retries, self) - def _close_connection(self): self._conn_manager.reset_connection_if_broken() + def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_name=None): + entity_name = entity_name or self.container_id + backoff = self.config.backoff_factor * 2 ** retried_times + if backoff <= self.config.backoff_max and ( + timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return + time.sleep(backoff) + log.info("%r has an exception (%r). Retrying...", format(entity_name), last_exception) + else: + log.info("%r operation has timed out. Last exception before timeout is (%r)", + entity_name, last_exception) + raise last_exception + def _management_request(self, mgmt_msg, op_type): alt_creds = { "username": self._auth_config.get("iot_username"), "password": self._auth_config.get("iot_password") } - max_retries = self.config.max_retries - retry_count = 0 - while retry_count <= self.config.max_retries: + retried_times = 0 + while retried_times <= self.config.max_retries: mgmt_auth = self._create_auth(**alt_creds) mgmt_client = uamqp.AMQPClient(self.mgmt_target) try: @@ -130,8 +139,9 @@ def _management_request(self, mgmt_msg, op_type): description_fields=b'status-description') return response except Exception as exception: # pylint: disable=broad-except - self._handle_exception(exception, retry_count, max_retries) - retry_count += 1 + last_exception = _handle_exception(exception, self) + self._try_delay(retried_times=retried_times, last_exception=last_exception) + retried_times += 1 finally: mgmt_client.close() @@ -144,7 +154,7 @@ def _iothub_redirect(self): event_position=EventPosition('-1'), operation='/messages/events') with self._redirect_consumer: - self._redirect_consumer._open_with_retry(timeout=self.config.receive_timeout) # pylint: disable=protected-access + self._redirect_consumer._open_with_retry() # pylint: disable=protected-access self._redirect_consumer = None def get_properties(self): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 6306c750260a..82550bf3b9e5 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -14,7 +14,7 @@ from azure.eventhub.common import EventData, EventPosition from azure.eventhub.error import _error_handler -from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator +from ._consumer_producer_mixin import ConsumerProducerMixin log = logging.getLogger(__name__) @@ -88,9 +88,8 @@ def __iter__(self): return self def __next__(self): - max_retries = self.client.config.max_retries - retry_count = 0 - while True: + retried_times = 0 + while retried_times < self.client.config.max_retries: try: self._open() if not self.messages_iter: @@ -98,11 +97,13 @@ def __next__(self): message = next(self.messages_iter) event_data = EventData._from_message(message) # pylint:disable=protected-access self.offset = EventPosition(event_data.offset, inclusive=False) - retry_count = 0 + retried_times = 0 return event_data except Exception as exception: # pylint:disable=broad-except - self._handle_exception(exception, retry_count, max_retries, timeout_time=None) - retry_count += 1 + last_exception = self._handle_exception(exception) + self.client._try_delay(retried_times=retried_times, last_exception=last_exception, + entity_name=self.name) + retried_times += 1 def _create_handler(self): alt_creds = { @@ -131,7 +132,7 @@ def _redirect(self, redirect): self.messages_iter = None super(EventHubConsumer, self)._redirect(redirect) - def _open(self, timeout_time=None, **kwargs): + def _open(self): """ Open the EventHubConsumer using the supplied connection. If the handler has previously been redirected, the redirect @@ -144,17 +145,16 @@ def _open(self, timeout_time=None, **kwargs): if not self.running and self.redirected: self.client._process_redirect_uri(self.redirected) self.source = self.redirected.address - super(EventHubConsumer, self)._open(timeout_time) + super(EventHubConsumer, self)._open() - @_retry_decorator - def _open_with_retry(self, timeout_time=None, **kwargs): - return self._open(timeout_time=timeout_time, **kwargs) + def _open_with_retry(self): + return self._do_retryable_operation(self._open, operation_need_param=False) def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): last_exception = kwargs.get("last_exception") data_batch = kwargs.get("data_batch") - self._open(timeout_time) + self._open() remaining_time = timeout_time - time.time() if remaining_time <= 0.0: if last_exception: @@ -171,9 +171,9 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): data_batch.append(event_data) return data_batch - @_retry_decorator - def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs): - return self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs) + def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): + return self._do_retryable_operation(self._receive, timeout=timeout, + max_batch_size=max_batch_size, **kwargs) @property def queue_size(self): @@ -221,7 +221,7 @@ def receive(self, max_batch_size=None, timeout=None): max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch) data_batch = [] # type: List[EventData] - return self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) + return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) def close(self, exception=None): # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py index 72b11f5478ad..6db54e5977de 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py @@ -157,10 +157,10 @@ def _create_eventhub_exception(exception): return error -def _handle_exception(exception, retry_count, max_retries, closable, timeout_time=None): # pylint:disable=too-many-branches, too-many-statements - try: +def _handle_exception(exception, closable): # pylint:disable=too-many-branches, too-many-statements + try: # closable is a producer/consumer object name = closable.name - except AttributeError: + except AttributeError: # closable is an client object name = closable.container_id if isinstance(exception, KeyboardInterrupt): # pylint:disable=no-else-raise log.info("%r stops due to keyboard interrupt", name) @@ -184,10 +184,6 @@ def _handle_exception(exception, retry_count, max_retries, closable, timeout_tim log.info("%r Event data send error (%r)", name, exception) error = EventDataSendError(str(exception), exception) raise error - elif retry_count >= max_retries: - error = _create_eventhub_exception(exception) - log.info("%r has exhausted retry. Exception still occurs (%r)", name, exception) - raise error else: if isinstance(exception, errors.AuthenticationException): if hasattr(closable, "_close_connection"): @@ -214,20 +210,4 @@ def _handle_exception(exception, retry_count, max_retries, closable, timeout_tim else: if hasattr(closable, "_close_connection"): closable._close_connection() # pylint:disable=protected-access - # start processing retry delay - try: - backoff_factor = closable.client.config.backoff_factor - backoff_max = closable.client.config.backoff_max - except AttributeError: - backoff_factor = closable.config.backoff_factor - backoff_max = closable.config.backoff_max - backoff = backoff_factor * 2 ** retry_count - if backoff <= backoff_max and (timeout_time is None or time.time() + backoff <= timeout_time): #pylint:disable=no-else-return - time.sleep(backoff) - log.info("%r has an exception (%r). Retrying...", format(name), exception) - return _create_eventhub_exception(exception) - else: - error = _create_eventhub_exception(exception) - log.info("%r operation has timed out. Last exception before timeout is (%r)", name, error) - raise error - # end of processing retry delay + return _create_eventhub_exception(exception) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index a36541475ceb..c019a30ee7b8 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -14,7 +14,7 @@ from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError -from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator +from ._consumer_producer_mixin import ConsumerProducerMixin log = logging.getLogger(__name__) @@ -104,26 +104,24 @@ def _create_handler(self): link_properties=self._link_properties, properties=self.client._create_properties(self.client.config.user_agent)) # pylint: disable=protected-access - def _open(self, timeout_time=None, **kwargs): # pylint:disable=unused-argument, arguments-differ # TODO:To refactor + def _open(self): """ Open the EventHubProducer using the supplied connection. If the handler has previously been redirected, the redirect context will be used to create a new handler before opening it. """ - if not self.running and self.redirected: self.client._process_redirect_uri(self.redirected) # pylint: disable=protected-access self.target = self.redirected.address - super(EventHubProducer, self)._open(timeout_time) + super(EventHubProducer, self)._open() - @_retry_decorator - def _open_with_retry(self, timeout_time=None, **kwargs): - return self._open(timeout_time=timeout_time, **kwargs) + def _open_with_retry(self): + return self._do_retryable_operation(self._open, operation_need_param=False) def _send_event_data(self, timeout_time=None, last_exception=None): if self.unsent_events: - self._open(timeout_time) + self._open() remaining_time = timeout_time - time.time() if remaining_time <= 0.0: if last_exception: @@ -141,9 +139,8 @@ def _send_event_data(self, timeout_time=None, last_exception=None): self._condition = OperationTimeoutError("send operation timed out") _error(self._outcome, self._condition) - @_retry_decorator - def _send_event_data_with_retry(self, timeout_time=None, last_exception=None): - return self._send_event_data(timeout_time=timeout_time, last_exception=last_exception) + def _send_event_data_with_retry(self, timeout=None): + return self._do_retryable_operation(self._send_event_data, timeout=timeout) def _on_outcome(self, outcome, condition): """ @@ -182,7 +179,7 @@ def create_batch(self, max_size=None, partition_key=None): """ if not self._max_message_size_on_link: - self._open_with_retry(timeout=self.client.config.send_timeout) + self._open_with_retry() if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' @@ -237,7 +234,7 @@ def send(self, event_data, partition_key=None, timeout=None): wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] - self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO:to refactor + self._send_event_data_with_retry(timeout=timeout) def close(self, exception=None): # pylint:disable=useless-super-delegation # type:(Exception) -> None