Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
3a32907
Draft EventProcessor Loadbalancing
Aug 22, 2019
39b1b86
EventProcessor Load balancing
Aug 22, 2019
17f5153
small changes from bryan's review
Aug 23, 2019
04ef548
remove checkpoint manager from initialize
Aug 23, 2019
9be1741
small changes
Aug 23, 2019
1b5753c
Draft EventProcessor Loadbalancing
Aug 22, 2019
b4b77f9
EventProcessor Load balancing
Aug 22, 2019
1787fdd
small changes from bryan's review
Aug 23, 2019
c2d0155
remove checkpoint manager from initialize
Aug 23, 2019
1074385
small changes
Aug 23, 2019
386baf0
Fix code review feedback
Aug 29, 2019
1afbf0c
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
c126bea
Packaging update of azure-mgmt-datalake-analytics
AutorestCI Aug 30, 2019
40c7f03
Packaging update of azure-loganalytics
AutorestCI Aug 30, 2019
cf22c7c
Packaging update of azure-mgmt-storage
AutorestCI Aug 30, 2019
c7440b2
Merge branch 'eventhubs_preview3' into eventhubs_yx
Aug 30, 2019
fa804f4
code review fixes and pylint error
Aug 30, 2019
470cf7e
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
e5f3b50
reduce dictionary access
Aug 30, 2019
8343876
Revert "Packaging update of azure-mgmt-storage"
Sep 2, 2019
66c5b31
Revert "Packaging update of azure-loganalytics"
Sep 2, 2019
bcd851a
Revert "Packaging update of azure-mgmt-datalake-analytics"
Sep 2, 2019
d740bb0
Trivial code change
Sep 2, 2019
aad6978
Refine exception handling for eventprocessor
Sep 3, 2019
a55dc13
Enable pylint for eventprocessor
Sep 3, 2019
a339985
Expose OwnershipLostError
Sep 3, 2019
9102713
Move eventprocessor to aio
Sep 4, 2019
278592c
change checkpoint_manager to partition context
Sep 4, 2019
665f28c
fix pylint error
Sep 4, 2019
0060f9d
fix a small issue
Sep 4, 2019
7b4273a
Catch list_ownership/claim_ownership exceptions and retry
Sep 5, 2019
bdf97c8
Fix code review issues
Sep 6, 2019
02a4daf
fix event processor long running test
Sep 6, 2019
a9446de
Remove utils.py
Sep 6, 2019
8dfdec9
Remove close() method
Sep 6, 2019
2aace82
Updated docstrings
Sep 6, 2019
36ba0a3
add pytest
Sep 7, 2019
7f95d9e
small fixes
Sep 7, 2019
f5870af
Merge branch 'eventhubs_preview3' into eventhubs_yx
Sep 7, 2019
f30d143
Revert "Remove utils.py"
Sep 7, 2019
893bee0
change asyncio.create_task to 3.5 friendly code
Sep 7, 2019
4b41fa5
Remove Callable
Sep 7, 2019
fef0551
raise CancelledError instead of break
Sep 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix code review issues
  • Loading branch information
yijxie committed Sep 6, 2019
commit bdf97c85ef720a8e6dec8e6b1dc2bd552c9e535a
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .partition_processor import PartitionProcessor, CloseReason
from .partition_manager import PartitionManager, OwnershipLostError
from .partition_context import PartitionContext
from .sample_partition_manager import SamplePartitionManager

