Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
875841e
initial blob storage
Aug 28, 2019
6df7253
fix leaking connection str
Aug 28, 2019
585fbcb
[AutoPR] datafactory/resource-manager (#6814)
AutorestCI Aug 30, 2019
ffd8cb0
small pylint adjustment
Aug 30, 2019
27cb0bf
initial blob storage
Aug 28, 2019
f6d77e7
fix leaking connection str
Aug 28, 2019
894d8b1
Eventhubs pylint errors (#7007)
YijunXieMS Aug 30, 2019
2f69d65
Merge branch 'master' into eventhubs_preview3
Aug 30, 2019
e5c8d1c
Add typing for Python2.7
Aug 30, 2019
32833b3
Merge branch 'eventhubs_blobstorage' of github.com:Azure/azure-sdk-fo…
Aug 30, 2019
e85ac17
[EventHub] IoTHub management operations improvement and bug fixing (#…
yunhaoling Sep 2, 2019
da6199f
Change test polling to 5 sec
Sep 2, 2019
e6a7c5e
Add async lock to ensure etag consistency
Sep 2, 2019
ebc4362
Add dependency to azure-storage
Sep 2, 2019
1503604
Remove dependency on PartitionManager of azure-eventhub
Sep 2, 2019
c9707c4
Fix azure-storage-blob requirement error
Sep 2, 2019
d7b2606
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 2, 2019
778ab66
Add docstring to BlobPartitionManager
Sep 2, 2019
017d9f0
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 2, 2019
1fb341b
[EventHub] Retry refactor (#7026)
yunhaoling Sep 3, 2019
9bed566
Refine exception handling
Sep 3, 2019
b878002
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 3, 2019
7762130
add system_properties to EventData
Sep 3, 2019
1b10d00
Fix a small bug
Sep 4, 2019
13237b5
Refine example code
Sep 4, 2019
cbc6792
handle exception for claim_ownership and update_checkpoint
Sep 4, 2019
8748e1f
Add license info
Sep 4, 2019
97e0558
Restructure packages
Sep 4, 2019
e61d6a1
Lock each ownership with a separate lock
Sep 4, 2019
998eeed
Update receive method (#7064)
yunhaoling Sep 4, 2019
b03cc64
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 4, 2019
db93fd4
Re-org namespace package structure
Sep 4, 2019
2050615
raise error while list_ownership got an exception
Sep 5, 2019
2781062
Restructure package structure
Sep 5, 2019
8a32e44
replace checkpointer with checkpointstore as a part of package name
Sep 6, 2019
e13ddee
Update accessibility of class (#7091)
yunhaoling Sep 6, 2019
f616f37
Update samples and codes according to the review (#7098)
yunhaoling Sep 6, 2019
dad5baa
Python EventHubs load balancing (#6901)
YijunXieMS Sep 7, 2019
8e7e1c1
Fix a pylint error
Sep 7, 2019
88ca853
Merge remote-tracking branch 'central/eventhubs_preview3' into eventh…
Sep 7, 2019
b32417b
Merge remote-tracking branch 'central/eventhubs_preview3' into eventh…
Sep 7, 2019
667f0b0
remove duplicated partition manager
Sep 7, 2019
f28365c
Fix a bug in list_ownership
Sep 7, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
initial blob storage
  • Loading branch information
yijxie committed Aug 28, 2019
commit 875841e63c760b9e84fc903ca53bfea7f58ae503
Empty file.
21 changes: 21 additions & 0 deletions sdk/eventhub/azure-ehplugin-partitionmanager-blobstorage/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) Microsoft Corporation. All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include *.md
include azure/__init__.py
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.0.0b1"
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Iterable, Dict, Any
import logging
from collections import defaultdict
import asyncio

from azure.eventhub.eventprocessor import PartitionManager
from azure.storage.blob.aio import ContainerClient

logger = logging.getLogger(__name__)
UPLOAD_DATA = ""


class BlobPartitionManager(PartitionManager):
def __init__(self, container_client: ContainerClient):
self._container_client = container_client
# self._ownership_cache = {}
# self._ownership_locks = defaultdict(asyncio.Lock)

async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]:
blobs = self._container_client.list_blobs(include=['metadata'])
result = []
async for b in blobs:
metadata = b.metadata
ownership = {
"eventhub_name": eventhub_name,
"consumer_group_name": consumer_group_name,
"partition_id": b.name,
"etag": b.etag,
"last_modified_time": b.last_modified.timestamp() if b.last_modified else None
}
ownership.update(metadata)
result.append(ownership)
return result

async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
result = []
for ownership in ownership_list:
metadata = {"owner_id": ownership["owner_id"]}
if "offset" in ownership:
metadata["offset"] = ownership["offset"]
if "sequence_number" in ownership:
metadata["sequence_number"] = ownership["sequence_number"]
name = ownership["partition_id"]
try:
etag = ownership.get("etag")
if etag:
etag_match = {"if_match": '"'+etag+'"'}
else:
etag_match = {"if_none_match": '"*"'}
blob_client = await self._container_client.upload_blob(
name=name, data=UPLOAD_DATA, overwrite=True, metadata=metadata, **etag_match
)

uploaded_blob_properties = await blob_client.get_blob_properties()
ownership["etag"] = uploaded_blob_properties.etag
ownership["last_modified_time"] = uploaded_blob_properties.last_modified
except Exception as err:
logger.info("Claim error occurred: %r", err)
raise
result.append(ownership)
return result

async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_id, owner_id,
offset, sequence_number) -> None:

metadata = {
"owner_id": owner_id,
"offset": offset,
"sequence_number": sequence_number
}
try:
blob_client = await self._container_client.upload_blob(name=partition_id, data=UPLOAD_DATA, overwrite=True)

except Exception as err:
logger.info("Checkpoint error occurred: %r", err)
raise
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-e ../../eventhub/azure-eventhubs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import asyncio
import logging
import os
from azure.eventhub.aio import EventHubClient
from azure.eventhub.eventprocessor import EventProcessor, PartitionProcessor
from azure.ehplugin.partitionmanager.blobstorage.blobstoragepm import BlobPartitionManager
from azure.storage.blob.aio import ContainerClient

RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = "DefaultEndpointsProtocol=https;AccountName=ephstg;AccountKey=JX+/0R9IFtbo406MlvgLb4/ihT66kZh7vyeqUoDZkS5RKeUi7CBO+3mpl1gnNTf2Do6uUFjDQtdr1i2idcQ7Uw==;BlobEndpoint=https://ephstg.blob.core.windows.net/;QueueEndpoint=https://ephstg.queue.core.windows.net/;TableEndpoint=https://ephstg.table.core.windows.net/;FileEndpoint=https://ephstg.file.core.windows.net/;"

logging.basicConfig(level=logging.INFO)


async def do_operation(event):
# do some sync or async operations. If the operation is i/o intensive, async will have better performance
print(event)


class MyPartitionProcessor(PartitionProcessor):
async def process_events(self, events, checkpoint_manager):
if events:
await asyncio.gather(*[do_operation(event) for event in events])
await checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number)
else:
print("empty events received", "partition:", checkpoint_manager.partition_id)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL)
container_client = ContainerClient.from_connection_string(STORAGE_CONNECTION_STR, container="eventprocessor")
partition_manager = BlobPartitionManager(container_client=container_client)
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager, polling_interval=1)
try:
loop.run_until_complete(event_processor.start())
except KeyboardInterrupt:
loop.run_until_complete(event_processor.stop())
finally:
loop.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[packaging]
auto_update = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[bdist_wheel]
universal=1
80 changes: 80 additions & 0 deletions sdk/eventhub/azure-ehplugin-partitionmanager-blobstorage/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/env python

#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import re
import os.path
import sys
from io import open
from setuptools import find_packages, setup


# Change the PACKAGE_NAME only to change folder and different name
PACKAGE_NAME = "azure-ehplugin-partitionmanager-blobstorage"
PACKAGE_PPRINT_NAME = "Event Hubs Event Procesor Partition Manager implementation with Blob Storage"

# a-b-c => a/b/c
package_folder_path = PACKAGE_NAME.replace('-', '/')
# a-b-c => a.b.c
namespace_name = PACKAGE_NAME.replace('-', '.')

# Version extraction inspired from 'requests'
with open(os.path.join(package_folder_path, '__init__.py'), 'r') as fd:
version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
fd.read(), re.MULTILINE).group(1)

if not version:
raise RuntimeError('Cannot find version information')

with open('README.md') as f:
readme = f.read()
with open('HISTORY.md') as f:
history = f.read()

exclude_packages = [
'tests',
'examples',
# Exclude packages that will be covered by PEP420 or nspkg
'azure',
]

if sys.version_info < (3, 5, 3):
exclude_packages.extend([
'*.aio',
'*.aio.*',
])

setup(
name=PACKAGE_NAME,
version=version,
description='Microsoft Azure {} Client Library for Python'.format(PACKAGE_PPRINT_NAME),
long_description=readme + '\n\n' + history,
long_description_content_type='text/markdown',
license='MIT License',
author='Microsoft Corporation',
author_email='[email protected]',
url='https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs',
classifiers=[
'Development Status :: 3 - Alpha',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'License :: OSI Approved :: MIT License',
],
zip_safe=False,
packages=find_packages(exclude=exclude_packages),
install_requires=[

],
extras_require={
":python_version<'3.0'": ['azure-nspkg'],
}
)