Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
3a32907
Draft EventProcessor Loadbalancing
Aug 22, 2019
39b1b86
EventProcessor Load balancing
Aug 22, 2019
17f5153
small changes from bryan's review
Aug 23, 2019
04ef548
remove checkpoint manager from initialize
Aug 23, 2019
9be1741
small changes
Aug 23, 2019
1b5753c
Draft EventProcessor Loadbalancing
Aug 22, 2019
b4b77f9
EventProcessor Load balancing
Aug 22, 2019
1787fdd
small changes from bryan's review
Aug 23, 2019
c2d0155
remove checkpoint manager from initialize
Aug 23, 2019
1074385
small changes
Aug 23, 2019
386baf0
Fix code review feedback
Aug 29, 2019
1afbf0c
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
c126bea
Packaging update of azure-mgmt-datalake-analytics
AutorestCI Aug 30, 2019
40c7f03
Packaging update of azure-loganalytics
AutorestCI Aug 30, 2019
cf22c7c
Packaging update of azure-mgmt-storage
AutorestCI Aug 30, 2019
c7440b2
Merge branch 'eventhubs_preview3' into eventhubs_yx
Aug 30, 2019
fa804f4
code review fixes and pylint error
Aug 30, 2019
470cf7e
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
e5f3b50
reduce dictionary access
Aug 30, 2019
8343876
Revert "Packaging update of azure-mgmt-storage"
Sep 2, 2019
66c5b31
Revert "Packaging update of azure-loganalytics"
Sep 2, 2019
bcd851a
Revert "Packaging update of azure-mgmt-datalake-analytics"
Sep 2, 2019
d740bb0
Trivial code change
Sep 2, 2019
aad6978
Refine exception handling for eventprocessor
Sep 3, 2019
a55dc13
Enable pylint for eventprocessor
Sep 3, 2019
a339985
Expose OwnershipLostError
Sep 3, 2019
9102713
Move eventprocessor to aio
Sep 4, 2019
278592c
change checkpoint_manager to partition context
Sep 4, 2019
665f28c
fix pylint error
Sep 4, 2019
0060f9d
fix a small issue
Sep 4, 2019
7b4273a
Catch list_ownership/claim_ownership exceptions and retry
Sep 5, 2019
bdf97c8
Fix code review issues
Sep 6, 2019
02a4daf
fix event processor long running test
Sep 6, 2019
a9446de
Remove utils.py
Sep 6, 2019
8dfdec9
Remove close() method
Sep 6, 2019
2aace82
Updated docstrings
Sep 6, 2019
36ba0a3
add pytest
Sep 7, 2019
7f95d9e
small fixes
Sep 7, 2019
f5870af
Merge branch 'eventhubs_preview3' into eventhubs_yx
Sep 7, 2019
f30d143
Revert "Remove utils.py"
Sep 7, 2019
893bee0
change asyncio.create_task to 3.5 friendly code
Sep 7, 2019
4b41fa5
Remove Callable
Sep 7, 2019
fef0551
raise CancelledError instead of break
Sep 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
code review fixes and pylint error
  • Loading branch information
yijxie committed Aug 30, 2019
commit fa804f4d087823e82e118358578d4050fa9c8b40
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# -----------------------------------------------------------------------------------

from .event_processor import EventProcessor
from .checkpoint_manager import CheckpointManager
from .partition_processor import PartitionProcessor, CloseReason
from .partition_manager import PartitionManager
from .sqlite3_partition_manager import Sqlite3PartitionManager
Expand All @@ -13,5 +14,6 @@
'EventProcessor',
'PartitionProcessor',
'PartitionManager',
'CheckpointManager',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the user supposed to use the CheckpointManager directly? Creating new instances of it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, user will use checkpoint_manager.update_checkpoint(), but they don't create a CheckpointManager

'Sqlite3PartitionManager',
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import time
import random
import math
import collections
from collections import Counter
from typing import List
from collections import Counter, defaultdict
from azure.eventhub.aio import EventHubClient
from .partition_manager import PartitionManager

Expand All @@ -26,7 +26,7 @@ def __init__(
self, eventhub_client: EventHubClient, consumer_group_name: str, owner_id: str,
partition_manager: PartitionManager, ownership_timeout: int
):
self.cached_parition_ids = []
self.cached_parition_ids = [] # type: List[str]
self.eventhub_client = eventhub_client
self.eventhub_name = eventhub_client.eh_name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason that we make the eventhub_name an attribute on the instance, but that we also continue to get eh_name from the client instance later? Presumably, the values should stay the same....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventhub_name, consumer_group_name are two important attributes for an ownership. They usually appear together. They don't change anywhere in an ownership manager. Putting them together has better readability, I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me rephrase, why are you both assigning the name to an attribute and accessing the name from self.eventhub_client.eh_name in other methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a mistake. Changed all to self.eventhub_name

