Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
2cba844
Certs Final PR (#7076)
iscai-msft Sep 6, 2019
7e0ddbe
Perform encrypt/verify/wrap locally when possible (#6939)
chlowell Sep 7, 2019
6aba868
[EventHubs] Track2 Preview3 (#7059)
yunhaoling Sep 9, 2019
cf18c2b
[AutoPR] reservations/resource-manager (#6861)
AutorestCI Sep 9, 2019
c9f3e9c
setting a maxparallel paramter for the storage livetests. we need to …
scbedd Sep 9, 2019
dcaa3c2
Enable SSO on Windows (#7006)
chlowell Sep 9, 2019
54a1af0
adjusting where inconsistency is set. if we are inconsistent, but the…
scbedd Sep 10, 2019
34b1cba
revert changes to doc build host machine. we need to push some docs o…
scbedd Sep 10, 2019
ee0f3b7
Fix tracing if method raise exception (#7133)
lmazuel Sep 10, 2019
ce6484e
fix link to azure-identity readme (#7169)
chlowell Sep 10, 2019
5a6764d
Eventhubs preview3 documentation update (#7139)
YijunXieMS Sep 10, 2019
ad4a001
msal_extensions -> msal-extensions (#7172)
chlowell Sep 10, 2019
3037d37
update doc (#7131)
xiangyan99 Sep 10, 2019
a267b95
[Cosmos] Preview2 (#7140)
annatisch Sep 10, 2019
2aa33cf
Key Vault: added example and fixed link in Certificates README (#7151)
iscai-msft Sep 10, 2019
475545f
Key Vault: fixed certificate docstring errors (#7132)
iscai-msft Sep 10, 2019
2b6068b
Key Vault: added certificate changelog (#7128)
iscai-msft Sep 10, 2019
fe4c62a
remove duplicate enum key (#7173)
kayousef Sep 10, 2019
90fe8d9
azure-keyvault-nspkg (#7125)
chlowell Sep 10, 2019
c433e47
Storage Preview 3 Release (#7154)
rakshith91 Sep 10, 2019
b3790cd
update readme and history (#7188)
Sep 11, 2019
7354bbd
Skip Building on incompatible Python Versions (#7160)
scbedd Sep 11, 2019
ab3f145
Disable dev-setup "Develop" mode in CI checks (#7162)
scbedd Sep 11, 2019
59e24d1
Eventhubs preview3 doc update
yunhaoling Sep 11, 2019
dc84cca
Final Key Vault 4.0.0b3 updates (#7189)
chlowell Sep 11, 2019
3c5304d
add cryptography dependency (#7207)
chlowell Sep 11, 2019
21d1741
Eventhubs blobstorage merge to master (#7150)
YijunXieMS Sep 11, 2019
1016d17
Increment Key Vault versions (#7211)
chlowell Sep 11, 2019
26319a8
Increment azure-identity version (#7176)
chlowell Sep 11, 2019
758f358
Doc build resolution for extension package (#7217)
scbedd Sep 11, 2019
21c62d7
removing stray == (#7218)
scbedd Sep 11, 2019
b30de5a
handle 0 packages being passed to dev_setup from setup_execute_tests.…
scbedd Sep 11, 2019
fc0d440
adding line item entry (#7221)
scbedd Sep 11, 2019
483a334
KeyVault: Removing JsonWeb prefix from enums (#7212)
iscai-msft Sep 11, 2019
c3fece1
Eventhubs blobstorage docs update (#7216)
YijunXieMS Sep 12, 2019
fdf78d5
fixed certificates 4.0.0b3 changelog (#7214)
iscai-msft Sep 12, 2019
58e3bf1
[cognitive-services] cog services devtools test framework (#7087)
kristapratico Sep 12, 2019
1caeded
Key Vault: reworked certificates samples, all passing (#7225)
iscai-msft Sep 12, 2019
3a97e09
adding azure-eventhubs to the exception list. should result in what w…
scbedd Sep 12, 2019
977cbda
add ability to specify job timeout (#7237)
danieljurek Sep 12, 2019
56c39bb
Cosmos Pipeline Refactor (#7227)
danieljurek Sep 12, 2019
54c452f
Fix spelling mistakes in cosmos_db.py (#6980)
James-DBA-Anderson Sep 12, 2019
71c31c4
Fix doc link (#6458)
jongio Sep 12, 2019
d1cf7e2
class links with extra text need :class: (#7215)
bryevdv Sep 12, 2019
16fca20
Key Vault: added certificate rst files to sphinx doc (#7248)
iscai-msft Sep 12, 2019
d73d926
Address nspkg Failures (#7243)
scbedd Sep 12, 2019
a1c2a91
Update Dependency Script to Key of Package Name for SDIST, update ove…
scbedd Sep 13, 2019
71e62fa
Un-use deprecated method in devtools (#7222)
Sep 13, 2019
2a0c6c3
[AutoPR compute/resource-manager] added missing packages to compute r…
AutorestCI Sep 16, 2019
fa91e4e
Fix error creating EnvironmentCredential with username/password (#7260)
chlowell Sep 16, 2019
5038f0b
update sphinx sources for certificates (#7290)
scbedd Sep 16, 2019
e461f03
Resolve faulty package filter when building for release (#7297)
scbedd Sep 16, 2019
ededddf
Read env variables and not settings if available (#7101)
lmazuel Sep 16, 2019
225f0e3
update requirements versions and fix breaks
danieljurek Sep 13, 2019
d2c001e
update to September 2019 release
danieljurek Sep 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Eventhubs preview3 documentation update (#7139)
  • Loading branch information
YijunXieMS authored Sep 10, 2019
commit 5a6764d9936e434d4c0b9e8b215237dd4533291a
22 changes: 14 additions & 8 deletions sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
# Release History

## 5.0.0b3 (2019-09-10)

**New features**
- `EventProcessor` has a load balancer that balances load among multiple EventProcessors automatically
- In addition to `SamplePartitionManager`, A new `PartitionManager` implementation that uses Azure Blob Storage is added
to centrally store the checkpoint data for event processors. It's not packaged separately as a plug-in to this package.
Refer to [Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) for details.

- Added support for automatic load balancing among multiple `EventProcessor`.
- Added `BlobPartitionManager` which implements `PartitionManager`.
- Azure Blob Storage is applied for storing data used by `EventProcessor`.
- Packaged separately as a plug-in to `EventProcessor`.
- For details, please refer to [Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio).
- Added property `system_properties` on `EventData`.

**Breaking changes**

- `PartitionProcessor` constructor removed argument "checkpoint_manager". Its methods (initialize, process_events,
process_error, close) added argument "partition_context", which has method update_checkpoint.
- `CheckpointManager` was replaced by `PartitionContext`
- Renamed `Sqlite3PartitionManager` to `SamplePartitionManager`
- Removed constructor method of `PartitionProcessor`. For initialization please implement the method `initialize`.
- Replaced `CheckpointManager` by `PartitionContext`.
- `PartitionContext` has partition context information and method `update_checkpoint`.
- Updated all methods of `PartitionProcessor` to include `PartitionContext` as part of the arguments.
- Updated accessibility of class members in `EventHub/EventHubConsumer/EventHubProducer`to be private.
- Moved `azure.eventhub.eventprocessor` under `aio` package, which now becomes `azure.eventhub.aio.eventprocessor`.

## 5.0.0b2 (2019-08-06)

Expand Down
75 changes: 67 additions & 8 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The Azure Event Hubs client library allows for publishing and consuming of Azure
- Observe interesting operations and interactions happening within your business or other ecosystem, allowing loosely coupled systems to interact without the need to bind them together.
- Receive events from one or more publishers, transform them to better meet the needs of your ecosystem, then publish the transformed events to a new stream for consumers to observe.

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub) | [Product documentation](https://docs.microsoft.com/en-ca/azure/event-hubs/)
[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/) | [API reference documentation](https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub) | [Product documentation](https://docs.microsoft.com/en-us/azure/event-hubs/)

## Getting started

Expand All @@ -20,10 +20,6 @@ Install the Azure Event Hubs client library for Python with pip:
```
$ pip install --pre azure-eventhub
```
For Python2.7, please install package "typing". This is a workaround for [issue 6767](https://github.com/Azure/azure-sdk-for-python/issues/6767).
```
$ pip install typing
```

**Prerequisites**

Expand Down Expand Up @@ -113,6 +109,8 @@ partition_ids = client.get_partition_ids()

Publish events to an Event Hub.

#### Send a single event or an array of events

```python
from azure.eventhub import EventHubClient, EventData

Expand All @@ -134,6 +132,34 @@ finally:
pass
```

#### Send a batch of events

Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached.
```python
from azure.eventhub import EventHubClient, EventData

try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubClient.from_connection_string(connection_str, event_hub_path)
producer = client.create_producer(partition_id="0")

event_data_batch = producer.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.

with producer:
producer.send(event_data_batch)
except:
raise
finally:
pass
```

### Consume events from an Event Hub

Consume events from an Event Hub.
Expand Down Expand Up @@ -163,6 +189,7 @@ finally:

Publish events to an Event Hub asynchronously.

#### Send a single event or an array of events
```python
from azure.eventhub.aio import EventHubClient
from azure.eventhub import EventData
Expand All @@ -178,7 +205,37 @@ try:
event_list.append(EventData(b"A single event"))

async with producer:
await producer.send(event_list)
await producer.send(event_list) # Send a list of events
await producer.send(EventData(b"A single event")) # Send a single event
except:
raise
finally:
pass
```

#### Send a batch of events

Use the `create_batch` method on `EventHubProcuer` to create an `EventDataBatch` object which can then be sent using the `send` method. Events may be added to the `EventDataBatch` using the `try_add` method until the maximum batch size limit in bytes has been reached.
```python
from azure.eventhub.aio import EventHubClient
from azure.eventhub import EventData

try:
connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
event_hub_path = '<< NAME OF THE EVENT HUB >>'
client = EventHubClient.from_connection_string(connection_str, event_hub_path)
producer = client.create_producer(partition_id="0")

event_data_batch = await producer.create_batch(max_size=10000)
can_add = True
while can_add:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
can_add = False # EventDataBatch object reaches max_size.

async with producer:
await producer.send(event_data_batch)
except:
raise
finally:
Expand Down Expand Up @@ -217,9 +274,11 @@ Using an `EventHubConsumer` to consume events like in the previous examples puts

The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing.

Load balancing is typically useful when running multiple instances of `EventProcessor` across multiple processes or even machines. It is recommended to store checkpoints to a persistent store when running in production. Search pypi with the prefix `azure-eventhubs-checkpoint` to find packages that support persistent storage of checkpoints.

You can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.

[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is another `PartitionManager` implementation that allows multiple EventProcessors to share the load balancing and checkpoint data in a central storage.
[Azure Blob Storage Partition Manager](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) is one of the `PartitionManager` implementation we provide that applies Azure Blob Storage as the persistent store.


```python
Expand All @@ -242,7 +301,7 @@ class MyPartitionProcessor(PartitionProcessor):

async def main():
client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3)
partition_manager = SamplePartitionManager() # in-memory PartitionManager.
partition_manager = SamplePartitionManager() # in-memory or file based PartitionManager
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
asyncio.ensure_future(event_processor.start())
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
__version__ = "5.0.0b3"
from uamqp import constants # type: ignore
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async def get_properties(self):
-'partition_ids'

:rtype: dict
:raises: ~azure.eventhub.ConnectError
:raises: ~azure.eventhub.EventHubError
"""
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()
Expand Down Expand Up @@ -207,7 +207,7 @@ async def get_partition_properties(self, partition):
:param partition: The target partition id.
:type partition: str
:rtype: dict
:raises: ~azure.eventhub.ConnectError
:raises: ~azure.eventhub.EventHubError
"""
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instan
group to be actively reading events from the partition. These non-exclusive consumers are
sometimes referred to as "Non-Epoch Consumers."

Please use the method `create_consumer` on `EventHubClient` for creating `EventHubConsumer`.
"""
_timeout = 0
_epoch_symbol = b'com.microsoft:epoch'
Expand All @@ -51,8 +52,8 @@ def __init__( # pylint: disable=super-init-not-called
:param prefetch: The number of events to prefetch from the service
for processing. Default is 300.
:type prefetch: int
:param owner_level: The priority of the exclusive consumer. It will an exclusive
consumer if owner_level is set.
:param owner_level: The priority of the exclusive consumer. An exclusive
consumer will be created if owner_level is set.
:type owner_level: int
:param loop: An event loop.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

class EventProcessor(object): # pylint:disable=too-many-instance-attributes
"""
An EventProcessor constantly receives events from all partitions of the Event Hub in the context of a given
An EventProcessor constantly receives events from multiple partitions of the Event Hub in the context of a given
consumer group. The received data will be sent to PartitionProcessor to be processed.

It provides the user a convenient way to receive events from multiple partitions and save checkpoints.
Expand Down Expand Up @@ -90,11 +90,13 @@ def __init__(
:type consumer_group_name: str
:param partition_processor_type: A subclass type of ~azure.eventhub.eventprocessor.PartitionProcessor.
:type partition_processor_type: type
:param partition_manager: Interacts with the storage system, dealing with ownership and checkpoints.
For an easy start, SamplePartitionManager comes with the package.
:type partition_manager: Class implementing the ~azure.eventhub.eventprocessor.PartitionManager.
:param partition_manager: Interacts with the data storage that stores ownership and checkpoints data.
~azure.eventhub.aio.eventprocessor.SamplePartitionManager demonstrates the basic usage of `PartitionManager`
which stores data in memory or a file.
Users can either use the provided `PartitionManager` plug-ins or develop their own `PartitionManager`.
:type partition_manager: Subclass of ~azure.eventhub.eventprocessor.PartitionManager.
:param initial_event_position: The event position to start a partition consumer.
if the partition has no checkpoint yet. This will be replaced by "reset" checkpoint in the near future.
if the partition has no checkpoint yet. This could be replaced by "reset" checkpoint in the near future.
:type initial_event_position: EventPosition
:param polling_interval: The interval between any two pollings of balancing and claiming
:type polling_interval: float
Expand All @@ -119,13 +121,8 @@ def __repr__(self):
async def start(self):
"""Start the EventProcessor.

1. Calls the OwnershipManager to keep claiming and balancing ownership of partitions in an
infinitely loop until self.stop() is called.
2. Cancels tasks for partitions that are no longer owned by this EventProcessor
3. Creates tasks for partitions that are newly claimed by this EventProcessor
4. Keeps tasks running for partitions that haven't changed ownership
5. Each task repeatedly calls EvenHubConsumer.receive() to retrieve events and
call user defined partition processor
The EventProcessor will try to claim and balance partition ownership with other `EventProcessor`
and asynchronously start receiving EventData from EventHub and processing events.

:return: None

Expand All @@ -145,23 +142,26 @@ async def start(self):
await asyncio.sleep(self._polling_interval)
continue

to_cancel_list = self._tasks.keys()
if claimed_ownership_list:
claimed_partition_ids = [x["partition_id"] for x in claimed_ownership_list]
to_cancel_list = self._tasks.keys() - claimed_partition_ids
self._create_tasks_for_claimed_ownership(claimed_ownership_list)
else:
to_cancel_list = set(self._tasks.keys())
log.info("EventProcessor %r hasn't claimed an ownership. It keeps claiming.", self._id)
if to_cancel_list:
self._cancel_tasks_for_partitions(to_cancel_list)
log.info("EventProcesor %r has cancelled partitions %r", self._id, to_cancel_list)
await asyncio.sleep(self._polling_interval)

async def stop(self):
"""Stop claiming ownership and all the partition consumers owned by this EventProcessor
"""Stop the EventProcessor.

This method stops claiming ownership of owned partitions and cancels tasks that are running
EventHubConsumer.receive() for the partitions owned by this EventProcessor.
The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions
it is working on.
Other running EventProcessor will take over these released partitions.

A stopped EventProcessor can be restarted by calling method `start` again.

:return: None

Expand All @@ -182,7 +182,7 @@ def _cancel_tasks_for_partitions(self, to_cancel_partitions):
def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list):
for ownership in to_claim_ownership_list:
partition_id = ownership["partition_id"]
if partition_id not in self._tasks:
if partition_id not in self._tasks or self._tasks[partition_id].done():
self._tasks[partition_id] = get_running_loop().create_task(self._receive(ownership))

async def _receive(self, ownership):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class PartitionManager(ABC):
"""
PartitionManager deals with the interaction with the chosen storage service.
It's able to list/claim ownership and create checkpoint.
It's able to list/claim ownership and save checkpoint.
"""

@abstractmethod
Expand Down Expand Up @@ -76,11 +76,11 @@ async def update_checkpoint(self, eventhub_name, consumer_group_name, partition_
will be associated with.
:type sequence_number: int
:return: None
:raise: `OwnershipLostError`, `CheckpointError`
:raise: `OwnershipLostError`
"""


class OwnershipLostError(Exception):
"""Raises when update_checkpoint detects the ownership has been lost
"""Raises when update_checkpoint detects the ownership to a partition has been lost

"""
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ class PartitionProcessor(ABC):
"""
PartitionProcessor processes events received from the Azure Event Hubs service. A single instance of a class
implementing this abstract class will be created for every partition the associated
~azure.eventhub.eventprocessor.EventProcessor owns.
~azure.eventhub.aio.eventprocessor.EventProcessor owns.

"""

async def initialize(self, partition_context: PartitionContext):
"""
"""This method will be called when `EventProcessor` creates a `PartitionProcessor`.

:param partition_context: The context information of this partition.
:type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext
"""

# Please put the code for initialization of PartitionProcessor here.

async def close(self, reason, partition_context: PartitionContext):
"""Called when EventProcessor stops processing this PartitionProcessor.

Expand All @@ -46,6 +48,8 @@ async def close(self, reason, partition_context: PartitionContext):

"""

# Please put the code for closing PartitionProcessor here.

@abstractmethod
async def process_events(self, events: List[EventData], partition_context: PartitionContext):
"""Called when a batch of events have been received.
Expand All @@ -58,8 +62,10 @@ async def process_events(self, events: List[EventData], partition_context: Parti

"""

# Please put the code for processing events here.

async def process_error(self, error, partition_context: PartitionContext):
"""Called when an error happens
"""Called when an error happens when receiving or processing events

:param error: The error that happens.
:type error: Exception
Expand All @@ -68,3 +74,5 @@ async def process_error(self, error, partition_context: PartitionContext):
:type partition_context: ~azure.eventhub.aio.eventprocessor.PartitionContext

"""

# Please put the code for processing error here.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class EventHubProducer(ConsumerProducerMixin): # pylint: disable=too-many-insta
be created to allow event data to be automatically routed to an available partition or specific
to a partition.

Please use the method `create_producer` on `EventHubClient` for creating `EventHubProducer`.
"""
_timeout_symbol = b'com.microsoft:timeout'

Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def get_properties(self):
-'partition_ids'

:rtype: dict
:raises: ~azure.eventhub.ConnectError
:raises: ~azure.eventhub.EventHubError
"""
if self._is_iothub and not self._iothub_redirect_info:
self._iothub_redirect()
Expand All @@ -188,7 +188,7 @@ def get_partition_ids(self):
Get partition ids of the specified EventHub.

:rtype: list[str]
:raises: ~azure.eventhub.ConnectError
:raises: ~azure.eventhub.EventHubError
"""
return self.get_properties()['partition_ids']

Expand Down
Loading