Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
# Release History

## 5.0.0b2 (2019-08-06)

**New features**

- Added method `create_batch` on the `EventHubProducer` to create an `EventDataBatch` that can then be used to add events until the maximum size is reached.
- This batch object can then be used in the `send()` method to send all the added events to Event Hubs.
- This allows publishers to build batches without the possibility of encountering the error around the message size exceeding the supported limit when sending events.
- It also allows publishers with bandwidth concerns to control the size of each batch published.
- Added new configuration parameters for exponential delay between retry operations.
- `retry_total`: The total number of attempts to redo the failed operation.
- `backoff_factor`: The delay time factor.
- `backoff_max`: The maximum delay time in total.
- Added support for context manager on `EventHubClient`.
- Added new error type `OperationTimeoutError` for send operation.
- Introduced a new class `EventProcessor` which replaces the older concept of [Event Processor Host](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host). This early preview is intended to allow users to test the new design using a single instance of `EventProcessor`. The ability to checkpoints to a durable store will be added in future updates.
- `EventProcessor`: EventProcessor creates and runs consumers for all partitions of the eventhub.
- `PartitionManager`: PartitionManager defines the interface for getting/claiming ownerships of partitions and updating checkpoints.
- `PartitionProcessor`: PartitionProcessor defines the interface for processing events.
- `CheckpointManager`: CheckpointManager takes responsibility for updating checkpoints during events processing.

**Breaking changes**

- `EventProcessorHost` was replaced by `EventProcessor`, please read the new features for details.
- Replaced `max_retries` configuration parameter of the EventHubClient with `retry_total`.


## 5.0.0b1 (2019-06-25)

