Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updating docstrings, docs and sample
  • Loading branch information
yunhaoling committed Aug 6, 2019
commit bf984a9bfad6d32d767da399561b3765290f339e
35 changes: 24 additions & 11 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,25 +217,38 @@ The `EventProcessor` will delegate processing of events to a `PartitionProcessor
import asyncio

from azure.eventhub.aio import EventHubClient
from azure.eventhub.eventprocessor import PartitionProcessor, EventProcessor, Sqlite3PartitionManager
from azure.eventhub.eventprocessor import EventProcessor, PartitionProcessor, Sqlite3PartitionManager

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'

async def do_operation(event):
# do some sync or async operations. If the operation is i/o intensive, async will have better performance
print(event)

class MyPartitionProcessor(PartitionProcessor):
def __init__(self, checkpoint_manager):
super(MyPartitionProcessor, self).__init__(checkpoint_manager)

async def process_events(self, events):
print("PartitionProcessor for eventhub:{}, consumer group:{}, partition id:{}, number of events processed:{}".
format(self._eventhub_name, self._consumer_group_name, self._partition_id, len(events)))
if events:
await asyncio.gather(*[do_operation(event) for event in events])
await self._checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number)

async def stop_after_awhile(event_processor, duration):
await asyncio.sleep(duration)
await event_processor.stop()
def partition_processor_factory(checkpoint_manager):
return MyPartitionProcessor(checkpoint_manager)

async def main():
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
client = EventHubClient.from_connection_string(connection_str)
partition_manager = Sqlite3PartitionManager(db_filename=":memory:") # in-memory partition manager
event_processor = EventProcessor("$default", client, MyPartitionProcessor, partition_manager)
await asyncio.gather(event_processor.start(), stop_after_awhile(event_processor, 100))
client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3)
partition_manager = Sqlite3PartitionManager()
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
# You can also define a callable object for creating PartitionProcessor like below:
# event_processor = EventProcessor(client, "$default", partition_processor_factory, partition_manager)
asyncio.ensure_future(event_processor.start())
await asyncio.sleep(60)
await event_processor.stop()
finally:
await partition_manager.close()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
Expand Down
2 changes: 0 additions & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ def __init__(self, body=None, to_device=None):

:param body: The data to send in a single message.
:type body: str, bytes or list
:param batch: A data generator to send batched messages.
:type batch: Generator
:param to_device: An IoT device to route to.
:type to_device: str
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@


class CheckpointManager(object):
"""Every PartitionProcessor has a CheckpointManager to save the partition's checkpoint.
"""
CheckpointManager is responsible for the creation of checkpoints.
The interaction with the chosen storage service is done via ~azure.eventhub.eventprocessor.PartitionManager.

"""
def __init__(self, partition_id: str, eventhub_name: str, consumer_group_name: str, owner_id: str, partition_manager: PartitionManager):
Expand All @@ -19,10 +21,13 @@ def __init__(self, partition_id: str, eventhub_name: str, consumer_group_name: s
self.partition_manager = partition_manager

async def update_checkpoint(self, offset, sequence_number=None):
"""Users call this method in PartitionProcessor.process_events() to save checkpoints
"""
Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.

:param offset: offset of the processed EventData
:param sequence_number: sequence_number of the processed EventData
:param offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
:type offset: str
:param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.
:type sequence_number: int
:return: None
"""
await self.partition_manager.update_checkpoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,26 @@ class EventProcessor(object):
def __init__(self, eventhub_client: EventHubClient, consumer_group_name: str,
partition_processor_factory: Callable[[CheckpointManager], PartitionProcessor],
partition_manager: PartitionManager, **kwargs):
"""An EventProcessor automatically creates and runs consumers for all partitions of the eventhub.
"""
An EventProcessor constantly receives events from all partitions of the Event Hub in the context of a given
consumer group. The received data will be sent to PartitionProcessor to be processed.

It provides the user a convenient way to receive events from multiple partitions and save checkpoints.
If multiple EventProcessors are running for an event hub, they will automatically balance loading.
This load balancer won't be available until preview 3.

:param eventhub_client: an instance of azure.eventhub.aio.EventClient object
:param consumer_group_name: the consumer group that is used to receive events from
:param partition_processor_factory: a callable (type or function) that is called to return a PartitionProcessor.
Users define their own PartitionProcessor by subclassing it implement abstract method process_events() to
implement their own operation logic.
:param partition_manager: an instance of a PartitionManager implementation. A partition manager claims ownership
of partitions and saves partition checkpoints to a data storage. For preview 2, sample Sqlite3PartitionManager is
already provided. For the future releases there will be more PartitionManager implementation provided to save
checkpoint to different data storage. Users can also implement their PartitionManager class to user their own
preferred data storage.
:param initial_event_position: the offset to start a partition consumer if the partition has no checkpoint yet
If multiple EventProcessors are running for an event hub, they will automatically balance load.
This load balancing won't be available until preview 3.

:param eventhub_client: An instance of ~azure.eventhub.aio.EventClient object
:type eventhub_client: ~azure.eventhub.aio.EventClient
:param consumer_group_name: The name of the consumer group this event processor is associated with. Events will
be read only in the context of this group.
:type consumer_group_name: str
:param partition_processor_factory: A callable(type or function) object that creates an instance of a class
implementing the ~azure.eventhub.eventprocessor.PartitionProcessor interface.
:type partition_processor_factory: callable object
:param partition_manager: Interacts with the storage system, dealing with ownership and checkpoints.
For preview 2, sample Sqlite3PartitionManager is provided.
:type partition_manager: Class implementing the ~azure.eventhub.eventprocessor.PartitionManager interface.
:param initial_event_position: The offset to start a partition consumer if the partition has no checkpoint yet.
:type initial_event_position: int or str

Example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@


class PartitionManager(ABC):
"""Subclass PartitionManager to implement the read/write access to storage service to list/claim ownership and save checkpoint.

"""
PartitionManager deals with the interaction with the chosen storage service.
It's able to list/claim ownership and create checkpoint.
"""

@abstractmethod
async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]:
"""
Retrieves a complete ownership list from the chosen storage service.

:param eventhub_name:
:param consumer_group_name:
:param eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to
the Event Hubs namespace that contains it.
:type eventhub_name: str
:param consumer_group_name: The name of the consumer group the ownership are associated with.
:type consumer_group_name: str
:return: Iterable of dictionaries containing the following partition ownership information:
eventhub_name
consumer_group_name
Expand All @@ -33,11 +38,45 @@ async def list_ownership(self, eventhub_name: str, consumer_group_name: str) ->

@abstractmethod
async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
"""
Tries to claim a list of specified ownership.

:param partitions: Iterable of dictionaries containing all the ownership to claim.
:type partitions: Iterable of dict
:return: Iterable of dictionaries containing the following partition ownership information:
eventhub_name
consumer_group_name
owner_id
partition_id
owner_level
offset
sequence_number
last_modified_time
etag
"""
pass

@abstractmethod
async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id,
offset, sequence_number) -> None:
"""
Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.

:param eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to
the Event Hubs namespace that contains it.
:type eventhub_name: str
:param consumer_group_name: The name of the consumer group the ownership are associated with.
:type consumer_group_name: str
:param partition_id: The partition id which the checkpoint is created for.
:type partition_id: str
:param owner_id: The identifier of the ~azure.eventhub.eventprocessor.EventProcessor.
:type owner_id: str
:param offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
:type offset: str
:param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.
:type sequence_number: int
:return:
"""
pass

async def close(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class CloseReason(Enum):


class PartitionProcessor(ABC):
"""
PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class
implementing this interface will be created for every partition the associated ~azure.eventhub.eventprocessor.EventProcessor owns.

"""
def __init__(self, checkpoint_manager: CheckpointManager):
self._checkpoint_manager = checkpoint_manager

Expand All @@ -27,18 +32,27 @@ async def close(self, reason):
There are different reasons to trigger the PartitionProcessor to close.
Refer to enum class CloseReason

:param reason: Reason for closing the PartitionProcessor.
:type reason: CloseReason

"""
pass

@abstractmethod
async def process_events(self, events: List[EventData]):
"""Called when a batch of events have been received.

:param events: Received events.
:type events: list[~azure.eventhub.common.EventData]

"""
pass

async def process_error(self, error):
"""Called when an error happens

:param error: The error that happens.
:type error: Exception

"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,17 @@ async def process_events(self, events):
await self._checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number)


def partition_processor_factory(checkpoint_manager):
return MyPartitionProcessor(checkpoint_manager)


async def main():
client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL)
partition_manager = Sqlite3PartitionManager()
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
# You can also define a callable object for creating PartitionProcessor like below:
# event_processor = EventProcessor(client, "$default", partition_processor_factory, partition_manager)
asyncio.ensure_future(event_processor.start())
await asyncio.sleep(TEST_DURATION)
await event_processor.stop()
Expand Down