Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
use decorator to implement retry logic and update some tests
  • Loading branch information
yunhaoling committed Jul 30, 2019
commit 702747ca5f3fc7f6feb377722a0468ca99b38b22
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,30 @@
import logging
import time

from uamqp import errors
from uamqp import errors, constants
from azure.eventhub.error import EventHubError, _handle_exception

log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(*args, **kwargs):
timeout = kwargs.get("timeout", None)
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout_time = time.time() + timeout
max_retries = args[0].client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(args[0], timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = args[0]._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
Expand Down Expand Up @@ -61,6 +79,8 @@ def _open(self, timeout_time=None):
if timeout_time and time.time() >= timeout_time:
return
time.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True

def _close_handler(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,31 @@
import logging
import time

from uamqp import errors
from uamqp import errors, constants
from azure.eventhub.error import EventHubError, ConnectError
from ..aio.error_async import _handle_exception

log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
async def wrapped_func(*args, **kwargs):
timeout = kwargs.get("timeout", None)
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout_time = time.time() + timeout
max_retries = args[0].client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return await to_be_wrapped_func(args[0], timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = await args[0]._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):

def __init__(self):
Expand Down Expand Up @@ -62,6 +80,8 @@ async def _open(self, timeout_time=None):
if timeout_time and time.time() >= timeout_time:
return
await asyncio.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True

async def _close_handler(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from azure.eventhub import EventData, EventPosition
from azure.eventhub.error import EventHubError, AuthenticationError, ConnectError, ConnectionLostError, _error_handler
from ..aio.error_async import _handle_exception
from ._consumer_producer_mixin_async import ConsumerProducerMixin
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -159,11 +159,72 @@ def queue_size(self):
return self._handler._received_messages.qsize()
return 0

@_retry_decorator
async def _receive(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
last_exception = kwargs.get("last_exception")
max_batch_size = kwargs.get("max_batch_size")
data_batch = kwargs.get("data_batch")

await self._open(timeout_time)
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
log.info("%r receive operation timed out. (%r)", self.name, last_exception)
raise last_exception
return data_batch

remaining_time_ms = 1000 * remaining_time
message_batch = await self._handler.receive_message_batch_async(
max_batch_size=max_batch_size,
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData(message=message)
self.offset = EventPosition(event_data.offset)
data_batch.append(event_data)
return data_batch

async def receive(self, **kwargs):
# type: (int, float) -> List[EventData]
"""
Receive events asynchronously from the EventHub.

:param max_batch_size: Receive a batch of events. Batch size will
be up to the maximum specified, but will return as soon as service
returns no new events. If combined with a timeout and no events are
retrieve before the time, the result will be empty. If no batch
size is supplied, the prefetch size will be the maximum.
:type max_batch_size: int
:param timeout: The maximum wait time to build up the requested message count for the batch.
If not specified, the default wait time specified when the consumer was created will be used.
:type timeout: float
:rtype: list[~azure.eventhub.common.EventData]
:raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError,
~azure.eventhub.EventHubError

Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START eventhub_client_async_receive]
:end-before: [END eventhub_client_async_receive]
:language: python
:dedent: 4
:caption: Receives events asynchronously

"""
self._check_closed()

max_batch_size = kwargs.get("max_batch_size", None)
timeout = kwargs.get("timeout", None) or self.client.config.receive_timeout
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
data_batch = [] # type: List[EventData]

return await self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)

async def _legacy_receive(self, **kwargs):
# type: (int, float) -> List[EventData]
"""
Receive events asynchronously from the EventHub.

:param max_batch_size: Receive a batch of events. Batch size will
be up to the maximum specified, but will return as soon as service
returns no new events. If combined with a timeout and no events are
Expand Down
40 changes: 35 additions & 5 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError
from ..producer import _error, _set_partition_key
from ._consumer_producer_mixin_async import ConsumerProducerMixin
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -98,7 +98,7 @@ def _create_handler(self):
self.client.config.user_agent), # pylint: disable=protected-access
loop=self.loop)

async def _open(self, timeout_time=None):
async def _open(self, timeout_time=None, **kwargs):
"""
Open the EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
Expand All @@ -110,7 +110,32 @@ async def _open(self, timeout_time=None):
self.target = self.redirected.address
await super(EventHubProducer, self)._open(timeout_time)

async def _send_event_data(self, timeout=None):
@_retry_decorator
async def _send_event_data(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
last_exception = kwargs.get("last_exception")

if self.unsent_events:
await self._open(timeout_time)
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
error = last_exception
else:
error = OperationTimeoutError("send operation timed out")
log.info("%r send operation timed out. (%r)", self.name, error)
raise error
self._handler._msg_timeout = remaining_time # pylint: disable=protected-access
self._handler.queue_message(*self.unsent_events)
await self._handler.wait_async()
self.unsent_events = self._handler.pending_messages
if self._outcome != constants.MessageSendResult.Ok:
if self._outcome == constants.MessageSendResult.Timeout:
self._condition = OperationTimeoutError("send operation timed out")
_error(self._outcome, self._condition)
return

async def _legacy_send_event_data(self, timeout=None):
timeout = timeout or self.client.config.send_timeout
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
Expand Down Expand Up @@ -170,14 +195,19 @@ async def create_batch(self, **kwargs):
"""
max_size = kwargs.get("max_size", None)
partition_key = kwargs.get("partition_key", None)

@_retry_decorator
async def wrapped_open(*args, **kwargs):
await self._open(**kwargs)

if not self._max_message_size_on_link:
await self._open()
await wrapped_open(self, timeout=self.client.config.send_timeout)

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
.format(max_size, self._max_message_size_on_link))

return EventDataBatch(max_size or self._max_message_size_on_link, partition_key)
return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key)

async def send(self, event_data, **kwargs):
# type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None
Expand Down
72 changes: 65 additions & 7 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import _error_handler, EventHubError
from ._consumer_producer_mixin import ConsumerProducerMixin
from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -152,7 +152,65 @@ def queue_size(self):
return self._handler._received_messages.qsize()
return 0

@_retry_decorator
def _receive(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
last_exception = kwargs.get("last_exception")
max_batch_size = kwargs.get("max_batch_size")
data_batch = kwargs.get("data_batch")

self._open(timeout_time)
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
log.info("%r receive operation timed out. (%r)", self.name, last_exception)
raise last_exception
return data_batch
remaining_time_ms = 1000 * remaining_time
message_batch = self._handler.receive_message_batch(
max_batch_size=max_batch_size - (len(data_batch) if data_batch else 0),
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData(message=message)
self.offset = EventPosition(event_data.offset)
data_batch.append(event_data)
return data_batch

def receive(self, **kwargs):
"""
Receive events from the EventHub.

:param max_batch_size: Receive a batch of events. Batch size will
be up to the maximum specified, but will return as soon as service
returns no new events. If combined with a timeout and no events are
retrieve before the time, the result will be empty. If no batch
size is supplied, the prefetch size will be the maximum.
:type max_batch_size: int
:param timeout: The maximum wait time to build up the requested message count for the batch.
If not specified, the default wait time specified when the consumer was created will be used.
:type timeout: float
:rtype: list[~azure.eventhub.common.EventData]
:raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError,
~azure.eventhub.EventHubError
Example:
.. literalinclude:: ../examples/test_examples_eventhub.py
:start-after: [START eventhub_client_sync_receive]
:end-before: [END eventhub_client_sync_receive]
:language: python
:dedent: 4
:caption: Receive events from the EventHub.

"""
self._check_closed()

max_batch_size = kwargs.get("max_batch_size", None)
timeout = kwargs.get("timeout", None) or self.client.config.receive_timeout
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
data_batch = [] # type: List[EventData]

return self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)

def _legacy_receive(self, **kwargs):
# type:(int, float) -> List[EventData]
"""
Receive events from the EventHub.
Expand Down Expand Up @@ -182,17 +240,19 @@ def receive(self, **kwargs):
timeout = kwargs.get("timeout", None)

self._check_closed()

max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
data_batch = [] # type: List[EventData]

timeout = self.client.config.receive_timeout if timeout is None else timeout
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout

data_batch = [] # type: List[EventData]
start_time = time.time()
timeout_time = start_time + timeout
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None

self._receive()
while True:
try:
self._open(timeout_time)
Expand All @@ -211,8 +271,6 @@ def receive(self, **kwargs):
self.offset = EventPosition(event_data.offset)
data_batch.append(event_data)
return data_batch
except EventHubError:
raise
except Exception as exception:
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
Expand Down
Loading