Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Retry refactor
  • Loading branch information
yunhaoling committed Aug 30, 2019
commit f473a06488460336ac8a3931902e99a5d3985bae
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,14 @@

import logging
import time
from enum import Enum

from uamqp import errors, constants, compat # type: ignore
from azure.eventhub.error import EventHubError, _handle_exception

log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
Expand All @@ -55,9 +38,9 @@ def _redirect(self, redirect):
self.running = False
self._close_connection()

def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
Open the EventHubConsumer/EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

Expand Down Expand Up @@ -98,6 +81,26 @@ def _handle_exception(self, exception, retry_count, max_retries, timeout_time):

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)

def _do_retryable_operation(self, operation_type, timeout=None, **kwargs):
# pylint:disable=protected-access
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = kwargs.pop('last_exception', None)
while True:
try:
if operation_type == _OperationType.OPEN:
return self._open()
elif operation_type == _OperationType.SEND:
return self._send_event_data(timeout_time=timeout_time, last_exception=last_exception)
elif operation_type == _OperationType.RECEIVE:
return self._receive(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1

def close(self, exception=None):
# type:(Exception) -> None
"""
Expand Down Expand Up @@ -131,3 +134,9 @@ def close(self, exception=None):
self.error = EventHubError("{} handler is closed.".format(self.name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection


class _OperationType(Enum):
OPEN = 0
SEND = 1
RECEIVE = 2
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,11 @@
from uamqp import errors, constants, compat # type: ignore
from azure.eventhub.error import EventHubError, ConnectError
from ..aio.error_async import _handle_exception
from .._consumer_producer_mixin import _OperationType

log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
async def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return await to_be_wrapped_func(
self, timeout_time=timeout_time, last_exception=last_exception, **kwargs
)
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):

def __init__(self):
Expand All @@ -58,7 +39,7 @@ async def _redirect(self, redirect):
self.running = False
await self._close_connection()

async def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
Expand Down Expand Up @@ -101,6 +82,27 @@ async def _handle_exception(self, exception, retry_count, max_retries, timeout_t

return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)

async def _do_retryable_operation(self, operation_type, timeout=None, **kwargs):
# pylint:disable=protected-access
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = kwargs.pop('last_exception', None)
while True:
print('retrying:{}'.format(retry_count))
try:
if operation_type == _OperationType.OPEN:
return await self._open()
elif operation_type == _OperationType.SEND:
return await self._send_event_data(timeout_time=timeout_time, last_exception=last_exception)
elif operation_type == _OperationType.RECEIVE:
return await self._receive(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1

async def close(self, exception=None):
# type: (Exception) -> None
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

from azure.eventhub import EventData, EventPosition
from azure.eventhub.error import EventHubError, ConnectError, _error_handler
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator
from ._consumer_producer_mixin_async import ConsumerProducerMixin
from .._consumer_producer_mixin import _OperationType

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -134,7 +135,7 @@ async def _redirect(self, redirect):
self.messages_iter = None
await super(EventHubConsumer, self)._redirect(redirect)

async def _open(self, timeout_time=None):
async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
Expand All @@ -145,13 +146,13 @@ async def _open(self, timeout_time=None):
if not self.running and self.redirected:
self.client._process_redirect_uri(self.redirected)
self.source = self.redirected.address
await super(EventHubConsumer, self)._open(timeout_time)
await super(EventHubConsumer, self)._open()

async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
last_exception = kwargs.get("last_exception")
data_batch = kwargs.get("data_batch")

await self._open(timeout_time)
await self._open()
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
Expand All @@ -169,9 +170,9 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
data_batch.append(event_data)
return data_batch

@_retry_decorator
async def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs):
return await self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs)
async def _receive_with_try(self, timeout=None, max_batch_size=None, **kwargs):
return await self._do_retryable_operation(_OperationType.RECEIVE, timeout=timeout,
max_batch_size=max_batch_size, **kwargs)

@property
def queue_size(self):
Expand Down
22 changes: 10 additions & 12 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError
from ..producer import _error, _set_partition_key
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator

from ._consumer_producer_mixin_async import ConsumerProducerMixin
from .._consumer_producer_mixin import _OperationType

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -98,7 +98,7 @@ def _create_handler(self):
self.client.config.user_agent),
loop=self.loop)

async def _open(self, timeout_time=None, **kwargs): # pylint:disable=arguments-differ, unused-argument # TODO: to refactor
async def _open(self):
"""
Open the EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
Expand All @@ -108,15 +108,14 @@ async def _open(self, timeout_time=None, **kwargs): # pylint:disable=arguments-
if not self.running and self.redirected:
self.client._process_redirect_uri(self.redirected) # pylint: disable=protected-access
self.target = self.redirected.address
await super(EventHubProducer, self)._open(timeout_time)
await super(EventHubProducer, self)._open()

@_retry_decorator
async def _open_with_retry(self, timeout_time=None, **kwargs):
return await self._open(timeout_time=timeout_time, **kwargs)
async def _open_with_retry(self):
return await self._do_retryable_operation(_OperationType.OPEN)

async def _send_event_data(self, timeout_time=None, last_exception=None):
if self.unsent_events:
await self._open(timeout_time)
await self._open()
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
Expand All @@ -135,9 +134,8 @@ async def _send_event_data(self, timeout_time=None, last_exception=None):
_error(self._outcome, self._condition)
return

@_retry_decorator
async def _send_event_data_with_retry(self, timeout_time=None, last_exception=None):
return await self._send_event_data(timeout_time=timeout_time, last_exception=last_exception)
async def _send_event_data_with_retry(self, timeout=None):
return await self._do_retryable_operation(_OperationType.SEND, timeout=timeout)

def _on_outcome(self, outcome, condition):
"""
Expand Down Expand Up @@ -176,7 +174,7 @@ async def create_batch(self, max_size=None, partition_key=None):
"""

if not self._max_message_size_on_link:
await self._open_with_retry(timeout=self.client.config.send_timeout)
await self._open_with_retry()

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
Expand Down
14 changes: 7 additions & 7 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import _error_handler
from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator
from ._consumer_producer_mixin import ConsumerProducerMixin, _OperationType


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -129,7 +129,7 @@ def _redirect(self, redirect):
self.messages_iter = None
super(EventHubConsumer, self)._redirect(redirect)

def _open(self, timeout_time=None):
def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
Expand All @@ -140,13 +140,13 @@ def _open(self, timeout_time=None):
if not self.running and self.redirected:
self.client._process_redirect_uri(self.redirected)
self.source = self.redirected.address
super(EventHubConsumer, self)._open(timeout_time)
super(EventHubConsumer, self)._open()

def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
last_exception = kwargs.get("last_exception")
data_batch = kwargs.get("data_batch")

self._open(timeout_time)
self._open()
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
Expand All @@ -163,9 +163,9 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
data_batch.append(event_data)
return data_batch

@_retry_decorator
def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs):
return self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs)
def _receive_with_try(self, timeout=None, max_batch_size=None, **kwargs):
return self._do_retryable_operation(_OperationType.RECEIVE, timeout=timeout,
max_batch_size=max_batch_size, **kwargs)

