Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
120 commits
Select commit Hold shift + click to select a range
3a32907
Draft EventProcessor Loadbalancing
Aug 22, 2019
39b1b86
EventProcessor Load balancing
Aug 22, 2019
64da8ed
Small changes from code review
Aug 23, 2019
17f5153
small changes from bryan's review
Aug 23, 2019
04ef548
remove checkpoint manager from initialize
Aug 23, 2019
9be1741
small changes
Aug 23, 2019
d951dcf
change EventData.msg_properties to private attribute
Aug 26, 2019
8bbac25
remove abstract method
Aug 27, 2019
875841e
initial blob storage
Aug 28, 2019
70a33d0
code clean 1
Aug 28, 2019
6df7253
fix leaking connection str
Aug 28, 2019
abbdd25
code clean 2
Aug 28, 2019
b45d6b3
Fix pylint
Aug 29, 2019
247004a
Fix pylint
Aug 29, 2019
6ace6ce
Use properties EventData.partition_key
Aug 29, 2019
008421d
Small changes from code review
Aug 23, 2019
b8c027d
change EventData.msg_properties to private attribute
Aug 26, 2019
2489dd3
remove abstract method
Aug 27, 2019
3a2d72f
code clean 1
Aug 28, 2019
9735756
code clean 2
Aug 28, 2019
288617e
Fix pylint
Aug 29, 2019
2bdbffe
Fix pylint
Aug 29, 2019
e8ea699
Use properties EventData.partition_key
Aug 29, 2019
1b5753c
Draft EventProcessor Loadbalancing
Aug 22, 2019
b4b77f9
EventProcessor Load balancing
Aug 22, 2019
1787fdd
small changes from bryan's review
Aug 23, 2019
c2d0155
remove checkpoint manager from initialize
Aug 23, 2019
1074385
small changes
Aug 23, 2019
386baf0
Fix code review feedback
Aug 29, 2019
889597c
Merge branch 'eventhubs_preview3' of github.com:Azure/azure-sdk-for-p…
Aug 29, 2019
cb08478
Use properties EventData.partition_key
Aug 29, 2019
b3dcd07
Temporarily disable pylint errors that need refactoring
Aug 29, 2019
b85e6cc
fix pylint errors
Aug 29, 2019
92feb09
Merge branch 'master' into eventhubs_preview3
Aug 29, 2019
1afbf0c
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
c126bea
Packaging update of azure-mgmt-datalake-analytics
AutorestCI Aug 30, 2019
40c7f03
Packaging update of azure-loganalytics
AutorestCI Aug 30, 2019
cf22c7c
Packaging update of azure-mgmt-storage
AutorestCI Aug 30, 2019
5e51ce2
fix pylint errors
Aug 30, 2019
726bf6f
ignore eventprocessor pylint temporarily
Aug 30, 2019
c7440b2
Merge branch 'eventhubs_preview3' into eventhubs_yx
Aug 30, 2019
fa804f4
code review fixes and pylint error
Aug 30, 2019
470cf7e
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
ffd8cb0
small pylint adjustment
Aug 30, 2019
e5f3b50
reduce dictionary access
Aug 30, 2019
27cb0bf
initial blob storage
Aug 28, 2019
f6d77e7
fix leaking connection str
Aug 28, 2019
2f69d65
Merge branch 'master' into eventhubs_preview3
Aug 30, 2019
e5c8d1c
Add typing for Python2.7
Aug 30, 2019
32833b3
Merge branch 'eventhubs_blobstorage' of github.com:Azure/azure-sdk-fo…
Aug 30, 2019
e85ac17
[EventHub] IoTHub management operations improvement and bug fixing (#…
yunhaoling Sep 2, 2019
da6199f
Change test polling to 5 sec
Sep 2, 2019
e6a7c5e
Add async lock to ensure etag consistency
Sep 2, 2019
ebc4362
Add dependency to azure-storage
Sep 2, 2019
1503604
Remove dependency on PartitionManager of azure-eventhub
Sep 2, 2019
c9707c4
Fix azure-storage-blob requirement error
Sep 2, 2019
8343876
Revert "Packaging update of azure-mgmt-storage"
Sep 2, 2019
66c5b31
Revert "Packaging update of azure-loganalytics"
Sep 2, 2019
bcd851a
Revert "Packaging update of azure-mgmt-datalake-analytics"
Sep 2, 2019
d7b2606
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 2, 2019
d740bb0
Trivial code change
Sep 2, 2019
778ab66
Add docstring to BlobPartitionManager
Sep 2, 2019
017d9f0
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 2, 2019
1fb341b
[EventHub] Retry refactor (#7026)
yunhaoling Sep 3, 2019
aad6978
Refine exception handling for eventprocessor
Sep 3, 2019
a55dc13
Enable pylint for eventprocessor
Sep 3, 2019
a339985
Expose OwnershipLostError
Sep 3, 2019
9bed566
Refine exception handling
Sep 3, 2019
b878002
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 3, 2019
7762130
add system_properties to EventData
Sep 3, 2019
1b10d00
Fix a small bug
Sep 4, 2019
13237b5
Refine example code
Sep 4, 2019
cbc6792
handle exception for claim_ownership and update_checkpoint
Sep 4, 2019
8748e1f
Add license info
Sep 4, 2019
97e0558
Restructure packages
Sep 4, 2019
e61d6a1
Lock each ownership with a separate lock
Sep 4, 2019
9102713
Move eventprocessor to aio
Sep 4, 2019
278592c
change checkpoint_manager to partition context
Sep 4, 2019
665f28c
fix pylint error
Sep 4, 2019
998eeed
Update receive method (#7064)
yunhaoling Sep 4, 2019
b03cc64
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 4, 2019
db93fd4
Re-org namespace package structure
Sep 4, 2019
2050615
raise error while list_ownership got an exception
Sep 5, 2019
2781062
Restructure package structure
Sep 5, 2019
8a32e44
replace checkpointer with checkpointstore as a part of package name
Sep 6, 2019
e13ddee
Update accessibility of class (#7091)
yunhaoling Sep 6, 2019
f616f37
Update samples and codes according to the review (#7098)
yunhaoling Sep 6, 2019
dad5baa
Python EventHubs load balancing (#6901)
YijunXieMS Sep 7, 2019
8e7e1c1
Fix a pylint error
Sep 7, 2019
88ca853
Merge remote-tracking branch 'central/eventhubs_preview3' into eventh…
Sep 7, 2019
b32417b
Merge remote-tracking branch 'central/eventhubs_preview3' into eventh…
Sep 7, 2019
667f0b0
remove duplicated partition manager
Sep 7, 2019
f28365c
Fix a bug in list_ownership
Sep 7, 2019
74f39ce
Add pytest for blob partition manager
Sep 8, 2019
9959dc0
remove conftest.py from blob partition manager
Sep 8, 2019
1fd2243
Cache BlobClient instead of using ContainerClient to improve performance
Sep 9, 2019
b0e27a3
fix a list_ownership bug
Sep 9, 2019
c062fe0
add python requires
Sep 9, 2019
2b78446
Small fix
Sep 9, 2019
5bd2420
Change azure storage blob dependency version
Sep 9, 2019
05bdf04
Merge branch 'master' into eventhubs_blobstorage
Sep 9, 2019
b5af820
universal=0 by definition
Sep 9, 2019
74391a4
remove azure-eventhubs from dev requirement
Sep 9, 2019
f4f38bd
Update HISTORY
Sep 10, 2019
feffcfd
Update README
Sep 10, 2019
bb71c4a
Update README
Sep 10, 2019
2a41d06
empty init file under folder extensions to align with azure-eventhub
Sep 10, 2019
4553bba
Update readme.md
yunhaoling Sep 10, 2019
6938fc2
Fix a link issue
Sep 10, 2019
ca49bd8
Merge branch 'eventhubs_blobstorage' of github.com:YijunXieMS/azure-s…
Sep 10, 2019
2939ba2
fix a class name issue
Sep 10, 2019
89c99d1
add azure-eventhubs in dev_requirement
Sep 11, 2019
fd8ecdf
Revert "add azure-eventhubs in dev_requirement"
Sep 11, 2019
f7b85b4
Merge branch 'master' into eventhubs_blobstorage
Sep 11, 2019
74bd105
add azure-eventhubs in dev_requirement
Sep 11, 2019
c45bba9
Merge branch 'master' into eventhubs_blobstorage
Sep 11, 2019
f1053d2
Update azure-eventhub dependency to 5.0.0b3
Sep 11, 2019
da9f415
override azure-storage-blob version for azure-eventhubs-checkpointsto…
Sep 11, 2019
5eaa826
Add azure-eventhub in shared_requirements.txt
Sep 11, 2019
06896c0
Add extensions in manifest.in
Sep 11, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Release History
## 1.0.0b1 (2019-09-10)

**New features**

- `BlobPartitionManager` that uses Azure Blob Storage Block Blob to store EventProcessor checkpoint data

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.png)
21 changes: 21 additions & 0 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) Microsoft Corporation. All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
include *.md
include azure/__init__.py
include azure/eventhub/__init__.py
include azure/eventhub/extensions/__init__.py
141 changes: 141 additions & 0 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Azure EventHubs Checkpoint Store client library for Python using Storage Blobs

Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs.
This Checkpoint Store package works as a plug-in package to `EventProcessor`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information.

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblobaio/) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)

## Getting started

### Install the package

```
$ pip install --pre azure-eventhub-checkpointstoreblobaio
```

**Prerequisites**

- Python 3.5.3 or later.
- **Microsoft Azure Subscription:** To use Azure services, including Azure Event Hubs, you'll need a subscription. If you do not have an existing Azure account, you may sign up for a free trial or use your MSDN subscriber benefits when you [create an account](https://account.windowsazure.com/Home/Index).

- **Event Hubs namespace with an Event Hub:** To interact with Azure Event Hubs, you'll also need to have a namespace and Event Hub available. If you are not familiar with creating Azure resources, you may wish to follow the step-by-step guide for [creating an Event Hub using the Azure portal](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create). There, you can also find detailed instructions for using the Azure CLI, Azure PowerShell, or Azure Resource Manager (ARM) templates to create an Event Hub.

- **Azure Storage Account:** You'll need to have an Azure Storage Account and create a Azure Blob Storage Block Container to store the checkpoint data with blobs. You may follow the guide [creating an Azure Block Blob Storage Account](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-create-account-block-blob).

## Key concepts

### Checkpointing

Checkpointing is a process by which readers mark or commit their position within a partition event sequence.
Checkpointing is the responsibility of the consumer and occurs on a per-partition basis within a consumer group.
This responsibility means that for each consumer group, each partition reader must keep track of its current position
in the event stream, and can inform the service when it considers the data stream complete. If a reader disconnects from
a partition, when it reconnects it begins reading at the checkpoint that was previously submitted by the last reader of
that partition in that consumer group. When the reader connects, it passes the offset to the event hub to specify the
location at which to start reading. In this way, you can use checkpointing to both mark events as "complete" by
downstream applications, and to provide resiliency if a failover between readers running on different machines occurs.
It is possible to return to older data by specifying a lower offset from this checkpointing process. Through this
mechanism, checkpointing enables both failover resiliency and event stream replay.

### Offsets & sequence numbers
Both offset & sequence number refer to the position of an event within a partition. You can think of them as a
client-side cursor. The offset is a byte numbering of the event. The offset/sequence number enables an event consumer
(reader) to specify a point in the event stream from which they want to begin reading events. You can specify a
timestamp such that you receive events enqueued only after the given timestamp. Consumers are responsible for
storing their own offset values outside of the Event Hubs service. Within a partition, each event includes an offset,
sequence number and the timestamp of when it was enqueued.

## Examples
- [Create an Azure Storage Blobs `ContainerClient`](#create-an-azure-storage-blobs-containerclient)
- [Create an Azure EventHubs `EventHubClient`](#create-an-eventhubclient)
- [Consume events using an `EventProessor` that uses a `BlobPartitionManager`](#consume-events-using-an-eventprocessor-that-uses-a-blobpartitionmanager-to-do-checkpointing)

### Create an Azure Storage Blobs `ContainerClient`
The easiest way to create a `ContainerClient` is to use a connection string.
```python
from azure.storage.blob.aio import ContainerClient
container_client = ContainerClient.from_connection_string("my_storageacount_connection_string", container="mycontainer")
```
For other ways of creating a `ContainerClient`, go to [Blob Storage library](https://github.com/Azure/azure-sdk-for-python/tree/eventhubs_preview3/sdk/storage/azure-storage-blob) for more details.

### Create an `EventHubClient`
The easiest way to create a `EventHubClient` is to use a connection string.
```python
from azure.eventhub.aio import EventHubClient
eventhub_client = EventHubClient.from_connection_string("my_eventhub_namespace_connection_string", event_hub_path="myeventhub")
```
For other ways of creating a `EventHubClient`, refer to [EventHubs library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) for more details.

### Consume events using an `EventProcessor` that uses a `BlobPartitionManager` to do checkpointing
```python
import asyncio

from azure.eventhub.aio import EventHubClient
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor
from azure.storage.blob.aio import ContainerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager

eventhub_connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
storage_container_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
storage_container_name = '<< STORAGE CONTAINER NAME>>'

class MyPartitionProcessor(PartitionProcessor):
async def process_events(self, events, partition_context):
if events:
# write your code here to process events
# save checkpoint to the data store
await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number)

async def main():
eventhub_client = EventHubClient.from_connection_string(eventhub_connection_str, receive_timeout=5, retry_total=3)
storage_container_client = ContainerClient.from_connection_string(storage_container_connection_str, container=storage_container_name)
partition_manager = BlobPartitionManager(storage_container_client) # use the BlobPartitonManager to save
event_processor = EventProcessor(eventhub_client, "$default", MyPartitionProcessor, partition_manager)
async with storage_container_client:
asyncio.ensure_future(event_processor.start())
await asyncio.sleep(60) # run for a while
await event_processor.stop()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```

## Troubleshooting

### General
Enabling logging will be helpful to do trouble shooting.
Refer to [Logging](#logging) to enable loggers for related libraries.

## Next steps

### Examples
- [./examples/eventprocessor/event_processor_blob_storage_example.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py) - event processor with blob partition manager example

### Documentation

Reference documentation is available at https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub.

### Logging

- Enable `azure.eventhub.extensions.checkpointstoreblobaio` logger to collect traces from the library.
- Enable `azure.eventhub.aio.eventprocessor` logger to collect traces from package eventprocessor of the azure-eventhub library.
- Enable `azure.eventhub` logger to collect traces from the main azure-eventhub library.
- Enable `azure.storage.blob` logger to collect traces from azure storage blob library.
- Enable `uamqp` logger to collect traces from the underlying uAMQP library.
- Enable AMQP frame level trace by setting `network_tracing=True` when creating the client.

### Provide Feedback

If you encounter any bugs or have suggestions, please file an issue in the [Issues](https://github.com/Azure/azure-sdk-for-python/issues) section of the project.

## Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [[email protected]](mailto:[email protected]) with any additional questions or comments.

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.png)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.0.0b1"

from .blobstoragepm import BlobPartitionManager

__all__ = [
"BlobPartitionManager",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Iterable, Dict, Any
import logging
from collections import defaultdict
import asyncio
from azure.eventhub.aio.eventprocessor import PartitionManager, OwnershipLostError # type: ignore
from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore
from azure.storage.blob.aio import ContainerClient, BlobClient # type: ignore

logger = logging.getLogger(__name__)
UPLOAD_DATA = ""


class BlobPartitionManager(PartitionManager):
"""An PartitionManager that uses Azure Blob Storage to store the partition ownership and checkpoint data.

This class implements methods list_ownership, claim_ownership, and update_checkpoint that are defined in class
azure.eventhub.eventprocessor.PartitionManager of package azure-eventhub.

"""
def __init__(self, container_client: ContainerClient):
"""Create a BlobPartitionManager

:param container_client: The Azure Blob Storage Container client that is used to save checkpoint data to Azure
Blob Storage Container.
"""
self._container_client = container_client
self._cached_blob_clients = defaultdict() # type:Dict[str, BlobClient]
self._cached_ownership_dict = defaultdict(dict) # type: Dict[str, Dict[str, Any]]
# lock each partition for list_ownership, claim_ownership and update_checkpoint etag doesn't get out of sync
# when the three methods are running concurrently
self._cached_ownership_locks = defaultdict(asyncio.Lock) # type:Dict[str, asyncio.Lock]

def _get_blob_client(self, blob_name):
result = self._cached_blob_clients.get(blob_name)
if not result:
result = self._container_client.get_blob_client(blob_name)
self._cached_blob_clients[blob_name] = result
return result

async def _upload_blob(self, ownership, metadata):
etag = ownership.get("etag")
if etag:
etag_match = {"if_match": etag}
else:
etag_match = {"if_none_match": '*'}
partition_id = ownership["partition_id"]
uploaded_blob_properties = await self._get_blob_client(partition_id).upload_blob(
data=UPLOAD_DATA, overwrite=True, metadata=metadata, **etag_match
)
ownership["etag"] = uploaded_blob_properties["etag"]
ownership["last_modified_time"] = uploaded_blob_properties["last_modified"].timestamp()
ownership.update(metadata)

async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]:
try:
blobs = self._container_client.list_blobs(include=['metadata'])
except Exception as err: # pylint:disable=broad-except
logger.warning("An exception occurred during list_ownership for eventhub %r consumer group %r. "
"Exception is %r", eventhub_name, consumer_group_name, err)
raise
async for b in blobs:
async with self._cached_ownership_locks[b.name]:
if b.name not in self._cached_ownership_dict \
or b.last_modified.timestamp() > self._cached_ownership_dict[b.name].get("last_modified_time"):
metadata = b.metadata
ownership = {
"eventhub_name": eventhub_name,
"consumer_group_name": consumer_group_name,
"partition_id": b.name,
"owner_id": metadata["owner_id"],
"etag": b.etag,
"last_modified_time": b.last_modified.timestamp() if b.last_modified else None
}
ownership.update(metadata)
self._cached_ownership_dict[b.name] = ownership
return self._cached_ownership_dict.values()

async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
result = []
for ownership in ownership_list:
partition_id = ownership["partition_id"]
eventhub_name = ownership["eventhub_name"]
consumer_group_name = ownership["consumer_group_name"]
owner_id = ownership["owner_id"]

async with self._cached_ownership_locks[partition_id]:
metadata = {"owner_id": ownership["owner_id"]}
if "offset" in ownership:
metadata["offset"] = ownership["offset"]
if "sequence_number" in ownership:
metadata["sequence_number"] = ownership["sequence_number"]
try:
await self._upload_blob(ownership, metadata)
self._cached_ownership_dict[partition_id] = ownership
result.append(ownership)
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of eventhub %r consumer group %r lost ownership to partition %r",
owner_id, eventhub_name, consumer_group_name, partition_id)
except Exception as err: # pylint:disable=broad-except
logger.warning("An exception occurred when EventProcessor instance %r claim_ownership for "
"eventhub %r consumer group %r partition %r. The ownership is now lost. Exception "
"is %r", owner_id, eventhub_name, consumer_group_name, partition_id, err)
return result

async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id,
offset, sequence_number) -> None:
metadata = {
"owner_id": owner_id,
"offset": offset,
"sequence_number": str(sequence_number)
}
cached_ownership = self._cached_ownership_dict[partition_id]
async with self._cached_ownership_locks[partition_id]:
try:
await self._upload_blob(cached_ownership, metadata)
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to "
"partition %r because the ownership has been stolen",
owner_id, eventhub_name, consumer_group_name, partition_id)
raise OwnershipLostError()
except Exception as err:
logger.warning(
"EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to "
"partition %r because of unexpected error. Exception is %r",
owner_id, eventhub_name, consumer_group_name, partition_id, err)
raise # EventProcessor will catch the exception and handle it
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
../azure-eventhubs
Loading