Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Misc fixes from code review
  • Loading branch information
yijxie committed Jul 17, 2019
commit 11b33332452438287797c7fc7a8f7c8bb9fc02e7
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid
import asyncio
import logging
from typing import Iterator, Generator, List, Union
from typing import Iterable

from uamqp import constants, errors, compat
from uamqp import SendClientAsync
Expand Down Expand Up @@ -318,13 +318,13 @@ async def create_batch(self, max_message_size=None, partition_key=None):
await self._open()

if max_message_size and max_message_size > self._max_message_size_on_link:
raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
.format(max_message_size, self._max_message_size_on_link))

return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also be done with an "or" statement:
EventDataBatch(max_message_size or self._max_message_size....)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to use "or"


async def send(self, event_data, partition_key=None):
# type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None
# type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None
"""
Sends an event data and blocks until acknowledgement is
received or operation times out.
Expand Down
13 changes: 13 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,18 @@ def __init__(self, max_message_size=None, partition_key=None):

self._set_partition_key(partition_key)
self._size = self.message.gather()[0].get_message_encoded_size()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please double check the uAMQP code for .gather()? I have a suspicion that it's not a repeatable operation.... though I could be wrong....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works in our case

self._count = 0

def __len__(self):
return self._count

@property
def size(self):
"""The size in bytes

:return: int
"""
return self._size

@staticmethod
def _from_batch(batch_data, partition_key=None):
Expand Down Expand Up @@ -306,6 +318,7 @@ def try_add(self, event_data):

self.message._body_gen.append(event_data) # pylint: disable=protected-access
self._size = size_after_add
self._count += 1
return True


Expand Down
13 changes: 6 additions & 7 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import uuid
import logging
import time
from typing import Iterator, Generator, List, Union
from typing import Iterable, Union

from uamqp import constants, errors
from uamqp import compat
Expand Down Expand Up @@ -110,9 +110,8 @@ def _open(self):
self._connect()
self.running = True

self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\
self._handler.message_handler._link.peer_max_message_size\
else constants.MAX_MESSAGE_LENGTH_BYTES
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access

def _connect(self):
connected = self._build_connection()
Expand Down Expand Up @@ -315,13 +314,13 @@ def create_batch(self, max_message_size=None, partition_key=None):
self._open()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the behaviour for the async create_batch (to raise an error if the connection isn't open) we will need to do the same thing here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same to the above async one. We should open internally first to get the right peer link message size.


if max_message_size and max_message_size > self._max_message_size_on_link:
raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
.format(max_message_size, self._max_message_size_on_link))

return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key)
return EventDataBatch(max_message_size or self._max_message_size_on_link, partition_key)

def send(self, event_data, partition_key=None):
# type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None
# type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None
"""
Sends an event data and blocks until acknowledgement is
received or operation times out.
Expand Down