diff --git a/sdk/eventhub/azure-eventhubs/HISTORY.md b/sdk/eventhub/azure-eventhubs/HISTORY.md index c1b859b55f00..11ae91dc04fb 100644 --- a/sdk/eventhub/azure-eventhubs/HISTORY.md +++ b/sdk/eventhub/azure-eventhubs/HISTORY.md @@ -1,5 +1,20 @@ # Release History +## 5.0.0b2 (2019-08-06) + +**New features** + +- Added ability to create and send EventDataBatch object with limited data size. +- Added new configuration parameters for exponential delay among each retry operation. + - `retry_total`: The total number of attempts to redo the failed operation. + - `backoff_factor`: The delay time factor. + - `backoff_max`: The maximum delay time in total. + +**Breaking changes** + +- New `EventProcessor` design + - The `EventProcessorHost` was waived. + ## 5.0.0b1 (2019-06-25) Version 5.0.0b1 is a preview of our efforts to create a client library that is user friendly and idiomatic to the Python ecosystem. The reasons for most of the changes in this update can be found in the [Azure SDK Design Guidelines for Python](https://azuresdkspecs.z5.web.core.windows.net/PythonSpec.html). For more information, please visit https://aka.ms/azure-sdk-preview1-python. diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index 95f6e908c404..9124ff261949 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -7,12 +7,30 @@ import logging import time -from uamqp import errors +from uamqp import errors, constants, compat from azure.eventhub.error import EventHubError, _handle_exception log = logging.getLogger(__name__) +def _retry_decorator(to_be_wrapped_func): + def wrapped_func(self, *args, **kwargs): + timeout = kwargs.pop("timeout", None) + if not timeout: + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout_time = time.time() + timeout + max_retries = self.client.config.max_retries + retry_count = 0 + last_exception = None + while True: + try: + return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs) + except Exception as exception: + last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) + retry_count += 1 + return wrapped_func + + class ConsumerProducerMixin(object): def __init__(self): self.client = None @@ -27,7 +45,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def _check_closed(self): if self.error: - raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name)) + raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name)) def _create_handler(self): pass @@ -46,6 +64,8 @@ def _open(self, timeout_time=None): """ # pylint: disable=protected-access if not self.running: + if self._handler: + self._handler.close() if self.redirected: alt_creds = { "username": self.client._auth_config.get("iot_username"), @@ -58,9 +78,9 @@ def _open(self, timeout_time=None): self.client.get_auth(**alt_creds) )) while not self._handler.client_ready(): - if timeout_time and time.time() >= timeout_time: - return time.sleep(0.05) + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ + or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access self.running = True def _close_handler(self): @@ -72,6 +92,10 @@ def _close_connection(self): self.client._conn_manager.reset_connection_if_broken() def _handle_exception(self, exception, retry_count, max_retries, timeout_time): + if not self.running and isinstance(exception, compat.TimeoutException): + exception = errors.AuthenticationException("Authorization timeout.") + return _handle_exception(exception, retry_count, max_retries, self, timeout_time) + return _handle_exception(exception, retry_count, max_retries, self, timeout_time) def close(self, exception=None): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index e6b35ad41ae4..a90198f42f54 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -6,13 +6,31 @@ import logging import time -from uamqp import errors +from uamqp import errors, constants, compat from azure.eventhub.error import EventHubError, ConnectError from ..aio.error_async import _handle_exception log = logging.getLogger(__name__) +def _retry_decorator(to_be_wrapped_func): + async def wrapped_func(self, *args, **kwargs): + timeout = kwargs.pop("timeout", None) + if not timeout: + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout_time = time.time() + timeout + max_retries = self.client.config.max_retries + retry_count = 0 + last_exception = None + while True: + try: + return await to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs) + except Exception as exception: + last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) + retry_count += 1 + return wrapped_func + + class ConsumerProducerMixin(object): def __init__(self): @@ -28,7 +46,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def _check_closed(self): if self.error: - raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name)) + raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name)) def _create_handler(self): pass @@ -47,6 +65,8 @@ async def _open(self, timeout_time=None): """ # pylint: disable=protected-access if not self.running: + if self._handler: + await self._handler.close_async() if self.redirected: alt_creds = { "username": self.client._auth_config.get("iot_username"), @@ -59,9 +79,9 @@ async def _open(self, timeout_time=None): self.client.get_auth(**alt_creds) )) while not await self._handler.client_ready_async(): - if timeout_time and time.time() >= timeout_time: - return await asyncio.sleep(0.05) + self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ + or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access self.running = True async def _close_handler(self): @@ -73,6 +93,10 @@ async def _close_connection(self): await self.client._conn_manager.reset_connection_if_broken() async def _handle_exception(self, exception, retry_count, max_retries, timeout_time): + if not self.running and isinstance(exception, compat.TimeoutException): + exception = errors.AuthenticationException("Authorization timeout.") + return await _handle_exception(exception, retry_count, max_retries, self, timeout_time) + return await _handle_exception(exception, retry_count, max_retries, self, timeout_time) async def close(self, exception=None): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py index 17479119ccad..381ed9cb5dd6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py @@ -12,11 +12,8 @@ from uamqp import ( Message, AMQPClientAsync, - errors, ) -from uamqp import compat -from azure.eventhub.error import ConnectError from azure.eventhub.common import parse_sas_token, EventPosition, EventHubSharedKeyCredential, EventHubSASTokenCredential from ..client_abstract import EventHubClientAbstract @@ -193,9 +190,8 @@ async def get_partition_properties(self, partition): output['is_empty'] = partition_info[b'is_partition_empty'] return output - def create_consumer( - self, consumer_group, partition_id, event_position, **kwargs): - # type: (str, str, EventPosition, int, str, int, asyncio.AbstractEventLoop) -> EventHubConsumer + def create_consumer(self, consumer_group, partition_id, event_position, **kwargs): + # type: (str, str, EventPosition) -> EventHubConsumer """ Create an async consumer to the client for a particular consumer group and partition. @@ -240,8 +236,7 @@ def create_consumer( prefetch=prefetch, loop=loop) return handler - def create_producer( - self, **kwargs): + def create_producer(self, *, partition_id=None, operation=None, send_timeout=None, loop=None): # type: (str, str, float, asyncio.AbstractEventLoop) -> EventHubProducer """ Create an async producer to send EventData object to an EventHub. @@ -268,10 +263,6 @@ def create_producer( :caption: Add an async producer to the client to send EventData. """ - partition_id = kwargs.get("partition_id", None) - operation = kwargs.get("operation", None) - send_timeout = kwargs.get("send_timeout", None) - loop = kwargs.get("loop", None) target = "amqps://{}{}".format(self.address.hostname, self.address.path) if operation: @@ -283,4 +274,4 @@ def create_producer( return handler async def close(self): - await self._conn_manager.close_connection() \ No newline at end of file + await self._conn_manager.close_connection() diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index dac2d0c0fa61..404fc23312f0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -8,13 +8,12 @@ from typing import List import time -from uamqp import errors, types, compat +from uamqp import errors, types from uamqp import ReceiveClientAsync, Source from azure.eventhub import EventData, EventPosition -from azure.eventhub.error import EventHubError, AuthenticationError, ConnectError, ConnectionLostError, _error_handler -from ..aio.error_async import _handle_exception -from ._consumer_producer_mixin_async import ConsumerProducerMixin +from azure.eventhub.error import EventHubError, ConnectError, _error_handler +from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator log = logging.getLogger(__name__) @@ -81,6 +80,7 @@ def __init__( # pylint: disable=super-init-not-called self.error = None self._link_properties = {} partition = self.source.split('/')[-1] + self.partition = partition self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition) if owner_level: self._link_properties[types.AMQPSymbol(self._epoch)] = types.AMQPLong(int(owner_level)) @@ -100,8 +100,9 @@ async def __anext__(self): if not self.messages_iter: self.messages_iter = self._handler.receive_messages_iter_async() message = await self.messages_iter.__anext__() - event_data = EventData(message=message) + event_data = EventData._from_message(message) self.offset = EventPosition(event_data.offset, inclusive=False) + retry_count = 0 return event_data except Exception as exception: await self._handle_exception(exception, retry_count, max_retries) @@ -146,6 +147,32 @@ async def _open(self, timeout_time=None): self.source = self.redirected.address await super(EventHubConsumer, self)._open(timeout_time) + async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): + last_exception = kwargs.get("last_exception") + data_batch = kwargs.get("data_batch") + + await self._open(timeout_time) + remaining_time = timeout_time - time.time() + if remaining_time <= 0.0: + if last_exception: + log.info("%r receive operation timed out. (%r)", self.name, last_exception) + raise last_exception + return data_batch + + remaining_time_ms = 1000 * remaining_time + message_batch = await self._handler.receive_message_batch_async( + max_batch_size=max_batch_size, + timeout=remaining_time_ms) + for message in message_batch: + event_data = EventData._from_message(message) + self.offset = EventPosition(event_data.offset) + data_batch.append(event_data) + return data_batch + + @_retry_decorator + async def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs): + return await self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs) + @property def queue_size(self): # type: () -> int @@ -159,7 +186,7 @@ def queue_size(self): return self._handler._received_messages.qsize() return 0 - async def receive(self, **kwargs): + async def receive(self, *, max_batch_size=None, timeout=None): # type: (int, float) -> List[EventData] """ Receive events asynchronously from the EventHub. @@ -186,45 +213,13 @@ async def receive(self, **kwargs): :caption: Receives events asynchronously """ - max_batch_size = kwargs.get("max_batch_size", None) - timeout = kwargs.get("timeout", None) - self._check_closed() - max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size - timeout = self.client.config.receive_timeout if timeout is None else timeout - if not timeout: - timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout - - data_batch = [] - start_time = time.time() - timeout_time = start_time + timeout - max_retries = self.client.config.max_retries - retry_count = 0 - last_exception = None - while True: - try: - await self._open(timeout_time) - remaining_time = timeout_time - time.time() - if remaining_time <= 0.0: - if last_exception: - log.info("%r receive operation timed out. (%r)", self.name, last_exception) - raise last_exception - return data_batch - - remaining_time_ms = 1000 * remaining_time - message_batch = await self._handler.receive_message_batch_async( - max_batch_size=max_batch_size, - timeout=remaining_time_ms) - for message in message_batch: - event_data = EventData(message=message) - self.offset = EventPosition(event_data.offset) - data_batch.append(event_data) - return data_batch - except EventHubError: - raise - except Exception as exception: - last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) - retry_count += 1 + + timeout = timeout or self.client.config.receive_timeout + max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch) + data_batch = [] # type: List[EventData] + + return await self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py index 957a3005662e..b44f8cb54a33 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py @@ -33,6 +33,8 @@ def _create_eventhub_exception(exception): async def _handle_exception(exception, retry_count, max_retries, closable, timeout_time=None): + if isinstance(exception, asyncio.CancelledError): + raise try: name = closable.name except AttributeError: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index e326aef0a115..a231e12eb63c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -14,7 +14,7 @@ from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError from ..producer import _error, _set_partition_key -from ._consumer_producer_mixin_async import ConsumerProducerMixin +from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator log = logging.getLogger(__name__) @@ -98,7 +98,7 @@ def _create_handler(self): self.client.config.user_agent), # pylint: disable=protected-access loop=self.loop) - async def _open(self, timeout_time=None): + async def _open(self, timeout_time=None, **kwargs): """ Open the EventHubProducer using the supplied connection. If the handler has previously been redirected, the redirect @@ -110,39 +110,34 @@ async def _open(self, timeout_time=None): self.target = self.redirected.address await super(EventHubProducer, self)._open(timeout_time) - async def _send_event_data(self, timeout=None): - timeout = timeout or self.client.config.send_timeout - if not timeout: - timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout - start_time = time.time() - timeout_time = start_time + timeout - max_retries = self.client.config.max_retries - retry_count = 0 - last_exception = None - while True: - try: - if self.unsent_events: - await self._open(timeout_time) - remaining_time = timeout_time - time.time() - if remaining_time < 0.0: - if last_exception: - error = last_exception - else: - error = OperationTimeoutError("send operation timed out") - log.info("%r send operation timed out. (%r)", self.name, error) - raise error - self._handler._msg_timeout = remaining_time # pylint: disable=protected-access - self._handler.queue_message(*self.unsent_events) - await self._handler.wait_async() - self.unsent_events = self._handler.pending_messages - if self._outcome != constants.MessageSendResult.Ok: - if self._outcome == constants.MessageSendResult.Timeout: - self._condition = OperationTimeoutError("send operation timed out") - _error(self._outcome, self._condition) - return - except Exception as exception: - last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) - retry_count += 1 + @_retry_decorator + async def _open_with_retry(self, timeout_time=None, **kwargs): + return await self._open(timeout_time=timeout_time, **kwargs) + + async def _send_event_data(self, timeout_time=None, last_exception=None): + if self.unsent_events: + await self._open(timeout_time) + remaining_time = timeout_time - time.time() + if remaining_time <= 0.0: + if last_exception: + error = last_exception + else: + error = OperationTimeoutError("send operation timed out") + log.info("%r send operation timed out. (%r)", self.name, error) + raise error + self._handler._msg_timeout = remaining_time # pylint: disable=protected-access + self._handler.queue_message(*self.unsent_events) + await self._handler.wait_async() + self.unsent_events = self._handler.pending_messages + if self._outcome != constants.MessageSendResult.Ok: + if self._outcome == constants.MessageSendResult.Timeout: + self._condition = OperationTimeoutError("send operation timed out") + _error(self._outcome, self._condition) + return + + @_retry_decorator + async def _send_event_data_with_retry(self, timeout_time=None, last_exception=None): + return await self._send_event_data(timeout_time=timeout_time, last_exception=last_exception) def _on_outcome(self, outcome, condition): """ @@ -156,7 +151,8 @@ def _on_outcome(self, outcome, condition): self._outcome = outcome self._condition = condition - async def create_batch(self, **kwargs): + async def create_batch(self, max_size=None, partition_key=None): + # type:(int, str) -> EventDataBatch """ Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side. @@ -167,20 +163,28 @@ async def create_batch(self, **kwargs): :type partition_key: str :return: an EventDataBatch instance :rtype: ~azure.eventhub.EventDataBatch + + Example: + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py + :start-after: [START eventhub_client_async_create_batch] + :end-before: [END eventhub_client_async_create_batch] + :language: python + :dedent: 4 + :caption: Create EventDataBatch object within limited size + """ - max_size = kwargs.get("max_size", None) - partition_key = kwargs.get("partition_key", None) + if not self._max_message_size_on_link: - await self._open() + await self._open_with_retry(timeout=self.client.config.send_timeout) if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' .format(max_size, self._max_message_size_on_link)) - return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) + return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key) - async def send(self, event_data, **kwargs): - # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None + async def send(self, event_data, *, partition_key=None, timeout=None): + # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes], float) -> None """ Sends an event data and blocks until acknowledgement is received or operation times out. @@ -200,7 +204,7 @@ async def send(self, event_data, **kwargs): :rtype: None Example: - .. literalinclude:: ../examples/test_examples_eventhub.py + .. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py :start-after: [START eventhub_client_async_send] :end-before: [END eventhub_client_async_send] :language: python @@ -208,8 +212,6 @@ async def send(self, event_data, **kwargs): :caption: Sends an event data and blocks until acknowledgement is received or operation times out. """ - partition_key = kwargs.get("partition_key", None) - timeout = kwargs.get("timeout", None) self._check_closed() if isinstance(event_data, EventData): @@ -227,7 +229,7 @@ async def send(self, event_data, **kwargs): wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] - await self._send_event_data(timeout) + await self._send_event_data_with_retry(timeout=timeout) async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py index deda0ddc01fb..706db5498fe3 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client.py @@ -18,18 +18,16 @@ from uamqp import Message from uamqp import authentication from uamqp import constants -from uamqp import errors -from uamqp import compat from azure.eventhub.producer import EventHubProducer from azure.eventhub.consumer import EventHubConsumer from azure.eventhub.common import parse_sas_token, EventPosition -from azure.eventhub.error import ConnectError, EventHubError from .client_abstract import EventHubClientAbstract from .common import EventHubSASTokenCredential, EventHubSharedKeyCredential from ._connection_manager import get_connection_manager from .error import _handle_exception + log = logging.getLogger(__name__) @@ -199,10 +197,8 @@ def get_partition_properties(self, partition): output['is_empty'] = partition_info[b'is_partition_empty'] return output - def create_consumer( - self, consumer_group, partition_id, event_position, **kwargs - ): - # type: (str, str, EventPosition, int, str, int) -> EventHubConsumer + def create_consumer(self, consumer_group, partition_id, event_position, **kwargs): + # type: (str, str, EventPosition, ...) -> EventHubConsumer """ Create a consumer to the client for a particular consumer group and partition. @@ -245,8 +241,8 @@ def create_consumer( prefetch=prefetch) return handler - def create_producer(self, **kwargs): - # type: (str, str, float) -> EventHubProducer + def create_producer(self, partition_id=None, operation=None, send_timeout=None): + # type: (str, str, float, ...) -> EventHubProducer """ Create an producer to send EventData object to an EventHub. @@ -271,9 +267,6 @@ def create_producer(self, **kwargs): :caption: Add a producer to the client to send EventData. """ - partition_id = kwargs.get("partition_id", None) - operation = kwargs.get("operation", None) - send_timeout = kwargs.get("send_timeout", None) target = "amqps://{}{}".format(self.address.hostname, self.address.path) if operation: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 8c97797c01ff..d908d4702ac5 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -282,11 +282,9 @@ def from_connection_string(cls, conn_str, **kwargs): return cls._from_iothub_connection_string(conn_str, **kwargs) @abstractmethod - def create_consumer( - self, consumer_group, partition_id, event_position, **kwargs - ): + def create_consumer(self, consumer_group, partition_id, event_position, **kwargs): pass @abstractmethod - def create_producer(self, **kwargs): + def create_producer(self, partition_id=None, operation=None, send_timeout=None): pass diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index ea00d0aef5ff..701b45484d75 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -11,7 +11,7 @@ import logging from azure.eventhub.error import EventDataError -from uamqp import BatchMessage, Message, types, constants, errors +from uamqp import BatchMessage, Message, types, constants from uamqp.message import MessageHeader, MessageProperties log = logging.getLogger(__name__) @@ -57,7 +57,7 @@ class EventData(object): PROP_TIMESTAMP = b"x-opt-enqueued-time" PROP_DEVICE_ID = b"iothub-connection-device-id" - def __init__(self, body=None, **kwargs): + def __init__(self, body=None, to_device=None): """ Initialize EventData. @@ -67,11 +67,7 @@ def __init__(self, body=None, **kwargs): :type batch: Generator :param to_device: An IoT device to route to. :type to_device: str - :param message: The received message. - :type message: ~uamqp.message.Message """ - to_device = kwargs.get("to_device", None) - message = kwargs.get("message", None) self._partition_key = types.AMQPSymbol(EventData.PROP_PARTITION_KEY) self._annotations = {} @@ -79,20 +75,14 @@ def __init__(self, body=None, **kwargs): self.msg_properties = MessageProperties() if to_device: self.msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device) - if message: - self.message = message - self.msg_properties = message.properties - self._annotations = message.annotations - self._app_properties = message.application_properties + if body and isinstance(body, list): + self.message = Message(body[0], properties=self.msg_properties) + for more in body[1:]: + self.message._body.append(more) # pylint: disable=protected-access + elif body is None: + raise ValueError("EventData cannot be None.") else: - if body and isinstance(body, list): - self.message = Message(body[0], properties=self.msg_properties) - for more in body[1:]: - self.message._body.append(more) # pylint: disable=protected-access - elif body is None: - raise ValueError("EventData cannot be None.") - else: - self.message = Message(body, properties=self.msg_properties) + self.message = Message(body, properties=self.msg_properties) def __str__(self): dic = { @@ -127,6 +117,15 @@ def _set_partition_key(self, value): self.message.header = header self._annotations = annotations + @staticmethod + def _from_message(message): + event_data = EventData(body='') + event_data.message = message + event_data.msg_properties = message.properties + event_data._annotations = message.annotations + event_data._app_properties = message.application_properties + return event_data + @property def sequence_number(self): """ @@ -215,7 +214,7 @@ def body(self): except TypeError: raise ValueError("Message data empty.") - def body_as_str(self, **kwargs): + def body_as_str(self, encoding='UTF-8'): """ The body of the event data as a string if the data is of a compatible type. @@ -224,7 +223,6 @@ def body_as_str(self, **kwargs): Default is 'UTF-8' :rtype: str or unicode """ - encoding = kwargs.get("encoding", 'UTF-8') data = self.body try: return "".join(b.decode(encoding) for b in data) @@ -237,7 +235,7 @@ def body_as_str(self, **kwargs): except Exception as e: raise TypeError("Message data is not compatible with string type: {}".format(e)) - def body_as_json(self, **kwargs): + def body_as_json(self, encoding='UTF-8'): """ The body of the event loaded as a JSON object is the data is compatible. @@ -245,7 +243,6 @@ def body_as_json(self, **kwargs): Default is 'UTF-8' :rtype: dict """ - encoding = kwargs.get("encoding", 'UTF-8') data_str = self.body_as_str(encoding=encoding) try: return json.loads(data_str) @@ -263,9 +260,7 @@ class EventDataBatch(object): Do not instantiate an EventDataBatch object directly. """ - def __init__(self, **kwargs): - max_size = kwargs.get("max_size", None) - partition_key = kwargs.get("partition_key", None) + def __init__(self, max_size=None, partition_key=None): self.max_size = max_size or constants.MAX_MESSAGE_LENGTH_BYTES self._partition_key = partition_key self.message = BatchMessage(data=[], multi_messages=False, properties=None) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index e59c440c7c88..85c09bf1a308 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -13,8 +13,9 @@ from uamqp import ReceiveClient, Source from azure.eventhub.common import EventData, EventPosition -from azure.eventhub.error import _error_handler, EventHubError -from ._consumer_producer_mixin import ConsumerProducerMixin +from azure.eventhub.error import _error_handler +from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator + log = logging.getLogger(__name__) @@ -75,6 +76,7 @@ def __init__(self, client, source, **kwargs): self.redirected = None self.error = None partition = self.source.split('/')[-1] + self.partition = partition self.name = "EHConsumer-{}-partition{}".format(uuid.uuid4(), partition) if owner_level: self._link_properties[types.AMQPSymbol(self._epoch)] = types.AMQPLong(int(owner_level)) @@ -94,8 +96,9 @@ def __next__(self): if not self.messages_iter: self.messages_iter = self._handler.receive_messages_iter() message = next(self.messages_iter) - event_data = EventData(message=message) + event_data = EventData._from_message(message) self.offset = EventPosition(event_data.offset, inclusive=False) + retry_count = 0 return event_data except Exception as exception: self._handle_exception(exception, retry_count, max_retries) @@ -139,6 +142,31 @@ def _open(self, timeout_time=None): self.source = self.redirected.address super(EventHubConsumer, self)._open(timeout_time) + def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): + last_exception = kwargs.get("last_exception") + data_batch = kwargs.get("data_batch") + + self._open(timeout_time) + remaining_time = timeout_time - time.time() + if remaining_time <= 0.0: + if last_exception: + log.info("%r receive operation timed out. (%r)", self.name, last_exception) + raise last_exception + return data_batch + remaining_time_ms = 1000 * remaining_time + message_batch = self._handler.receive_message_batch( + max_batch_size=max_batch_size - (len(data_batch) if data_batch else 0), + timeout=remaining_time_ms) + for message in message_batch: + event_data = EventData._from_message(message) + self.offset = EventPosition(event_data.offset) + data_batch.append(event_data) + return data_batch + + @_retry_decorator + def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs): + return self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs) + @property def queue_size(self): # type:() -> int @@ -152,8 +180,8 @@ def queue_size(self): return self._handler._received_messages.qsize() return 0 - def receive(self, **kwargs): - # type:(int, float) -> List[EventData] + def receive(self, max_batch_size=None, timeout=None): + # type: (int, float) -> List[EventData] """ Receive events from the EventHub. @@ -178,44 +206,13 @@ def receive(self, **kwargs): :caption: Receive events from the EventHub. """ - max_batch_size = kwargs.get("max_batch_size", None) - timeout = kwargs.get("timeout", None) - self._check_closed() - max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size - timeout = self.client.config.receive_timeout if timeout is None else timeout - if not timeout: - timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout = timeout or self.client.config.receive_timeout + max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch) data_batch = [] # type: List[EventData] - start_time = time.time() - timeout_time = start_time + timeout - max_retries = self.client.config.max_retries - retry_count = 0 - last_exception = None - while True: - try: - self._open(timeout_time) - remaining_time = timeout_time - time.time() - if remaining_time <= 0.0: - if last_exception: - log.info("%r receive operation timed out. (%r)", self.name, last_exception) - raise last_exception - return data_batch - remaining_time_ms = 1000 * remaining_time - message_batch = self._handler.receive_message_batch( - max_batch_size=max_batch_size - (len(data_batch) if data_batch else 0), - timeout=remaining_time_ms) - for message in message_batch: - event_data = EventData(message=message) - self.offset = EventPosition(event_data.offset) - data_batch.append(event_data) - return data_batch - except EventHubError: - raise - except Exception as exception: - last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) - retry_count += 1 + + return self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch) def close(self, exception=None): # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index da2a9ee95368..13ef7744772b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -10,12 +10,11 @@ from typing import Iterable, Union from uamqp import types, constants, errors -from uamqp import compat from uamqp import SendClient from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError -from ._consumer_producer_mixin import ConsumerProducerMixin +from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator log = logging.getLogger(__name__) @@ -105,7 +104,7 @@ def _create_handler(self): link_properties=self._link_properties, properties=self.client._create_properties(self.client.config.user_agent)) # pylint: disable=protected-access - def _open(self, timeout_time=None): + def _open(self, timeout_time=None, **kwargs): """ Open the EventHubProducer using the supplied connection. If the handler has previously been redirected, the redirect @@ -118,39 +117,34 @@ def _open(self, timeout_time=None): self.target = self.redirected.address super(EventHubProducer, self)._open(timeout_time) - def _send_event_data(self, timeout=None): - timeout = timeout or self.client.config.send_timeout - if not timeout: - timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout - start_time = time.time() - timeout_time = start_time + timeout - max_retries = self.client.config.max_retries - retry_count = 0 - last_exception = None - while True: - try: - if self.unsent_events: - self._open(timeout_time) - remaining_time = timeout_time - time.time() - if remaining_time <= 0.0: - if last_exception: - error = last_exception - else: - error = OperationTimeoutError("send operation timed out") - log.info("%r send operation timed out. (%r)", self.name, error) - raise error - self._handler._msg_timeout = remaining_time # pylint: disable=protected-access - self._handler.queue_message(*self.unsent_events) - self._handler.wait() - self.unsent_events = self._handler.pending_messages - if self._outcome != constants.MessageSendResult.Ok: - if self._outcome == constants.MessageSendResult.Timeout: - self._condition = OperationTimeoutError("send operation timed out") - _error(self._outcome, self._condition) - return - except Exception as exception: - last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) - retry_count += 1 + @_retry_decorator + def _open_with_retry(self, timeout_time=None, **kwargs): + return self._open(timeout_time=timeout_time, **kwargs) + + def _send_event_data(self, timeout_time=None, last_exception=None): + if self.unsent_events: + self._open(timeout_time) + remaining_time = timeout_time - time.time() + if remaining_time <= 0.0: + if last_exception: + error = last_exception + else: + error = OperationTimeoutError("send operation timed out") + log.info("%r send operation timed out. (%r)", self.name, error) + raise error + self._handler._msg_timeout = remaining_time # pylint: disable=protected-access + self._handler.queue_message(*self.unsent_events) + self._handler.wait() + self.unsent_events = self._handler.pending_messages + if self._outcome != constants.MessageSendResult.Ok: + if self._outcome == constants.MessageSendResult.Timeout: + self._condition = OperationTimeoutError("send operation timed out") + _error(self._outcome, self._condition) + return + + @_retry_decorator + def _send_event_data_with_retry(self, timeout_time=None, last_exception=None): + return self._send_event_data(timeout_time=timeout_time, last_exception=last_exception) def _on_outcome(self, outcome, condition): """ @@ -164,7 +158,8 @@ def _on_outcome(self, outcome, condition): self._outcome = outcome self._condition = condition - def create_batch(self, **kwargs): + def create_batch(self, max_size=None, partition_key=None): + # type:(int, str) -> EventDataBatch """ Create an EventDataBatch object with max size being max_size. The max_size should be no greater than the max allowed message size defined by the service side. @@ -175,19 +170,27 @@ def create_batch(self, **kwargs): :type partition_key: str :return: an EventDataBatch instance :rtype: ~azure.eventhub.EventDataBatch + + Example: + .. literalinclude:: ../examples/test_examples_eventhub.py + :start-after: [START eventhub_client_sync_create_batch] + :end-before: [END eventhub_client_sync_create_batch] + :language: python + :dedent: 4 + :caption: Create EventDataBatch object within limited size + """ - max_size = kwargs.get("max_size", None) - partition_key = kwargs.get("partition_key", None) + if not self._max_message_size_on_link: - self._open() + self._open_with_retry(timeout=self.client.config.send_timeout) if max_size and max_size > self._max_message_size_on_link: raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' .format(max_size, self._max_message_size_on_link)) - return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) + return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key) - def send(self, event_data, **kwargs): + def send(self, event_data, partition_key=None, timeout=None): # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes], float) -> None """ Sends an event data and blocks until acknowledgement is @@ -216,8 +219,6 @@ def send(self, event_data, **kwargs): :caption: Sends an event data and blocks until acknowledgement is received or operation times out. """ - partition_key = kwargs.get("partition_key", None) - timeout = kwargs.get("timeout", None) self._check_closed() if isinstance(event_data, EventData): @@ -231,11 +232,11 @@ def send(self, event_data, **kwargs): wrapper_event_data = event_data else: if partition_key: - event_data = self._set_partition_key(event_data, partition_key) + event_data = _set_partition_key(event_data, partition_key) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] - self._send_event_data(timeout=timeout) + self._send_event_data_with_retry(timeout=timeout) def close(self, exception=None): # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py b/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py index 048f5af1623c..896f2a007b21 100644 --- a/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py +++ b/sdk/eventhub/azure-eventhubs/examples/async_examples/test_examples_eventhub_async.py @@ -23,7 +23,7 @@ async def test_example_eventhub_async_send_and_receive(live_eventhub_config): os.environ['EVENT_HUB_HOSTNAME'], os.environ['EVENT_HUB_SAS_POLICY'], os.environ['EVENT_HUB_SAS_KEY'], - os.environ['EVENT_HUB_NAME']) + os.environ['EVENT_HUB_NAME']) client = EventHubClient.from_connection_string(connection_str) # [END create_eventhub_client_async] @@ -49,6 +49,17 @@ async def test_example_eventhub_async_send_and_receive(live_eventhub_config): await consumer.receive(timeout=1) + # [START eventhub_client_async_create_batch] + event_data_batch = await producer.create_batch(max_size=10000) + while True: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + # The EventDataBatch object reaches its max_size. + # You can send the full EventDataBatch object and create a new one here. + break + # [END eventhub_client_async_create_batch] + # [START eventhub_client_async_send] async with producer: event_data = EventData(b"A single event") diff --git a/sdk/eventhub/azure-eventhubs/examples/event_data_batch.py b/sdk/eventhub/azure-eventhubs/examples/event_data_batch.py new file mode 100644 index 000000000000..3cf6dc88f177 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs/examples/event_data_batch.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +An example to show creating and sending EventBatchData within limited size. +""" + +# pylint: disable=C0111 + +import logging +import time +import os + +from azure.eventhub import EventHubClient, EventData, EventHubSharedKeyCredential + +import examples +logger = examples.get_logger(logging.INFO) + + +HOSTNAME = os.environ.get('EVENT_HUB_HOSTNAME') # .servicebus.windows.net +EVENT_HUB = os.environ.get('EVENT_HUB_NAME') + +USER = os.environ.get('EVENT_HUB_SAS_POLICY') +KEY = os.environ.get('EVENT_HUB_SAS_KEY') + + +def create_batch_data(producer): + event_data_batch = producer.create_batch(max_size=10000) + while True: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + # EventDataBatch object reaches max_size. + # New EventDataBatch object can be created here to send more data + break + return event_data_batch + + +try: + if not HOSTNAME: + raise ValueError("No EventHubs URL supplied.") + + client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY), + network_tracing=False) + producer = client.create_producer() + + try: + start_time = time.time() + with producer: + event_data_batch = create_batch_data(producer) + producer.send(event_data_batch) + except: + raise + finally: + end_time = time.time() + run_time = end_time - start_time + logger.info("Runtime: {} seconds".format(run_time)) + +except KeyboardInterrupt: + pass diff --git a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py index d8483dc6c032..04e29d08256c 100644 --- a/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py +++ b/sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py @@ -86,6 +86,17 @@ def test_example_eventhub_sync_send_and_receive(live_eventhub_config): event_data = EventData(body=list_data) # [END create_event_data] + # [START eventhub_client_sync_create_batch] + event_data_batch = producer.create_batch(max_size=10000) + while True: + try: + event_data_batch.try_add(EventData('Message inside EventBatchData')) + except ValueError: + # The EventDataBatch object reaches its max_size. + # You can send the full EventDataBatch object and create a new one here. + break + # [END eventhub_client_sync_create_batch] + # [START eventhub_client_sync_send] with producer: event_data = EventData(b"A single event") diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py index 74c05d174e47..900612684001 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py @@ -44,12 +44,8 @@ def get_logger(filename, level=logging.INFO): return azure_logger -logger = get_logger("recv_test_async.log", logging.INFO) - -async def get_partitions(client): - eh_data = await client.get_properties() - return eh_data["partition_ids"] +logger = get_logger("recv_test_async.log", logging.INFO) async def pump(_pid, receiver, _args, _dl): @@ -76,9 +72,7 @@ async def pump(_pid, receiver, _args, _dl): total, batch[-1].sequence_number, batch[-1].offset)) - print("{}: total received {}".format( - _pid, - total)) + print("{}: Total received {}".format(receiver.partition, total)) except Exception as e: print("Partition {} receiver failed: {}".format(_pid, e)) raise @@ -127,11 +121,11 @@ async def test_long_running_receive_async(connection_str): receiver = client.create_consumer(consumer_group="$default", partition_id=pid, event_position=EventPosition(args.offset), - prefetch=50, + prefetch=300, loop=loop) pumps.append(pump(pid, receiver, args, args.duration)) await asyncio.gather(*pumps) if __name__ == '__main__': - asyncio.run(test_long_running_receive_async(os.environ.get('EVENT_HUB_CONNECTION_STR'))) + asyncio.run(test_long_running_receive_async(os.environ.get('EVENT_HUB_PERF_CONN_STR'))) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py index 809fa3430b59..00279d168d70 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_send_async.py @@ -122,4 +122,4 @@ async def test_long_running_partition_send_async(connection_str): if __name__ == '__main__': - asyncio.run(test_long_running_partition_send_async(os.environ.get('EVENT_HUB_CONNECTION_STR'))) + asyncio.run(test_long_running_partition_send_async(os.environ.get('EVENT_HUB_PERF_CONN_STR'))) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py index 3d43942fe6c8..4406da855f59 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py @@ -30,6 +30,7 @@ async def test_send_with_invalid_hostname_async(invalid_hostname, connstr_receiv sender = client.create_producer() with pytest.raises(AuthenticationError): await sender.send(EventData("test data")) + await sender.close() @pytest.mark.liveTest @@ -39,6 +40,7 @@ async def test_receive_with_invalid_hostname_async(invalid_hostname): receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): await receiver.receive(timeout=3) + await receiver.close() @pytest.mark.liveTest @@ -49,6 +51,7 @@ async def test_send_with_invalid_key_async(invalid_key, connstr_receivers): sender = client.create_producer() with pytest.raises(AuthenticationError): await sender.send(EventData("test data")) + await sender.close() @pytest.mark.liveTest @@ -58,6 +61,7 @@ async def test_receive_with_invalid_key_async(invalid_key): receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): await receiver.receive(timeout=3) + await receiver.close() @pytest.mark.liveTest @@ -68,6 +72,7 @@ async def test_send_with_invalid_policy_async(invalid_policy, connstr_receivers) sender = client.create_producer() with pytest.raises(AuthenticationError): await sender.send(EventData("test data")) + await sender.close() @pytest.mark.liveTest @@ -77,6 +82,7 @@ async def test_receive_with_invalid_policy_async(invalid_policy): receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): await receiver.receive(timeout=3) + await receiver.close() @pytest.mark.liveTest @@ -100,6 +106,7 @@ async def test_non_existing_entity_sender_async(connection_str): sender = client.create_producer(partition_id="1") with pytest.raises(AuthenticationError): await sender.send(EventData("test data")) + await sender.close() @pytest.mark.liveTest @@ -109,6 +116,7 @@ async def test_non_existing_entity_receiver_async(connection_str): receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): await receiver.receive(timeout=5) + await receiver.close() @pytest.mark.liveTest @@ -119,7 +127,7 @@ async def test_receive_from_invalid_partitions_async(connection_str): client = EventHubClient.from_connection_string(connection_str, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1")) with pytest.raises(ConnectError): - await receiver.receive(timeout=10) + await receiver.receive(timeout=5) await receiver.close() @@ -196,3 +204,40 @@ async def test_max_receivers_async(connstr_senders): failed = [o for o in outputs if isinstance(o, EventHubError)] assert len(failed) == 1 print(failed[0].message) + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_create_batch_with_invalid_hostname_async(invalid_hostname): + client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) + sender = client.create_producer() + try: + with pytest.raises(AuthenticationError): + batch_event_data = await sender.create_batch(max_size=300, partition_key="key") + finally: + await sender.close() + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_create_batch_with_none_async(connection_str): + client = EventHubClient.from_connection_string(connection_str, network_tracing=False) + sender = client.create_producer() + batch_event_data = await sender.create_batch(max_size=300, partition_key="key") + try: + with pytest.raises(ValueError): + batch_event_data.try_add(EventData(None)) + finally: + await sender.close() + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_create_batch_with_too_large_size_async(connection_str): + client = EventHubClient.from_connection_string(connection_str, network_tracing=False) + sender = client.create_producer() + try: + with pytest.raises(ValueError): + batch_event_data = await sender.create_batch(max_size=5 * 1024 * 1024, partition_key="key") + finally: + await sender.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py index ae696bf469b5..2a2e4836c2d5 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_receive_async.py @@ -185,7 +185,7 @@ async def test_exclusive_receiver_async(connstr_senders): await pump(receiver1) output2 = await pump(receiver2) with pytest.raises(ConnectionLostError): - await receiver1.receive(timeout=1) + await receiver1.receive(timeout=3) assert output2 == 1 finally: await receiver1.close() @@ -230,7 +230,7 @@ async def test_exclusive_receiver_after_non_exclusive_receiver_async(connstr_sen await pump(receiver1) output2 = await pump(receiver2) with pytest.raises(ConnectionLostError): - await receiver1.receive(timeout=1) + await receiver1.receive(timeout=3) assert output2 == 1 finally: await receiver1.close() @@ -248,7 +248,7 @@ async def test_non_exclusive_receiver_after_exclusive_receiver_async(connstr_sen receiver2 = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), prefetch=10) try: output1 = await pump(receiver1) - with pytest.raises(ConnectError): + with pytest.raises(ConnectionLostError): await pump(receiver2) assert output1 == 1 finally: diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py index 91357e2553b9..05be713e2d8c 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_reconnect_async.py @@ -37,7 +37,7 @@ async def test_send_with_long_interval_async(connstr_receivers, sleep): for r in receivers: if not sleep: # if sender sleeps, the receivers will be disconnected. destroy connection to simulate r._handler._connection._conn.destroy() - received.extend(r.receive(timeout=1)) + received.extend(r.receive(timeout=5)) assert len(received) == 2 assert list(received[0].body)[0] == b"A single event" diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py index 3d5fb70601ea..aa301bad3119 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_send_async.py @@ -246,3 +246,21 @@ async def test_send_over_websocket_async(connstr_receivers): for r in receivers: r.close() + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_send_with_create_event_batch_async(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) + sender = client.create_producer() + + event_data_batch = await sender.create_batch(max_size=100000) + while True: + try: + event_data_batch.try_add(EventData('A single event data')) + except ValueError: + break + + await sender.send(event_data_batch) + await sender.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py index bbd945d09e9b..47559b778af3 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py @@ -14,6 +14,7 @@ import time import os import sys +import threading import pytest from logging.handlers import RotatingFileHandler @@ -22,6 +23,7 @@ from azure.eventhub import EventHubClient from azure.eventhub import EventHubSharedKeyCredential + def get_logger(filename, level=logging.INFO): azure_logger = logging.getLogger("azure.eventhub") azure_logger.setLevel(level) @@ -47,38 +49,33 @@ def get_logger(filename, level=logging.INFO): logger = get_logger("recv_test.log", logging.INFO) -def get_partitions(args): - eh_data = args.get_properties() - return eh_data["partition_ids"] - - -def pump(receivers, duration): +def pump(receiver, duration): total = 0 iteration = 0 deadline = time.time() + duration - try: - while time.time() < deadline: - for pid, receiver in receivers.items(): + with receiver: + try: + while time.time() < deadline: batch = receiver.receive(timeout=5) size = len(batch) total += size iteration += 1 if size == 0: print("{}: No events received, queue size {}, delivered {}".format( - pid, + receiver.partition, receiver.queue_size, total)) - elif iteration >= 50: + elif iteration >= 5: iteration = 0 print("{}: total received {}, last sn={}, last offset={}".format( - pid, + receiver.partition, total, batch[-1].sequence_number, batch[-1].offset)) - print("Total received {}".format(total)) - except Exception as e: - print("EventHubConsumer failed: {}".format(e)) - raise + print("{}: Total received {}".format(receiver.partition, total)) + except Exception as e: + print("EventHubConsumer failed: {}".format(e)) + raise @pytest.mark.liveTest @@ -112,22 +109,23 @@ def test_long_running_receive(connection_str): except ImportError: raise ValueError("Must specify either '--conn-str' or '--address'") - try: - if not args.partitions: - partitions = get_partitions(client) - else: - partitions = args.partitions.split(",") - pumps = {} - for pid in partitions: - pumps[pid] = client.create_consumer(consumer_group="$default", - partition_id=pid, - event_position=EventPosition(args.offset), - prefetch=50) - pump(pumps, args.duration) - finally: - for pid in partitions: - pumps[pid].close() + if args.partitions: + partitions = args.partitions.split(",") + else: + partitions = client.get_partition_ids() + + threads = [] + for pid in partitions: + consumer = client.create_consumer(consumer_group="$default", + partition_id=pid, + event_position=EventPosition(args.offset), + prefetch=300) + thread = threading.Thread(target=pump, args=(consumer, args.duration)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() if __name__ == '__main__': - test_long_running_receive(os.environ.get('EVENT_HUB_CONNECTION_STR')) + test_long_running_receive(os.environ.get('EVENT_HUB_PERF_CONN_STR')) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py index e4826d05e3fa..e737ee6889d7 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py @@ -13,11 +13,12 @@ import time import os import sys +import threading import logging import pytest from logging.handlers import RotatingFileHandler -from azure.eventhub import EventHubClient, EventHubProducer, EventData, EventHubSharedKeyCredential +from azure.eventhub import EventHubClient, EventDataBatch, EventData, EventHubSharedKeyCredential def get_logger(filename, level=logging.INFO): @@ -42,34 +43,30 @@ def get_logger(filename, level=logging.INFO): return azure_logger -logger = get_logger("send_test.log", logging.INFO) - -def check_send_successful(outcome, condition): - if outcome.value != 0: - print("Send failed {}".format(condition)) +logger = get_logger("send_test.log", logging.INFO) -def main(client, args): - sender = client.create_producer() +def send(sender, args): + # sender = client.create_producer() deadline = time.time() + args.duration total = 0 - try: with sender: - event_list = [] + batch = sender.create_batch() while time.time() < deadline: data = EventData(body=b"D" * args.payload) - event_list.append(data) - total += 1 - if total % 100 == 0: - sender.send(event_list) - event_list = [] - print("Send total {}".format(total)) + try: + batch.try_add(data) + total += 1 + except ValueError: + sender.send(batch, timeout=0) + print("Sent total {} of partition {}".format(total, sender.partition)) + batch = sender.create_batch() except Exception as err: - print("Send failed {}".format(err)) + print("Partition {} send failed {}".format(sender.partition, err)) raise - print("Sent total {}".format(total)) + print("Sent total {} of partition {}".format(total, sender.partition)) @pytest.mark.liveTest @@ -105,10 +102,17 @@ def test_long_running_send(connection_str): raise ValueError("Must specify either '--conn-str' or '--address'") try: - main(client, args) + partition_ids = client.get_partition_ids() + threads = [] + for pid in partition_ids: + sender = client.create_producer(partition_id=pid) + thread = threading.Thread(target=send, args=(sender, args)) + thread.start() + threads.append(thread) + thread.join() except KeyboardInterrupt: pass if __name__ == '__main__': - test_long_running_send(os.environ.get('EVENT_HUB_CONNECTION_STR')) + test_long_running_send(os.environ.get('EVENT_HUB_PERF_CONN_STR')) diff --git a/sdk/eventhub/azure-eventhubs/tests/test_negative.py b/sdk/eventhub/azure-eventhubs/tests/test_negative.py index 4749df940d9c..a1fee7605818 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_negative.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_negative.py @@ -27,6 +27,7 @@ def test_send_with_invalid_hostname(invalid_hostname, connstr_receivers): sender = client.create_producer() with pytest.raises(AuthenticationError): sender.send(EventData("test data")) + sender.close() @pytest.mark.liveTest @@ -47,6 +48,7 @@ def test_send_with_invalid_key(invalid_key, connstr_receivers): sender.send(EventData("test data")) sender.close() + @pytest.mark.liveTest def test_receive_with_invalid_key_sync(invalid_key): client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) @@ -96,19 +98,18 @@ def test_non_existing_entity_sender(connection_str): sender = client.create_producer(partition_id="1") with pytest.raises(AuthenticationError): sender.send(EventData("test data")) + sender.close() @pytest.mark.liveTest def test_non_existing_entity_receiver(connection_str): client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) - with pytest.raises(AuthenticationError): receiver.receive(timeout=5) receiver.close() - @pytest.mark.liveTest def test_receive_from_invalid_partitions_sync(connection_str): partitions = ["XYZ", "-1", "1000", "-"] @@ -117,7 +118,7 @@ def test_receive_from_invalid_partitions_sync(connection_str): receiver = client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1")) try: with pytest.raises(ConnectError): - receiver.receive(timeout=10) + receiver.receive(timeout=5) finally: receiver.close() @@ -214,3 +215,31 @@ def test_message_body_types(connstr_senders): raise finally: receiver.close() + + +@pytest.mark.liveTest +def test_create_batch_with_invalid_hostname_sync(invalid_hostname): + client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) + sender = client.create_producer() + with pytest.raises(AuthenticationError): + batch_event_data = sender.create_batch(max_size=300, partition_key="key") + sender.close() + + +@pytest.mark.liveTest +def test_create_batch_with_none_sync(connection_str): + client = EventHubClient.from_connection_string(connection_str, network_tracing=False) + sender = client.create_producer() + batch_event_data = sender.create_batch(max_size=300, partition_key="key") + with pytest.raises(ValueError): + batch_event_data.try_add(EventData(None)) + sender.close() + + +@pytest.mark.liveTest +def test_create_batch_with_too_large_size_sync(connection_str): + client = EventHubClient.from_connection_string(connection_str, network_tracing=False) + sender = client.create_producer() + with pytest.raises(ValueError): + batch_event_data = sender.create_batch(max_size=5 * 1024 * 1024, partition_key="key") + sender.close() diff --git a/sdk/eventhub/azure-eventhubs/tests/test_receive.py b/sdk/eventhub/azure-eventhubs/tests/test_receive.py index 35c5e39c992b..d241a8e6e585 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_receive.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_receive.py @@ -148,10 +148,10 @@ def test_receive_with_custom_datetime_sync(connstr_senders): receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition(offset)) with receiver: all_received = [] - received = receiver.receive(timeout=1) + received = receiver.receive(timeout=5) while received: all_received.extend(received) - received = receiver.receive(timeout=1) + received = receiver.receive(timeout=5) assert len(all_received) == 5 for received_event in all_received: diff --git a/sdk/eventhub/azure-eventhubs/tests/test_reconnect.py b/sdk/eventhub/azure-eventhubs/tests/test_reconnect.py index 223a759ea9c5..0796cee2178d 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_reconnect.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_reconnect.py @@ -32,7 +32,7 @@ def test_send_with_long_interval_sync(connstr_receivers, sleep): for r in receivers: if not sleep: r._handler._connection._conn.destroy() - received.extend(r.receive(timeout=3)) + received.extend(r.receive(timeout=5)) assert len(received) == 2 assert list(received[0].body)[0] == b"A single event" diff --git a/sdk/eventhub/azure-eventhubs/tests/test_send.py b/sdk/eventhub/azure-eventhubs/tests/test_send.py index f50ac702fb52..8499ff93b36d 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_send.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_send.py @@ -249,3 +249,20 @@ def test_send_over_websocket_sync(connstr_receivers): received.extend(r.receive(timeout=3)) assert len(received) == 20 + + +@pytest.mark.liveTest +def test_send_with_create_event_batch_sync(connstr_receivers): + connection_str, receivers = connstr_receivers + client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False) + sender = client.create_producer() + + event_data_batch = sender.create_batch(max_size=100000) + while True: + try: + event_data_batch.try_add(EventData('A single event data')) + except ValueError: + break + + sender.send(event_data_batch) + sender.close()