Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def wrapped_func(self, *args, **kwargs):
kwargs.pop("timeout", None)
while True:
try:
return to_be_wrapped_func(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def wrapped_func(self, *args, **kwargs):
kwargs.pop("timeout", None)
while True:
try:
return await to_be_wrapped_func(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return await to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,8 @@ async def _open(self, timeout_time=None):
self.source = self.redirected.address
await super(EventHubConsumer, self)._open(timeout_time)

async def _receive(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
last_exception = kwargs.get("last_exception")
max_batch_size = kwargs.get("max_batch_size")
data_batch = kwargs.get("data_batch")

await self._open(timeout_time)
Expand All @@ -171,6 +169,10 @@ async def _receive(self, **kwargs):
data_batch.append(event_data)
return data_batch

@_retry_decorator
async def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs):
return await self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs)

@property
def queue_size(self):
# type: () -> int
Expand Down Expand Up @@ -217,8 +219,7 @@ async def receive(self, *, max_batch_size=None, timeout=None):
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return await _retry_decorator(self._receive)(self, timeout=timeout,
max_batch_size=max_batch_size, data_batch=data_batch)
return await self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
12 changes: 10 additions & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ async def _open(self, timeout_time=None, **kwargs):
self.target = self.redirected.address
await super(EventHubProducer, self)._open(timeout_time)

@_retry_decorator
async def _open_with_retry(self, timeout_time=None, **kwargs):
return await self._open(timeout_time=timeout_time, **kwargs)

async def _send_event_data(self, timeout_time=None, last_exception=None):
if self.unsent_events:
await self._open(timeout_time)
Expand All @@ -131,6 +135,10 @@ async def _send_event_data(self, timeout_time=None, last_exception=None):
_error(self._outcome, self._condition)
return

@_retry_decorator
async def _send_event_data_with_retry(self, timeout_time=None, last_exception=None):
return await self._send_event_data(timeout_time=timeout_time, last_exception=last_exception)

def _on_outcome(self, outcome, condition):
"""
Called when the outcome is received for a delivery.
Expand Down Expand Up @@ -158,7 +166,7 @@ async def create_batch(self, max_size=None, partition_key=None):
"""

if not self._max_message_size_on_link:
await _retry_decorator(self._open)(self, timeout=self.client.config.send_timeout)
await self._open_with_retry(timeout=self.client.config.send_timeout)

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
Expand Down Expand Up @@ -212,7 +220,7 @@ async def send(self, event_data, *, partition_key=None, timeout=None):
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
await _retry_decorator(self._send_event_data)(self, timeout=timeout)
await self._send_event_data_with_retry(timeout=timeout)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
10 changes: 6 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,8 @@ def _open(self, timeout_time=None):
self.source = self.redirected.address
super(EventHubConsumer, self)._open(timeout_time)

def _receive(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
last_exception = kwargs.get("last_exception")
max_batch_size = kwargs.get("max_batch_size")
data_batch = kwargs.get("data_batch")

self._open(timeout_time)
Expand All @@ -165,6 +163,10 @@ def _receive(self, **kwargs):
data_batch.append(event_data)
return data_batch

@_retry_decorator
def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs):
return self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs)

@property
def queue_size(self):
# type:() -> int
Expand Down Expand Up @@ -210,7 +212,7 @@ def receive(self, max_batch_size=None, timeout=None):
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return _retry_decorator(self._receive)(self, timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
return self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)

def close(self, exception=None):
# type:(Exception) -> None
Expand Down
12 changes: 10 additions & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ def _open(self, timeout_time=None, **kwargs):
self.target = self.redirected.address
super(EventHubProducer, self)._open(timeout_time)

@_retry_decorator
def _open_with_retry(self, timeout_time=None, **kwargs):
return self._open(timeout_time=timeout_time, **kwargs)

def _send_event_data(self, timeout_time=None, last_exception=None):
if self.unsent_events:
self._open(timeout_time)
Expand All @@ -138,6 +142,10 @@ def _send_event_data(self, timeout_time=None, last_exception=None):
_error(self._outcome, self._condition)
return

@_retry_decorator
def _send_event_data_with_retry(self, timeout_time=None, last_exception=None):
return self._send_event_data(timeout_time=timeout_time, last_exception=last_exception)

def _on_outcome(self, outcome, condition):
"""
Called when the outcome is received for a delivery.
Expand Down Expand Up @@ -165,7 +173,7 @@ def create_batch(self, max_size=None, partition_key=None):
"""

if not self._max_message_size_on_link:
_retry_decorator(self._open)(self, timeout=self.client.config.send_timeout)
self._open_with_retry(timeout=self.client.config.send_timeout)

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
Expand Down Expand Up @@ -219,7 +227,7 @@ def send(self, event_data, partition_key=None, timeout=None):
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
_retry_decorator(self._send_event_data)(self, timeout=timeout)
self._send_event_data_with_retry(timeout=timeout)

def close(self, exception=None):
# type:(Exception) -> None
Expand Down