diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index 616107e86125..c4faad253c16 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -5,7 +5,7 @@ __version__ = "5.0.0b1" -from azure.eventhub.common import EventData, EventPosition +from azure.eventhub.common import EventData, EventDataBatch, EventPosition from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \ AuthenticationError, EventDataSendError, ConnectionLostError from azure.eventhub.client import EventHubClient @@ -18,6 +18,7 @@ __all__ = [ "EventData", + "EventDataBatch", "EventHubError", "ConnectError", "ConnectionLostError", diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py index 9ac6fb468945..95f6e908c404 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py @@ -72,7 +72,7 @@ def _close_connection(self): self.client._conn_manager.reset_connection_if_broken() def _handle_exception(self, exception, retry_count, max_retries, timeout_time): - _handle_exception(exception, retry_count, max_retries, self, timeout_time) + return _handle_exception(exception, retry_count, max_retries, self, timeout_time) def close(self, exception=None): # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py index 3178e1fb72a7..618359192ffe 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_connection_manager_async.py @@ -70,9 +70,9 @@ async def get_connection(self, host, auth): async def close_connection(self): pass - def reset_connection_if_broken(self): + async def reset_connection_if_broken(self): pass def get_connection_manager(**kwargs): - return _SharedConnectionManager(**kwargs) + return _SeparateConnectionManager(**kwargs) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py index 5a0f0d9eaa4d..e6b35ad41ae4 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py @@ -73,7 +73,7 @@ async def _close_connection(self): await self.client._conn_manager.reset_connection_if_broken() async def _handle_exception(self, exception, retry_count, max_retries, timeout_time): - await _handle_exception(exception, retry_count, max_retries, self, timeout_time) + return await _handle_exception(exception, retry_count, max_retries, self, timeout_time) async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index d4d4143810af..dac2d0c0fa61 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -41,7 +41,7 @@ def __init__( # pylint: disable=super-init-not-called self, client, source, **kwargs): """ Instantiate an async consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync @@ -193,7 +193,7 @@ async def receive(self, **kwargs): max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size timeout = self.client.config.receive_timeout if timeout is None else timeout if not timeout: - timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout data_batch = [] start_time = time.time() @@ -226,7 +226,7 @@ async def receive(self, **kwargs): last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) retry_count += 1 - async def close(self, **kwargs): + async def close(self, exception=None): # type: (Exception) -> None """ Close down the handler. If the handler has already closed, @@ -246,7 +246,6 @@ async def close(self, **kwargs): :caption: Close down the handler. """ - exception = kwargs.get("exception", None) self.running = False if self.error: return diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 9612b4156327..e326aef0a115 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -11,8 +11,8 @@ from uamqp import types, constants, errors from uamqp import SendClientAsync -from azure.eventhub.common import EventData, _BatchSendEventData -from azure.eventhub.error import _error_handler, OperationTimeoutError +from azure.eventhub.common import EventData, EventDataBatch +from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError from ..producer import _error, _set_partition_key from ._consumer_producer_mixin_async import ConsumerProducerMixin @@ -34,7 +34,7 @@ def __init__( # pylint: disable=super-init-not-called self, client, target, **kwargs): """ Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` - method in EventHubClient. + method in EventHubClient. :param client: The parent EventHubClientAsync. :type client: ~azure.eventhub.aio.EventHubClientAsync @@ -62,6 +62,7 @@ def __init__( # pylint: disable=super-init-not-called super(EventHubProducer, self).__init__() self.loop = loop or asyncio.get_event_loop() + self._max_message_size_on_link = None self.running = False self.client = client self.target = target @@ -110,9 +111,9 @@ async def _open(self, timeout_time=None): await super(EventHubProducer, self)._open(timeout_time) async def _send_event_data(self, timeout=None): - timeout = self.client.config.send_timeout if timeout is None else timeout + timeout = timeout or self.client.config.send_timeout if not timeout: - timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout start_time = time.time() timeout_time = start_time + timeout max_retries = self.client.config.max_retries @@ -155,8 +156,31 @@ def _on_outcome(self, outcome, condition): self._outcome = outcome self._condition = condition + async def create_batch(self, **kwargs): + """ + Create an EventDataBatch object with max size being max_size. + The max_size should be no greater than the max allowed message size defined by the service side. + :param max_size: The maximum size of bytes data that an EventDataBatch object can hold. + :type max_size: int + :param partition_key: With the given partition_key, event data will land to + a particular partition of the Event Hub decided by the service. + :type partition_key: str + :return: an EventDataBatch instance + :rtype: ~azure.eventhub.EventDataBatch + """ + max_size = kwargs.get("max_size", None) + partition_key = kwargs.get("partition_key", None) + if not self._max_message_size_on_link: + await self._open() + + if max_size and max_size > self._max_message_size_on_link: + raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + .format(max_size, self._max_message_size_on_link)) + + return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) + async def send(self, event_data, **kwargs): - # type:(Union[EventData, Iterable[EventData]], Union[str, bytes]) -> None + # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None """ Sends an event data and blocks until acknowledgement is received or operation times out. @@ -164,7 +188,8 @@ async def send(self, event_data, **kwargs): :param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects :type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list :param partition_key: With the given partition_key, event data will land to - a particular partition of the Event Hub decided by the service. + a particular partition of the Event Hub decided by the service. partition_key + could be omitted if event_data is of type ~azure.eventhub.EventDataBatch. :type partition_key: str :param timeout: The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used. @@ -189,18 +214,22 @@ async def send(self, event_data, **kwargs): self._check_closed() if isinstance(event_data, EventData): if partition_key: - event_data._set_partition_key(partition_key) + event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: - event_data_with_pk = _set_partition_key(event_data, partition_key) - wrapper_event_data = _BatchSendEventData( - event_data_with_pk, - partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) + if isinstance(event_data, EventDataBatch): + if partition_key and not (partition_key == event_data._partition_key): # pylint: disable=protected-access + raise EventDataError('The partition_key does not match the one of the EventDataBatch') + wrapper_event_data = event_data + else: + if partition_key: + event_data = _set_partition_key(event_data, partition_key) + wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] await self._send_event_data(timeout) - async def close(self, **kwargs): + async def close(self, exception=None): # type: (Exception) -> None """ Close down the handler. If the handler has already closed, @@ -220,5 +249,4 @@ async def close(self, **kwargs): :caption: Close down the handler. """ - exception = kwargs.get("exception", None) await super(EventHubProducer, self).close(exception) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 7f6afb51c7fc..8c97797c01ff 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -276,6 +276,7 @@ def from_connection_string(cls, conn_str, **kwargs): host = address[left_slash_pos + 2:] else: host = address + kwargs.pop("event_hub_path", None) return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs) else: return cls._from_iothub_connection_string(conn_str, **kwargs) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 8faded746a74..ea00d0aef5ff 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -8,11 +8,17 @@ import calendar import json import six -from enum import Enum +import logging -from uamqp import BatchMessage, Message, types +from azure.eventhub.error import EventDataError +from uamqp import BatchMessage, Message, types, constants, errors from uamqp.message import MessageHeader, MessageProperties +log = logging.getLogger(__name__) + +# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each +_BATCH_MESSAGE_OVERHEAD_COST = [5, 8] + def parse_sas_token(sas_token): """Parse a SAS token into its components. @@ -51,7 +57,7 @@ class EventData(object): PROP_TIMESTAMP = b"x-opt-enqueued-time" PROP_DEVICE_ID = b"iothub-connection-device-id" - def __init__(self, **kwargs): + def __init__(self, body=None, **kwargs): """ Initialize EventData. @@ -64,7 +70,6 @@ def __init__(self, **kwargs): :param message: The received message. :type message: ~uamqp.message.Message """ - body = kwargs.get("body", None) to_device = kwargs.get("to_device", None) message = kwargs.get("message", None) @@ -251,10 +256,40 @@ def encode_message(self): return self.message.encode_message() -class _BatchSendEventData(EventData): - def __init__(self, batch_event_data, partition_key=None): - self.message = BatchMessage(data=batch_event_data, multi_messages=False, properties=None) +class EventDataBatch(object): + """ + The EventDataBatch class is a holder of a batch of event data within max size bytes. + Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. + Do not instantiate an EventDataBatch object directly. + """ + + def __init__(self, **kwargs): + max_size = kwargs.get("max_size", None) + partition_key = kwargs.get("partition_key", None) + self.max_size = max_size or constants.MAX_MESSAGE_LENGTH_BYTES + self._partition_key = partition_key + self.message = BatchMessage(data=[], multi_messages=False, properties=None) + self._set_partition_key(partition_key) + self._size = self.message.gather()[0].get_message_encoded_size() + self._count = 0 + + def __len__(self): + return self._count + + @property + def size(self): + """The size in bytes + + :return: int + """ + return self._size + + @staticmethod + def _from_batch(batch_data, partition_key=None): + batch_data_instance = EventDataBatch(partition_key=partition_key) + batch_data_instance.message._body_gen = batch_data + return batch_data_instance def _set_partition_key(self, value): if value: @@ -267,6 +302,38 @@ def _set_partition_key(self, value): self.message.annotations = annotations self.message.header = header + def try_add(self, event_data): + """ + The message size is a sum up of body, properties, header, etc. + :param event_data: + :return: + """ + if event_data is None: + log.warning("event_data is None when calling EventDataBatch.try_add. Ignored") + return + if not isinstance(event_data, EventData): + raise TypeError('event_data should be type of EventData') + + if self._partition_key: + if event_data.partition_key and not (event_data.partition_key == self._partition_key): + raise EventDataError('The partition_key of event_data does not match the one of the EventDataBatch') + if not event_data.partition_key: + event_data._set_partition_key(self._partition_key) + + event_data_size = event_data.message.get_message_encoded_size() + + # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that + # message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes. + size_after_add = self._size + event_data_size\ + + _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1] + + if size_after_add > self.max_size: + raise ValueError("EventDataBatch has reached its size limit {}".format(self.max_size)) + + self.message._body_gen.append(event_data) # pylint: disable=protected-access + self._size = size_after_add + self._count += 1 + class EventPosition(object): """ @@ -286,7 +353,7 @@ class EventPosition(object): >>> event_pos = EventPosition(1506968696002) """ - def __init__(self, value, **kwargs): + def __init__(self, value, inclusive=False): """ Initialize EventPosition. @@ -295,7 +362,6 @@ def __init__(self, value, **kwargs): :param inclusive: Whether to include the supplied value as the start point. :type inclusive: bool """ - inclusive = kwargs.get("inclusive", False) self.value = value if value is not None else "-1" self.inclusive = inclusive diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 855e6cf97d53..e59c440c7c88 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -40,7 +40,7 @@ class EventHubConsumer(ConsumerProducerMixin): def __init__(self, client, source, **kwargs): """ Instantiate a consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient @@ -185,7 +185,7 @@ def receive(self, **kwargs): max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size timeout = self.client.config.receive_timeout if timeout is None else timeout if not timeout: - timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout data_batch = [] # type: List[EventData] start_time = time.time() @@ -217,7 +217,7 @@ def receive(self, **kwargs): last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) retry_count += 1 - def close(self, **kwargs): + def close(self, exception=None): # type:(Exception) -> None """ Close down the handler. If the handler has already closed, @@ -237,7 +237,6 @@ def close(self, **kwargs): :caption: Close down the handler. """ - exception = kwargs.get("exception", None) if self.messages_iter: self.messages_iter.close() self.messages_iter = None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py index 0fb6933e3015..cbe6a8a04946 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/error.py @@ -57,8 +57,7 @@ class EventHubError(Exception): :vartype details: dict[str, str] """ - def __init__(self, message, **kwargs): - details = kwargs.get("details", None) + def __init__(self, message, details=None): self.error = None self.message = message self.details = details diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 465fbf45d9ef..da2a9ee95368 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -13,10 +13,11 @@ from uamqp import compat from uamqp import SendClient -from azure.eventhub.common import EventData, _BatchSendEventData -from azure.eventhub.error import OperationTimeoutError, _error_handler +from azure.eventhub.common import EventData, EventDataBatch +from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError from ._consumer_producer_mixin import ConsumerProducerMixin + log = logging.getLogger(__name__) @@ -45,7 +46,7 @@ class EventHubProducer(ConsumerProducerMixin): def __init__(self, client, target, **kwargs): """ Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` method - in EventHubClient. + in EventHubClient. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient. @@ -70,6 +71,7 @@ def __init__(self, client, target, **kwargs): auto_reconnect = kwargs.get("auto_reconnect", True) super(EventHubProducer, self).__init__() + self._max_message_size_on_link = None self.running = False self.client = client self.target = target @@ -117,9 +119,9 @@ def _open(self, timeout_time=None): super(EventHubProducer, self)._open(timeout_time) def _send_event_data(self, timeout=None): - timeout = self.client.config.send_timeout if timeout is None else timeout + timeout = timeout or self.client.config.send_timeout if not timeout: - timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout + timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout start_time = time.time() timeout_time = start_time + timeout max_retries = self.client.config.max_retries @@ -162,8 +164,31 @@ def _on_outcome(self, outcome, condition): self._outcome = outcome self._condition = condition + def create_batch(self, **kwargs): + """ + Create an EventDataBatch object with max size being max_size. + The max_size should be no greater than the max allowed message size defined by the service side. + :param max_size: The maximum size of bytes data that an EventDataBatch object can hold. + :type max_size: int + :param partition_key: With the given partition_key, event data will land to + a particular partition of the Event Hub decided by the service. + :type partition_key: str + :return: an EventDataBatch instance + :rtype: ~azure.eventhub.EventDataBatch + """ + max_size = kwargs.get("max_size", None) + partition_key = kwargs.get("partition_key", None) + if not self._max_message_size_on_link: + self._open() + + if max_size and max_size > self._max_message_size_on_link: + raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' + .format(max_size, self._max_message_size_on_link)) + + return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) + def send(self, event_data, **kwargs): - # type:(Union[EventData, Iterable[EventData]], Union[str, bytes], float) -> None + # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes], float) -> None """ Sends an event data and blocks until acknowledgement is received or operation times out. @@ -171,14 +196,14 @@ def send(self, event_data, **kwargs): :param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects :type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list :param partition_key: With the given partition_key, event data will land to - a particular partition of the Event Hub decided by the service. + a particular partition of the Event Hub decided by the service. partition_key + could be omitted if event_data is of type ~azure.eventhub.EventDataBatch. :type partition_key: str :param timeout: The maximum wait time to send the event data. If not specified, the default wait time specified when the producer was created will be used. :type timeout:float :raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError, ~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError - :return: None :rtype: None @@ -197,18 +222,22 @@ def send(self, event_data, **kwargs): self._check_closed() if isinstance(event_data, EventData): if partition_key: - event_data._set_partition_key(partition_key) + event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data else: - event_data_with_pk = _set_partition_key(event_data, partition_key) - wrapper_event_data = _BatchSendEventData( - event_data_with_pk, - partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) + if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. + if partition_key and not (partition_key == event_data._partition_key): # pylint: disable=protected-access + raise EventDataError('The partition_key does not match the one of the EventDataBatch') + wrapper_event_data = event_data + else: + if partition_key: + event_data = self._set_partition_key(event_data, partition_key) + wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self.unsent_events = [wrapper_event_data.message] self._send_event_data(timeout=timeout) - def close(self, **kwargs): + def close(self, exception=None): # type:(Exception) -> None """ Close down the handler. If the handler has already closed, @@ -228,5 +257,4 @@ def close(self, **kwargs): :caption: Close down the handler. """ - exception = kwargs.get("exception", None) super(EventHubProducer, self).close(exception) diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py index 0a1ef23e1dea..d0a0d0912868 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_auth_async.py @@ -31,7 +31,7 @@ async def test_client_secret_credential_async(aad_credential, live_eventhub): async with receiver: - received = await receiver.receive(timeout=1) + received = await receiver.receive(timeout=3) assert len(received) == 0 async with sender: @@ -40,7 +40,7 @@ async def test_client_secret_credential_async(aad_credential, live_eventhub): await asyncio.sleep(1) - received = await receiver.receive(timeout=1) + received = await receiver.receive(timeout=3) assert len(received) == 1 assert list(received[0].body)[0] == 'A single message'.encode('utf-8') diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py index f0666094618c..74c05d174e47 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_longrunning_receive_async.py @@ -60,7 +60,7 @@ async def pump(_pid, receiver, _args, _dl): try: async with receiver: while time.time() < deadline: - batch = await receiver.receive(timeout=1) + batch = await receiver.receive(timeout=3) size = len(batch) total += size iteration += 1 diff --git a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py index 27deb42c3aeb..3d43942fe6c8 100644 --- a/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py +++ b/sdk/eventhub/azure-eventhubs/tests/asynctests/test_negative_async.py @@ -29,16 +29,16 @@ async def test_send_with_invalid_hostname_async(invalid_hostname, connstr_receiv client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) sender = client.create_producer() with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_invalid_hostname_async(invalid_hostname): client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) - sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await sender._open() + await receiver.receive(timeout=3) @pytest.mark.liveTest @@ -48,16 +48,16 @@ async def test_send_with_invalid_key_async(invalid_key, connstr_receivers): client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) sender = client.create_producer() with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_invalid_key_async(invalid_key): client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) - sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await sender._open() + await receiver.receive(timeout=3) @pytest.mark.liveTest @@ -67,16 +67,16 @@ async def test_send_with_invalid_policy_async(invalid_policy, connstr_receivers) client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False) sender = client.create_producer() with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_with_invalid_policy_async(invalid_policy): client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False) - sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await sender._open() + await receiver.receive(timeout=3) @pytest.mark.liveTest @@ -88,7 +88,7 @@ async def test_send_partition_key_with_partition_async(connection_str): try: data = EventData(b"Data") with pytest.raises(ValueError): - await sender.send(data) + await sender.send(EventData("test data")) finally: await sender.close() @@ -99,7 +99,7 @@ async def test_non_existing_entity_sender_async(connection_str): client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False) sender = client.create_producer(partition_id="1") with pytest.raises(AuthenticationError): - await sender._open() + await sender.send(EventData("test data")) @pytest.mark.liveTest @@ -108,35 +108,31 @@ async def test_non_existing_entity_receiver_async(connection_str): client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - await receiver._open() + await receiver.receive(timeout=5) @pytest.mark.liveTest @pytest.mark.asyncio async def test_receive_from_invalid_partitions_async(connection_str): - partitions = ["XYZ", "-1", "1000", "-" ] + partitions = ["XYZ", "-1", "1000", "-"] for p in partitions: client = EventHubClient.from_connection_string(connection_str, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1")) - try: - with pytest.raises(ConnectError): - await receiver.receive(timeout=10) - finally: - await receiver.close() + with pytest.raises(ConnectError): + await receiver.receive(timeout=10) + await receiver.close() @pytest.mark.liveTest @pytest.mark.asyncio async def test_send_to_invalid_partitions_async(connection_str): - partitions = ["XYZ", "-1", "1000", "-" ] + partitions = ["XYZ", "-1", "1000", "-"] for p in partitions: client = EventHubClient.from_connection_string(connection_str, network_tracing=False) sender = client.create_producer(partition_id=p) - try: - with pytest.raises(ConnectError): - await sender._open() - finally: - await sender.close() + with pytest.raises(ConnectError): + await sender.send(EventData("test data")) + await sender.close() @pytest.mark.liveTest diff --git a/sdk/eventhub/azure-eventhubs/tests/test_auth.py b/sdk/eventhub/azure-eventhubs/tests/test_auth.py index d5871971a5b4..0f0c78794884 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_auth.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_auth.py @@ -26,7 +26,7 @@ def test_client_secret_credential(aad_credential, live_eventhub): receiver = client.create_consumer(consumer_group="$default", partition_id='0', event_position=EventPosition("@latest")) with receiver: - received = receiver.receive(timeout=1) + received = receiver.receive(timeout=3) assert len(received) == 0 with sender: @@ -34,7 +34,7 @@ def test_client_secret_credential(aad_credential, live_eventhub): sender.send(event) time.sleep(1) - received = receiver.receive(timeout=1) + received = receiver.receive(timeout=3) assert len(received) == 1 assert list(received[0].body)[0] == 'A single message'.encode('utf-8') diff --git a/sdk/eventhub/azure-eventhubs/tests/test_negative.py b/sdk/eventhub/azure-eventhubs/tests/test_negative.py index ac19a01f76c9..4749df940d9c 100644 --- a/sdk/eventhub/azure-eventhubs/tests/test_negative.py +++ b/sdk/eventhub/azure-eventhubs/tests/test_negative.py @@ -34,7 +34,8 @@ def test_receive_with_invalid_hostname_sync(invalid_hostname): client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - receiver.receive(timeout=3) + receiver.receive(timeout=5) + receiver.close() @pytest.mark.liveTest @@ -44,14 +45,16 @@ def test_send_with_invalid_key(invalid_key, connstr_receivers): sender = client.create_producer() with pytest.raises(AuthenticationError): sender.send(EventData("test data")) - + sender.close() @pytest.mark.liveTest def test_receive_with_invalid_key_sync(invalid_key): client = EventHubClient.from_connection_string(invalid_key, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + with pytest.raises(AuthenticationError): - receiver.receive(timeout=3) + receiver.receive(timeout=10) + receiver.close() @pytest.mark.liveTest @@ -61,6 +64,7 @@ def test_send_with_invalid_policy(invalid_policy, connstr_receivers): sender = client.create_producer() with pytest.raises(AuthenticationError): sender.send(EventData("test data")) + sender.close() @pytest.mark.liveTest @@ -68,7 +72,8 @@ def test_receive_with_invalid_policy_sync(invalid_policy): client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) with pytest.raises(AuthenticationError): - receiver.receive(timeout=3) + receiver.receive(timeout=5) + receiver.close() @pytest.mark.liveTest @@ -97,13 +102,16 @@ def test_non_existing_entity_sender(connection_str): def test_non_existing_entity_receiver(connection_str): client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1")) + with pytest.raises(AuthenticationError): - receiver.receive(timeout=3) + receiver.receive(timeout=5) + receiver.close() + @pytest.mark.liveTest def test_receive_from_invalid_partitions_sync(connection_str): - partitions = ["XYZ", "-1", "1000", "-" ] + partitions = ["XYZ", "-1", "1000", "-"] for p in partitions: client = EventHubClient.from_connection_string(connection_str, network_tracing=False) receiver = client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1")) @@ -116,7 +124,7 @@ def test_receive_from_invalid_partitions_sync(connection_str): @pytest.mark.liveTest def test_send_to_invalid_partitions(connection_str): - partitions = ["XYZ", "-1", "1000", "-" ] + partitions = ["XYZ", "-1", "1000", "-"] for p in partitions: client = EventHubClient.from_connection_string(connection_str, network_tracing=False) sender = client.create_producer(partition_id=p)