diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 96d60ffc8943..9541fcc0efa2 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -122,11 +122,14 @@ async def _send_event_data(self, timeout=None): error = OperationTimeoutError("send operation timed out") log.info("%r send operation timed out. (%r)", self.name, error) raise error + self._handler._msg_timeout = remaining_time # pylint: disable=protected-access self._handler.queue_message(*self.unsent_events) await self._handler.wait_async() self.unsent_events = self._handler.pending_messages - if self._outcome != constants.MessageSendResult.Ok: - _error(self._outcome, self._condition) + if self._outcome != constants.MessageSendResult.Ok: + if self._outcome == constants.MessageSendResult.Timeout: + self._condition = OperationTimeoutError("send operation timed out") + _error(self._outcome, self._condition) return except Exception as exception: last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) @@ -155,6 +158,9 @@ async def send(self, event_data, partition_key=None, timeout=None): :param partition_key: With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. :type partition_key: str + :param timeout: The maximum wait time to send the event data. + If not specified, the default wait time specified when the producer was created will be used. + :type timeout:float :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError :return: None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 0a839463e4fb..9ea370bc6aef 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -129,10 +129,13 @@ def _send_event_data(self, timeout=None): error = OperationTimeoutError("send operation timed out") log.info("%r send operation timed out. (%r)", self.name, error) raise error + self._handler._msg_timeout = remaining_time # pylint: disable=protected-access self._handler.queue_message(*self.unsent_events) self._handler.wait() self.unsent_events = self._handler.pending_messages if self._outcome != constants.MessageSendResult.Ok: + if self._outcome == constants.MessageSendResult.Timeout: + self._condition = OperationTimeoutError("send operation timed out") _error(self._outcome, self._condition) return except Exception as exception: @@ -162,6 +165,9 @@ def send(self, event_data, partition_key=None, timeout=None): :param partition_key: With the given partition_key, event data will land to a particular partition of the Event Hub decided by the service. :type partition_key: str + :param timeout: The maximum wait time to send the event data. + If not specified, the default wait time specified when the producer was created will be used. + :type timeout:float :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError