diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md new file mode 100644 index 000000000000..b40cd0557126 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md @@ -0,0 +1,8 @@ +# Release History +## 1.0.0b1 (2019-09-10) + +**New features** + +- `BlobPartitionManager` that uses Azure Blob Storage Block Blob to store EventProcessor checkpoint data + +![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.png) \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/LICENSE b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/LICENSE new file mode 100644 index 000000000000..21071075c245 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/MANIFEST.in b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/MANIFEST.in new file mode 100644 index 000000000000..7012aaaa132a --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/MANIFEST.in @@ -0,0 +1,4 @@ +include *.md +include azure/__init__.py +include azure/eventhub/__init__.py +include azure/eventhub/extensions/__init__.py diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md new file mode 100644 index 000000000000..9436f340fffa --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md @@ -0,0 +1,141 @@ +# Azure EventHubs Checkpoint Store client library for Python using Storage Blobs + +Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs. +This Checkpoint Store package works as a plug-in package to `EventProcessor`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information. + +[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblobaio/) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/) + +## Getting started + +### Install the package + +``` +$ pip install --pre azure-eventhub-checkpointstoreblobaio +``` + +**Prerequisites** + +- Python 3.5.3 or later. +- **Microsoft Azure Subscription:** To use Azure services, including Azure Event Hubs, you'll need a subscription. If you do not have an existing Azure account, you may sign up for a free trial or use your MSDN subscriber benefits when you [create an account](https://account.windowsazure.com/Home/Index). + +- **Event Hubs namespace with an Event Hub:** To interact with Azure Event Hubs, you'll also need to have a namespace and Event Hub available. If you are not familiar with creating Azure resources, you may wish to follow the step-by-step guide for [creating an Event Hub using the Azure portal](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create). There, you can also find detailed instructions for using the Azure CLI, Azure PowerShell, or Azure Resource Manager (ARM) templates to create an Event Hub. + +- **Azure Storage Account:** You'll need to have an Azure Storage Account and create a Azure Blob Storage Block Container to store the checkpoint data with blobs. You may follow the guide [creating an Azure Block Blob Storage Account](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-create-account-block-blob). + +## Key concepts + +### Checkpointing + +Checkpointing is a process by which readers mark or commit their position within a partition event sequence. +Checkpointing is the responsibility of the consumer and occurs on a per-partition basis within a consumer group. +This responsibility means that for each consumer group, each partition reader must keep track of its current position +in the event stream, and can inform the service when it considers the data stream complete. If a reader disconnects from +a partition, when it reconnects it begins reading at the checkpoint that was previously submitted by the last reader of +that partition in that consumer group. When the reader connects, it passes the offset to the event hub to specify the +location at which to start reading. In this way, you can use checkpointing to both mark events as "complete" by +downstream applications, and to provide resiliency if a failover between readers running on different machines occurs. +It is possible to return to older data by specifying a lower offset from this checkpointing process. Through this +mechanism, checkpointing enables both failover resiliency and event stream replay. + +### Offsets & sequence numbers +Both offset & sequence number refer to the position of an event within a partition. You can think of them as a +client-side cursor. The offset is a byte numbering of the event. The offset/sequence number enables an event consumer +(reader) to specify a point in the event stream from which they want to begin reading events. You can specify a +timestamp such that you receive events enqueued only after the given timestamp. Consumers are responsible for +storing their own offset values outside of the Event Hubs service. Within a partition, each event includes an offset, +sequence number and the timestamp of when it was enqueued. + +## Examples +- [Create an Azure Storage Blobs `ContainerClient`](#create-an-azure-storage-blobs-containerclient) +- [Create an Azure EventHubs `EventHubClient`](#create-an-eventhubclient) +- [Consume events using an `EventProessor` that uses a `BlobPartitionManager`](#consume-events-using-an-eventprocessor-that-uses-a-blobpartitionmanager-to-do-checkpointing) + +### Create an Azure Storage Blobs `ContainerClient` +The easiest way to create a `ContainerClient` is to use a connection string. +```python +from azure.storage.blob.aio import ContainerClient +container_client = ContainerClient.from_connection_string("my_storageacount_connection_string", container="mycontainer") +``` +For other ways of creating a `ContainerClient`, go to [Blob Storage library](https://github.com/Azure/azure-sdk-for-python/tree/eventhubs_preview3/sdk/storage/azure-storage-blob) for more details. + +### Create an `EventHubClient` +The easiest way to create a `EventHubClient` is to use a connection string. +```python +from azure.eventhub.aio import EventHubClient +eventhub_client = EventHubClient.from_connection_string("my_eventhub_namespace_connection_string", event_hub_path="myeventhub") +``` +For other ways of creating a `EventHubClient`, refer to [EventHubs library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) for more details. + +### Consume events using an `EventProcessor` that uses a `BlobPartitionManager` to do checkpointing +```python +import asyncio + +from azure.eventhub.aio import EventHubClient +from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor +from azure.storage.blob.aio import ContainerClient +from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager + +eventhub_connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>' +storage_container_connection_str = '<< CONNECTION STRING OF THE STORAGE >>' +storage_container_name = '<< STORAGE CONTAINER NAME>>' + +class MyPartitionProcessor(PartitionProcessor): + async def process_events(self, events, partition_context): + if events: + # write your code here to process events + # save checkpoint to the data store + await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) + +async def main(): + eventhub_client = EventHubClient.from_connection_string(eventhub_connection_str, receive_timeout=5, retry_total=3) + storage_container_client = ContainerClient.from_connection_string(storage_container_connection_str, container=storage_container_name) + partition_manager = BlobPartitionManager(storage_container_client) # use the BlobPartitonManager to save + event_processor = EventProcessor(eventhub_client, "$default", MyPartitionProcessor, partition_manager) + async with storage_container_client: + asyncio.ensure_future(event_processor.start()) + await asyncio.sleep(60) # run for a while + await event_processor.stop() + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) +``` + +## Troubleshooting + +### General +Enabling logging will be helpful to do trouble shooting. +Refer to [Logging](#logging) to enable loggers for related libraries. + +## Next steps + +### Examples +- [./examples/eventprocessor/event_processor_blob_storage_example.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py) - event processor with blob partition manager example + +### Documentation + +Reference documentation is available at https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub. + +### Logging + +- Enable `azure.eventhub.extensions.checkpointstoreblobaio` logger to collect traces from the library. +- Enable `azure.eventhub.aio.eventprocessor` logger to collect traces from package eventprocessor of the azure-eventhub library. +- Enable `azure.eventhub` logger to collect traces from the main azure-eventhub library. +- Enable `azure.storage.blob` logger to collect traces from azure storage blob library. +- Enable `uamqp` logger to collect traces from the underlying uAMQP library. +- Enable AMQP frame level trace by setting `network_tracing=True` when creating the client. + +### Provide Feedback + +If you encounter any bugs or have suggestions, please file an issue in the [Issues](https://github.com/Azure/azure-sdk-for-python/issues) section of the project. + +## Contributing + +This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com. + +When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA. + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). +For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. + +![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.png) diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/__init__.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/__init__.py new file mode 100644 index 000000000000..62351a0ab30b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/__init__.py @@ -0,0 +1,5 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/__init__.py new file mode 100644 index 000000000000..62351a0ab30b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/__init__.py @@ -0,0 +1,5 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/__init__.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py new file mode 100644 index 000000000000..9e0e473c9a8b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/__init__.py @@ -0,0 +1,12 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +__version__ = "1.0.0b1" + +from .blobstoragepm import BlobPartitionManager + +__all__ = [ + "BlobPartitionManager", +] diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepm.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepm.py new file mode 100644 index 000000000000..244c1a8ffcd6 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/azure/eventhub/extensions/checkpointstoreblobaio/blobstoragepm.py @@ -0,0 +1,132 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +from typing import Iterable, Dict, Any +import logging +from collections import defaultdict +import asyncio +from azure.eventhub.aio.eventprocessor import PartitionManager, OwnershipLostError # type: ignore +from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore +from azure.storage.blob.aio import ContainerClient, BlobClient # type: ignore + +logger = logging.getLogger(__name__) +UPLOAD_DATA = "" + + +class BlobPartitionManager(PartitionManager): + """An PartitionManager that uses Azure Blob Storage to store the partition ownership and checkpoint data. + + This class implements methods list_ownership, claim_ownership, and update_checkpoint that are defined in class + azure.eventhub.eventprocessor.PartitionManager of package azure-eventhub. + + """ + def __init__(self, container_client: ContainerClient): + """Create a BlobPartitionManager + + :param container_client: The Azure Blob Storage Container client that is used to save checkpoint data to Azure + Blob Storage Container. + """ + self._container_client = container_client + self._cached_blob_clients = defaultdict() # type:Dict[str, BlobClient] + self._cached_ownership_dict = defaultdict(dict) # type: Dict[str, Dict[str, Any]] + # lock each partition for list_ownership, claim_ownership and update_checkpoint etag doesn't get out of sync + # when the three methods are running concurrently + self._cached_ownership_locks = defaultdict(asyncio.Lock) # type:Dict[str, asyncio.Lock] + + def _get_blob_client(self, blob_name): + result = self._cached_blob_clients.get(blob_name) + if not result: + result = self._container_client.get_blob_client(blob_name) + self._cached_blob_clients[blob_name] = result + return result + + async def _upload_blob(self, ownership, metadata): + etag = ownership.get("etag") + if etag: + etag_match = {"if_match": etag} + else: + etag_match = {"if_none_match": '*'} + partition_id = ownership["partition_id"] + uploaded_blob_properties = await self._get_blob_client(partition_id).upload_blob( + data=UPLOAD_DATA, overwrite=True, metadata=metadata, **etag_match + ) + ownership["etag"] = uploaded_blob_properties["etag"] + ownership["last_modified_time"] = uploaded_blob_properties["last_modified"].timestamp() + ownership.update(metadata) + + async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]: + try: + blobs = self._container_client.list_blobs(include=['metadata']) + except Exception as err: # pylint:disable=broad-except + logger.warning("An exception occurred during list_ownership for eventhub %r consumer group %r. " + "Exception is %r", eventhub_name, consumer_group_name, err) + raise + async for b in blobs: + async with self._cached_ownership_locks[b.name]: + if b.name not in self._cached_ownership_dict \ + or b.last_modified.timestamp() > self._cached_ownership_dict[b.name].get("last_modified_time"): + metadata = b.metadata + ownership = { + "eventhub_name": eventhub_name, + "consumer_group_name": consumer_group_name, + "partition_id": b.name, + "owner_id": metadata["owner_id"], + "etag": b.etag, + "last_modified_time": b.last_modified.timestamp() if b.last_modified else None + } + ownership.update(metadata) + self._cached_ownership_dict[b.name] = ownership + return self._cached_ownership_dict.values() + + async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: + result = [] + for ownership in ownership_list: + partition_id = ownership["partition_id"] + eventhub_name = ownership["eventhub_name"] + consumer_group_name = ownership["consumer_group_name"] + owner_id = ownership["owner_id"] + + async with self._cached_ownership_locks[partition_id]: + metadata = {"owner_id": ownership["owner_id"]} + if "offset" in ownership: + metadata["offset"] = ownership["offset"] + if "sequence_number" in ownership: + metadata["sequence_number"] = ownership["sequence_number"] + try: + await self._upload_blob(ownership, metadata) + self._cached_ownership_dict[partition_id] = ownership + result.append(ownership) + except (ResourceModifiedError, ResourceExistsError): + logger.info( + "EventProcessor instance %r of eventhub %r consumer group %r lost ownership to partition %r", + owner_id, eventhub_name, consumer_group_name, partition_id) + except Exception as err: # pylint:disable=broad-except + logger.warning("An exception occurred when EventProcessor instance %r claim_ownership for " + "eventhub %r consumer group %r partition %r. The ownership is now lost. Exception " + "is %r", owner_id, eventhub_name, consumer_group_name, partition_id, err) + return result + + async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id, + offset, sequence_number) -> None: + metadata = { + "owner_id": owner_id, + "offset": offset, + "sequence_number": str(sequence_number) + } + cached_ownership = self._cached_ownership_dict[partition_id] + async with self._cached_ownership_locks[partition_id]: + try: + await self._upload_blob(cached_ownership, metadata) + except (ResourceModifiedError, ResourceExistsError): + logger.info( + "EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to " + "partition %r because the ownership has been stolen", + owner_id, eventhub_name, consumer_group_name, partition_id) + raise OwnershipLostError() + except Exception as err: + logger.warning( + "EventProcessor instance %r of eventhub %r consumer group %r couldn't update_checkpoint to " + "partition %r because of unexpected error. Exception is %r", + owner_id, eventhub_name, consumer_group_name, partition_id, err) + raise # EventProcessor will catch the exception and handle it diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt new file mode 100644 index 000000000000..ad81a99facbe --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt @@ -0,0 +1 @@ +../azure-eventhubs \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py new file mode 100644 index 000000000000..e7edc047831a --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/examples/event_processor_blob_storage_example.py @@ -0,0 +1,42 @@ +import asyncio +import logging +import os +from azure.eventhub.aio import EventHubClient +from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor +from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager +from azure.storage.blob.aio import ContainerClient + +RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout +RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small +CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] +STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"] + +logging.basicConfig(level=logging.INFO) + + +async def do_operation(event): + # do some sync or async operations. If the operation is i/o intensive, async will have better performance + print(event) + + +class MyPartitionProcessor(PartitionProcessor): + async def process_events(self, events, partition_context): + if events: + await asyncio.gather(*[do_operation(event) for event in events]) + await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number) + else: + print("empty events received", "partition:", partition_context.partition_id) + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL) + container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, container="eventprocessor") + partition_manager = BlobPartitionManager(container_client=container_client) + event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager, polling_interval=10) + try: + loop.run_until_complete(event_processor.start()) + except KeyboardInterrupt: + loop.run_until_complete(event_processor.stop()) + finally: + loop.stop() diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/sdk_packaging.toml b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/sdk_packaging.toml new file mode 100644 index 000000000000..e7687fdae93b --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/sdk_packaging.toml @@ -0,0 +1,2 @@ +[packaging] +auto_update = false \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.cfg b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.cfg new file mode 100644 index 000000000000..68c61a226596 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.cfg @@ -0,0 +1,2 @@ +[bdist_wheel] +universal=0 \ No newline at end of file diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py new file mode 100644 index 000000000000..1df5ac1fd5ac --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python + +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import re +import os.path +import sys +from io import open +from setuptools import find_packages, setup + + +# Change the PACKAGE_NAME only to change folder and different name +PACKAGE_NAME = "azure-eventhub-checkpointstoreblob-aio" +PACKAGE_PPRINT_NAME = "Event Hubs checkpointer implementation with Blob Storage" + +package_folder_path = "azure/eventhub/extensions/checkpointstoreblobaio" +namespace_name = "azure.eventhub.extensions.checkpointstoreblobaio" + +# Version extraction inspired from 'requests' +with open(os.path.join(package_folder_path, '__init__.py'), 'r') as fd: + version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', + fd.read(), re.MULTILINE).group(1) + +if not version: + raise RuntimeError('Cannot find version information') + +with open('README.md') as f: + readme = f.read() +with open('HISTORY.md') as f: + history = f.read() + +exclude_packages = [ + 'tests', + 'examples', + # Exclude packages that will be covered by PEP420 or nspkg + 'azure', + 'azure.eventhub', + 'azure.eventhub.extensions', + ] + +setup( + name=PACKAGE_NAME, + version=version, + description='Microsoft Azure {} Client Library for Python'.format(PACKAGE_PPRINT_NAME), + long_description=readme + '\n\n' + history, + long_description_content_type='text/markdown', + license='MIT License', + author='Microsoft Corporation', + author_email='azpysdkhelp@microsoft.com', + url='https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointerblob-aio', + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'License :: OSI Approved :: MIT License', + ], + zip_safe=False, + packages=find_packages(exclude=exclude_packages), + python_requires=">=3.5.3", + install_requires=[ + 'azure-storage-blob<12.0.0b4,>=12.0.0b2', + 'azure-eventhub<6.0.0,>=5.0.0b3', + ], + extras_require={ + + } +) diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager.py new file mode 100644 index 000000000000..2f5f42294231 --- /dev/null +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager.py @@ -0,0 +1,130 @@ +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import pytest +import time +import os +import uuid +import warnings + +from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager + + +def get_live_storage_blob_client(): + try: + storage_connection_str = os.environ['AZURE_STORAGE_CONN_STR'] + except KeyError: + return None + try: + from azure.storage.blob import BlobServiceClient + from azure.storage.blob.aio import ContainerClient + except ImportError or ModuleNotFoundError: + return None + + container_str = str(uuid.uuid4()) + blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str) + blob_service_client.create_container(container_str) + container_client = ContainerClient.from_connection_string(storage_connection_str, container=container_str) + return container_str, container_client + + +def remove_live_storage_blob_client(container_str): + try: + storage_connection_str = os.environ['AZURE_STORAGE_CONN_STR'] + from azure.storage.blob import BlobServiceClient + blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str) + blob_service_client.delete_container(container_str) + except: + warnings.warn(UserWarning("storage container teardown failed")) + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_claim_and_list_ownership(): + container_str, live_storage_blob_client = get_live_storage_blob_client() + if not live_storage_blob_client: + pytest.skip("Storage blob client can't be created") + + eventhub_name = 'eventhub' + consumer_group_name = '$default' + ownership_cnt = 8 + + try: + async with live_storage_blob_client: + + partition_manager = BlobPartitionManager(container_client=live_storage_blob_client) + + ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name) + assert len(ownership_list) == 0 + + ownership_list = [] + + for i in range(ownership_cnt): + ownership = {} + ownership['eventhub_name'] = eventhub_name + ownership['consumer_group_name'] = consumer_group_name + ownership['owner_id'] = 'ownerid' + ownership['partition_id'] = str(i) + ownership['last_modified_time'] = time.time() + ownership["offset"] = "1" + ownership["sequence_number"] = "1" + ownership_list.append(ownership) + + await partition_manager.claim_ownership(ownership_list) + + ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name) + assert len(ownership_list) == ownership_cnt + finally: + remove_live_storage_blob_client(container_str) + + +@pytest.mark.liveTest +@pytest.mark.asyncio +async def test_update_checkpoint(): + container_str, live_storage_blob_client = get_live_storage_blob_client() + if not live_storage_blob_client: + pytest.skip("Storage blob client can't be created") + + eventhub_name = 'eventhub' + consumer_group_name = '$default' + owner_id = 'owner' + partition_cnt = 8 + + try: + async with live_storage_blob_client: + partition_manager = BlobPartitionManager(container_client=live_storage_blob_client) + + ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name) + assert len(ownership_list) == 0 + + ownership_list = [] + + for i in range(partition_cnt): + ownership = {} + ownership['eventhub_name'] = eventhub_name + ownership['consumer_group_name'] = consumer_group_name + ownership['owner_id'] = owner_id + ownership['partition_id'] = str(i) + ownership['last_modified_time'] = time.time() + ownership['offset'] = '1' + ownership['sequence_number'] = '10' + ownership_list.append(ownership) + + await partition_manager.claim_ownership(ownership_list) + + ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name) + assert len(ownership_list) == partition_cnt + + for i in range(partition_cnt): + await partition_manager.update_checkpoint(eventhub_name, consumer_group_name, str(i), + owner_id, '2', '20') + + ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name) + for ownership in ownership_list: + assert ownership['offset'] == '2' + assert ownership['sequence_number'] == '20' + finally: + remove_live_storage_blob_client(container_str) diff --git a/shared_requirements.txt b/shared_requirements.txt index 04738b39cfd9..c05003f516aa 100644 --- a/shared_requirements.txt +++ b/shared_requirements.txt @@ -9,6 +9,7 @@ azure-common~=1.1 azure-core<2.0.0,>=1.0.0b2 azure-cosmosdb-table~=1.0 azure-datalake-store~=0.0.18 +azure-eventhub<6.0.0,>=5.0.0b3 azure-eventgrid~=1.1 azure-graphrbac~=0.40.0 azure-keyvault~=1.0 @@ -106,3 +107,4 @@ six>=1.6 #override azure-storage-queue azure-core<2.0.0,>=1.0.0b3 #override azure-storage-file azure-core<2.0.0,>=1.0.0b3 #override azure-cosmos azure-core<2.0.0,>=1.0.0b3 +#override azure-eventhubs-checkpointstoreblob-aio azure-storage-blob<12.0.0b4,>=12.0.0b2