__all__ = [
'CloseReason',
Expand All @@ -15,4 +16,5 @@
'PartitionManager',
'OwnershipLostError',
'PartitionContext',
'SamplePartitionManager',
]
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class OwnershipManager(object):
"""
def __init__(
self, eventhub_client: EventHubClient, consumer_group_name: str, owner_id: str,
partition_manager: PartitionManager, ownership_timeout: int
partition_manager: PartitionManager, ownership_timeout: float
):
self.cached_parition_ids = [] # type: List[str]
self.eventhub_client = eventhub_client
Expand Down Expand Up @@ -86,7 +86,7 @@ async def _balance_ownership(self, all_partition_ids):
self.eventhub_name, self.consumer_group_name
)
now = time.time()
ownership_dict = dict((x["partition_id"], x) for x in ownership_list) # put the list to dict for fast lookup
ownership_dict = {x["partition_id"]: x for x in ownership_list} # put the list to dict for fast lookup
not_owned_partition_ids = [pid for pid in all_partition_ids if pid not in ownership_dict]
timed_out_partition_ids = [ownership["partition_id"] for ownership in ownership_list
if ownership["last_modified_time"] + self.ownership_timeout < now]
Expand All @@ -100,6 +100,8 @@ async def _balance_ownership(self, all_partition_ids):

# calculate expected count per owner
all_partition_count = len(all_partition_ids)
# owners_count is the number of active owners. If self.owner_id is not yet among the active owners,
# then plus 1 to include self. This will make owners_count >= 1.
owners_count = len(active_ownership_by_owner) + \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a comment to describe what it is doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

(0 if self.owner_id in active_ownership_by_owner else 1)
expected_count_per_owner = all_partition_count // owners_count
Expand All @@ -109,7 +111,6 @@ async def _balance_ownership(self, all_partition_ids):
to_claim = active_ownership_self
if len(active_ownership_self) > most_count_allowed_per_owner: # needs to abandon a partition
to_claim.pop() # abandon one partition if owned too many
# TODO: Release an ownership immediately so other EventProcessors won't need to wait it to timeout
elif len(active_ownership_self) < expected_count_per_owner:
# Either claims an inactive partition, or steals from other owners
if claimable_partition_ids: # claim an inactive partition if there is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ async def process_events(self, events, partition_context):
def __init__(
self, eventhub_client: EventHubClient, consumer_group_name: str,
partition_processor_factory: Callable[..., PartitionProcessor],
partition_manager: PartitionManager, **kwargs
partition_manager: PartitionManager, *,
initial_event_position: EventPosition = EventPosition("-1"), polling_interval: float = 10.0

):
"""
Instantiate an EventProcessor.
Expand Down Expand Up @@ -84,14 +86,9 @@ def __init__(
self._eventhub_name = eventhub_client.eh_name
self._partition_processor_factory = partition_processor_factory
self._partition_manager = partition_manager
self._initial_event_position = kwargs.get("initial_event_position", "-1")
# TODO: initial position provider will be a callable
# so users can create initial event position for every partition
self._max_batch_size = eventhub_client.config.max_batch_size
self._receive_timeout = eventhub_client.config.receive_timeout
self._polling_interval = kwargs.get("polling_interval", 10)
self._initial_event_position = initial_event_position # will be replaced by reset event position in preview 4
self._polling_interval = polling_interval
self._ownership_timeout = self._polling_interval * 2
# TODO: Team haven't decided if this is a separate argument
self._tasks = {} # type: Dict[str, asyncio.Task]
self._id = str(uuid.uuid4())
self._running = False
Expand Down Expand Up @@ -122,18 +119,19 @@ async def start(self):
try:
claimed_ownership_list = await ownership_manager.claim_ownership()
except Exception as err:
log.exception("An exception occurred during balancing and claiming ownership for eventhub %r "
"consumer group %r. Retrying after %r seconds",
self._eventhub_name, self._consumer_group_name, self._polling_interval, exc_info=err)
log.warning("An exception (%r) occurred during balancing and claiming ownership for eventhub %r "
"consumer group %r. Retrying after %r seconds",
err, self._eventhub_name, self._consumer_group_name, self._polling_interval)
await asyncio.sleep(self._polling_interval)
continue

to_cancel_list = self._tasks.keys()
if claimed_ownership_list:
claimed_partition_ids = [x["partition_id"] for x in claimed_ownership_list]
to_cancel_list = self._tasks.keys() - claimed_partition_ids
self._create_tasks_for_claimed_ownership(claimed_ownership_list)
else:
log.warning("EventProcessor %r hasn't claimed an ownership. It keeps claiming.", self._id)
to_cancel_list = self._tasks.keys()
log.info("EventProcessor %r hasn't claimed an ownership. It keeps claiming.", self._id)
if to_cancel_list:
self._cancel_tasks_for_partitions(to_cancel_list)
log.info("EventProcesor %r has cancelled partitions %r", self._id, to_cancel_list)
Expand All @@ -153,7 +151,7 @@ async def stop(self):
_, task = self._tasks.popitem()
task.cancel()
log.info("EventProcessor %r has been cancelled", self._id)
await asyncio.sleep(2) # give some time to finish after cancelled
await asyncio.sleep(2) # give some time to finish after cancelled.

def _cancel_tasks_for_partitions(self, to_cancel_partitions):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would invert this and pass in the currently claimed partitions. This way the use of self._tasks is more centralized/less spread out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to keep the code for now to save time for other more important stuffs and come back to this during preview 4 coding

for partition_id in to_cancel_partitions:
Expand Down Expand Up @@ -181,23 +179,22 @@ async def _receive(self, ownership):
owner_id,
self._partition_manager
)
partition_processor.eventhub_name = ownership
partition_consumer = self._eventhub_client.create_consumer(
consumer_group_name,
partition_id,
EventPosition(ownership.get("offset", self._initial_event_position))
EventPosition(ownership.get("offset", self._initial_event_position.value))
)

