Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
3a32907
Draft EventProcessor Loadbalancing
Aug 22, 2019
39b1b86
EventProcessor Load balancing
Aug 22, 2019
17f5153
small changes from bryan's review
Aug 23, 2019
04ef548
remove checkpoint manager from initialize
Aug 23, 2019
9be1741
small changes
Aug 23, 2019
875841e
initial blob storage
Aug 28, 2019
6df7253
fix leaking connection str
Aug 28, 2019
1b5753c
Draft EventProcessor Loadbalancing
Aug 22, 2019
b4b77f9
EventProcessor Load balancing
Aug 22, 2019
1787fdd
small changes from bryan's review
Aug 23, 2019
c2d0155
remove checkpoint manager from initialize
Aug 23, 2019
1074385
small changes
Aug 23, 2019
386baf0
Fix code review feedback
Aug 29, 2019
1afbf0c
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
c126bea
Packaging update of azure-mgmt-datalake-analytics
AutorestCI Aug 30, 2019
40c7f03
Packaging update of azure-loganalytics
AutorestCI Aug 30, 2019
cf22c7c
Packaging update of azure-mgmt-storage
AutorestCI Aug 30, 2019
c7440b2
Merge branch 'eventhubs_preview3' into eventhubs_yx
Aug 30, 2019
fa804f4
code review fixes and pylint error
Aug 30, 2019
470cf7e
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
e5f3b50
reduce dictionary access
Aug 30, 2019
27cb0bf
initial blob storage
Aug 28, 2019
f6d77e7
fix leaking connection str
Aug 28, 2019
32833b3
Merge branch 'eventhubs_blobstorage' of github.com:Azure/azure-sdk-fo…
Aug 30, 2019
da6199f
Change test polling to 5 sec
Sep 2, 2019
e6a7c5e
Add async lock to ensure etag consistency
Sep 2, 2019
ebc4362
Add dependency to azure-storage
Sep 2, 2019
1503604
Remove dependency on PartitionManager of azure-eventhub
Sep 2, 2019
c9707c4
Fix azure-storage-blob requirement error
Sep 2, 2019
8343876
Revert "Packaging update of azure-mgmt-storage"
Sep 2, 2019
66c5b31
Revert "Packaging update of azure-loganalytics"
Sep 2, 2019
bcd851a
Revert "Packaging update of azure-mgmt-datalake-analytics"
Sep 2, 2019
d7b2606
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 2, 2019
d740bb0
Trivial code change
Sep 2, 2019
778ab66
Add docstring to BlobPartitionManager
Sep 2, 2019
017d9f0
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 2, 2019
aad6978
Refine exception handling for eventprocessor
Sep 3, 2019
a55dc13
Enable pylint for eventprocessor
Sep 3, 2019
a339985
Expose OwnershipLostError
Sep 3, 2019
9bed566
Refine exception handling
Sep 3, 2019
b878002
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 3, 2019
cbc6792
handle exception for claim_ownership and update_checkpoint
Sep 4, 2019
8748e1f
Add license info
Sep 4, 2019
97e0558
Restructure packages
Sep 4, 2019
e61d6a1
Lock each ownership with a separate lock
Sep 4, 2019
9102713
Move eventprocessor to aio
Sep 4, 2019
278592c
change checkpoint_manager to partition context
Sep 4, 2019
665f28c
fix pylint error
Sep 4, 2019
b03cc64
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 4, 2019
db93fd4
Re-org namespace package structure
Sep 4, 2019
2050615
raise error while list_ownership got an exception
Sep 5, 2019
2781062
Restructure package structure
Sep 5, 2019
8a32e44
replace checkpointer with checkpointstore as a part of package name
Sep 6, 2019
88ca853
Merge remote-tracking branch 'central/eventhubs_preview3' into eventh…
Sep 7, 2019
b32417b
Merge remote-tracking branch 'central/eventhubs_preview3' into eventh…
Sep 7, 2019
667f0b0
remove duplicated partition manager
Sep 7, 2019
f28365c
Fix a bug in list_ownership
Sep 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
code review fixes and pylint error
  • Loading branch information
yijxie committed Aug 30, 2019
commit fa804f4d087823e82e118358578d4050fa9c8b40
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# -----------------------------------------------------------------------------------

from .event_processor import EventProcessor
from .checkpoint_manager import CheckpointManager
from .partition_processor import PartitionProcessor, CloseReason
from .partition_manager import PartitionManager
from .sqlite3_partition_manager import Sqlite3PartitionManager
Expand All @@ -13,5 +14,6 @@
'EventProcessor',
'PartitionProcessor',
'PartitionManager',
'CheckpointManager',
'Sqlite3PartitionManager',
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import time
import random
import math
import collections
from collections import Counter
from typing import List
from collections import Counter, defaultdict
from azure.eventhub.aio import EventHubClient
from .partition_manager import PartitionManager