self.consumer_group_name = consumer_group_name
Expand All @@ -45,7 +45,7 @@ async def claim_ownership(self):
"""
if not self.cached_parition_ids:
await self._retrieve_partition_ids()
to_claim = await self._balance_ownership()
to_claim = await self._balance_ownership(self.cached_parition_ids)
claimed_list = await self.partition_manager.claim_ownership(to_claim) if to_claim else None
return claimed_list

Expand All @@ -56,7 +56,7 @@ async def _retrieve_partition_ids(self):
"""
self.cached_parition_ids = await self.eventhub_client.get_partition_ids()

async def _balance_ownership(self):
async def _balance_ownership(self, all_partition_ids):
"""Balances and claims ownership of partitions for this EventProcessor.
The balancing algorithm is:
1. Find partitions with inactive ownership and partitions that haven never been claimed before
Expand Down Expand Up @@ -87,19 +87,19 @@ async def _balance_ownership(self):
)
now = time.time()
ownership_dict = dict((x["partition_id"], x) for x in ownership_list) # put the list to dict for fast lookup
not_owned_partition_ids = [pid for pid in self.cached_parition_ids if pid not in ownership_dict]
not_owned_partition_ids = [pid for pid in all_partition_ids if pid not in ownership_dict]
timed_out_partition_ids = [ownership["partition_id"] for ownership in ownership_list
if ownership["last_modified_time"] + self.ownership_timeout < now]
claimable_partition_ids = not_owned_partition_ids + timed_out_partition_ids
active_ownership = [ownership for ownership in ownership_list
if ownership["last_modified_time"] + self.ownership_timeout >= now]
active_ownership_by_owner = collections.defaultdict(list)
active_ownership_by_owner = defaultdict(list)
for ownership in active_ownership:
active_ownership_by_owner[ownership["owner_id"]].append(ownership)
active_ownership_self = active_ownership_by_owner[self.owner_id]

# calculate expected count per owner
all_partition_count = len(self.cached_parition_ids)
all_partition_count = len(all_partition_ids)
owners_count = len(active_ownership_by_owner) + \
(0 if self.owner_id in active_ownership_by_owner else 1)
expected_count_per_owner = all_partition_count // owners_count
Expand All @@ -109,20 +109,21 @@ async def _balance_ownership(self):
to_claim = active_ownership_self
if len(active_ownership_self) > most_count_allowed_per_owner: # needs to abandon a partition
to_claim.pop() # abandon one partition if owned too many
# TODO: Release a ownership immediately so other EventProcessors won't need to wait it to timeout
elif len(active_ownership_self) < expected_count_per_owner: # Either claims an inactive partition, or steals from other owners
# TODO: Release an ownership immediately so other EventProcessors won't need to wait it to timeout
elif len(active_ownership_self) < expected_count_per_owner:
# Either claims an inactive partition, or steals from other owners
if claimable_partition_ids: # claim an inactive partition if there is
random_partition_id = random.choice(claimable_partition_ids)
random_chosen_to_claim = ownership_dict.get(random_partition_id,
{"partition_id": random_partition_id,
"eventhub_name": self.eventhub_client.eh_name,
"consumer_group_name": self.consumer_group_name,
"owner_level": 0}) # TODO: consider removing owner_level
"consumer_group_name": self.consumer_group_name
})
random_chosen_to_claim["owner_id"] = self.owner_id
to_claim.append(random_chosen_to_claim)
else: # steal from another owner that has the most count
active_ownership_count_group_by_owner = Counter(
(x, len(y)) for x, y in active_ownership_by_owner.items())
dict((x, len(y)) for x, y in active_ownership_by_owner.items()))
most_frequent_owner_id = active_ownership_count_group_by_owner.most_common(1)[0][0]
# randomly choose a partition to steal from the most_frequent_owner
to_steal_partition = random.choice(active_ownership_by_owner[most_frequent_owner_id])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ class CheckpointManager(object):
The interaction with the chosen storage service is done via ~azure.eventhub.eventprocessor.PartitionManager.

"""
def __init__(self, partition_id: str, eventhub_name: str, consumer_group_name: str,
owner_id: str, partition_manager: PartitionManager):
def __init__(self, eventhub_name: str, consumer_group_name: str,
partition_id: str, owner_id: str, partition_manager: PartitionManager):
self.partition_id = partition_id
self.eventhub_name = eventhub_name
self.consumer_group_name = consumer_group_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
OWNER_LEVEL = 0