@property
def queue_size(self):
Expand Down
23 changes: 10 additions & 13 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError
from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator
from ._consumer_producer_mixin import ConsumerProducerMixin, _OperationType


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -104,26 +104,24 @@ def _create_handler(self):
link_properties=self._link_properties,
properties=self.client._create_properties(self.client.config.user_agent)) # pylint: disable=protected-access

def _open(self, timeout_time=None, **kwargs): # pylint:disable=unused-argument, arguments-differ # TODO:To refactor
def _open(self):
"""
Open the EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

"""

if not self.running and self.redirected:
self.client._process_redirect_uri(self.redirected) # pylint: disable=protected-access
self.target = self.redirected.address
super(EventHubProducer, self)._open(timeout_time)
super(EventHubProducer, self)._open()

@_retry_decorator
def _open_with_retry(self, timeout_time=None, **kwargs):
return self._open(timeout_time=timeout_time, **kwargs)
def _open_with_retry(self):
return self._do_retryable_operation(_OperationType.OPEN)

def _send_event_data(self, timeout_time=None, last_exception=None):
if self.unsent_events:
self._open(timeout_time)
self._open()
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
Expand All @@ -141,9 +139,8 @@ def _send_event_data(self, timeout_time=None, last_exception=None):
self._condition = OperationTimeoutError("send operation timed out")
_error(self._outcome, self._condition)

@_retry_decorator
def _send_event_data_with_retry(self, timeout_time=None, last_exception=None):
return self._send_event_data(timeout_time=timeout_time, last_exception=last_exception)
def _send_event_data_with_retry(self, timeout=None):
return self._do_retryable_operation(_OperationType.SEND, timeout=timeout)

def _on_outcome(self, outcome, condition):
"""
Expand Down Expand Up @@ -182,7 +179,7 @@ def create_batch(self, max_size=None, partition_key=None):
"""

if not self._max_message_size_on_link:
self._open_with_retry(timeout=self.client.config.send_timeout)
self._open_with_retry()

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
Expand Down Expand Up @@ -237,7 +234,7 @@ def send(self, event_data, partition_key=None, timeout=None):
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO:to refactor
self._send_event_data_with_retry(timeout=timeout)

def close(self, exception=None): # pylint:disable=useless-super-delegation
# type:(Exception) -> None
Expand Down