-
Notifications
You must be signed in to change notification settings - Fork 3.2k
create_batch feature implementation (#6256) #6324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
2d22a1a
11b3333
4f310ea
47df21b
dfd4e39
7e58a0c
fc69d42
e4be05e
e702c76
aa56698
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,12 +5,12 @@ | |
| import uuid | ||
| import asyncio | ||
| import logging | ||
| from typing import Iterator, Generator, List, Union | ||
| from typing import Iterable | ||
|
|
||
| from uamqp import constants, errors, compat | ||
| from uamqp import SendClientAsync | ||
|
|
||
| from azure.eventhub.common import EventData, _BatchSendEventData | ||
| from azure.eventhub.common import EventData, EventDataBatch | ||
| from azure.eventhub.error import EventHubError, ConnectError, \ | ||
| AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler | ||
|
|
||
|
|
@@ -20,9 +20,9 @@ | |
| class EventHubProducer(object): | ||
| """ | ||
| A producer responsible for transmitting EventData to a specific Event Hub, | ||
| grouped together in batches. Depending on the options specified at creation, the producer may | ||
| be created to allow event data to be automatically routed to an available partition or specific | ||
| to a partition. | ||
| grouped together in batches. Depending on the options specified at creation, the producer may | ||
| be created to allow event data to be automatically routed to an available partition or specific | ||
| to a partition. | ||
|
|
||
| """ | ||
|
|
||
|
|
@@ -31,7 +31,7 @@ def __init__( # pylint: disable=super-init-not-called | |
| keep_alive=None, auto_reconnect=True, loop=None): | ||
| """ | ||
| Instantiate an async EventHubProducer. EventHubProducer should be instantiated by calling the `create_producer` | ||
| method in EventHubClient. | ||
| method in EventHubClient. | ||
|
|
||
| :param client: The parent EventHubClientAsync. | ||
| :type client: ~azure.eventhub.aio.EventHubClientAsync | ||
|
|
@@ -52,6 +52,7 @@ def __init__( # pylint: disable=super-init-not-called | |
| :param loop: An event loop. If not specified the default event loop will be used. | ||
| """ | ||
| self.loop = loop or asyncio.get_event_loop() | ||
| self._max_message_size_on_link = None | ||
| self.running = False | ||
| self.client = client | ||
| self.target = target | ||
|
|
@@ -110,6 +111,10 @@ async def _open(self): | |
| await self._connect() | ||
| self.running = True | ||
|
|
||
| self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ | ||
| self._handler.message_handler._link.peer_max_message_size\ | ||
| else constants.MAX_MESSAGE_LENGTH_BYTES | ||
|
|
||
| async def _connect(self): | ||
| connected = await self._build_connection() | ||
| if not connected: | ||
|
|
@@ -301,8 +306,25 @@ def _set_partition_key(event_datas, partition_key): | |
| ed._set_partition_key(partition_key) | ||
| yield ed | ||
|
|
||
| async def create_batch(self, max_size=None, partition_key=None): | ||
|
||
| """ | ||
| Create an EventDataBatch object with max message size being max_message_size. | ||
| The max_message_size should be no greater than the max allowed message size defined by the service side. | ||
| :param max_message_size: | ||
| :param partition_key: | ||
| :return: | ||
| """ | ||
| if not self._max_message_size_on_link: | ||
| await self._open() | ||
|
|
||
| if max_size and max_size > self._max_message_size_on_link: | ||
| raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' | ||
| .format(max_size, self._max_message_size_on_link)) | ||
|
|
||
| return EventDataBatch(max_size or self._max_message_size_on_link, partition_key) | ||
|
|
||
| async def send(self, event_data, partition_key=None): | ||
| # type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None | ||
| # type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None | ||
| """ | ||
| Sends an event data and blocks until acknowledgement is | ||
| received or operation times out. | ||
|
|
@@ -329,13 +351,15 @@ async def send(self, event_data, partition_key=None): | |
| self._check_closed() | ||
| if isinstance(event_data, EventData): | ||
| if partition_key: | ||
| event_data._set_partition_key(partition_key) | ||
| event_data._set_partition_key(partition_key) # pylint: disable=protected-access | ||
| wrapper_event_data = event_data | ||
| else: | ||
| event_data_with_pk = self._set_partition_key(event_data, partition_key) | ||
| wrapper_event_data = _BatchSendEventData( | ||
| event_data_with_pk, | ||
| partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) | ||
| if isinstance(event_data, EventDataBatch): | ||
| wrapper_event_data = event_data | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this mean that in the case of EventDataBatch we ignore the passed in partition key?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, EventDataBatch wins. It also has partition_key.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case we should raise an error if a user tries to pass in the partition_key alongside a Batch object. Otherwise they might think the value they pass in being used when it's actually being ignored. |
||
| else: | ||
| if partition_key: | ||
| event_data = self._set_partition_key(event_data, partition_key) | ||
| wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access | ||
| wrapper_event_data.message.on_send_complete = self._on_outcome | ||
| self.unsent_events = [wrapper_event_data.message] | ||
| await self._send_event_data() | ||
|
|
@@ -373,4 +397,4 @@ async def close(self, exception=None): | |
| self.error = EventHubError(str(exception)) | ||
| else: | ||
| self.error = EventHubError("This send handler is now closed.") | ||
| await self._handler.close_async() | ||
| await self._handler.close_async() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,10 +8,15 @@ | |
| import calendar | ||
| import json | ||
| import six | ||
| import logging | ||
|
|
||
| from uamqp import BatchMessage, Message, types | ||
| from azure.eventhub.error import EventDataError | ||
| from uamqp import BatchMessage, Message, types, constants, errors | ||
| from uamqp.message import MessageHeader, MessageProperties | ||
|
|
||
| # event_data.encoded_size < 255, batch encode overhead is 5, >=256, overhead is 8 each | ||
| _BATCH_MESSAGE_OVERHEAD_COST = [5, 8] | ||
|
|
||
|
|
||
| def parse_sas_token(sas_token): | ||
| """Parse a SAS token into its components. | ||
|
|
@@ -244,10 +249,40 @@ def encode_message(self): | |
| return self.message.encode_message() | ||
|
|
||
|
|
||
| class _BatchSendEventData(EventData): | ||
| def __init__(self, batch_event_data, partition_key=None): | ||
| self.message = BatchMessage(data=batch_event_data, multi_messages=False, properties=None) | ||
| class EventDataBatch(object): | ||
| """ | ||
| The EventDataBatch class is a holder of a batch of event date within max message size bytes. | ||
| Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object. | ||
| Do not instantiate an EventDataBatch object directly. | ||
| """ | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
||
|
|
||
| def __init__(self, max_size=None, partition_key=None): | ||
| self.max_size = max_size if max_size else constants.MAX_MESSAGE_LENGTH_BYTES | ||
|
||
| self._partition_key = partition_key | ||
| self.message = BatchMessage(data=[], multi_messages=False, properties=None) | ||
|
|
||
| self._set_partition_key(partition_key) | ||
| self._size = self.message.gather()[0].get_message_encoded_size() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please double check the uAMQP code for
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It works in our case |
||
| self._count = 0 | ||
|
|
||
| def __len__(self): | ||
| return self._count | ||
|
|
||
| @property | ||
| def size(self): | ||
| """The size in bytes | ||
|
|
||
| :return: int | ||
| """ | ||
| return self._size | ||
|
|
||
| @staticmethod | ||
| def _from_batch(batch_data, partition_key=None): | ||
| batch_data_instance = EventDataBatch(partition_key=partition_key) | ||
| batch_data_instance.message._body_gen = batch_data | ||
| return batch_data_instance | ||
|
|
||
| def _set_partition_key(self, value): | ||
| if value: | ||
|
|
@@ -260,6 +295,38 @@ def _set_partition_key(self, value): | |
| self.message.annotations = annotations | ||
| self.message.header = header | ||
|
|
||
| def try_add(self, event_data): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is this function used?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The try_add function is part of the API and supposed to be used by customers not by us. |
||
| """ | ||
| The message size is a sum up of body, properties, header, etc. | ||
| :param event_data: | ||
| :return: | ||
| """ | ||
| if event_data is None: | ||
| self.log.warning("event_data is None when calling EventDataBatch.try_add. Ignored") | ||
|
||
| return | ||
| if not isinstance(event_data, EventData): | ||
| raise TypeError('event_data should be type of EventData') | ||
|
|
||
| if self._partition_key: | ||
| if event_data.partition_key and not (event_data.partition_key == self._partition_key): | ||
| raise EventDataError('The partition_key of event_data does not match the one of the EventDataBatch') | ||
| if not event_data.partition_key: | ||
| event_data._set_partition_key(self._partition_key) | ||
|
|
||
| event_data_size = event_data.message.get_message_encoded_size() | ||
|
|
||
| # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that | ||
| # message into the BatchMessage would be 5 bytes, if >= 256, it would be 8 bytes. | ||
| size_after_add = self._size + event_data_size\ | ||
| + _BATCH_MESSAGE_OVERHEAD_COST[0 if (event_data_size < 256) else 1] | ||
|
|
||
| if size_after_add > self.max_size: | ||
| raise ValueError("EventDataBatch has reached its size limit {}".format(self.max_size)) | ||
|
|
||
| self.message._body_gen.append(event_data) # pylint: disable=protected-access | ||
| self._size = size_after_add | ||
| self._count += 1 | ||
|
|
||
|
|
||
| class EventPosition(object): | ||
| """ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: This statement is a bit of mouthful - I think it would be cleaner just to do it with an "or" statement:
Also I think it will need a pylint disable for protected-access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion. Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You fixed it in the sync class but not the async one :)