class EventProcessor(object):
class EventProcessor(object): # pylint:disable=too-many-instance-attributes
"""
An EventProcessor constantly receives events from all partitions of the Event Hub in the context of a given
consumer group. The received data will be sent to PartitionProcessor to be processed.
Expand Down Expand Up @@ -163,72 +163,62 @@ def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list):
async def _receive(self, ownership):
log.info("start ownership, %r", ownership)
partition_processor = self._partition_processor_factory()
if not hasattr(partition_processor, "process_events"):
log.error(
"Fatal error: a partition processor should at least have method process_events(events, checkpoint_manager). EventProcessor will stop.")
await self.stop()
raise TypeError("Partition processor must has method process_events(events, checkpoint_manager")

partition_consumer = self._eventhub_client.create_consumer(
ownership["consumer_group_name"],
ownership["partition_id"],
EventPosition(ownership.get("offset", self._initial_event_position))
)
partition_id = ownership["partition_id"]
eventhub_name = ownership["eventhub_name"]
consumer_group_name = ownership["consumer_group_name"]
owner_id = ownership["owner_id"]
checkpoint_manager = CheckpointManager(
ownership["partition_id"],
ownership["eventhub_name"],
ownership["consumer_group_name"],
ownership["owner_id"],
eventhub_name,
consumer_group_name,
partition_id,
owner_id,
self._partition_manager
)

async def initialize():
if hasattr(partition_processor, "initialize"):
await partition_processor.initialize()

async def process_error(err):
if hasattr(partition_processor, "process_error"):
await partition_processor.process_error(err, checkpoint_manager)

async def close(close_reason):
if hasattr(partition_processor, "close"):
await partition_processor.close(close_reason, checkpoint_manager)
partition_processor.eventhub_name = ownership
partition_processor.checkpoint_manager = checkpoint_manager
partition_consumer = self._eventhub_client.create_consumer(
consumer_group_name,
partition_id,
EventPosition(ownership.get("offset", self._initial_event_position))
)

try:
while True:
try:
await initialize()
await partition_processor.initialize()
events = await partition_consumer.receive(timeout=self._receive_timeout)
await partition_processor.process_events(events, checkpoint_manager)
except asyncio.CancelledError as cancelled_error:
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r "
"is cancelled",
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" is cancelled",
ownership["owner_id"],
ownership["eventhub_name"],
ownership["partition_id"],
ownership["consumer_group_name"]
)
await process_error(cancelled_error)
await close(CloseReason.SHUTDOWN)
await partition_processor.process_error(cancelled_error, checkpoint_manager)
await partition_processor.close(CloseReason.SHUTDOWN, checkpoint_manager)
# TODO: release the ownership immediately via partition manager
break
except EventHubError as eh_err:
reason = CloseReason.OWNERSHIP_LOST if eh_err.error == "link:stolen" else CloseReason.EVENTHUB_EXCEPTION
reason = CloseReason.OWNERSHIP_LOST if eh_err.error == "link:stolen" \
else CloseReason.EVENTHUB_EXCEPTION
log.warning(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r "
"has met an exception receiving events. It's being closed. The exception is %r.",
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has met an exception receiving events. It's being closed. The exception is %r.",
ownership["owner_id"],
ownership["eventhub_name"],
ownership["partition_id"],
ownership["consumer_group_name"],
eh_err
)
await process_error(eh_err)
await close(reason) # An EventProcessor will pick up this partition again after the ownership is released
await partition_processor.process_error(eh_err, checkpoint_manager)
await partition_processor.close(reason, checkpoint_manager)
# An EventProcessor will pick up this partition again after the ownership is released
# TODO: release the ownership immediately via partition manager
break
except Exception as exp:
except Exception as exp: # pylint:disable=broad-except
log.warning(exp)
finally:
await partition_consumer.close()
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def list_ownership(self, eventhub_name: str, consumer_group_name: str) ->
last_modified_time
etag
"""
pass


@abstractmethod
async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
Expand All @@ -54,13 +54,14 @@ async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterabl
last_modified_time
etag
"""
pass


@abstractmethod
async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id,
offset, sequence_number) -> None:
"""
Updates the checkpoint using the given information for the associated partition and consumer group in the chosen storage service.
Updates the checkpoint using the given information for the associated partition and
consumer group in the chosen storage service.