Version 5.0.0b1 is a preview of our efforts to create a client library that is user friendly and idiomatic to the Python ecosystem. The reasons for most of the changes in this update can be found in the [Azure SDK Design Guidelines for Python](https://azuresdkspecs.z5.web.core.windows.net/PythonSpec.html). For more information, please visit https://aka.ms/azure-sdk-preview1-python.
Expand Down Expand Up @@ -154,4 +180,6 @@ Version 5.0.0b1 is a preview of our efforts to create a client library that is u

## 0.2.0a1 (unreleased)

- Swapped out Proton dependency for uAMQP.
- Swapped out Proton dependency for uAMQP.

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs/HISTORY.png)
57 changes: 55 additions & 2 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The following sections provide several code snippets covering some of the most c
- [Consume events from an Event Hub](#consume-events-from-an-event-hub)
- [Async publish events to an Event Hub](#async-publish-events-to-an-event-hub)
- [Async consume events from an Event Hub](#async-consume-events-from-an-event-hub)
- [Consume events using an Event Processor](#consume-events-using-an-event-processor)

### Inspect an Event Hub

Expand Down Expand Up @@ -206,6 +207,56 @@ finally:
pass
```

### Consume events using an Event Processor

Using an `EventHubConsumer` to consume events like in the previous examples puts the responsibility of storing the checkpoints (the last processed event) on the user. Checkpoints are important for restarting the task of processing events from the right position in a partition. Ideally, you would also want to run multiple programs targeting different partitions with some load balancing. This is where an `EventProcessor` can help.

The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing.

While load balancing is a feature we will be adding in the next update, you can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.

```python
import asyncio

from azure.eventhub.aio import EventHubClient
from azure.eventhub.eventprocessor import EventProcessor, PartitionProcessor, Sqlite3PartitionManager

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'

async def do_operation(event):
# do some sync or async operations. If the operation is i/o intensive, async will have better performance
print(event)

class MyPartitionProcessor(PartitionProcessor):
def __init__(self, checkpoint_manager):
super(MyPartitionProcessor, self).__init__(checkpoint_manager)

async def process_events(self, events):
if events:
await asyncio.gather(*[do_operation(event) for event in events])
await self._checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number)

def partition_processor_factory(checkpoint_manager):
return MyPartitionProcessor(checkpoint_manager)

async def main():
client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3)
partition_manager = Sqlite3PartitionManager() # in-memory PartitionManager
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
# You can also define a callable object for creating PartitionProcessor like below:
# event_processor = EventProcessor(client, "$default", partition_processor_factory, partition_manager)
asyncio.ensure_future(event_processor.start())
await asyncio.sleep(60)
await event_processor.stop()
finally:
await partition_manager.close()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```

## Troubleshooting

### General
Expand All @@ -230,7 +281,7 @@ These are the samples in our repo demonstraing the usage of the library.
- [./examples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/recv.py) - use consumer to consume events
- [./examples/async_examples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py) - async/await support of a producer
- [./examples/async_examples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py) - async/await support of a consumer
- [./examples/eph.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/eph.py) - event processor host
- [./examples/eventprocessor/event_processor_example.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/eventprocessor/event_processor_example.py) - event processor

### Documentation

Expand All @@ -253,4 +304,6 @@ This project welcomes contributions and suggestions. Most contributions require
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [[email protected]](mailto:[email protected]) with any additional questions or comments.
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [[email protected]](mailto:[email protected]) with any additional questions or comments.

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs/README.png)
5 changes: 3 additions & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "5.0.0b1"
__version__ = "5.0.0b2"

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
AuthenticationError, EventDataSendError, ConnectionLostError
from azure.eventhub.client import EventHubClient
Expand All @@ -18,6 +18,7 @@

__all__ = [
"EventData",
"EventDataBatch",
"EventHubError",
"ConnectError",
"ConnectionLostError",
Expand Down
77 changes: 77 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/_connection_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from threading import RLock
from uamqp import Connection, TransportType, c_uamqp


class _SharedConnectionManager(object):
def __init__(self, **kwargs):
self._lock = RLock()
self._conn = None # type: Connection

self._container_id = kwargs.get("container_id")
self._debug = kwargs.get("debug")
self._error_policy = kwargs.get("error_policy")
self._properties = kwargs.get("properties")
self._encoding = kwargs.get("encoding") or "UTF-8"
self._transport_type = kwargs.get('transport_type') or TransportType.Amqp
self._http_proxy = kwargs.get('http_proxy')
self._max_frame_size = kwargs.get("max_frame_size")
self._channel_max = kwargs.get("channel_max")
self._idle_timeout = kwargs.get("idle_timeout")
self._remote_idle_timeout_empty_frame_send_ratio = kwargs.get("remote_idle_timeout_empty_frame_send_ratio")

def get_connection(self, host, auth):
# type: (...) -> Connection
with self._lock:
if self._conn is None:
self._conn = Connection(
host,
auth,
container_id=self._container_id,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug,
encoding=self._encoding)
return self._conn

def close_connection(self):
with self._lock:
if self._conn:
self._conn.destroy()
self._conn = None

def reset_connection_if_broken(self):
with self._lock:
if self._conn and self._conn._state in (
c_uamqp.ConnectionState.CLOSE_RCVD,
c_uamqp.ConnectionState.CLOSE_SENT,
c_uamqp.ConnectionState.DISCARDING,
c_uamqp.ConnectionState.END,
):
self._conn = None


class _SeparateConnectionManager(object):
def __init__(self, **kwargs):
pass

def get_connection(self, host, auth):
return None

def close_connection(self):
pass

def reset_connection_if_broken(self):
pass


def get_connection_manager(**kwargs):
return _SeparateConnectionManager(**kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import logging
import time

from uamqp import errors, constants, compat
from azure.eventhub.error import EventHubError, _handle_exception

log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs):
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
self._handler = None
self.name = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))

def _create_handler(self):
pass

def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._close_connection()

def _open(self, timeout_time=None):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

"""
# pylint: disable=protected-access
if not self.running:
if self._handler:
self._handler.close()
if self.redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
))
while not self._handler.client_ready():
time.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True

def _close_handler(self):
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self.running = False

def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken()

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)

def close(self, exception=None):
# type:(Exception) -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.

:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception

Example:
.. literalinclude:: ../examples/test_examples_eventhub.py
:start-after: [START eventhub_client_receiver_close]
:end-before: [END eventhub_client_receiver_close]
:language: python
:dedent: 4
:caption: Close down the handler.

"""
self.running = False
if self.error:
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif exception:
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("{} handler is closed.".format(self.name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
Loading