Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Improve send timeout
  • Loading branch information
yunhaoling committed Jul 24, 2019
commit d78abe6bf4868a3e39ff23b56046d77634874ead
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,14 @@ async def _send_event_data(self, timeout=None):
error = OperationTimeoutError("send operation timed out")
log.info("%r send operation timed out. (%r)", self.name, error)
raise error
self._handler._msg_timeout = remaining_time # pylint: disable=protected-access
self._handler.queue_message(*self.unsent_events)
await self._handler.wait_async()
self.unsent_events = self._handler.pending_messages
if self._outcome != constants.MessageSendResult.Ok:
_error(self._outcome, self._condition)
if self._outcome != constants.MessageSendResult.Ok:
if self._outcome == constants.MessageSendResult.Timeout:
self._condition = OperationTimeoutError("send operation timed out")
_error(self._outcome, self._condition)
return
except Exception as exception:
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time)
Expand Down Expand Up @@ -155,6 +158,9 @@ async def send(self, event_data, partition_key=None, timeout=None):
:param partition_key: With the given partition_key, event data will land to
a particular partition of the Event Hub decided by the service.
:type partition_key: str
:param timeout: The maximum wait time to send the event data.
If not specified, the default wait time specified when the producer was created will be used.
:type timeout:float
:raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError,
~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError
:return: None
Expand Down
6 changes: 6 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ def _send_event_data(self, timeout=None):
error = OperationTimeoutError("send operation timed out")
log.info("%r send operation timed out. (%r)", self.name, error)
raise error
self._handler._msg_timeout = remaining_time # pylint: disable=protected-access
self._handler.queue_message(*self.unsent_events)
self._handler.wait()
self.unsent_events = self._handler.pending_messages
if self._outcome != constants.MessageSendResult.Ok:
if self._outcome == constants.MessageSendResult.Timeout:
self._condition = OperationTimeoutError("send operation timed out")
_error(self._outcome, self._condition)
return
except Exception as exception:
Expand Down Expand Up @@ -162,6 +165,9 @@ def send(self, event_data, partition_key=None, timeout=None):
:param partition_key: With the given partition_key, event data will land to
a particular partition of the Event Hub decided by the service.
:type partition_key: str
:param timeout: The maximum wait time to send the event data.
If not specified, the default wait time specified when the producer was created will be used.
:type timeout:float
:raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError,
~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError

Expand Down