:param eventhub_name: The name of the specific Event Hub the ownership are associated with, relative to
the Event Hubs namespace that contains it.
Expand All @@ -73,11 +74,11 @@ async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_
:type owner_id: str
:param offset: The offset of the ~azure.eventhub.EventData the new checkpoint will be associated with.
:type offset: str
:param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint will be associated with.
:param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint
will be associated with.
:type sequence_number: int
:return:
"""
pass

async def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
# -----------------------------------------------------------------------------------

from typing import List
from typing_extensions import Protocol
from abc import ABC
from enum import Enum
from .checkpoint_manager import CheckpointManager

from azure.eventhub import EventData
from .checkpoint_manager import CheckpointManager


class CloseReason(Enum):
Expand All @@ -17,12 +16,14 @@ class CloseReason(Enum):
EVENTHUB_EXCEPTION = 2 # Exception happens during receiving events


class PartitionProcessor(Protocol):
class PartitionProcessor(ABC):
"""
PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class
implementing this abstract class will be created for every partition the associated ~azure.eventhub.eventprocessor.EventProcessor owns.
implementing this abstract class will be created for every partition the associated
~azure.eventhub.eventprocessor.EventProcessor owns.

"""

async def initialize(self):
pass

Expand All @@ -34,15 +35,18 @@ async def close(self, reason, checkpoint_manager: CheckpointManager):

:param reason: Reason for closing the PartitionProcessor.
:type reason: ~azure.eventhub.eventprocessor.CloseReason
:param checkpoint_manager: Use its method update_checkpoint to update checkpoint to the data store
:type checkpoint_manager: ~azure.eventhub.CheckpointManager

"""
pass

async def process_events(self, events: List[EventData], checkpoint_manager: CheckpointManager):
"""Called when a batch of events have been received.

:param events: Received events.
:type events: list[~azure.eventhub.common.EventData]
:param checkpoint_manager: Use its method update_checkpoint to update checkpoint to the data store
:type checkpoint_manager: ~azure.eventhub.CheckpointManager

"""
raise NotImplementedError
Expand All @@ -52,6 +56,7 @@ async def process_error(self, error, checkpoint_manager: CheckpointManager):

:param error: The error that happens.
:type error: Exception
:param checkpoint_manager: Use its method update_checkpoint to update checkpoint to the data store
:type checkpoint_manager: ~azure.eventhub.CheckpointManager

"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ def __init__(self, db_filename: str = ":memory:", ownership_table: str = "owners
+ ",".join(self.primary_keys)\
+ "))"
c.execute(sql)
except sqlite3.OperationalError:
raise
pass
finally:
c.close()
self.conn = conn
Expand Down Expand Up @@ -100,13 +97,16 @@ async def claim_ownership(self, partitions):
if p.get("etag") == cursor_fetch[0][0]:
p["last_modified_time"] = time.time()
p["etag"] = str(uuid.uuid4())
other_fields_without_checkpoint = list(filter(lambda x: x not in self.checkpoint_fields, self.other_fields))
other_fields_without_checkpoint = list(
filter(lambda x: x not in self.checkpoint_fields, self.other_fields)
)
sql = "update " + _check_table_name(self.ownership_table) + " set "\
+ ','.join([field+"=?" for field in other_fields_without_checkpoint])\
+ " where "\
+ " and ".join([field+"=?" for field in self.primary_keys])

cursor.execute(sql, tuple(p.get(field) for field in other_fields_without_checkpoint) + tuple(p.get(field) for field in self.primary_keys))
cursor.execute(sql, tuple(p.get(field) for field in other_fields_without_checkpoint)
+ tuple(p.get(field) for field in self.primary_keys))
result.append(p)
else:
logger.info("EventProcessor %r failed to claim partition %r "
Expand All @@ -121,15 +121,19 @@ async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_
offset, sequence_number):
cursor = self.conn.cursor()
try:
cursor.execute("select owner_id from " + _check_table_name(self.ownership_table) + " where eventhub_name=? and consumer_group_name=? and partition_id=?",
cursor.execute("select owner_id from " + _check_table_name(self.ownership_table)
+ " where eventhub_name=? and consumer_group_name=? and partition_id=?",
(eventhub_name, consumer_group_name, partition_id))
cursor_fetch = cursor.fetchall()
if cursor_fetch and owner_id == cursor_fetch[0][0]:
cursor.execute("update " + _check_table_name(self.ownership_table) + " set offset=?, sequence_number=? where eventhub_name=? and consumer_group_name=? and partition_id=?",
cursor.execute("update " + _check_table_name(self.ownership_table)
+ " set offset=?, sequence_number=? "
"where eventhub_name=? and consumer_group_name=? and partition_id=?",
(offset, sequence_number, eventhub_name, consumer_group_name, partition_id))
self.conn.commit()
else:
logger.info("EventProcessor couldn't checkpoint to partition %r because it no longer has the ownership", partition_id)
logger.info("EventProcessor couldn't checkpoint to partition %r because it no longer has the ownership",
partition_id)

finally:
cursor.close()
Expand Down
Loading