Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,26 @@

**New features**

- Added new class `EventDataBatch` for publication of a batch of events with known size constraint.
- Added new method `create_batch` to producer for creating EventDataBatch objects.
- Added method `create_batch` on the `EventHubProducer` to create an `EventDataBatch` that can then be used to add events until the maximum size is reached.
- This batch object can then be used in the `send()` method to send all the added events to Event Hubs.
- This allows publishers to build batches without the possibility of encountering the error around the message size exceeding the supported limit when sending events.
- It also allows publishers with bandwidth concerns to control the size of each batch published.
- Added new configuration parameters for exponential delay among each retry operation.
- `retry_total`: The total number of attempts to redo the failed operation.
- `backoff_factor`: The delay time factor.
- `backoff_max`: The maximum delay time in total.
- Added support for context manager on `EventHubClient`.

**Breaking changes**

- Replaced `max_retries` configuration parameter of the EventHubClient with `retry_total`.
- Introduced the initial concept of a new version of the `EventProcessor`, intended as a neutral framework for processing events across all partitions for a given Event Hub and in the context of a specific Consumer Group. This early preview is intended to allow consumers to test the new design using a single instance that does not persist checkpoints to any durable store.
- Introduced a new class `EventProcessor` which replaces the older concept of [Event Processor Host](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host). This early preview is intended to allow users to test the new design using a single instance of `EventProcessor`. The ability to checkpoints to a durable store will be added in future updates.
- `EventProcessor`: EventProcessor creates and runs consumers for all partitions of the eventhub.
- `PartitionManager`: PartitionManager defines the interface for getting/claiming ownerships of partitions and updating checkpoints.
- `PartitionProcessor`: PartitionProcessor defines the interface for processing events.
- `CheckpointManager`: CheckpointManager takes responsibility for updating checkpoints during events processing.

**Breaking changes**

- Replaced `max_retries` configuration parameter of the EventHubClient with `retry_total`.


## 5.0.0b1 (2019-06-25)

Version 5.0.0b1 is a preview of our efforts to create a client library that is user friendly and idiomatic to the Python ecosystem. The reasons for most of the changes in this update can be found in the [Azure SDK Design Guidelines for Python](https://azuresdkspecs.z5.web.core.windows.net/PythonSpec.html). For more information, please visit https://aka.ms/azure-sdk-preview1-python.
Expand Down
12 changes: 7 additions & 5 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ The following sections provide several code snippets covering some of the most c
- [Consume events from an Event Hub](#consume-events-from-an-event-hub)
- [Async publish events to an Event Hub](#async-publish-events-to-an-event-hub)
- [Async consume events from an Event Hub](#async-consume-events-from-an-event-hub)
- [Consume events from all partitions of an Event Hub](#consume-events-from-all-partitions-of-an-event-hub)
- [Consume events using an Event Processor](#consume-events-using-an-event-processor)

### Inspect an Event Hub

Expand Down Expand Up @@ -207,11 +207,13 @@ finally:
pass
```

### Consume events from all partitions of an Event Hub
### Consume events using an Event Processor

To consume events from all partitions of an Event Hub, you'll create an `EventProcessor` for a specific consumer group. When an Event Hub is created, it provides a default consumer group that can be used to get started.
Using an `EventHubConsumer` to consume events like in the previous examples puts the responsibility of storing the checkpoints (the last processed event) on the user. Checkpoints are important for restarting the task of processing events from the right position in a partition. Ideally, you would also want to run multiple programs targeting different partitions with some load balancing. This is where an `EventProcessor` can help.

The `EventProcessor` will delegate processing of events to a `PartitionProcessor` implementation that you provide, allowing your logic to focus on the logic needed to provide value while the processor holds responsibility for managing the underlying consumer operations. In our example, we will focus on building the `EventProcessor` and use a very minimal partition processor that does no actual processing.
The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing.

While load balancing is a feature we will be adding in the next update, you can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.

```python
import asyncio
Expand Down Expand Up @@ -239,7 +241,7 @@ def partition_processor_factory(checkpoint_manager):

async def main():
client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3)
partition_manager = Sqlite3PartitionManager()
partition_manager = Sqlite3PartitionManager() # in-memory PartitionManager
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
# You can also define a callable object for creating PartitionProcessor like below:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def __init__(self, eventhub_client: EventHubClient, consumer_group_name: str,
be read only in the context of this group.
:type consumer_group_name: str
:param partition_processor_factory: A callable(type or function) object that creates an instance of a class
implementing the ~azure.eventhub.eventprocessor.PartitionProcessor interface.
implementing the ~azure.eventhub.eventprocessor.PartitionProcessor.
:type partition_processor_factory: callable object
:param partition_manager: Interacts with the storage system, dealing with ownership and checkpoints.
For preview 2, sample Sqlite3PartitionManager is provided.
:type partition_manager: Class implementing the ~azure.eventhub.eventprocessor.PartitionManager interface.
:type partition_manager: Class implementing the ~azure.eventhub.eventprocessor.PartitionManager.
:param initial_event_position: The offset to start a partition consumer if the partition has no checkpoint yet.
:type initial_event_position: int or str

Expand Down