Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
python_requires=">=3.5.3",
install_requires=[
'azure-storage-blob<=12.1,>=12.0.0b2',
'azure-eventhub<6.0.0,>=5.0.0b3',
'azure-eventhub<6.0.0,>=5.0.0b5',
'aiohttp<4.0,>=3.0',
],
extras_require={
Expand Down
9 changes: 9 additions & 0 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob/HISTORY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Release History

## 2019-11-04 1.0.0b5

**New features**

- `BlobPartitionManager` that uses Azure Blob Storage Block Blob to store EventProcessor checkpoint data

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs-checkpointstoreblob/HISTORY.png)
21 changes: 21 additions & 0 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) Microsoft Corporation. All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
include *.md
include azure/__init__.py
include azure/eventhub/__init__.py
include azure/eventhub/extensions/__init__.py
138 changes: 138 additions & 0 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Azure EventHubs Checkpoint Store client library for Python using Storage Blobs

Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs.
This Checkpoint Store package works as a plug-in package to `EventHubConsumerClient`. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information.

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)

## Getting started

### Install the package

```
$ pip install --pre azure-eventhub-checkpointstoreblob
```

**Prerequisites**

- Python2.7, Python 3.5.3 or later.
- **Microsoft Azure Subscription:** To use Azure services, including Azure Event Hubs, you'll need a subscription. If you do not have an existing Azure account, you may sign up for a free trial or use your MSDN subscriber benefits when you [create an account](https://azure.microsoft.com/en-us/).

- **Event Hubs namespace with an Event Hub:** To interact with Azure Event Hubs, you'll also need to have a namespace and Event Hub available. If you are not familiar with creating Azure resources, you may wish to follow the step-by-step guide for [creating an Event Hub using the Azure portal](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create). There, you can also find detailed instructions for using the Azure CLI, Azure PowerShell, or Azure Resource Manager (ARM) templates to create an Event Hub.

- **Azure Storage Account:** You'll need to have an Azure Storage Account and create a Azure Blob Storage Block Container to store the checkpoint data with blobs. You may follow the guide [creating an Azure Block Blob Storage Account](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-create-account-block-blob).

## Key concepts

### Checkpointing

Checkpointing is a process by which readers mark or commit their position within a partition event sequence.
Checkpointing is the responsibility of the consumer and occurs on a per-partition basis within a consumer group.
This responsibility means that for each consumer group, each partition reader must keep track of its current position
in the event stream, and can inform the service when it considers the data stream complete. If a reader disconnects from
a partition, when it reconnects it begins reading at the checkpoint that was previously submitted by the last reader of
that partition in that consumer group. When the reader connects, it passes the offset to the event hub to specify the
location at which to start reading. In this way, you can use checkpointing to both mark events as "complete" by
downstream applications, and to provide resiliency if a failover between readers running on different machines occurs.
It is possible to return to older data by specifying a lower offset from this checkpointing process. Through this
mechanism, checkpointing enables both failover resiliency and event stream replay.

### Offsets & sequence numbers
Both offset & sequence number refer to the position of an event within a partition. You can think of them as a
client-side cursor. The offset is a byte numbering of the event. The offset/sequence number enables an event consumer
(reader) to specify a point in the event stream from which they want to begin reading events. You can specify a
timestamp such that you receive events enqueued only after the given timestamp. Consumers are responsible for
storing their own offset values outside of the Event Hubs service. Within a partition, each event includes an offset,
sequence number and the timestamp of when it was enqueued.

