Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update code according to the review (#6623)
* Wait longer for reconnect op

* Raise authentication error when open timeout

* Optimize retry decorator

* Update code according to review

* Small fix
  • Loading branch information
yunhaoling authored Aug 2, 2019
commit 5ad02559779ab23b8d9f94cba7fe569dbad2aac8
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,27 @@
import logging
import time

from uamqp import errors, constants
from uamqp import errors, constants, compat
from azure.eventhub.error import EventHubError, _handle_exception

log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(*args, **kwargs):
def wrapped_func(self, *args, **kwargs):
timeout = kwargs.get("timeout", None)
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout_time = time.time() + timeout
max_retries = args[0].client.config.max_retries
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
kwargs.pop("timeout", None)
while True:
try:
return to_be_wrapped_func(args[0], timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return to_be_wrapped_func(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = args[0]._handle_exception(exception, retry_count, max_retries, timeout_time)
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
return wrapped_func

Expand Down Expand Up @@ -92,6 +93,10 @@ def _close_connection(self):
self.client._conn_manager.reset_connection_if_broken()

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)

def close(self, exception=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,28 @@
import logging
import time

from uamqp import errors, constants
from uamqp import errors, constants, compat
from azure.eventhub.error import EventHubError, ConnectError
from ..aio.error_async import _handle_exception

log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
async def wrapped_func(*args, **kwargs):
async def wrapped_func(self, *args, **kwargs):
timeout = kwargs.get("timeout", None)
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout_time = time.time() + timeout
max_retries = args[0].client.config.max_retries
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
kwargs.pop("timeout", None)
while True:
try:
return await to_be_wrapped_func(args[0], timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return await to_be_wrapped_func(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = await args[0]._handle_exception(exception, retry_count, max_retries, timeout_time)
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
return wrapped_func

Expand Down Expand Up @@ -93,6 +94,10 @@ async def _close_connection(self):
await self.client._conn_manager.reset_connection_if_broken()

async def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)

return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)

async def close(self, exception=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async def get_partition_properties(self, partition):
return output

def create_consumer(self, consumer_group, partition_id, event_position, **kwargs):
# type: (str, str, EventPosition, int, str, int, asyncio.AbstractEventLoop) -> EventHubConsumer
# type: (str, str, EventPosition) -> EventHubConsumer
"""
Create an async consumer to the client for a particular consumer group and partition.

Expand Down Expand Up @@ -236,7 +236,7 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs
prefetch=prefetch, loop=loop)
return handler

def create_producer(self, partition_id=None, operation=None, send_timeout=None, loop=None):
def create_producer(self, *, partition_id=None, operation=None, send_timeout=None, loop=None):
# type: (str, str, float, asyncio.AbstractEventLoop) -> EventHubProducer
"""
Create an async producer to send EventData object to an EventHub.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async def __anext__(self):
if not self.messages_iter:
self.messages_iter = self._handler.receive_messages_iter_async()
message = await self.messages_iter.__anext__()
event_data = EventData(message=message)
event_data = EventData._from_message(message)
self.offset = EventPosition(event_data.offset, inclusive=False)
retry_count = 0
return event_data
Expand Down Expand Up @@ -147,7 +147,6 @@ async def _open(self, timeout_time=None):
self.source = self.redirected.address
await super(EventHubConsumer, self)._open(timeout_time)

@_retry_decorator
async def _receive(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
last_exception = kwargs.get("last_exception")
Expand All @@ -167,7 +166,7 @@ async def _receive(self, **kwargs):
max_batch_size=max_batch_size,
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData(message=message)
event_data = EventData._from_message(message)
self.offset = EventPosition(event_data.offset)
data_batch.append(event_data)
return data_batch
Expand All @@ -185,7 +184,7 @@ def queue_size(self):
return self._handler._received_messages.qsize()
return 0

async def receive(self, max_batch_size=None, timeout=None):
async def receive(self, *, max_batch_size=None, timeout=None):
# type: (int, float) -> List[EventData]
"""
Receive events asynchronously from the EventHub.
Expand Down Expand Up @@ -218,7 +217,8 @@ async def receive(self, max_batch_size=None, timeout=None):
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return await self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
return await _retry_decorator(self._receive)(self, timeout=timeout,
max_batch_size=max_batch_size, data_batch=data_batch)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
16 changes: 4 additions & 12 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ async def _open(self, timeout_time=None, **kwargs):
self.target = self.redirected.address
await super(EventHubProducer, self)._open(timeout_time)

@_retry_decorator
async def _send_event_data(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
last_exception = kwargs.get("last_exception")

async def _send_event_data(self, timeout_time=None, last_exception=None):
if self.unsent_events:
await self._open(timeout_time)
remaining_time = timeout_time - time.time()
Expand Down Expand Up @@ -161,20 +157,16 @@ async def create_batch(self, max_size=None, partition_key=None):
:rtype: ~azure.eventhub.EventDataBatch
"""

@_retry_decorator
async def _wrapped_open(*args, **kwargs):
await self._open(**kwargs)

if not self._max_message_size_on_link:
await _wrapped_open(self, timeout=self.client.config.send_timeout)
await _retry_decorator(self._open)(self, timeout=self.client.config.send_timeout)

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
.format(max_size, self._max_message_size_on_link))

return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key)

async def send(self, event_data, partition_key=None, timeout=None):
async def send(self, event_data, *, partition_key=None, timeout=None):
# type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes], float) -> None
"""
Sends an event data and blocks until acknowledgement is
Expand Down Expand Up @@ -220,7 +212,7 @@ async def send(self, event_data, partition_key=None, timeout=None):
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
await self._send_event_data(timeout)
await _retry_decorator(self._send_event_data)(self, timeout=timeout)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def create_consumer(self, consumer_group, partition_id, event_position, **kwargs
return handler

def create_producer(self, partition_id=None, operation=None, send_timeout=None):
# type: (str, str, float) -> EventHubProducer
# type: (str, str, float, ...) -> EventHubProducer
"""
Create an producer to send EventData object to an EventHub.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,9 @@ def from_connection_string(cls, conn_str, **kwargs):
return cls._from_iothub_connection_string(conn_str, **kwargs)

@abstractmethod
def create_consumer(
self, consumer_group, partition_id, event_position, **kwargs
):
def create_consumer(self, consumer_group, partition_id, event_position, **kwargs):
pass

@abstractmethod
def create_producer(self, **kwargs):
def create_producer(self, partition_id=None, operation=None, send_timeout=None):
pass
33 changes: 17 additions & 16 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class EventData(object):
PROP_TIMESTAMP = b"x-opt-enqueued-time"
PROP_DEVICE_ID = b"iothub-connection-device-id"

def __init__(self, body=None, to_device=None, message=None):
def __init__(self, body=None, to_device=None):
"""
Initialize EventData.

Expand All @@ -67,8 +67,6 @@ def __init__(self, body=None, to_device=None, message=None):
:type batch: Generator
:param to_device: An IoT device to route to.
:type to_device: str
:param message: The received message.
:type message: ~uamqp.message.Message
"""

self._partition_key = types.AMQPSymbol(EventData.PROP_PARTITION_KEY)
Expand All @@ -77,20 +75,14 @@ def __init__(self, body=None, to_device=None, message=None):
self.msg_properties = MessageProperties()
if to_device:
self.msg_properties.to = '/devices/{}/messages/devicebound'.format(to_device)
if message:
self.message = message
self.msg_properties = message.properties
self._annotations = message.annotations
self._app_properties = message.application_properties
if body and isinstance(body, list):
self.message = Message(body[0], properties=self.msg_properties)
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("EventData cannot be None.")
else:
if body and isinstance(body, list):
self.message = Message(body[0], properties=self.msg_properties)
for more in body[1:]:
self.message._body.append(more) # pylint: disable=protected-access
elif body is None:
raise ValueError("EventData cannot be None.")
else:
self.message = Message(body, properties=self.msg_properties)
self.message = Message(body, properties=self.msg_properties)

def __str__(self):
dic = {
Expand Down Expand Up @@ -125,6 +117,15 @@ def _set_partition_key(self, value):
self.message.header = header
self._annotations = annotations

@staticmethod
def _from_message(message):
event_data = EventData(body='')
event_data.message = message
event_data.msg_properties = message.properties
event_data._annotations = message.annotations
event_data._app_properties = message.application_properties
return event_data

@property
def sequence_number(self):
"""
Expand Down
7 changes: 3 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __next__(self):
if not self.messages_iter:
self.messages_iter = self._handler.receive_messages_iter()
message = next(self.messages_iter)
event_data = EventData(message=message)
event_data = EventData._from_message(message)
self.offset = EventPosition(event_data.offset, inclusive=False)
retry_count = 0
return event_data
Expand Down Expand Up @@ -142,7 +142,6 @@ def _open(self, timeout_time=None):
self.source = self.redirected.address
super(EventHubConsumer, self)._open(timeout_time)

@_retry_decorator
def _receive(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
last_exception = kwargs.get("last_exception")
Expand All @@ -161,7 +160,7 @@ def _receive(self, **kwargs):
max_batch_size=max_batch_size - (len(data_batch) if data_batch else 0),
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData(message=message)
event_data = EventData._from_message(message)
self.offset = EventPosition(event_data.offset)
data_batch.append(event_data)
return data_batch
Expand Down Expand Up @@ -211,7 +210,7 @@ def receive(self, max_batch_size=None, timeout=None):
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
return _retry_decorator(self._receive)(self, timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)

def close(self, exception=None):
# type:(Exception) -> None
Expand Down
14 changes: 3 additions & 11 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,7 @@ def _open(self, timeout_time=None, **kwargs):
self.target = self.redirected.address
super(EventHubProducer, self)._open(timeout_time)

@_retry_decorator
def _send_event_data(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
last_exception = kwargs.get("last_exception")

def _send_event_data(self, timeout_time=None, last_exception=None):
if self.unsent_events:
self._open(timeout_time)
remaining_time = timeout_time - time.time()
Expand Down Expand Up @@ -168,12 +164,8 @@ def create_batch(self, max_size=None, partition_key=None):
:rtype: ~azure.eventhub.EventDataBatch
"""

@_retry_decorator
def _wrapped_open(*args, **kwargs):
self._open(**kwargs)

if not self._max_message_size_on_link:
_wrapped_open(self, timeout=self.client.config.send_timeout)
_retry_decorator(self._open)(self, timeout=self.client.config.send_timeout)

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
Expand Down Expand Up @@ -227,7 +219,7 @@ def send(self, event_data, partition_key=None, timeout=None):
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
self._send_event_data(timeout=timeout)
_retry_decorator(self._send_event_data)(self, timeout=timeout)

def close(self, exception=None):
# type:(Exception) -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def test_send_with_long_interval_async(connstr_receivers, sleep):
for r in receivers:
if not sleep: # if sender sleeps, the receivers will be disconnected. destroy connection to simulate
r._handler._connection._conn.destroy()
received.extend(r.receive(timeout=3))
received.extend(r.receive(timeout=5))
assert len(received) == 2
assert list(received[0].body)[0] == b"A single event"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ async def test_send_with_create_event_batch_async(connstr_receivers):
client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False)
sender = client.create_producer()

event_data_batch = await sender.create_batch(max_size=100 * 1024)
event_data_batch = await sender.create_batch(max_size=100000)
while True:
try:
event_data_batch.try_add(EventData('A single event data'))
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/tests/test_reconnect.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_send_with_long_interval_sync(connstr_receivers, sleep):
for r in receivers:
if not sleep:
r._handler._connection._conn.destroy()
received.extend(r.receive(timeout=3))
received.extend(r.receive(timeout=5))

assert len(received) == 2
assert list(received[0].body)[0] == b"A single event"
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/tests/test_send.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def test_send_with_create_event_batch_sync(connstr_receivers):
client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket, network_tracing=False)
sender = client.create_producer()

event_data_batch = sender.create_batch(max_size=100 * 1024)
event_data_batch = sender.create_batch(max_size=100000)
while True:
try:
event_data_batch.try_add(EventData('A single event data'))
Expand Down