Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,6 @@
log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
Expand All @@ -55,9 +37,9 @@ def _redirect(self, redirect):
self.running = False
self._close_connection()

def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
Open the EventHubConsumer/EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

Expand Down Expand Up @@ -91,12 +73,36 @@ def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return _handle_exception(exception, self)

return _handle_exception(exception, self)

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# pylint:disable=protected-access
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self.client.config.max_retries:
try:
if operation_need_param:
return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
else:
return operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self.name)
retried_times += 1

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception)
raise last_exception

def close(self, exception=None):
# type:(Exception) -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,6 @@
log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
async def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return await to_be_wrapped_func(
self, timeout_time=timeout_time, last_exception=last_exception, **kwargs
)
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):

def __init__(self):
Expand All @@ -58,7 +38,7 @@ async def _redirect(self, redirect):
self.running = False
await self._close_connection()

async def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
Expand Down Expand Up @@ -94,12 +74,36 @@ async def _close_connection(self):
await self._close_handler()
await self.client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access

async def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
async def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return await _handle_exception(exception, self)

return await _handle_exception(exception, self)

async def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# pylint:disable=protected-access
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self.client.config.max_retries:
try:
if operation_need_param:
return await operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
else:
return await operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception)
await self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self.name)
retried_times += 1

return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)
log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception)
raise last_exception

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
32 changes: 20 additions & 12 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# --------------------------------------------------------------------------------------------
import logging
import datetime
import time
import functools
import asyncio

Expand Down Expand Up @@ -100,23 +101,29 @@ def _create_auth(self, username=None, password=None):
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)

async def _handle_exception(self, exception, retry_count, max_retries):
await _handle_exception(exception, retry_count, max_retries, self)

async def _close_connection(self):
await self._conn_manager.reset_connection_if_broken()

async def _management_request(self, mgmt_msg, op_type):
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()
async def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_name=None):
entity_name = entity_name or self.container_id
backoff = self.config.backoff_factor * 2 ** retried_times
if backoff <= self.config.backoff_max and (
timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return
asyncio.sleep(backoff)
log.info("%r has an exception (%r). Retrying...", format(entity_name), last_exception)
else:
log.info("%r operation has timed out. Last exception before timeout is (%r)",
entity_name, last_exception)
raise last_exception

async def _management_request(self, mgmt_msg, op_type):
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password": self._auth_config.get("iot_password")
}
max_retries = self.config.max_retries
retry_count = 0
while True:

retried_times = 0
while retried_times <= self.config.max_retries:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.config.network_tracing)
try:
Expand All @@ -130,8 +137,9 @@ async def _management_request(self, mgmt_msg, op_type):
description_fields=b'status-description')
return response
except Exception as exception: # pylint:disable=broad-except
await self._handle_exception(exception, retry_count, max_retries)
retry_count += 1
last_exception = await _handle_exception(exception, self)
await self._try_delay(retried_times=retried_times, last_exception=last_exception)
retried_times += 1
finally:
await mgmt_client.close_async()

Expand All @@ -144,7 +152,7 @@ async def _iothub_redirect(self):
event_position=EventPosition('-1'),
operation='/messages/events')
async with self._redirect_consumer:
await self._redirect_consumer._open_with_retry(timeout=self.config.receive_timeout) # pylint: disable=protected-access
await self._redirect_consumer._open_with_retry() # pylint: disable=protected-access
self._redirect_consumer = None

async def get_properties(self):
Expand Down
34 changes: 17 additions & 17 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from azure.eventhub import EventData, EventPosition
from azure.eventhub.error import EventHubError, ConnectError, _error_handler
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator
from ._consumer_producer_mixin_async import ConsumerProducerMixin

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -92,21 +92,22 @@ def __aiter__(self):
return self

