Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

__version__ = "5.0.0b1"

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
AuthenticationError, EventDataSendError, ConnectionLostError
from azure.eventhub.client import EventHubClient
Expand All @@ -18,6 +18,7 @@

__all__ = [
"EventData",
"EventDataBatch",
"EventHubError",
"ConnectError",
"ConnectionLostError",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
class EventHubConsumer(object):
"""
A consumer responsible for reading EventData from a specific Event Hub
partition and as a member of a specific consumer group.
partition and as a member of a specific consumer group.

A consumer may be exclusive, which asserts ownership over the partition for the consumer
group to ensure that only one consumer from that group is reading the from the partition.
These exclusive consumers are sometimes referred to as "Epoch Consumers."
group to ensure that only one consumer from that group is reading the from the partition.
These exclusive consumers are sometimes referred to as "Epoch Consumers."

A consumer may also be non-exclusive, allowing multiple consumers from the same consumer
group to be actively reading events from the partition. These non-exclusive consumers are
sometimes referred to as "Non-Epoch Consumers."
group to be actively reading events from the partition. These non-exclusive consumers are
sometimes referred to as "Non-Epoch Consumers."

"""
timeout = 0
Expand All @@ -38,7 +38,7 @@ def __init__( # pylint: disable=super-init-not-called
keep_alive=None, auto_reconnect=True, loop=None):
"""
Instantiate an async consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method
in EventHubClient.
in EventHubClient.

:param client: The parent EventHubClientAsync.
:type client: ~azure.eventhub.aio.EventHubClientAsync
Expand Down
46 changes: 35 additions & 11 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from uamqp import constants, errors, compat
from uamqp import SendClientAsync

from azure.eventhub.common import EventData, _BatchSendEventData
from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import EventHubError, ConnectError, \
AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler

Expand All @@ -20,9 +20,9 @@
class EventHubProducer(object):
"""
A producer responsible for transmitting EventData to a specific Event Hub,
grouped together in batches. Depending on the options specified at creation, the producer may
be created to allow event data to be automatically routed to an available partition or specific
to a partition.
grouped together in batches. Depending on the options specified at creation, the producer may
be created to allow event data to be automatically routed to an available partition or specific
to a partition.

"""

Expand All @@ -31,7 +31,7 @@ def __init__( # pylint: disable=super-init-not-called
keep_alive=None, auto_reconnect=True, loop=None):
"""
Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer`
method in EventHubClient.
method in EventHubClient.

:param client: The parent EventHubClientAsync.
:type client: ~azure.eventhub.aio.EventHubClientAsync
Expand All @@ -52,6 +52,7 @@ def __init__( # pylint: disable=super-init-not-called
:param loop: An event loop. If not specified the default event loop will be used.
"""
self.loop = loop or asyncio.get_event_loop()
self._max_message_size_on_link = None
self.running = False
self.client = client
self.target = target
Expand Down Expand Up @@ -110,6 +111,10 @@ async def _open(self):
await self._connect()
self.running = True

self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\
self._handler.message_handler._link.peer_max_message_size\
else constants.MAX_MESSAGE_LENGTH_BYTES

async def _connect(self):
connected = await self._build_connection()
if not connected:
Expand Down Expand Up @@ -301,6 +306,23 @@ def _set_partition_key(event_datas, partition_key):
ed._set_partition_key(partition_key)
yield ed

async def create_batch(self, max_message_size=None, partition_key=None):
"""
Create an EventDataBatch object with max message size being max_message_size.
The max_message_size should be no greater than the max allowed message size defined by the service side.
:param max_message_size:
:param partition_key:
:return:
"""
if not self._max_message_size_on_link:
await self._open()

if max_message_size and max_message_size > self._max_message_size_on_link:
raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
.format(max_message_size, self._max_message_size_on_link))

return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key)

async def send(self, event_data, partition_key=None):
# type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None
"""
Expand Down Expand Up @@ -329,13 +351,15 @@ async def send(self, event_data, partition_key=None):
self._check_closed()
if isinstance(event_data, EventData):
if partition_key:
event_data._set_partition_key(partition_key)
event_data._set_partition_key(partition_key) # pylint: disable=protected-access
wrapper_event_data = event_data
else:
event_data_with_pk = self._set_partition_key(event_data, partition_key)
wrapper_event_data = _BatchSendEventData(
event_data_with_pk,
partition_key=partition_key) if partition_key else _BatchSendEventData(event_data)
if isinstance(event_data, EventDataBatch):
wrapper_event_data = event_data
else:
if partition_key:
event_data = self._set_partition_key(event_data, partition_key)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
await self._send_event_data()
Expand Down Expand Up @@ -373,4 +397,4 @@ async def close(self, exception=None):
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("This send handler is now closed.")
await self._handler.close_async()
await self._handler.close_async()
56 changes: 52 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
import json
import six

from uamqp import BatchMessage, Message, types
from azure.eventhub.error import EventDataError
from uamqp import BatchMessage, Message, types, constants, errors
from uamqp.message import MessageHeader, MessageProperties

# event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each
_BATCH_MESSAGE_OVERHEAD_COST = [5, 8]


def parse_sas_token(sas_token):
"""Parse a SAS token into its components.
Expand Down Expand Up @@ -244,10 +248,25 @@ def encode_message(self):
return self.message.encode_message()


class _BatchSendEventData(EventData):
def __init__(self, batch_event_data, partition_key=None):
self.message = BatchMessage(data=batch_event_data, multi_messages=False, properties=None)
class EventDataBatch(object):
"""
The EventDataBatch class is a holder of a batch of event date within max message size bytes.
Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object.
Do not instantiate an EventDataBatch object directly.
"""
def __init__(self, max_message_size=None, partition_key=None):
self.max_message_size = max_message_size if max_message_size else constants.MAX_MESSAGE_LENGTH_BYTES
self._partition_key = partition_key
self.message = BatchMessage(data=[], multi_messages=False, properties=None)

self._set_partition_key(partition_key)
self._size = self.message.gather()[0].get_message_encoded_size()

@staticmethod
def _from_batch(batch_data, partition_key=None):
batch_data_instance = EventDataBatch(partition_key=partition_key)
batch_data_instance.message._body_gen = batch_data
return batch_data_instance

def _set_partition_key(self, value):
if value:
Expand All @@ -260,6 +279,35 @@ def _set_partition_key(self, value):
self.message.annotations = annotations
self.message.header = header

def try_add(self, event_data):
"""
The message size is a sum up of body, properties, header, etc.
:param event_data:
:return:
"""
if not isinstance(event_data, EventData):
raise EventDataError('event_data should be type of EventData')

if self._partition_key:
if event_data.partition_key and not (event_data.partition_key == self._partition_key):
raise EventDataError('The partition_key of event_data does not match the one of the EventDataBatch')
if not event_data.partition_key:
event_data._set_partition_key(self._partition_key)

event_data_size = event_data.message.get_message_encoded_size()

# For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that
# message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes.
size_after_add = self._size + event_data_size\
+ _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1]

if size_after_add > self.max_message_size:
return False

self.message._body_gen.append(event_data) # pylint: disable=protected-access
self._size = size_after_add
return True


class EventPosition(object):
"""
Expand Down
12 changes: 6 additions & 6 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
class EventHubConsumer(object):
"""
A consumer responsible for reading EventData from a specific Event Hub
partition and as a member of a specific consumer group.
partition and as a member of a specific consumer group.

A consumer may be exclusive, which asserts ownership over the partition for the consumer
group to ensure that only one consumer from that group is reading the from the partition.
These exclusive consumers are sometimes referred to as "Epoch Consumers."
group to ensure that only one consumer from that group is reading the from the partition.
These exclusive consumers are sometimes referred to as "Epoch Consumers."

A consumer may also be non-exclusive, allowing multiple consumers from the same consumer
group to be actively reading events from the partition. These non-exclusive consumers are
sometimes referred to as "Non-Epoch Consumers."
group to be actively reading events from the partition. These non-exclusive consumers are
sometimes referred to as "Non-Epoch Consumers."

"""
timeout = 0
Expand All @@ -41,7 +41,7 @@ def __init__(self, client, source, event_position=None, prefetch=300, owner_leve
keep_alive=None, auto_reconnect=True):
"""
Instantiate a consumer. EventHubConsumer should be instantiated by calling the `create_consumer` method
in EventHubClient.
in EventHubClient.

:param client: The parent EventHubClient.
:type client: ~azure.eventhub.client.EventHubClient
Expand Down
47 changes: 36 additions & 11 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from uamqp import compat
from uamqp import SendClient

from azure.eventhub.common import EventData, _BatchSendEventData
from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import EventHubError, ConnectError, \
AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler

Expand All @@ -23,16 +23,16 @@
class EventHubProducer(object):
"""
A producer responsible for transmitting EventData to a specific Event Hub,
grouped together in batches. Depending on the options specified at creation, the producer may
be created to allow event data to be automatically routed to an available partition or specific
to a partition.
grouped together in batches. Depending on the options specified at creation, the producer may
be created to allow event data to be automatically routed to an available partition or specific
to a partition.

"""

def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True):
"""
Instantiate an EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` method
in EventHubClient.
in EventHubClient.

:param client: The parent EventHubClient.
:type client: ~azure.eventhub.client.EventHubClient.
Expand All @@ -51,6 +51,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N
Default value is `True`.
:type auto_reconnect: bool
"""
self._max_message_size_on_link = None
self.running = False
self.client = client
self.target = target
Expand Down Expand Up @@ -109,6 +110,10 @@ def _open(self):
self._connect()
self.running = True

self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\
self._handler.message_handler._link.peer_max_message_size\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstances there is no peer_max_message_size? When the link is not open?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about it. It seems that the peer_max_message_size should always be returned back from service side as it's part of the link negotiation in amqp protocol.
This piece of code at least gives a local default value.

else constants.MAX_MESSAGE_LENGTH_BYTES

def _connect(self):
connected = self._build_connection()
if not connected:
Expand Down Expand Up @@ -298,6 +303,23 @@ def _error(outcome, condition):
if outcome != constants.MessageSendResult.Ok:
raise condition

def create_batch(self, max_message_size=None, partition_key=None):
"""
Create an EventDataBatch object with max message size being max_message_size.
The max_message_size should be no greater than the max allowed message size defined by the service side.
:param max_message_size:
:param partition_key:
:return:
"""
if not self._max_message_size_on_link:
self._open()

if max_message_size and max_message_size > self._max_message_size_on_link:
raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
.format(max_message_size, self._max_message_size_on_link))

return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key)

def send(self, event_data, partition_key=None):
# type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None
"""
Expand All @@ -307,7 +329,8 @@ def send(self, event_data, partition_key=None):
:param event_data: The event to be sent. It can be an EventData object, or iterable of EventData objects
:type event_data: ~azure.eventhub.common.EventData, Iterator, Generator, list
:param partition_key: With the given partition_key, event data will land to
a particular partition of the Event Hub decided by the service.
a particular partition of the Event Hub decided by the service. partition_key
will be omitted if event_data is of type ~azure.eventhub.EventDataBatch.
:type partition_key: str
:raises: ~azure.eventhub.AuthenticationError, ~azure.eventhub.ConnectError, ~azure.eventhub.ConnectionLostError,
~azure.eventhub.EventDataError, ~azure.eventhub.EventDataSendError, ~azure.eventhub.EventHubError
Expand All @@ -327,13 +350,15 @@ def send(self, event_data, partition_key=None):
self._check_closed()
if isinstance(event_data, EventData):
if partition_key:
event_data._set_partition_key(partition_key)
event_data._set_partition_key(partition_key) # pylint: disable=protected-access
wrapper_event_data = event_data
else:
event_data_with_pk = self._set_partition_key(event_data, partition_key)
wrapper_event_data = _BatchSendEventData(
event_data_with_pk,
partition_key=partition_key) if partition_key else _BatchSendEventData(event_data)
if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted.
wrapper_event_data = event_data
else:
if partition_key:
event_data = self._set_partition_key(event_data, partition_key)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
self._send_event_data()
Expand Down