Expand All @@ -26,7 +26,7 @@ def __init__(
self, eventhub_client: EventHubClient, consumer_group_name: str, owner_id: str,
partition_manager: PartitionManager, ownership_timeout: int
):
self.cached_parition_ids = []
self.cached_parition_ids = [] # type: List[str]
self.eventhub_client = eventhub_client
self.eventhub_name = eventhub_client.eh_name
self.consumer_group_name = consumer_group_name
Expand All @@ -45,7 +45,7 @@ async def claim_ownership(self):
"""
if not self.cached_parition_ids:
await self._retrieve_partition_ids()
to_claim = await self._balance_ownership()
to_claim = await self._balance_ownership(self.cached_parition_ids)
claimed_list = await self.partition_manager.claim_ownership(to_claim) if to_claim else None
return claimed_list

Expand All @@ -56,7 +56,7 @@ async def _retrieve_partition_ids(self):
"""
self.cached_parition_ids = await self.eventhub_client.get_partition_ids()

async def _balance_ownership(self):
async def _balance_ownership(self, all_partition_ids):
"""Balances and claims ownership of partitions for this EventProcessor.
The balancing algorithm is:
1. Find partitions with inactive ownership and partitions that haven never been claimed before
Expand Down Expand Up @@ -87,19 +87,19 @@ async def _balance_ownership(self):
)
now = time.time()
ownership_dict = dict((x["partition_id"], x) for x in ownership_list) # put the list to dict for fast lookup
not_owned_partition_ids = [pid for pid in self.cached_parition_ids if pid not in ownership_dict]
not_owned_partition_ids = [pid for pid in all_partition_ids if pid not in ownership_dict]
timed_out_partition_ids = [ownership["partition_id"] for ownership in ownership_list
if ownership["last_modified_time"] + self.ownership_timeout < now]
claimable_partition_ids = not_owned_partition_ids + timed_out_partition_ids
active_ownership = [ownership for ownership in ownership_list
if ownership["last_modified_time"] + self.ownership_timeout >= now]
active_ownership_by_owner = collections.defaultdict(list)
active_ownership_by_owner = defaultdict(list)
for ownership in active_ownership:
active_ownership_by_owner[ownership["owner_id"]].append(ownership)
active_ownership_self = active_ownership_by_owner[self.owner_id]

# calculate expected count per owner
all_partition_count = len(self.cached_parition_ids)
all_partition_count = len(all_partition_ids)
owners_count = len(active_ownership_by_owner) + \
(0 if self.owner_id in active_ownership_by_owner else 1)
expected_count_per_owner = all_partition_count // owners_count
Expand All @@ -109,20 +109,21 @@ async def _balance_ownership(self):
to_claim = active_ownership_self
if len(active_ownership_self) > most_count_allowed_per_owner: # needs to abandon a partition
to_claim.pop() # abandon one partition if owned too many
# TODO: Release a ownership immediately so other EventProcessors won't need to wait it to timeout
elif len(active_ownership_self) < expected_count_per_owner: # Either claims an inactive partition, or steals from other owners
# TODO: Release an ownership immediately so other EventProcessors won't need to wait it to timeout
elif len(active_ownership_self) < expected_count_per_owner:
# Either claims an inactive partition, or steals from other owners
if claimable_partition_ids: # claim an inactive partition if there is
random_partition_id = random.choice(claimable_partition_ids)
random_chosen_to_claim = ownership_dict.get(random_partition_id,
{"partition_id": random_partition_id,
"eventhub_name": self.eventhub_client.eh_name,
"consumer_group_name": self.consumer_group_name,
"owner_level": 0}) # TODO: consider removing owner_level
"consumer_group_name": self.consumer_group_name
})
random_chosen_to_claim["owner_id"] = self.owner_id
to_claim.append(random_chosen_to_claim)
else: # steal from another owner that has the most count
active_ownership_count_group_by_owner = Counter(
(x, len(y)) for x, y in active_ownership_by_owner.items())
dict((x, len(y)) for x, y in active_ownership_by_owner.items()))
most_frequent_owner_id = active_ownership_count_group_by_owner.most_common(1)[0][0]
# randomly choose a partition to steal from the most_frequent_owner
to_steal_partition = random.choice(active_ownership_by_owner[most_frequent_owner_id])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ class CheckpointManager(object):
The interaction with the chosen storage service is done via ~azure.eventhub.eventprocessor.PartitionManager.

"""
def __init__(self, partition_id: str, eventhub_name: str, consumer_group_name: str,
owner_id: str, partition_manager: PartitionManager):
def __init__(self, eventhub_name: str, consumer_group_name: str,
partition_id: str, owner_id: str, partition_manager: PartitionManager):
self.partition_id = partition_id
self.eventhub_name = eventhub_name
self.consumer_group_name = consumer_group_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
OWNER_LEVEL = 0


