Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e62825f
Shared connection (sync) draft
Jul 15, 2019
84ae397
Shared connection (sync) draft 2
Jul 16, 2019
6e6c827
Shared connection (sync) test update
Jul 16, 2019
a86146e
Shared connection
Jul 17, 2019
c5a23c8
Fix an issue
Jul 17, 2019
b83264d
add retry exponential delay and timeout to exception handling
Jul 22, 2019
4f17781
put module method before class def
Jul 22, 2019
f0d98d1
fixed Client.get_properties error
Jul 22, 2019
ed7d414
new eph (draft)
Jul 27, 2019
ee228b4
new eph (draft2)
Jul 29, 2019
1d65719
remove in memory partition manager
Jul 29, 2019
4895385
EventProcessor draft 3
Jul 30, 2019
6415ac2
small format change
Jul 30, 2019
a685f87
Fix logging
Jul 30, 2019
6388f4a
Add EventProcessor example
Jul 30, 2019
1cd6275
Merge branch 'eventhubs_preview2' into eventhubs_eph
Jul 30, 2019
6394f19
use decorator to implement retry logic and update some tests (#6544)
yunhaoling Jul 30, 2019
56fdd1e
Update livetest (#6547)
yunhaoling Jul 30, 2019
10f0be6
Remove legacy code and update livetest (#6549)
yunhaoling Jul 30, 2019
fbb66bd
make sync longrunning multi-threaded
Jul 31, 2019
00ff723
small changes on async long running test
Jul 31, 2019
0f5180c
reset retry_count for iterator
Jul 31, 2019
6e0c238
Don't return early when open a ReceiveClient or SendClient
Jul 31, 2019
0efc95f
type annotation change
Jul 31, 2019
90fbafb
Update kwargs and remove unused import
yunhaoling Jul 31, 2019
e06bad8
Misc changes from EventProcessor PR review
Jul 31, 2019
dd1d7ae
raise asyncio.CancelledError out instead of supressing it.
Jul 31, 2019
9d18dd9
Merge branch 'eventhubs_dev' into eventhubs_eph
Jul 31, 2019
a31ee67
Update livetest and small fixed (#6594)
yunhaoling Jul 31, 2019
19a5539
Merge branch 'eventhubs_dev' into eventhubs_eph
Aug 1, 2019
997dacf
Fix feedback from PR (1)
Aug 2, 2019
d688090
Revert "Merge branch 'eventhubs_dev' into eventhubs_eph"
Aug 2, 2019
2399dcb
Fix feedback from PR (2)
Aug 2, 2019
5ad0255
Update code according to the review (#6623)
yunhaoling Aug 2, 2019
5679065
Fix feedback from PR (3)
Aug 2, 2019
35245a1
Merge branch 'eventhubs_dev' into eventhubs_eph
Aug 2, 2019
83d0ec2
small bug fixing
Aug 2, 2019
a2195ba
Remove old EPH
Aug 2, 2019
f8acc8b
Update decorator implementation (#6642)
yunhaoling Aug 2, 2019
6fe4533
Remove old EPH pytest
Aug 2, 2019
598245d
Revert "Revert "Merge branch 'eventhubs_dev' into eventhubs_eph""
Aug 2, 2019
97dfce5
Update sample codes and docstring (#6643)
yunhaoling Aug 2, 2019
64c5c7d
Check tablename to prevent sql injection
Aug 3, 2019
13014dd
PR review update
Aug 3, 2019
3e82e90
Removed old EPH stuffs.
Aug 3, 2019
0f670d8
Small fix (#6650)
yunhaoling Aug 3, 2019
2a8b34c
Merge branch 'eventhubs_dev' into eventhubs_eph
Aug 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

from .event_processor import EventProcessor
from .partition_processor import PartitionProcessor
from .partition_manager import PartitionManager
from .sqlite3_partition_manager import Sqlite3PartitionManager
from .close_reason import CloseReason

__all__ = [
'CloseReason',
'EventProcessor',
'PartitionProcessor',
'PartitionManager',
'Sqlite3PartitionManager',
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

"""
Based on https://stackoverflow.com/questions/43229939/how-to-pass-a-boolean-by-reference-across-threads-and-modules
"""


class CancellationToken:
"""
Thread Safe Mutable Cancellation Token.
"""
def __init__(self):
self.is_cancelled = False

def cancel(self):
"""
Cancel the token.
"""
self.is_cancelled = True
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------


from .partition_manager import PartitionManager


class CheckpointManager(object):
"""Every PartitionProcessor has a CheckpointManager to save the partition's checkpoint.

"""
def __init__(self, partition_id, eventhub_name, consumer_group_name, instance_id, partition_manager: PartitionManager):
self._partition_id = partition_id
self._eventhub_name = eventhub_name
self._consumer_group_name = consumer_group_name
self._instance_id = instance_id
self._partition_manager = partition_manager

async def update_checkpoint(self, offset, sequence_number):
"""Users call this method in PartitionProcessor.process_events() to save checkpoints

:param offset: offset of the processed EventData
:param sequence_number: sequence_number of the processed EventData
:return: None
"""
await self._partition_manager.\
update_checkpoint(self._eventhub_name, self._consumer_group_name, self._partition_id, self._instance_id,
offset, sequence_number)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

from enum import Enum


class CloseReason(Enum):
SHUTDOWN = 0 # user call EventProcessor.stop()
LEASE_LOST = 1 # lose the ownership of a partition.
EVENTHUB_EXCEPTION = 2 # Exception happens during receiving events
USER_EXCEPTION = 3 # user's code in EventProcessor.process_events() raises an exception
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

from typing import Callable
import uuid
import asyncio
import logging

from azure.eventhub import EventPosition, EventHubError
from azure.eventhub.aio import EventHubClient
from ._cancellation_token import CancellationToken
from .checkpoint_manager import CheckpointManager
from .partition_manager import PartitionManager
from .partition_processor import PartitionProcessor
from .close_reason import CloseReason

log = logging.getLogger(__name__)


class EventProcessor(object):
def __init__(self, consumer_group_name: str, eventhub_client: EventHubClient,
partition_processor_callable: Callable[..., PartitionProcessor],
partition_manager: PartitionManager, **kwargs):
"""An EventProcessor automatically creates and runs consumers for all partitions of the eventhub.

It provides the user a convenient way to receive events from multiple partitions and save checkpoints.
If multiple EventProcessors are running for an event hub, they will automatically balance loading. This feature
won't be availabe until preview 3.

:param consumer_group_name: the consumer group that is used to receive events
from the event hub that the eventhub_client is going to receive events from
:param eventhub_client: an instance of azure.eventhub.aio.EventClient object
:param partition_processor_callable: a callable that is called to return a PartitionProcessor
:param partition_manager: an instance of a PartitionManager implementation
:param initial_event_position: the offset to start a partition consumer if the partition has no checkpoint yet
"""
self._consumer_group_name = consumer_group_name
self._eventhub_client = eventhub_client
self._eventhub_name = eventhub_client.eh_name
self._partition_processor_callable = partition_processor_callable
self._partition_manager = partition_manager
self._initial_event_position = kwargs.get("initial_event_position", "-1")
self._max_batch_size = eventhub_client.config.max_batch_size
self._receive_timeout = eventhub_client.config.receive_timeout
self._tasks = []
self._cancellation_token = CancellationToken()
self._instance_id = str(uuid.uuid4())
self._partition_ids = None

async def start(self):
"""Start the EventProcessor.
1. retrieve the partition ids from eventhubs
2. claim partition ownership of these partitions.
3. repeatedly call EvenHubConsumer.receive() to retrieve events and
call user defined PartitionProcessor.process_events()
"""
log.info("EventProcessor %r is being started", self._instance_id)
client = self._eventhub_client
partition_ids = await client.get_partition_ids()
self.partition_ids = partition_ids
claimed_list = await self._claim_partitions()
await self._start_claimed_partitions(claimed_list)

async def stop(self):
"""Stop all the partition consumer

It sends out a cancellation token to stop all partitions' EventHubConsumer will stop receiving events.

"""
self._cancellation_token.cancel()
# It's not agreed whether a partition manager has method close().
log.info("EventProcessor %r cancellation token has been sent", self._instance_id)

async def _claim_partitions(self):
partitions_ownership = await self._partition_manager.list_ownership(self._eventhub_name, self._consumer_group_name)
partitions_ownership_dict = dict()
for ownership in partitions_ownership:
partitions_ownership_dict[ownership["partition_id"]] = ownership

to_claim_list = []
for pid in self.partition_ids:
p_ownership = partitions_ownership_dict.get(pid)
if p_ownership:
to_claim_list.append(p_ownership)
else:
new_ownership = dict()
new_ownership["eventhub_name"] = self._eventhub_name
new_ownership["consumer_group_name"] = self._consumer_group_name
new_ownership["instance_id"] = self._instance_id
new_ownership["partition_id"] = pid
new_ownership["owner_level"] = 1 # will increment in preview 3
to_claim_list.append(new_ownership)
claimed_list = await self._partition_manager.claim_ownership(to_claim_list)
return claimed_list

async def _start_claimed_partitions(self, claimed_partitions):
consumers = []
for partition in claimed_partitions:
partition_id = partition["partition_id"]
offset = partition.get("offset")
offset = offset or self._initial_event_position
consumer = self._eventhub_client.create_consumer(self._consumer_group_name, partition_id,
EventPosition(str(offset)))
consumers.append(consumer)

partition_processor = self._partition_processor_callable(
eventhub_name=self._eventhub_name,
consumer_group_name=self._consumer_group_name,
partition_id=partition_id,
checkpoint_manager=CheckpointManager(partition_id, self._eventhub_name, self._consumer_group_name,
self._instance_id, self._partition_manager)
)
loop = asyncio.get_running_loop()
task = loop.create_task(
_receive(consumer, partition_processor, self._receive_timeout, self._cancellation_token))
self._tasks.append(task)

await asyncio.gather(*self._tasks)
await self._partition_manager.close()
log.info("EventProcessor %r partition manager is closed", self._instance_id)
log.info("EventProcessor %r partition has stopped", self._instance_id)


async def _receive(partition_consumer, partition_processor, receive_timeout, cancellation_token):
async with partition_consumer:
while not cancellation_token.is_cancelled:
try:
events = await partition_consumer.receive(timeout=receive_timeout)
except EventHubError as eh_err:
if eh_err.error == "link:stolen":
reason = CloseReason.LEASE_LOST
else:
reason = CloseReason.EVENTHUB_EXCEPTION
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r "
"has met an exception receiving events. It's being closed. The exception is %r.",
partition_processor._checkpoint_manager._instance_id,
partition_processor._eventhub_name,
partition_processor._partition_id,
partition_processor._consumer_group_name,
eh_err
)
await partition_processor.close(reason=reason)
break
try:
await partition_processor.process_events(events)
except Exception as exp: # user code has caused an error
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r "
"has met an exception from user code process_events. It's being closed. The exception is %r.",
partition_processor._checkpoint_manager._instance_id,
partition_processor._eventhub_name,
partition_processor._partition_id,
partition_processor._consumer_group_name,
exp
)
await partition_processor.close(reason=CloseReason.USER_EXCEPTION)
break
else:
await partition_processor.close(reason=CloseReason.SHUTDOWN)
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r "
"has been shutdown",
partition_processor._checkpoint_manager._instance_id,
partition_processor._eventhub_name,
partition_processor._partition_id,
partition_processor._consumer_group_name
)
# TODO: try to inform other EventProcessors to take the partition when this partition is closed in preview 3?
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

from typing import Iterable, Dict, Any
from abc import ABC, abstractmethod


class PartitionManager(ABC):
"""Subclass PartitionManager to implement the read/write access to storage service to list/claim ownership and save checkpoint.

"""

@abstractmethod
async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]:
"""

:param eventhub_name:
:param consumer_group_name:
:return: Iterable of dictionaries containing the following partition ownership information:
eventhub_name
consumer_group_name
instance_id
partition_id
owner_level
offset
sequence_number
last_modified_time
etag
"""
pass

@abstractmethod
async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
pass

@abstractmethod
async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, instance_id,
offset, sequence_number) -> None:
pass

async def close(self):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

from typing import List
from abc import ABC, abstractmethod
from .checkpoint_manager import CheckpointManager

from azure.eventhub import EventData


class PartitionProcessor(ABC):
def __init__(self, eventhub_name, consumer_group_name, partition_id, checkpoint_manager: CheckpointManager):
self._partition_id = partition_id
self._eventhub_name = eventhub_name
self._consumer_group_name = consumer_group_name
self._checkpoint_manager = checkpoint_manager

async def close(self, reason):
"""Called when EventProcessor stops processing this PartitionProcessor.

There are four different reasons to trigger the PartitionProcessor to close.
Refer to enum class CloseReason of close_reason.py

"""
pass

@abstractmethod
async def process_events(self, events: List[EventData]):
"""Called when a batch of events have been received.

"""
pass
Loading