Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
3a32907
Draft EventProcessor Loadbalancing
Aug 22, 2019
39b1b86
EventProcessor Load balancing
Aug 22, 2019
17f5153
small changes from bryan's review
Aug 23, 2019
04ef548
remove checkpoint manager from initialize
Aug 23, 2019
9be1741
small changes
Aug 23, 2019
875841e
initial blob storage
Aug 28, 2019
6df7253
fix leaking connection str
Aug 28, 2019
1b5753c
Draft EventProcessor Loadbalancing
Aug 22, 2019
b4b77f9
EventProcessor Load balancing
Aug 22, 2019
1787fdd
small changes from bryan's review
Aug 23, 2019
c2d0155
remove checkpoint manager from initialize
Aug 23, 2019
1074385
small changes
Aug 23, 2019
386baf0
Fix code review feedback
Aug 29, 2019
1afbf0c
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
c126bea
Packaging update of azure-mgmt-datalake-analytics
AutorestCI Aug 30, 2019
40c7f03
Packaging update of azure-loganalytics
AutorestCI Aug 30, 2019
cf22c7c
Packaging update of azure-mgmt-storage
AutorestCI Aug 30, 2019
c7440b2
Merge branch 'eventhubs_preview3' into eventhubs_yx
Aug 30, 2019
fa804f4
code review fixes and pylint error
Aug 30, 2019
470cf7e
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
e5f3b50
reduce dictionary access
Aug 30, 2019
27cb0bf
initial blob storage
Aug 28, 2019
f6d77e7
fix leaking connection str
Aug 28, 2019
32833b3
Merge branch 'eventhubs_blobstorage' of github.com:Azure/azure-sdk-fo…
Aug 30, 2019
da6199f
Change test polling to 5 sec
Sep 2, 2019
e6a7c5e
Add async lock to ensure etag consistency
Sep 2, 2019
ebc4362
Add dependency to azure-storage
Sep 2, 2019
1503604
Remove dependency on PartitionManager of azure-eventhub
Sep 2, 2019
c9707c4
Fix azure-storage-blob requirement error
Sep 2, 2019
8343876
Revert "Packaging update of azure-mgmt-storage"
Sep 2, 2019
66c5b31
Revert "Packaging update of azure-loganalytics"
Sep 2, 2019
bcd851a
Revert "Packaging update of azure-mgmt-datalake-analytics"
Sep 2, 2019
d7b2606
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 2, 2019
d740bb0
Trivial code change
Sep 2, 2019
778ab66
Add docstring to BlobPartitionManager
Sep 2, 2019
017d9f0
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 2, 2019
aad6978
Refine exception handling for eventprocessor
Sep 3, 2019
a55dc13
Enable pylint for eventprocessor
Sep 3, 2019
a339985
Expose OwnershipLostError
Sep 3, 2019
9bed566
Refine exception handling
Sep 3, 2019
b878002
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 3, 2019
cbc6792
handle exception for claim_ownership and update_checkpoint
Sep 4, 2019
8748e1f
Add license info
Sep 4, 2019
97e0558
Restructure packages
Sep 4, 2019
e61d6a1
Lock each ownership with a separate lock
Sep 4, 2019
9102713
Move eventprocessor to aio
Sep 4, 2019
278592c
change checkpoint_manager to partition context
Sep 4, 2019
665f28c
fix pylint error
Sep 4, 2019
b03cc64
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 4, 2019
db93fd4
Re-org namespace package structure
Sep 4, 2019
2050615
raise error while list_ownership got an exception
Sep 5, 2019
2781062
Restructure package structure
Sep 5, 2019
8a32e44
replace checkpointer with checkpointstore as a part of package name
Sep 6, 2019
88ca853
Merge remote-tracking branch 'central/eventhubs_preview3' into eventh…
Sep 7, 2019
b32417b
Merge remote-tracking branch 'central/eventhubs_preview3' into eventh…
Sep 7, 2019
667f0b0
remove duplicated partition manager
Sep 7, 2019
f28365c
Fix a bug in list_ownership
Sep 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refine exception handling for eventprocessor
  • Loading branch information