## Examples
- [Create an Azure Storage Blobs `ContainerClient`](#create-an-azure-storage-blobs-containerclient)
- [Create an Azure EventHubs `EventHubConsumerClient`](#create-an-eventhubconsumerclient)
- [Consume events using a `BlobPartitionManager`](#consume-events-using-a-blobpartitionmanager-to-do-checkpoint)

### Create an Azure Storage Blobs `ContainerClient`
The easiest way to create a `ContainerClient` is to use a connection string.
```python
from azure.storage.blob import ContainerClient
container_client = ContainerClient.from_connection_string("my_storageacount_connection_string", "mycontainer")
```
For other ways of creating a `ContainerClient`, go to [Blob Storage library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob) for more details.

### Create an `EventHubConsumerClient`
The easiest way to create a `EventHubConsumerClient` is to use a connection string.
```python
from azure.eventhub import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", event_hub_path="myeventhub")
```
For other ways of creating a `EventHubConsumerClient`, refer to [EventHubs library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) for more details.

### Consume events using a `BlobPartitionManager` to do checkpoint
```python

from azure.eventhub import EventHubConsumerClient
from azure.storage.blob import ContainerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobPartitionManager

eventhub_connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
storage_container_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
storage_container_name = '<< STORAGE CONTAINER NAME>>'

def do_operation(events):
# do some operations to the events.
pass

def process_events(partition_context, events):
do_operation(events)
partition_context.update_checkpoint(events[-1])

def main():
storage_container_client = ContainerClient.from_connection_string(storage_container_connection_str, storage_container_name)
partition_manager = BlobPartitionManager(storage_container_client) # use the BlobPartitonManager to save
client = EventHubConsumerClient.from_connection_string(eventhub_connection_str, partition_manager=partition_manager, receive_timeout=5, retry_total=3)

try:
client.receive(process_events, "$default")
except KeyboardInterrupt:
client.close()

if __name__ == '__main__':
main()
```

## Troubleshooting

### General
Enabling logging will be helpful to do trouble shooting.
Refer to [Logging](#logging) to enable loggers for related libraries.

## Next steps

### Examples
- [./samples/event_processor_blob_storage_example.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob/samples/event_processor_blob_storage_example.py) - EventHubConsumerClient with blob partition manager example

### Documentation

Reference documentation is available at https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.extensions.html

### Logging

- Enable `azure.eventhub.extensions.checkpointstoreblob` logger to collect traces from the library.
- Enable `azure.eventhub` logger to collect traces from the main azure-eventhub library.
- Enable `azure.storage.blob` logger to collect traces from azure storage blob library.
- Enable `uamqp` logger to collect traces from the underlying uAMQP library.
- Enable AMQP frame level trace by setting `logging_enable=True` when creating the client.

### Provide Feedback

If you encounter any bugs or have suggestions, please file an issue in the [Issues](https://github.com/Azure/azure-sdk-for-python/issues) section of the project.

## Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [[email protected]](mailto:[email protected]) with any additional questions or comments.

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs-checkpointstoreblob/README.png)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.0.0b5"

from .blobstoragepm import BlobPartitionManager

__all__ = [
"BlobPartitionManager",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Dict
import logging
import time
import calendar
from datetime import datetime
from collections import defaultdict

from azure.eventhub import PartitionManager, OwnershipLostError # type: ignore # pylint:disable=no-name-in-module
from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore
from azure.storage.blob import BlobClient # type: ignore

logger = logging.getLogger(__name__)
UPLOAD_DATA = ""


class BlobPartitionManager(PartitionManager):
"""An PartitionManager that uses Azure Blob Storage to store the partition ownership and checkpoint data.

This class implements methods list_ownership, claim_ownership, update_checkpoint and list_checkpoints that are
defined in class azure.eventhub.aio.PartitionManager of package azure-eventhub.

"""
def __init__(self, container_client):
# type(ContainerClient) -> None
"""Create a BlobPartitionManager

:param container_client: The Azure Blob Storage Container client that is used to save checkpoint data to Azure
Blob Storage Container.
"""
self._container_client = container_client
self._cached_blob_clients = defaultdict() # type:Dict[str, BlobClient]

@staticmethod
def _utc_to_local(utc_dt):
timestamp = calendar.timegm(utc_dt.timetuple())
local_dt = datetime.fromtimestamp(timestamp)
return local_dt.replace(microsecond=utc_dt.microsecond)

@staticmethod
def _to_timestamp(date):
timestamp = None
try:
timestamp = date.timestamp()
except AttributeError: # python2.7 compatible
timestamp = time.mktime(BlobPartitionManager._utc_to_local(date).timetuple())\
+ date.microsecond / 1e6
return timestamp

def _get_blob_client(self, blob_name):
result = self._cached_blob_clients.get(blob_name)
if not result:
result = self._container_client.get_blob_client(blob_name)
self._cached_blob_clients[blob_name] = result
return result

def _upload_ownership(self, ownership, metadata):
etag = ownership.get("etag")
if etag:
etag_match = {"if_match": etag}
else:
etag_match = {"if_none_match": '*'}
blob_name = "{}/{}/{}/ownership/{}".format(ownership["fully_qualified_namespace"], ownership["eventhub_name"],
ownership["consumer_group_name"], ownership["partition_id"])
uploaded_blob_properties = self._get_blob_client(blob_name).upload_blob(
data=UPLOAD_DATA, overwrite=True, metadata=metadata, **etag_match
)
ownership["etag"] = uploaded_blob_properties["etag"]
ownership["last_modified_time"] = self._to_timestamp(uploaded_blob_properties["last_modified"])
ownership.update(metadata)

def list_ownership(self, fully_qualified_namespace, eventhub_name, consumer_group_name):
try:
blobs = self._container_client.list_blobs(
name_starts_with="{}/{}/{}/ownership".format(
fully_qualified_namespace, eventhub_name, consumer_group_name),
include=['metadata'])
result = []
for b in blobs:
ownership = {
"fully_qualified_namespace": fully_qualified_namespace,
"eventhub_name": eventhub_name,
"consumer_group_name": consumer_group_name,
"partition_id": b.name.split("/")[-1],
"owner_id": b.metadata["ownerId"],
"etag": b.etag,
"last_modified_time": self._to_timestamp(b.last_modified) if b.last_modified else None
}
result.append(ownership)
return result
except Exception as err: # pylint:disable=broad-except
logger.warning("An exception occurred during list_ownership for "
"namespace %r eventhub %r consumer group %r. "
"Exception is %r", fully_qualified_namespace, eventhub_name, consumer_group_name, err)
raise

def _claim_one_partition(self, ownership):
partition_id = ownership["partition_id"]
fully_qualified_namespace = ownership["fully_qualified_namespace"]
eventhub_name = ownership["eventhub_name"]
consumer_group_name = ownership["consumer_group_name"]
owner_id = ownership["owner_id"]
metadata = {"ownerId": owner_id}
try:
self._upload_ownership(ownership, metadata)
return ownership
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of namespace %r eventhub %r consumer group %r "
"lost ownership to partition %r",
owner_id, fully_qualified_namespace, eventhub_name, consumer_group_name, partition_id)
raise OwnershipLostError()
except Exception as err: # pylint:disable=broad-except
logger.warning("An exception occurred when EventProcessor instance %r claim_ownership for "
"namespace %r eventhub %r consumer group %r partition %r. "
"The ownership is now lost. Exception "
"is %r",
owner_id, fully_qualified_namespace, eventhub_name, consumer_group_name, partition_id, err)
return ownership # Keep the ownership if an unexpected error happens

def claim_ownership(self, ownership_list):
gathered_results = []
for x in ownership_list:
try:
gathered_results.append(self._claim_one_partition(x))
except (ResourceModifiedError, ResourceExistsError):
pass
return gathered_results

def update_checkpoint(self, fully_qualified_namespace, eventhub_name, consumer_group_name, partition_id,
offset, sequence_number):
metadata = {
"Offset": offset,
"SequenceNumber": str(sequence_number),
}
blob_name = "{}/{}/{}/checkpoint/{}".format(fully_qualified_namespace, eventhub_name,
consumer_group_name, partition_id)
self._get_blob_client(blob_name).upload_blob(
data=UPLOAD_DATA, overwrite=True, metadata=metadata
)

def list_checkpoints(self, fully_qualified_namespace, eventhub_name, consumer_group_name):
blobs = self._container_client.list_blobs(
name_starts_with="{}/{}/{}/checkpoint".format(
fully_qualified_namespace, eventhub_name, consumer_group_name),
include=['metadata'])
result = []
for b in blobs:
metadata = b.metadata
checkpoint = {
"fully_qualified_namespace": fully_qualified_namespace,
"eventhub_name": eventhub_name,
"consumer_group_name": consumer_group_name,
"partition_id": b.name.split("/")[-1],
"offset": metadata["Offset"],
"sequence_number": metadata["SequenceNumber"]
}
result.append(checkpoint)
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-e ../../../tools/azure-sdk-tools
../../core/azure-core
-e ../../storage/azure-storage-blob
../azure-eventhubs
Loading