Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refacor EventDataBatch class
  • Loading branch information
yunhaoling committed Jul 10, 2019
commit 9779be04e139be8e3be2753fcb30651f74ab1a5d
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from uamqp import constants, errors, compat
from uamqp import SendClientAsync

from azure.eventhub.common import EventData, EventDataBatch, _BatchSendEventData
from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import EventHubError, ConnectError, \
AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler

Expand Down Expand Up @@ -357,10 +357,8 @@ async def send(self, event_data, partition_key=None):
wrapper_event_data = event_data
else:
if partition_key:
event_data_with_pk = self._set_partition_key(event_data, partition_key)
wrapper_event_data = _BatchSendEventData(event_data_with_pk, partition_key=partition_key)
else:
wrapper_event_data = _BatchSendEventData(event_data)
event_data = self._set_partition_key(event_data, partition_key)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
await self._send_event_data()
Expand Down Expand Up @@ -398,4 +396,4 @@ async def close(self, exception=None):
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("This send handler is now closed.")
await self._handler.close_async()
await self._handler.close_async()
36 changes: 18 additions & 18 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,25 @@ def encode_message(self):
return self.message.encode_message()


class _BatchSendEventData(EventData):
def __init__(self, batch_event_data, partition_key=None):
self.message = BatchMessage(data=batch_event_data, multi_messages=False, properties=None)
class EventDataBatch(object):
"""
The EventDataBatch class is a holder of a batch of event date within max message size bytes.
Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object.
Do not instantiate an EventDataBatch object directly.
"""
def __init__(self, max_message_size=None, partition_key=None):
self.max_message_size = max_message_size if max_message_size else constants.MAX_MESSAGE_LENGTH_BYTES
self._partition_key = partition_key
self.message = BatchMessage(data=[], multi_messages=False, properties=None)

self._set_partition_key(partition_key)
self._size = self.message.gather()[0].get_message_encoded_size()

@staticmethod
def _from_batch(batch_data, partition_key=None):
batch_data_instance = EventDataBatch(partition_key=partition_key)
batch_data_instance.message._body_gen = batch_data
return batch_data_instance

def _set_partition_key(self, value):
if value:
Expand All @@ -264,21 +279,6 @@ def _set_partition_key(self, value):
self.message.annotations = annotations
self.message.header = header


class EventDataBatch(_BatchSendEventData):
"""
The EventDataBatch class is a holder of a batch of event date within max message size bytes.
Use ~azure.eventhub.Producer.create_batch method to create an EventDataBatch object.
Do not instantiate an EventDataBatch object directly.
"""
def __init__(self, max_message_size, partition_key=None):
self.max_message_size = max_message_size
self._partition_key = partition_key
self.message = BatchMessage(data=[], multi_messages=False, properties=None)

self._set_partition_key(partition_key)
self._size = self.message.gather()[0].get_message_encoded_size()

def try_add(self, event_data):
"""
The message size is a sum up of body, properties, header, etc.
Expand Down
12 changes: 4 additions & 8 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
from uamqp import compat
from uamqp import SendClient

from azure.eventhub.common import EventData, _BatchSendEventData, EventDataBatch
from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import EventHubError, ConnectError, \
AuthenticationError, EventDataError, EventDataSendError, ConnectionLostError, _error_handler

log = logging.getLogger(__name__)




class EventHubProducer(object):
"""
A producer responsible for transmitting EventData to a specific Event Hub,
Expand Down Expand Up @@ -354,14 +352,12 @@ def send(self, event_data, partition_key=None):
event_data._set_partition_key(partition_key) # pylint: disable=protected-access
wrapper_event_data = event_data
else:
if isinstance(event_data, EventDataBatch):
if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted.
wrapper_event_data = event_data
else:
if partition_key:
event_data_with_pk = self._set_partition_key(event_data, partition_key)
wrapper_event_data = _BatchSendEventData(event_data_with_pk, partition_key=partition_key)
else:
wrapper_event_data = _BatchSendEventData(event_data)
event_data = self._set_partition_key(event_data, partition_key)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
self._send_event_data()
Expand Down