class EventProcessor(object):
class EventProcessor(object): # pylint:disable=too-many-instance-attributes
"""
An EventProcessor constantly receives events from all partitions of the Event Hub in the context of a given
consumer group. The received data will be sent to PartitionProcessor to be processed.
Expand Down Expand Up @@ -163,72 +163,62 @@ def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list):
async def _receive(self, ownership):
log.info("start ownership, %r", ownership)
partition_processor = self._partition_processor_factory()
if not hasattr(partition_processor, "process_events"):
log.error(
"Fatal error: a partition processor should at least have method process_events(events, checkpoint_manager). EventProcessor will stop.")
await self.stop()
raise TypeError("Partition processor must has method process_events(events, checkpoint_manager")

partition_consumer = self._eventhub_client.create_consumer(
ownership["consumer_group_name"],
ownership["partition_id"],
EventPosition(ownership.get("offset", self._initial_event_position))
)
partition_id = ownership["partition_id"]
eventhub_name = ownership["eventhub_name"]
consumer_group_name = ownership["consumer_group_name"]
owner_id = ownership["owner_id"]
checkpoint_manager = CheckpointManager(
ownership["partition_id"],
ownership["eventhub_name"],
ownership["consumer_group_name"],
ownership["owner_id"],
eventhub_name,
consumer_group_name,
partition_id,
owner_id,
self._partition_manager
)

async def initialize():
if hasattr(partition_processor, "initialize"):
await partition_processor.initialize()

async def process_error(err):
if hasattr(partition_processor, "process_error"):
await partition_processor.process_error(err, checkpoint_manager)

async def close(close_reason):
if hasattr(partition_processor, "close"):
await partition_processor.close(close_reason, checkpoint_manager)
partition_processor.eventhub_name = ownership
partition_processor.checkpoint_manager = checkpoint_manager
partition_consumer = self._eventhub_client.create_consumer(
consumer_group_name,
partition_id,
EventPosition(ownership.get("offset", self._initial_event_position))
)

try:
while True:
try:
await initialize()
await partition_processor.initialize()
events = await partition_consumer.receive(timeout=self._receive_timeout)
await partition_processor.process_events(events, checkpoint_manager)
except asyncio.CancelledError as cancelled_error:
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r "
"is cancelled",
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" is cancelled",
ownership["owner_id"],
ownership["eventhub_name"],
ownership["partition_id"],
ownership["consumer_group_name"]
)
await process_error(cancelled_error)
await close(CloseReason.SHUTDOWN)
await partition_processor.process_error(cancelled_error, checkpoint_manager)
await partition_processor.close(CloseReason.SHUTDOWN, checkpoint_manager)
# TODO: release the ownership immediately via partition manager
break
except EventHubError as eh_err:
reason = CloseReason.OWNERSHIP_LOST if eh_err.error == "link:stolen" else CloseReason.EVENTHUB_EXCEPTION
reason = CloseReason.OWNERSHIP_LOST if eh_err.error == "link:stolen" \
else CloseReason.EVENTHUB_EXCEPTION
log.warning(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r "
"has met an exception receiving events. It's being closed. The exception is %r.",
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has met an exception receiving events. It's being closed. The exception is %r.",
ownership["owner_id"],
ownership["eventhub_name"],
ownership["partition_id"],
ownership["consumer_group_name"],
eh_err
)
await process_error(eh_err)
await close(reason) # An EventProcessor will pick up this partition again after the ownership is released
await partition_processor.process_error(eh_err, checkpoint_manager)
await partition_processor.close(reason, checkpoint_manager)
# An EventProcessor will pick up this partition again after the ownership is released
# TODO: release the ownership immediately via partition manager
break
except Exception as exp:
except Exception as exp: # pylint:disable=broad-except
log.warning(exp)
finally:
await partition_consumer.close()
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def list_ownership(self, eventhub_name: str, consumer_group_name: str) ->
last_modified_time
etag
"""
pass


@abstractmethod
async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
Expand All @@ -54,13 +54,14 @@ async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterabl
last_modified_time
etag
"""
pass


@abstractmethod
async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id,
offset, sequence_number) -> None:
"""
Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.
Updates the checkpoint using the given information for the associated partition and
consumer group in the chosen storage service.

