Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Release History

## 5.0.0b2 (2019-08-06)

**New features**

- Added ability to create and send EventDataBatch object with limited data size.
- Added new configuration parameters for exponential delay among each retry operation.
- `retry_total`: The total number of attempts to redo the failed operation.
- `backoff_factor`: The delay time factor.
- `backoff_max`: The maximum delay time in total.

**Breaking changes**

- New `EventProcessor` design
- The `EventProcessorHost` was waived.

## 5.0.0b1 (2019-06-25)

Version 5.0.0b1 is a preview of our efforts to create a client library that is user friendly and idiomatic to the Python ecosystem. The reasons for most of the changes in this update can be found in the [Azure SDK Design Guidelines for Python](https://azuresdkspecs.z5.web.core.windows.net/PythonSpec.html). For more information, please visit https://aka.ms/azure-sdk-preview1-python.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,30 @@
import logging
import time

from uamqp import errors
from uamqp import errors, constants, compat
from azure.eventhub.error import EventHubError, _handle_exception

log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs):
timeout = kwargs.pop("timeout", None)
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
Expand All @@ -27,7 +45,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name))
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))

def _create_handler(self):
pass
Expand All @@ -46,6 +64,8 @@ def _open(self, timeout_time=None):
"""
# pylint: disable=protected-access
if not self.running:
if self._handler:
self._handler.close()
if self.redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
Expand All @@ -58,9 +78,9 @@ def _open(self, timeout_time=None):
self.client.get_auth(**alt_creds)
))
while not self._handler.client_ready():
if timeout_time and time.time() >= timeout_time:
return
time.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True

def _close_handler(self):
Expand All @@ -72,6 +92,10 @@ def _close_connection(self):
self.client._conn_manager.reset_connection_if_broken()

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)

def close(self, exception=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,31 @@
import logging
import time

from uamqp import errors
from uamqp import errors, constants, compat
from azure.eventhub.error import EventHubError, ConnectError
from ..aio.error_async import _handle_exception

log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
async def wrapped_func(self, *args, **kwargs):
timeout = kwargs.pop("timeout", None)
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return await to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):

def __init__(self):
Expand All @@ -28,7 +46,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name))
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))

def _create_handler(self):
pass
Expand All @@ -47,6 +65,8 @@ async def _open(self, timeout_time=None):
"""
# pylint: disable=protected-access
if not self.running:
if self._handler:
await self._handler.close_async()
if self.redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
Expand All @@ -59,9 +79,9 @@ async def _open(self, timeout_time=None):
self.client.get_auth(**alt_creds)
))
while not await self._handler.client_ready_async():
if timeout_time and time.time() >= timeout_time:
return
await asyncio.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True

async def _close_handler(self):
Expand All @@ -73,6 +93,10 @@ async def _close_connection(self):
await self.client._conn_manager.reset_connection_if_broken()

async def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)

return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)

async def close(self, exception=None):
Expand Down
17 changes: 4 additions & 13 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
from uamqp import (
Message,
AMQPClientAsync,
errors,
)
from uamqp import compat

from azure.eventhub.error import ConnectError
from azure.eventhub.common import parse_sas_token, EventPosition, EventHubSharedKeyCredential, EventHubSASTokenCredential
from ..client_abstract import EventHubClientAbstract

Expand Down Expand Up @@ -193,9 +190,8 @@ async def get_partition_properties(self, partition):
output['is_empty'] = partition_info[b'is_partition_empty']
return output