yijxie committed Sep 3, 2019
commit aad6978f934a3f843adb1de3879a0535c0a3ca8a
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from azure.eventhub import EventPosition, EventHubError
from azure.eventhub.aio import EventHubClient
from .checkpoint_manager import CheckpointManager
from .partition_manager import PartitionManager
from .partition_manager import PartitionManager, OwnershipLostError
from ._ownership_manager import OwnershipManager
from .partition_processor import CloseReason, PartitionProcessor

Expand Down Expand Up @@ -182,13 +182,43 @@ async def _receive(self, ownership):
EventPosition(ownership.get("offset", self._initial_event_position))
)

async def process_error(err):
log.error(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has met an error. The exception is %r.",
owner_id, eventhub_name, partition_id, consumer_group_name, err
)
try:
await partition_processor.process_error(err, checkpoint_manager)
except Exception as err_again: # pylint:disable=broad-except
log.error(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has another error during running process_error(). The exception is %r.",
owner_id, eventhub_name, partition_id, consumer_group_name, err_again
)

async def close(reason):
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" is being closed. Reason is: %r",
owner_id, eventhub_name, partition_id, consumer_group_name, reason
)
try:
await partition_processor.close(reason, checkpoint_manager)
except Exception as err: # pylint:disable=broad-except
log.error(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has an error during running close(). The exception is %r.",
owner_id, eventhub_name, partition_id, consumer_group_name, err
)

try:
while True:
try:
await partition_processor.initialize()
events = await partition_consumer.receive(timeout=self._receive_timeout)
await partition_processor.process_events(events, checkpoint_manager)
except asyncio.CancelledError as cancelled_error:
except asyncio.CancelledError:
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" is cancelled",
Expand All @@ -197,28 +227,25 @@ async def _receive(self, ownership):
partition_id,
consumer_group_name
)
await partition_processor.process_error(cancelled_error, checkpoint_manager)
await partition_processor.close(CloseReason.SHUTDOWN, checkpoint_manager)
# TODO: release the ownership immediately via partition manager
if self._running is False:
await close(CloseReason.SHUTDOWN)
else:
await close(CloseReason.OWNERSHIP_LOST)
# TODO: release the ownership immediately via partition manager in preview 4
break
except EventHubError as eh_err:
reason = CloseReason.OWNERSHIP_LOST if eh_err.error == "link:stolen" \
else CloseReason.EVENTHUB_EXCEPTION
log.warning(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
" has met an exception receiving events. It's being closed. The exception is %r.",
owner_id,
eventhub_name,
partition_id,
consumer_group_name,
eh_err
)
await partition_processor.process_error(eh_err, checkpoint_manager)
await partition_processor.close(reason, checkpoint_manager)
await process_error(eh_err)
await close(CloseReason.EVENTHUB_EXCEPTION)
# An EventProcessor will pick up this partition again after the ownership is released
# TODO: release the ownership immediately via partition manager
# TODO: release the ownership immediately via partition manager in preview 4
break
except OwnershipLostError:
await close(CloseReason.OWNERSHIP_LOST)
break
except Exception as other_error: # pylint:disable=broad-except
await process_error(other_error)
await close(CloseReason.PROCESS_EVENTS_ERROR)
# TODO: release the ownership immediately via partition manager in preview 4
break
except Exception as exp: # pylint:disable=broad-except
log.warning(exp)
finally:
await partition_consumer.close()
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,15 @@ async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_
:param sequence_number: The sequence_number of the ~azure.eventhub.EventData the new checkpoint
will be associated with.
:type sequence_number: int
:return:
:return: None
:raise: `OwnershipLostError`, `CheckpointError`
"""

async def close(self):
pass


class OwnershipLostError(Exception):
"""Raises when update_checkpoint detects the ownership has been lost

"""
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class CloseReason(Enum):
SHUTDOWN = 0 # user call EventProcessor.stop()
OWNERSHIP_LOST = 1 # lose the ownership of a partition.
EVENTHUB_EXCEPTION = 2 # Exception happens during receiving events
PROCESS_EVENTS_ERROR = 3 # Exception happens during process_events


class PartitionProcessor(ABC):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import uuid
import sqlite3
import logging
from .partition_manager import PartitionManager
from .partition_manager import PartitionManager, OwnershipLostError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -134,6 +134,7 @@ async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_
else:
logger.info("EventProcessor couldn't checkpoint to partition %r because it no longer has the ownership",
partition_id)
raise OwnershipLostError()

finally:
cursor.close()
Expand Down