:param eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to
the Event Hubs namespace that contains it.
Expand All @@ -73,11 +74,11 @@ async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_
:type owner_id: str
:param offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
:type offset: str
:param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.
:param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint
will be associated with.
:type sequence_number: int
:return:
"""
pass

async def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
# -----------------------------------------------------------------------------------

from typing import List
from typing_extensions import Protocol
from abc import ABC
from enum import Enum
from .checkpoint_manager import CheckpointManager

from azure.eventhub import EventData
from .checkpoint_manager import CheckpointManager


class CloseReason(Enum):
Expand All @@ -17,12 +16,14 @@ class CloseReason(Enum):
EVENTHUB_EXCEPTION = 2 # Exception happens during receiving events


class PartitionProcessor(Protocol):
class PartitionProcessor(ABC):
"""
PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class
implementing this abstract class will be created for every partition the associated ~azure.eventhub.eventprocessor.EventProcessor owns.
implementing this abstract class will be created for every partition the associated
~azure.eventhub.eventprocessor.EventProcessor owns.

"""

async def initialize(self):
pass

Expand All @@ -34,15 +35,18 @@ async def close(self, reason, checkpoint_manager: CheckpointManager):

:param reason: Reason for closing the PartitionProcessor.
:type reason: ~azure.eventhub.eventprocessor.CloseReason
:param checkpoint_manager: Use its method update_checkpoint to update checkpoint to the data store
:type checkpoint_manager: ~azure.eventhub.CheckpointManager

"""
pass

async def process_events(self, events: List[EventData], checkpoint_manager: CheckpointManager):
"""Called when a batch of events have been received.

:param events: Received events.
:type events: list[~azure.eventhub.common.EventData]
:param checkpoint_manager: Use its method update_checkpoint to update checkpoint to the data store
:type checkpoint_manager: ~azure.eventhub.CheckpointManager

"""
raise NotImplementedError
Expand All @@ -52,6 +56,7 @@ async def process_error(self, error, checkpoint_manager: CheckpointManager):

:param error: The error that happens.
:type error: Exception
:param checkpoint_manager: Use its method update_checkpoint to update checkpoint to the data store
:type checkpoint_manager: ~azure.eventhub.CheckpointManager

"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ def __init__(self, db_filename: str = ":memory:", ownership_table: str = "owners
+ ",".join(self.primary_keys)\
+ "))"
c.execute(sql)
except sqlite3.OperationalError:
raise
pass
finally:
c.close()
self.conn = conn
Expand Down Expand Up @@ -100,13 +97,16 @@ async def claim_ownership(self, partitions):
if p.get("etag") == cursor_fetch[0][0]:
p["last_modified_time"] = time.time()
p["etag"] = str(uuid.uuid4())
other_fields_without_checkpoint = list(filter(lambda x: x not in self.checkpoint_fields, self.other_fields))
other_fields_without_checkpoint = list(
filter(lambda x: x not in self.checkpoint_fields, self.other_fields)
)
sql = "update " + _check_table_name(self.ownership_table) + " set "\
+ ','.join([field+"=?" for field in other_fields_without_checkpoint])\
+ " where "\
+ " and ".join([field+"=?" for field in self.primary_keys])

cursor.execute(sql, tuple(p.get(field) for field in other_fields_without_checkpoint) + tuple(p.get(field) for field in self.primary_keys))
cursor.execute(sql, tuple(p.get(field) for field in other_fields_without_checkpoint)
+ tuple(p.get(field) for field in self.primary_keys))
result.append(p)
else:
logger.info("EventProcessor %r failed to claim partition %r "
Expand All @@ -121,15 +121,19 @@ async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_
offset, sequence_number):
cursor = self.conn.cursor()
try:
cursor.execute("select owner_id from " + _check_table_name(self.ownership_table) + " where eventhub_name=? and consumer_group_name=? and partition_id=?",
cursor.execute("select owner_id from " + _check_table_name(self.ownership_table)
+ " where eventhub_name=? and consumer_group_name=? and partition_id=?",
(eventhub_name, consumer_group_name, partition_id))
cursor_fetch = cursor.fetchall()
if cursor_fetch and owner_id == cursor_fetch[0][0]:
cursor.execute("update " + _check_table_name(self.ownership_table) + " set offset=?, sequence_number=? where eventhub_name=? and consumer_group_name=? and partition_id=?",
cursor.execute("update " + _check_table_name(self.ownership_table)
+ " set offset=?, sequence_number=? "
"where eventhub_name=? and consumer_group_name=? and partition_id=?",
(offset, sequence_number, eventhub_name, consumer_group_name, partition_id))
self.conn.commit()
else:
logger.info("EventProcessor couldn't checkpoint to partition %r because it no longer has the ownership", partition_id)
logger.info("EventProcessor couldn't checkpoint to partition %r because it no longer has the ownership",
partition_id)

finally:
cursor.close()
Expand Down
Loading