Skip to content

Commit c9672a6

Browse files
authored
Updating docstings, docs, samples (#6673)
* Draft for updating documentations * Small improvement * Updating docstrings, docs and sample
1 parent 83c9a6d commit c9672a6

File tree

12 files changed

+157
-166
lines changed

12 files changed

+157
-166
lines changed

sdk/eventhub/azure-eventhubs/HISTORY.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,22 @@
44

55
**New features**
66

7-
- Added ability to create and send EventDataBatch object with limited data size.
7+
- Added new class `EventDataBatch` for publication of a batch of events with known size constraint.
8+
- Added new method `create_batch` to producer for creating EventDataBatch objects.
89
- Added new configuration parameters for exponential delay among each retry operation.
910
- `retry_total`: The total number of attempts to redo the failed operation.
1011
- `backoff_factor`: The delay time factor.
1112
- `backoff_max`: The maximum delay time in total.
13+
- Added support for context manager on `EventHubClient`.
1214

1315
**Breaking changes**
1416

15-
- New `EventProcessor` design
16-
- The `EventProcessorHost` was waived.
17+
- Replaced `max_retries` configuration parameter of the EventHubClient with `retry_total`.
18+
- Introduced the initial concept of a new version of the `EventProcessor`, intended as a neutral framework for processing events across all partitions for a given Event Hub and in the context of a specific Consumer Group. This early preview is intended to allow consumers to test the new design using a single instance that does not persist checkpoints to any durable store.
19+
- `EventProcessor`: EventProcessor creates and runs consumers for all partitions of the eventhub.
20+
- `PartitionManager`: PartitionManager defines the interface for getting/claiming ownerships of partitions and updating checkpoints.
21+
- `PartitionProcessor`: PartitionProcessor defines the interface for processing events.
22+
- `CheckpointManager`: CheckpointManager takes responsibility for updating checkpoints during events processing.
1723

1824
## 5.0.0b1 (2019-06-25)
1925

sdk/eventhub/azure-eventhubs/README.md

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ The following sections provide several code snippets covering some of the most c
9090
- [Consume events from an Event Hub](#consume-events-from-an-event-hub)
9191
- [Async publish events to an Event Hub](#async-publish-events-to-an-event-hub)
9292
- [Async consume events from an Event Hub](#async-consume-events-from-an-event-hub)
93+
- [Consume events from all partitions of an Event Hub](#consume-events-from-all-partitions-of-an-event-hub)
9394

9495
### Inspect an Event Hub
9596

@@ -206,6 +207,54 @@ finally:
206207
pass
207208
```
208209

210+
### Consume events from all partitions of an Event Hub
211+
212+
To consume events from all partitions of an Event Hub, you'll create an `EventProcessor` for a specific consumer group. When an Event Hub is created, it provides a default consumer group that can be used to get started.
213+
214+
The `EventProcessor` will delegate processing of events to a `PartitionProcessor` implementation that you provide, allowing your logic to focus on the logic needed to provide value while the processor holds responsibility for managing the underlying consumer operations. In our example, we will focus on building the `EventProcessor` and use a very minimal partition processor that does no actual processing.
215+
216+
```python
217+
import asyncio
218+
219+
from azure.eventhub.aio import EventHubClient
220+
from azure.eventhub.eventprocessor import EventProcessor, PartitionProcessor, Sqlite3PartitionManager
221+
222+
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
223+
224+
async def do_operation(event):
225+
# do some sync or async operations. If the operation is i/o intensive, async will have better performance
226+
print(event)
227+
228+
class MyPartitionProcessor(PartitionProcessor):
229+
def __init__(self, checkpoint_manager):
230+
super(MyPartitionProcessor, self).__init__(checkpoint_manager)
231+
232+
async def process_events(self, events):
233+
if events:
234+
await asyncio.gather(*[do_operation(event) for event in events])
235+
await self._checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number)
236+
237+
def partition_processor_factory(checkpoint_manager):
238+
return MyPartitionProcessor(checkpoint_manager)
239+
240+
async def main():
241+
client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3)
242+
partition_manager = Sqlite3PartitionManager()
243+
try:
244+
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
245+
# You can also define a callable object for creating PartitionProcessor like below:
246+
# event_processor = EventProcessor(client, "$default", partition_processor_factory, partition_manager)
247+
asyncio.ensure_future(event_processor.start())
248+
await asyncio.sleep(60)
249+
await event_processor.stop()
250+
finally:
251+
await partition_manager.close()
252+
253+
if __name__ == '__main__':
254+
loop = asyncio.get_event_loop()
255+
loop.run_until_complete(main())
256+
```
257+
209258
## Troubleshooting
210259

211260
### General
@@ -230,7 +279,7 @@ These are the samples in our repo demonstraing the usage of the library.
230279
- [./examples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/recv.py) - use consumer to consume events
231280
- [./examples/async_examples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py) - async/await support of a producer
232281
- [./examples/async_examples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py) - async/await support of a consumer
233-
- [./examples/eph.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/eph.py) - event processor host
282+
- [./examples/eventprocessor/event_processor_example.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/eventprocessor/event_processor_example.py) - event processor
234283

235284
### Documentation
236285

sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
def _retry_decorator(to_be_wrapped_func):
1717
def wrapped_func(self, *args, **kwargs):
18-
timeout = kwargs.pop("timeout", None)
18+
timeout = kwargs.pop("timeout", 100000)
1919
if not timeout:
20-
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
20+
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
2121
timeout_time = time.time() + timeout
2222
max_retries = self.client.config.max_retries
2323
retry_count = 0

sdk/eventhub/azure-eventhubs/azure/eventhub/aio/_consumer_producer_mixin_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
def _retry_decorator(to_be_wrapped_func):
1717
async def wrapped_func(self, *args, **kwargs):
18-
timeout = kwargs.pop("timeout", None)
18+
timeout = kwargs.pop("timeout", 100000)
1919
if not timeout:
20-
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
20+
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
2121
timeout_time = time.time() + timeout
2222
max_retries = self.client.config.max_retries
2323
retry_count = 0

sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ def __init__(self, host, event_hub_path, credential, **kwargs):
118118
:type auth_timeout: float
119119
:param user_agent: The user agent that needs to be appended to the built in user agent string.
120120
:type user_agent: str
121-
:param max_retries: The max number of attempts to redo the failed operation when an error happened. Default
121+
:param retry_total: The total number of attempts to redo the failed operation when an error happened. Default
122122
value is 3.
123-
:type max_retries: int
123+
:type retry_total: int
124124
:param transport_type: The type of transport protocol that will be used for communicating with
125125
the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.
126126
:type transport_type: ~azure.eventhub.TransportType
@@ -239,9 +239,9 @@ def from_connection_string(cls, conn_str, **kwargs):
239239
:type auth_timeout: float
240240
:param user_agent: The user agent that needs to be appended to the built in user agent string.
241241
:type user_agent: str
242-
:param max_retries: The max number of attempts to redo the failed operation when an error happened. Default
242+
:param retry_total: The total number of attempts to redo the failed operation when an error happened. Default
243243
value is 3.
244-
:type max_retries: int
244+
:type retry_total: int
245245
:param transport_type: The type of transport protocol that will be used for communicating with
246246
the Event Hubs service. Default is ~azure.eventhub.TransportType.Amqp.
247247
:type transport_type: ~azure.eventhub.TransportType

sdk/eventhub/azure-eventhubs/azure/eventhub/common.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@ def __init__(self, body=None, to_device=None):
6363
6464
:param body: The data to send in a single message.
6565
:type body: str, bytes or list
66-
:param batch: A data generator to send batched messages.
67-
:type batch: Generator
6866
:param to_device: An IoT device to route to.
6967
:type to_device: str
7068
"""

sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/checkpoint_manager.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99

1010
class CheckpointManager(object):
11-
"""Every PartitionProcessor has a CheckpointManager to save the partition's checkpoint.
11+
"""
12+
CheckpointManager is responsible for the creation of checkpoints.
13+
The interaction with the chosen storage service is done via ~azure.eventhub.eventprocessor.PartitionManager.
1214
1315
"""
1416
def __init__(self, partition_id: str, eventhub_name: str, consumer_group_name: str, owner_id: str, partition_manager: PartitionManager):
@@ -19,10 +21,13 @@ def __init__(self, partition_id: str, eventhub_name: str, consumer_group_name: s
1921
self.partition_manager = partition_manager
2022

2123
async def update_checkpoint(self, offset, sequence_number=None):
22-
"""Users call this method in PartitionProcessor.process_events() to save checkpoints
24+
"""
25+
Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.
2326
24-
:param offset: offset of the processed EventData
25-
:param sequence_number: sequence_number of the processed EventData
27+
:param offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
28+
:type offset: str
29+
:param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.
30+
:type sequence_number: int
2631
:return: None
2732
"""
2833
await self.partition_manager.update_checkpoint(

sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/event_processor.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,26 @@ class EventProcessor(object):
2424
def __init__(self, eventhub_client: EventHubClient, consumer_group_name: str,
2525
partition_processor_factory: Callable[[CheckpointManager], PartitionProcessor],
2626
partition_manager: PartitionManager, **kwargs):
27-
"""An EventProcessor automatically creates and runs consumers for all partitions of the eventhub.
27+
"""
28+
An EventProcessor constantly receives events from all partitions of the Event Hub in the context of a given
29+
consumer group. The received data will be sent to PartitionProcessor to be processed.
2830
2931
It provides the user a convenient way to receive events from multiple partitions and save checkpoints.
30-
If multiple EventProcessors are running for an event hub, they will automatically balance loading.
31-
This load balancer won't be available until preview 3.
32-
33-
:param eventhub_client: an instance of azure.eventhub.aio.EventClient object
34-
:param consumer_group_name: the consumer group that is used to receive events from
35-
:param partition_processor_factory: a callable (type or function) that is called to return a PartitionProcessor.
36-
Users define their own PartitionProcessor by subclassing it implement abstract method process_events() to
37-
implement their own operation logic.
38-
:param partition_manager: an instance of a PartitionManager implementation. A partition manager claims ownership
39-
of partitions and saves partition checkpoints to a data storage. For preview 2, sample Sqlite3PartitionManager is
40-
already provided. For the future releases there will be more PartitionManager implementation provided to save
41-
checkpoint to different data storage. Users can also implement their PartitionManager class to user their own
42-
preferred data storage.
43-
:param initial_event_position: the offset to start a partition consumer if the partition has no checkpoint yet
32+
If multiple EventProcessors are running for an event hub, they will automatically balance load.
33+
This load balancing won't be available until preview 3.
34+
35+
:param eventhub_client: An instance of ~azure.eventhub.aio.EventClient object
36+
:type eventhub_client: ~azure.eventhub.aio.EventClient
37+
:param consumer_group_name: The name of the consumer group this event processor is associated with. Events will
38+
be read only in the context of this group.
39+
:type consumer_group_name: str
40+
:param partition_processor_factory: A callable(type or function) object that creates an instance of a class
41+
implementing the ~azure.eventhub.eventprocessor.PartitionProcessor interface.
42+
:type partition_processor_factory: callable object
43+
:param partition_manager: Interacts with the storage system, dealing with ownership and checkpoints.
44+
For preview 2, sample Sqlite3PartitionManager is provided.
45+
:type partition_manager: Class implementing the ~azure.eventhub.eventprocessor.PartitionManager interface.
46+
:param initial_event_position: The offset to start a partition consumer if the partition has no checkpoint yet.
4447
:type initial_event_position: int or str
4548
4649
Example:

sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/partition_manager.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,21 @@
88

99

1010
class PartitionManager(ABC):
11-
"""Subclass PartitionManager to implement the read/write access to storage service to list/claim ownership and save checkpoint.
12-
11+
"""
12+
PartitionManager deals with the interaction with the chosen storage service.
13+
It's able to list/claim ownership and create checkpoint.
1314
"""
1415

1516
@abstractmethod
1617
async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]:
1718
"""
19+
Retrieves a complete ownership list from the chosen storage service.
1820
19-
:param eventhub_name:
20-
:param consumer_group_name:
21+
:param eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to
22+
the Event Hubs namespace that contains it.
23+
:type eventhub_name: str
24+
:param consumer_group_name: The name of the consumer group the ownership are associated with.
25+
:type consumer_group_name: str
2126
:return: Iterable of dictionaries containing the following partition ownership information:
2227
eventhub_name
2328
consumer_group_name
@@ -33,11 +38,45 @@ async def list_ownership(self, eventhub_name: str, consumer_group_name: str) ->
3338

3439
@abstractmethod
3540
async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
41+
"""
42+
Tries to claim a list of specified ownership.
43+
44+
:param partitions: Iterable of dictionaries containing all the ownership to claim.
45+
:type partitions: Iterable of dict
46+
:return: Iterable of dictionaries containing the following partition ownership information:
47+
eventhub_name
48+
consumer_group_name
49+
owner_id
50+
partition_id
51+
owner_level
52+
offset
53+
sequence_number
54+
last_modified_time
55+
etag
56+
"""
3657
pass
3758

3859
@abstractmethod
3960
async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id,
4061
offset, sequence_number) -> None:
62+
"""
63+
Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.
64+
65+
:param eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to
66+
the Event Hubs namespace that contains it.
67+
:type eventhub_name: str
68+
:param consumer_group_name: The name of the consumer group the ownership are associated with.
69+
:type consumer_group_name: str
70+
:param partition_id: The partition id which the checkpoint is created for.
71+
:type partition_id: str
72+
:param owner_id: The identifier of the ~azure.eventhub.eventprocessor.EventProcessor.
73+
:type owner_id: str
74+
:param offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
75+
:type offset: str
76+
:param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.
77+
:type sequence_number: int
78+
:return:
79+
"""
4180
pass
4281

4382
async def close(self):

sdk/eventhub/azure-eventhubs/azure/eventhub/eventprocessor/partition_processor.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ class CloseReason(Enum):
1818

1919

2020
class PartitionProcessor(ABC):
21+
"""
22+
PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class
23+
implementing this interface will be created for every partition the associated ~azure.eventhub.eventprocessor.EventProcessor owns.
24+
25+
"""
2126
def __init__(self, checkpoint_manager: CheckpointManager):
2227
self._checkpoint_manager = checkpoint_manager
2328

@@ -27,18 +32,27 @@ async def close(self, reason):
2732
There are different reasons to trigger the PartitionProcessor to close.
2833
Refer to enum class CloseReason
2934
35+
:param reason: Reason for closing the PartitionProcessor.
36+
:type reason: CloseReason
37+
3038
"""
3139
pass
3240

3341
@abstractmethod
3442
async def process_events(self, events: List[EventData]):
3543
"""Called when a batch of events have been received.
3644
45+
:param events: Received events.
46+
:type events: list[~azure.eventhub.common.EventData]
47+
3748
"""
3849
pass
3950

4051
async def process_error(self, error):
4152
"""Called when an error happens
4253
54+
:param error: The error that happens.
55+
:type error: Exception
56+
4357
"""
4458
pass

0 commit comments

Comments
 (0)