async def __anext__(self):
max_retries = self.client.config.max_retries
retry_count = 0
while True:
retried_times = 0
while retried_times < self.client.config.max_retries:
try:
await self._open()
if not self.messages_iter:
self.messages_iter = self._handler.receive_messages_iter_async()
message = await self.messages_iter.__anext__()
event_data = EventData._from_message(message) # pylint:disable=protected-access
self.offset = EventPosition(event_data.offset, inclusive=False)
retry_count = 0
retried_times = 0
return event_data
except Exception as exception: # pylint:disable=broad-except
await self._handle_exception(exception, retry_count, max_retries, timeout_time=None)
retry_count += 1
last_exception = await self._handle_exception(exception)
await self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
entity_name=self.name)
retried_times += 1

def _create_handler(self):
alt_creds = {
Expand Down Expand Up @@ -136,7 +137,7 @@ async def _redirect(self, redirect):
self.messages_iter = None
await super(EventHubConsumer, self)._redirect(redirect)

async def _open(self, timeout_time=None, **kwargs):
async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
Expand All @@ -149,17 +150,16 @@ async def _open(self, timeout_time=None, **kwargs):
if not self.running and self.redirected:
self.client._process_redirect_uri(self.redirected)
self.source = self.redirected.address
await super(EventHubConsumer, self)._open(timeout_time)
await super(EventHubConsumer, self)._open()

@_retry_decorator
async def _open_with_retry(self, timeout_time=None, **kwargs):
return await self._open(timeout_time=timeout_time, **kwargs)
async def _open_with_retry(self):
return await self._do_retryable_operation(self._open, operation_need_param=False)

async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
last_exception = kwargs.get("last_exception")
data_batch = kwargs.get("data_batch")

await self._open(timeout_time)
await self._open()
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
Expand All @@ -177,9 +177,9 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
data_batch.append(event_data)
return data_batch

@_retry_decorator
async def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs):
return await self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs)
async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs):
return await self._do_retryable_operation(self._receive, timeout=timeout,
max_batch_size=max_batch_size, **kwargs)

@property
def queue_size(self):
Expand Down Expand Up @@ -227,7 +227,7 @@ async def receive(self, *, max_batch_size=None, timeout=None):
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return await self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
28 changes: 4 additions & 24 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def _create_eventhub_exception(exception):
return error


async def _handle_exception(exception, retry_count, max_retries, closable, timeout_time=None): # pylint:disable=too-many-branches, too-many-statements
async def _handle_exception(exception, closable): # pylint:disable=too-many-branches, too-many-statements
if isinstance(exception, asyncio.CancelledError):
raise exception
try:
Expand All @@ -45,10 +45,10 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo
name = closable.container_id
if isinstance(exception, KeyboardInterrupt): # pylint:disable=no-else-raise
log.info("%r stops due to keyboard interrupt", name)
closable.close()
await closable.close()
raise exception
elif isinstance(exception, EventHubError):
closable.close()
await closable.close()
raise exception
elif isinstance(exception, (
errors.MessageAccepted,
Expand All @@ -65,10 +65,6 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo
log.info("%r Event data send error (%r)", name, exception)
error = EventDataSendError(str(exception), exception)
raise error
elif retry_count >= max_retries:
error = _create_eventhub_exception(exception)
log.info("%r has exhausted retry. Exception still occurs (%r)", name, exception)
raise error
else:
if isinstance(exception, errors.AuthenticationException):
if hasattr(closable, "_close_connection"):
Expand All @@ -95,20 +91,4 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo
else:
if hasattr(closable, "_close_connection"):
await closable._close_connection() # pylint:disable=protected-access
# start processing retry delay
try:
backoff_factor = closable.client.config.backoff_factor
backoff_max = closable.client.config.backoff_max
except AttributeError:
backoff_factor = closable.config.backoff_factor
backoff_max = closable.config.backoff_max
backoff = backoff_factor * 2 ** retry_count
if backoff <= backoff_max and (timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return
await asyncio.sleep(backoff)
log.info("%r has an exception (%r). Retrying...", format(name), exception)
return _create_eventhub_exception(exception)
else:
error = _create_eventhub_exception(exception)
log.info("%r operation has timed out. Last exception before timeout is (%r)", name, error)
raise error
# end of processing retry delay
return _create_eventhub_exception(exception)
Loading