Skip to content

Commit 14180d9

Browse files
authored
use decorator to implement retry logic and update some tests (#6544)
1 parent aa56698 commit 14180d9

File tree

11 files changed

+299
-25
lines changed

11 files changed

+299
-25
lines changed

sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,30 @@
77
import logging
88
import time
99

10-
from uamqp import errors
10+
from uamqp import errors, constants
1111
from azure.eventhub.error import EventHubError, _handle_exception
1212

1313
log = logging.getLogger(__name__)
1414

1515

16+
def _retry_decorator(to_be_wrapped_func):
17+
def wrapped_func(*args, **kwargs):
18+
timeout = kwargs.get("timeout", None)
19+
if not timeout:
20+
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
21+
timeout_time = time.time() + timeout
22+
max_retries = args[0].client.config.max_retries
23+
retry_count = 0
24+
last_exception = None
25+
while True:
26+
try:
27+
return to_be_wrapped_func(args[0], timeout_time=timeout_time, last_exception=last_exception, **kwargs)
28+
except Exception as exception:
29+
last_exception = args[0]._handle_exception(exception, retry_count, max_retries, timeout_time)
30+
retry_count += 1
31+
return wrapped_func
32+
33+
1634
class ConsumerProducerMixin(object):
1735
def __init__(self):
1836
self.client = None
@@ -61,6 +79,8 @@ def _open(self, timeout_time=None):
6179
if timeout_time and time.time() >= timeout_time:
6280
return
6381
time.sleep(0.05)
82+
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
83+
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
6484
self.running = True
6585

6686
def _close_handler(self):

sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,31 @@
66
import logging
77
import time
88

9-
from uamqp import errors
9+
from uamqp import errors, constants
1010
from azure.eventhub.error import EventHubError, ConnectError
1111
from ..aio.error_async import _handle_exception
1212

1313
log = logging.getLogger(__name__)
1414

1515

16+
def _retry_decorator(to_be_wrapped_func):
17+
async def wrapped_func(*args, **kwargs):
18+
timeout = kwargs.get("timeout", None)
19+
if not timeout:
20+
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
21+
timeout_time = time.time() + timeout
22+
max_retries = args[0].client.config.max_retries
23+
retry_count = 0
24+
last_exception = None
25+
while True:
26+
try:
27+
return await to_be_wrapped_func(args[0], timeout_time=timeout_time, last_exception=last_exception, **kwargs)
28+
except Exception as exception:
29+
last_exception = await args[0]._handle_exception(exception, retry_count, max_retries, timeout_time)
30+
retry_count += 1
31+
return wrapped_func
32+
33+
1634
class ConsumerProducerMixin(object):
1735

1836
def __init__(self):
@@ -62,6 +80,8 @@ async def _open(self, timeout_time=None):
6280
if timeout_time and time.time() >= timeout_time:
6381
return
6482
await asyncio.sleep(0.05)
83+
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
84+
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
6585
self.running = True
6686

6787
async def _close_handler(self):

sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from azure.eventhub import EventData, EventPosition
1515
from azure.eventhub.error import EventHubError, AuthenticationError, ConnectError, ConnectionLostError, _error_handler
1616
from ..aio.error_async import _handle_exception
17-
from ._consumer_producer_mixin_async import ConsumerProducerMixin
17+
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator
1818

1919
log = logging.getLogger(__name__)
2020

@@ -159,11 +159,72 @@ def queue_size(self):
159159
return self._handler._received_messages.qsize()
160160
return 0
161161

162+
@_retry_decorator
163+
async def _receive(self, **kwargs):
164+
timeout_time = kwargs.get("timeout_time")
165+
last_exception = kwargs.get("last_exception")
166+
max_batch_size = kwargs.get("max_batch_size")
167+
data_batch = kwargs.get("data_batch")
168+
169+
await self._open(timeout_time)
170+
remaining_time = timeout_time - time.time()
171+
if remaining_time <= 0.0:
172+
if last_exception:
173+
log.info("%r receive operation timed out. (%r)", self.name, last_exception)
174+
raise last_exception
175+
return data_batch
176+
177+
remaining_time_ms = 1000 * remaining_time
178+
message_batch = await self._handler.receive_message_batch_async(
179+
max_batch_size=max_batch_size,
180+
timeout=remaining_time_ms)
181+
for message in message_batch:
182+
event_data = EventData(message=message)
183+
self.offset = EventPosition(event_data.offset)
184+
data_batch.append(event_data)
185+
return data_batch
186+
162187
async def receive(self, **kwargs):
163188
# type: (int, float) -> List[EventData]
164189
"""
165190
Receive events asynchronously from the EventHub.
166191
192+
:param max_batch_size: Receive a batch of events. Batch size will
193+
be up to the maximum specified, but will return as soon as service
194+
returns no new events. If combined with a timeout and no events are
195+
retrieve before the time, the result will be empty. If no batch
196+
size is supplied, the prefetch size will be the maximum.
197+
:type max_batch_size: int
198+
:param timeout: The maximum wait time to build up the requested message count for the batch.
199+
If not specified, the default wait time specified when the consumer was created will be used.
200+
:type timeout: float
201+
:rtype: list[~azure.eventhub.common.EventData]
202+
:raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError,
203+
~azure.eventhub.EventHubError
204+
205+
Example:
206+
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
207+
:start-after: [START eventhub_client_async_receive]
208+
:end-before: [END eventhub_client_async_receive]
209+
:language: python
210+
:dedent: 4
211+
:caption: Receives events asynchronously
212+
213+
"""
214+
self._check_closed()
215+
216+
max_batch_size = kwargs.get("max_batch_size", None)
217+
timeout = kwargs.get("timeout", None) or self.client.config.receive_timeout
218+
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
219+
data_batch = [] # type: List[EventData]
220+
221+
return await self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
222+
223+
async def _legacy_receive(self, **kwargs):
224+
# type: (int, float) -> List[EventData]
225+
"""
226+
Receive events asynchronously from the EventHub.
227+
167228
:param max_batch_size: Receive a batch of events. Batch size will
168229
be up to the maximum specified, but will return as soon as service
169230
returns no new events. If combined with a timeout and no events are

sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from azure.eventhub.common import EventData, EventDataBatch
1515
from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError
1616
from ..producer import _error, _set_partition_key
17-
from ._consumer_producer_mixin_async import ConsumerProducerMixin
17+
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator
1818

1919

2020
log = logging.getLogger(__name__)
@@ -98,7 +98,7 @@ def _create_handler(self):
9898
self.client.config.user_agent), # pylint: disable=protected-access
9999
loop=self.loop)
100100

101-
async def _open(self, timeout_time=None):
101+
async def _open(self, timeout_time=None, **kwargs):
102102
"""
103103
Open the EventHubProducer using the supplied connection.
104104
If the handler has previously been redirected, the redirect
@@ -110,7 +110,32 @@ async def _open(self, timeout_time=None):
110110
self.target = self.redirected.address
111111
await super(EventHubProducer, self)._open(timeout_time)
112112

113-
async def _send_event_data(self, timeout=None):
113+
@_retry_decorator
114+
async def _send_event_data(self, **kwargs):
115+
timeout_time = kwargs.get("timeout_time")
116+
last_exception = kwargs.get("last_exception")
117+
118+
if self.unsent_events:
119+
await self._open(timeout_time)
120+
remaining_time = timeout_time - time.time()
121+
if remaining_time <= 0.0:
122+
if last_exception:
123+
error = last_exception
124+
else:
125+
error = OperationTimeoutError("send operation timed out")
126+
log.info("%r send operation timed out. (%r)", self.name, error)
127+
raise error
128+
self._handler._msg_timeout = remaining_time # pylint: disable=protected-access
129+
self._handler.queue_message(*self.unsent_events)
130+
await self._handler.wait_async()
131+
self.unsent_events = self._handler.pending_messages
132+
if self._outcome != constants.MessageSendResult.Ok:
133+
if self._outcome == constants.MessageSendResult.Timeout:
134+
self._condition = OperationTimeoutError("send operation timed out")
135+
_error(self._outcome, self._condition)
136+
return
137+
138+
async def _legacy_send_event_data(self, timeout=None):
114139
timeout = timeout or self.client.config.send_timeout
115140
if not timeout:
116141
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
@@ -170,14 +195,19 @@ async def create_batch(self, **kwargs):
170195
"""
171196
max_size = kwargs.get("max_size", None)
172197
partition_key = kwargs.get("partition_key", None)
198+
199+
@_retry_decorator
200+
async def wrapped_open(*args, **kwargs):
201+
await self._open(**kwargs)
202+
173203
if not self._max_message_size_on_link:
174-
await self._open()
204+
await wrapped_open(self, timeout=self.client.config.send_timeout)
175205

176206
if max_size and max_size > self._max_message_size_on_link:
177207
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
178208
.format(max_size, self._max_message_size_on_link))
179209

180-
return EventDataBatch(max_size or self._max_message_size_on_link, partition_key)
210+
return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key)
181211

182212
async def send(self, event_data, **kwargs):
183213
# type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None

sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from azure.eventhub.common import EventData, EventPosition
1616
from azure.eventhub.error import _error_handler, EventHubError
17-
from ._consumer_producer_mixin import ConsumerProducerMixin
17+
from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator
1818

1919
log = logging.getLogger(__name__)
2020

@@ -152,7 +152,65 @@ def queue_size(self):
152152
return self._handler._received_messages.qsize()
153153
return 0
154154

155+
@_retry_decorator
156+
def _receive(self, **kwargs):
157+
timeout_time = kwargs.get("timeout_time")
158+
last_exception = kwargs.get("last_exception")
159+
max_batch_size = kwargs.get("max_batch_size")
160+
data_batch = kwargs.get("data_batch")
161+
162+
self._open(timeout_time)
163+
remaining_time = timeout_time - time.time()
164+
if remaining_time <= 0.0:
165+
if last_exception:
166+
log.info("%r receive operation timed out. (%r)", self.name, last_exception)
167+
raise last_exception
168+
return data_batch
169+
remaining_time_ms = 1000 * remaining_time
170+
message_batch = self._handler.receive_message_batch(
171+
max_batch_size=max_batch_size - (len(data_batch) if data_batch else 0),
172+
timeout=remaining_time_ms)
173+
for message in message_batch:
174+
event_data = EventData(message=message)
175+
self.offset = EventPosition(event_data.offset)
176+
data_batch.append(event_data)
177+
return data_batch
178+
155179
def receive(self, **kwargs):
180+
"""
181+
Receive events from the EventHub.
182+
183+
:param max_batch_size: Receive a batch of events. Batch size will
184+
be up to the maximum specified, but will return as soon as service
185+
returns no new events. If combined with a timeout and no events are
186+
retrieve before the time, the result will be empty. If no batch
187+
size is supplied, the prefetch size will be the maximum.
188+
:type max_batch_size: int
189+
:param timeout: The maximum wait time to build up the requested message count for the batch.
190+
If not specified, the default wait time specified when the consumer was created will be used.
191+
:type timeout: float
192+
:rtype: list[~azure.eventhub.common.EventData]
193+
:raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError,
194+
~azure.eventhub.EventHubError
195+
Example:
196+
.. literalinclude:: ../examples/test_examples_eventhub.py
197+
:start-after: [START eventhub_client_sync_receive]
198+
:end-before: [END eventhub_client_sync_receive]
199+
:language: python
200+
:dedent: 4
201+
:caption: Receive events from the EventHub.
202+
203+
"""
204+
self._check_closed()
205+
206+
max_batch_size = kwargs.get("max_batch_size", None)
207+
timeout = kwargs.get("timeout", None) or self.client.config.receive_timeout
208+
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
209+
data_batch = [] # type: List[EventData]
210+
211+
return self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
212+
213+
def _legacy_receive(self, **kwargs):
156214
# type:(int, float) -> List[EventData]
157215
"""
158216
Receive events from the EventHub.
@@ -182,17 +240,19 @@ def receive(self, **kwargs):
182240
timeout = kwargs.get("timeout", None)
183241

184242
self._check_closed()
243+
185244
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
245+
data_batch = [] # type: List[EventData]
246+
186247
timeout = self.client.config.receive_timeout if timeout is None else timeout
187248
if not timeout:
188249
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
189-
190-
data_batch = [] # type: List[EventData]
191-
start_time = time.time()
192-
timeout_time = start_time + timeout
250+
timeout_time = time.time() + timeout
193251
max_retries = self.client.config.max_retries
194252
retry_count = 0
195253
last_exception = None
254+
255+
self._receive()
196256
while True:
197257
try:
198258
self._open(timeout_time)
@@ -211,8 +271,6 @@ def receive(self, **kwargs):
211271
self.offset = EventPosition(event_data.offset)
212272
data_batch.append(event_data)
213273
return data_batch
214-
except EventHubError:
215-
raise
216274
except Exception as exception:
217275
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
218276
retry_count += 1

0 commit comments

Comments
 (0)