async def process_error(err):
log.error(
log.warning(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has met an error. The exception is %r.",
owner_id, eventhub_name, partition_id, consumer_group_name, err
)
try:
await partition_processor.process_error(err, partition_context)
except Exception as err_again: # pylint:disable=broad-except
log.error(
log.warning(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has another error during running process_error(). The exception is %r.",
owner_id, eventhub_name, partition_id, consumer_group_name, err_again
Expand All @@ -212,7 +209,7 @@ async def close(reason):
try:
await partition_processor.close(reason, partition_context)
except Exception as err: # pylint:disable=broad-except
log.error(
log.warning(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has an error during running close(). The exception is %r.",
owner_id, eventhub_name, partition_id, consumer_group_name, err
Expand All @@ -222,7 +219,7 @@ async def close(reason):
while True:
try:
await partition_processor.initialize(partition_context)
events = await partition_consumer.receive(timeout=self._receive_timeout)
events = await partition_consumer.receive()
await partition_processor.process_events(events, partition_context)
except asyncio.CancelledError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very concerned about the fact that we are not re-raising the asyncio.CancelledError here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed "break" to "raise" for this "except asyncio.CancelledError:" block

log.info(
Expand All @@ -237,21 +234,18 @@ async def close(reason):
await close(CloseReason.SHUTDOWN)
else:
await close(CloseReason.OWNERSHIP_LOST)
# TODO: release the ownership immediately via partition manager in preview 4
break
except EventHubError as eh_err:
await process_error(eh_err)
await close(CloseReason.EVENTHUB_EXCEPTION)
# An EventProcessor will pick up this partition again after the ownership is released
# TODO: release the ownership immediately via partition manager in preview 4
break
except OwnershipLostError:
await close(CloseReason.OWNERSHIP_LOST)
break
except Exception as other_error: # pylint:disable=broad-except
await process_error(other_error)
await close(CloseReason.PROCESS_EVENTS_ERROR)
# TODO: release the ownership immediately via partition manager in preview 4
break
finally:
await partition_consumer.close()
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ async def list_ownership(self, eventhub_name: str, consumer_group_name: str) ->
etag
"""


@abstractmethod
async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
"""
Tries to claim a list of specified ownership.

:param partitions: Iterable of dictionaries containing all the ownership to claim.
:type partitions: Iterable of dict
:param ownership_list: Iterable of dictionaries containing all the ownership to claim.
:type ownership_list: Iterable of dict
:return: Iterable of dictionaries containing the following partition ownership information:
eventhub_name
consumer_group_name
Expand All @@ -55,7 +54,6 @@ async def claim_ownership(self, partitions: Iterable[Dict[str, Any]]) -> Iterabl
etag
"""


@abstractmethod
async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id,
offset, sequence_number) -> None:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from azure.eventhub.aio import EventHubClient
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor
from azure.eventhub.aio.eventprocessor.partitionmanagers import SamplePartitionManager
from azure.eventhub.aio.eventprocessor import SamplePartitionManager

RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small
Expand Down