Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

__version__ = "5.0.0b1"

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
AuthenticationError, EventDataSendError, ConnectionLostError
from azure.eventhub.client import EventHubClient
Expand All @@ -18,6 +18,7 @@

__all__ = [
"EventData",
"EventDataBatch",
"EventHubError",
"ConnectError",
"ConnectionLostError",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _close_connection(self):
self.client._conn_manager.reset_connection_if_broken()

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
_handle_exception(exception, retry_count, max_retries, self, timeout_time)
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)

def close(self, exception=None):
# type:(Exception) -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ async def get_connection(self, host, auth):
async def close_connection(self):
pass

def reset_connection_if_broken(self):
async def reset_connection_if_broken(self):
pass


def get_connection_manager(**kwargs):
return _SharedConnectionManager(**kwargs)
return _SeparateConnectionManager(**kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def _close_connection(self):
await self.client._conn_manager.reset_connection_if_broken()

async def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
await _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__( # pylint: disable=super-init-not-called
self, client, source, **kwargs):
"""
Instantiate an async consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method
in EventHubClient.
in EventHubClient.

:param client: The parent EventHubClientAsync.
:type client: ~azure.eventhub.aio.EventHubClientAsync
Expand Down Expand Up @@ -193,7 +193,7 @@ async def receive(self, **kwargs):
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
timeout = self.client.config.receive_timeout if timeout is None else timeout
if not timeout:
timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout

data_batch = []
start_time = time.time()
Expand Down Expand Up @@ -226,7 +226,7 @@ async def receive(self, **kwargs):
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1

async def close(self, **kwargs):
async def close(self, exception=None):
# type: (Exception) -> None
"""
Close down the handler. If the handler has already closed,
Expand All @@ -246,7 +246,6 @@ async def close(self, **kwargs):
:caption: Close down the handler.

"""
exception = kwargs.get("exception", None)
self.running = False
if self.error:
return
Expand Down
56 changes: 42 additions & 14 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from uamqp import types, constants, errors
from uamqp import SendClientAsync

from azure.eventhub.common import EventData, _BatchSendEventData
from azure.eventhub.error import _error_handler, OperationTimeoutError
from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError
from ..producer import _error, _set_partition_key
from ._consumer_producer_mixin_async import ConsumerProducerMixin

Expand All @@ -34,7 +34,7 @@ def __init__( # pylint: disable=super-init-not-called
self, client, target, **kwargs):
"""
Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer`
method in EventHubClient.
method in EventHubClient.

:param client: The parent EventHubClientAsync.
:type client: ~azure.eventhub.aio.EventHubClientAsync
Expand Down Expand Up @@ -62,6 +62,7 @@ def __init__( # pylint: disable=super-init-not-called

super(EventHubProducer, self).__init__()
self.loop = loop or asyncio.get_event_loop()
self._max_message_size_on_link = None
self.running = False
self.client = client
self.target = target
Expand Down Expand Up @@ -110,9 +111,9 @@ async def _open(self, timeout_time=None):
await super(EventHubProducer, self)._open(timeout_time)

async def _send_event_data(self, timeout=None):
timeout = self.client.config.send_timeout if timeout is None else timeout
timeout = timeout or self.client.config.send_timeout
if not timeout:
timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
start_time = time.time()
timeout_time = start_time + timeout
max_retries = self.client.config.max_retries
Expand Down Expand Up @@ -155,16 +156,40 @@ def _on_outcome(self, outcome, condition):
self._outcome = outcome
self._condition = condition

async def create_batch(self, **kwargs):
"""
Create an EventDataBatch object with max size being max_size.
The max_size should be no greater than the max allowed message size defined by the service side.
:param max_size: The maximum size of bytes data that an EventDataBatch object can hold.
:type max_size: int
:param partition_key: With the given partition_key, event data will land to
a particular partition of the Event Hub decided by the service.
:type partition_key: str
:return: an EventDataBatch instance
:rtype: ~azure.eventhub.EventDataBatch
"""
max_size = kwargs.get("max_size", None)
partition_key = kwargs.get("partition_key", None)
if not self._max_message_size_on_link:
await self._open()

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
.format(max_size, self._max_message_size_on_link))

return EventDataBatch(max_size or self._max_message_size_on_link, partition_key)

async def send(self, event_data, **kwargs):
# type:(Union[EventData, Iterable[EventData]], Union[str, bytes]) -> None
# type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None
"""
Sends an event data and blocks until acknowledgement is
received or operation times out.

:param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects
:type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list
:param partition_key: With the given partition_key, event data will land to
a particular partition of the Event Hub decided by the service.
a particular partition of the Event Hub decided by the service. partition_key
could be omitted if event_data is of type ~azure.eventhub.EventDataBatch.
:type partition_key: str
:param timeout: The maximum wait time to send the event data.
If not specified, the default wait time specified when the producer was created will be used.
Expand All @@ -189,18 +214,22 @@ async def send(self, event_data, **kwargs):
self._check_closed()
if isinstance(event_data, EventData):
if partition_key:
event_data._set_partition_key(partition_key)
event_data._set_partition_key(partition_key) # pylint: disable=protected-access
wrapper_event_data = event_data
else:
event_data_with_pk = _set_partition_key(event_data, partition_key)
wrapper_event_data = _BatchSendEventData(
event_data_with_pk,
partition_key=partition_key) if partition_key else _BatchSendEventData(event_data)
if isinstance(event_data, EventDataBatch):
if partition_key and not (partition_key == event_data._partition_key): # pylint: disable=protected-access
raise EventDataError('The partition_key does not match the one of the EventDataBatch')
wrapper_event_data = event_data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that in the case of EventDataBatch we ignore the passed in partition key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, EventDataBatch wins. It also has partition_key.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we should raise an error if a user tries to pass in the partition_key alongside a Batch object. Otherwise they might think the value they pass in being used when it's actually being ignored.

else:
if partition_key:
event_data = _set_partition_key(event_data, partition_key)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
await self._send_event_data(timeout)

async def close(self, **kwargs):
async def close(self, exception=None):
# type: (Exception) -> None
"""
Close down the handler. If the handler has already closed,
Expand All @@ -220,5 +249,4 @@ async def close(self, **kwargs):
:caption: Close down the handler.

"""
exception = kwargs.get("exception", None)
await super(EventHubProducer, self).close(exception)
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ def from_connection_string(cls, conn_str, **kwargs):
host = address[left_slash_pos + 2:]
else:
host = address
kwargs.pop("event_hub_path", None)
return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs)
else:
return cls._from_iothub_connection_string(conn_str, **kwargs)
Expand Down
84 changes: 75 additions & 9 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@
import calendar
import json
import six
from enum import Enum
import logging

from uamqp import BatchMessage, Message, types
from azure.eventhub.error import EventDataError
from uamqp import BatchMessage, Message, types, constants, errors
from uamqp.message import MessageHeader, MessageProperties

log = logging.getLogger(__name__)

# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each
_BATCH_MESSAGE_OVERHEAD_COST = [5, 8]


def parse_sas_token(sas_token):
"""Parse a SAS token into its components.
Expand Down Expand Up @@ -51,7 +57,7 @@ class EventData(object):
PROP_TIMESTAMP = b"x-opt-enqueued-time"
PROP_DEVICE_ID = b"iothub-connection-device-id"

def __init__(self, **kwargs):
def __init__(self, body=None, **kwargs):
"""
Initialize EventData.

Expand All @@ -64,7 +70,6 @@ def __init__(self, **kwargs):
:param message: The received message.
:type message: ~uamqp.message.Message
"""
body = kwargs.get("body", None)
to_device = kwargs.get("to_device", None)
message = kwargs.get("message", None)

Expand Down Expand Up @@ -251,10 +256,40 @@ def encode_message(self):
return self.message.encode_message()


class _BatchSendEventData(EventData):
def __init__(self, batch_event_data, partition_key=None):
self.message = BatchMessage(data=batch_event_data, multi_messages=False, properties=None)
class EventDataBatch(object):
"""
The EventDataBatch class is a holder of a batch of event data within max size bytes.
Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object.
Do not instantiate an EventDataBatch object directly.
"""

def __init__(self, **kwargs):
max_size = kwargs.get("max_size", None)
partition_key = kwargs.get("partition_key", None)
self.max_size = max_size or constants.MAX_MESSAGE_LENGTH_BYTES
self._partition_key = partition_key
self.message = BatchMessage(data=[], multi_messages=False, properties=None)

self._set_partition_key(partition_key)
self._size = self.message.gather()[0].get_message_encoded_size()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please double check the uAMQP code for .gather()? I have a suspicion that it's not a repeatable operation.... though I could be wrong....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works in our case

self._count = 0

def __len__(self):
return self._count

@property
def size(self):
"""The size in bytes

:return: int
"""
return self._size

@staticmethod
def _from_batch(batch_data, partition_key=None):
batch_data_instance = EventDataBatch(partition_key=partition_key)
batch_data_instance.message._body_gen = batch_data
return batch_data_instance

def _set_partition_key(self, value):
if value:
Expand All @@ -267,6 +302,38 @@ def _set_partition_key(self, value):
self.message.annotations = annotations
self.message.header = header

def try_add(self, event_data):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this function used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try_add function is part of the API and supposed to be used by customers not by us.

"""
The message size is a sum up of body, properties, header, etc.
:param event_data:
:return:
"""
if event_data is None:
log.warning("event_data is None when calling EventDataBatch.try_add. Ignored")
return
if not isinstance(event_data, EventData):
raise TypeError('event_data should be type of EventData')

if self._partition_key:
if event_data.partition_key and not (event_data.partition_key == self._partition_key):
raise EventDataError('The partition_key of event_data does not match the one of the EventDataBatch')
if not event_data.partition_key:
event_data._set_partition_key(self._partition_key)

event_data_size = event_data.message.get_message_encoded_size()

# For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that
# message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes.
size_after_add = self._size + event_data_size\
+ _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1]

if size_after_add > self.max_size:
raise ValueError("EventDataBatch has reached its size limit {}".format(self.max_size))

self.message._body_gen.append(event_data) # pylint: disable=protected-access
self._size = size_after_add
self._count += 1


class EventPosition(object):
"""
Expand All @@ -286,7 +353,7 @@ class EventPosition(object):
>>> event_pos = EventPosition(1506968696002)
"""

def __init__(self, value, **kwargs):
def __init__(self, value, inclusive=False):
"""
Initialize EventPosition.

Expand All @@ -295,7 +362,6 @@ def __init__(self, value, **kwargs):
:param inclusive: Whether to include the supplied value as the start point.
:type inclusive: bool
"""
inclusive = kwargs.get("inclusive", False)
self.value = value if value is not None else "-1"
self.inclusive = inclusive

Expand Down
7 changes: 3 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class EventHubConsumer(ConsumerProducerMixin):
def __init__(self, client, source, **kwargs):
"""
Instantiate a consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method
in EventHubClient.
in EventHubClient.

:param client: The parent EventHubClient.
:type client: ~azure.eventhub.client.EventHubClient
Expand Down Expand Up @@ -185,7 +185,7 @@ def receive(self, **kwargs):
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
timeout = self.client.config.receive_timeout if timeout is None else timeout
if not timeout:
timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout

data_batch = [] # type: List[EventData]
start_time = time.time()
Expand Down Expand Up @@ -217,7 +217,7 @@ def receive(self, **kwargs):
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1

def close(self, **kwargs):
def close(self, exception=None):
# type:(Exception) -> None
"""
Close down the handler. If the handler has already closed,
Expand All @@ -237,7 +237,6 @@ def close(self, **kwargs):
:caption: Close down the handler.

"""
exception = kwargs.get("exception", None)
if self.messages_iter:
self.messages_iter.close()
self.messages_iter = None
Expand Down
3 changes: 1 addition & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ class EventHubError(Exception):
:vartype details: dict[str, str]
"""

def __init__(self, message, **kwargs):
details = kwargs.get("details", None)
def __init__(self, message, details=None):
self.error = None
self.message = message
self.details = details
Expand Down
Loading