def create_consumer(
self, consumer_group, partition_id, event_position, **kwargs):
# type: (str, str, EventPosition, int, str, int, asyncio.AbstractEventLoop) -> EventHubConsumer
def create_consumer(self, consumer_group, partition_id, event_position, **kwargs):
# type: (str, str, EventPosition) -> EventHubConsumer
"""
Create an async consumer to the client for a particular consumer group and partition.

Expand Down Expand Up @@ -240,8 +236,7 @@ def create_consumer(
prefetch=prefetch, loop=loop)
return handler

def create_producer(
self, **kwargs):
def create_producer(self, *, partition_id=None, operation=None, send_timeout=None, loop=None):
# type: (str, str, float, asyncio.AbstractEventLoop) -> EventHubProducer
"""
Create an async producer to send EventData object to an EventHub.
Expand All @@ -268,10 +263,6 @@ def create_producer(
:caption: Add an async producer to the client to send EventData.

"""
partition_id = kwargs.get("partition_id", None)
operation = kwargs.get("operation", None)
send_timeout = kwargs.get("send_timeout", None)
loop = kwargs.get("loop", None)

target = "amqps://{}{}".format(self.address.hostname, self.address.path)
if operation:
Expand All @@ -283,4 +274,4 @@ def create_producer(
return handler

async def close(self):
await self._conn_manager.close_connection()
await self._conn_manager.close_connection()
83 changes: 39 additions & 44 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
from typing import List
import time

from uamqp import errors, types, compat
from uamqp import errors, types
from uamqp import ReceiveClientAsync, Source

from azure.eventhub import EventData, EventPosition
from azure.eventhub.error import EventHubError, AuthenticationError, ConnectError, ConnectionLostError, _error_handler
from ..aio.error_async import _handle_exception
from ._consumer_producer_mixin_async import ConsumerProducerMixin
from azure.eventhub.error import EventHubError, ConnectError, _error_handler
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -81,6 +80,7 @@ def __init__( # pylint: disable=super-init-not-called
self.error = None
self._link_properties = {}
partition = self.source.split('/')[-1]
self.partition = partition
self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition)
if owner_level:
self._link_properties[types.AMQPSymbol(self._epoch)] = types.AMQPLong(int(owner_level))
Expand All @@ -100,8 +100,9 @@ async def __anext__(self):
if not self.messages_iter:
self.messages_iter = self._handler.receive_messages_iter_async()
message = await self.messages_iter.__anext__()
event_data = EventData(message=message)
event_data = EventData._from_message(message)
self.offset = EventPosition(event_data.offset, inclusive=False)
retry_count = 0
return event_data
except Exception as exception:
await self._handle_exception(exception, retry_count, max_retries)
Expand Down Expand Up @@ -146,6 +147,32 @@ async def _open(self, timeout_time=None):
self.source = self.redirected.address
await super(EventHubConsumer, self)._open(timeout_time)

async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
last_exception = kwargs.get("last_exception")
data_batch = kwargs.get("data_batch")

await self._open(timeout_time)
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
log.info("%r receive operation timed out. (%r)", self.name, last_exception)
raise last_exception
return data_batch

remaining_time_ms = 1000 * remaining_time
message_batch = await self._handler.receive_message_batch_async(
max_batch_size=max_batch_size,
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData._from_message(message)
self.offset = EventPosition(event_data.offset)
data_batch.append(event_data)
return data_batch

@_retry_decorator
async def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs):
return await self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs)

@property
def queue_size(self):
# type: () -> int
Expand All @@ -159,7 +186,7 @@ def queue_size(self):
return self._handler._received_messages.qsize()
return 0

async def receive(self, **kwargs):
async def receive(self, *, max_batch_size=None, timeout=None):
# type: (int, float) -> List[EventData]
"""
Receive events asynchronously from the EventHub.
Expand All @@ -186,45 +213,13 @@ async def receive(self, **kwargs):
:caption: Receives events asynchronously

"""
max_batch_size = kwargs.get("max_batch_size", None)
timeout = kwargs.get("timeout", None)

self._check_closed()
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
timeout = self.client.config.receive_timeout if timeout is None else timeout
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout

data_batch = []
start_time = time.time()
timeout_time = start_time + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
await self._open(timeout_time)
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
log.info("%r receive operation timed out. (%r)", self.name, last_exception)
raise last_exception
return data_batch

remaining_time_ms = 1000 * remaining_time
message_batch = await self._handler.receive_message_batch_async(
max_batch_size=max_batch_size,
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData(message=message)
self.offset = EventPosition(event_data.offset)
data_batch.append(event_data)
return data_batch
except EventHubError:
raise
except Exception as exception:
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1

timeout = timeout or self.client.config.receive_timeout
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return await self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def _create_eventhub_exception(exception):


async def _handle_exception(exception, retry_count, max_retries, closable, timeout_time=None):
if isinstance(exception, asyncio.CancelledError):
raise
try:
name = closable.name
except AttributeError:
Expand Down
Loading