Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e62825f
Shared connection (sync) draft
Jul 15, 2019
84ae397
Shared connection (sync) draft 2
Jul 16, 2019
6e6c827
Shared connection (sync) test update
Jul 16, 2019
a86146e
Shared connection
Jul 17, 2019
c5a23c8
Fix an issue
Jul 17, 2019
b83264d
add retry exponential delay and timeout to exception handling
Jul 22, 2019
4f17781
put module method before class def
Jul 22, 2019
f0d98d1
fixed Client.get_properties error
Jul 22, 2019
ed7d414
new eph (draft)
Jul 27, 2019
ee228b4
new eph (draft2)
Jul 29, 2019
1d65719
remove in memory partition manager
Jul 29, 2019
4895385
EventProcessor draft 3
Jul 30, 2019
6415ac2
small format change
Jul 30, 2019
a685f87
Fix logging
Jul 30, 2019
6388f4a
Add EventProcessor example
Jul 30, 2019
1cd6275
Merge branch 'eventhubs_preview2' into eventhubs_eph
Jul 30, 2019
6394f19
use decorator to implement retry logic and update some tests (#6544)
yunhaoling Jul 30, 2019
56fdd1e
Update livetest (#6547)
yunhaoling Jul 30, 2019
10f0be6
Remove legacy code and update livetest (#6549)
yunhaoling Jul 30, 2019
fbb66bd
make sync longrunning multi-threaded
Jul 31, 2019
00ff723
small changes on async long running test
Jul 31, 2019
0f5180c
reset retry_count for iterator
Jul 31, 2019
6e0c238
Don't return early when open a ReceiveClient or SendClient
Jul 31, 2019
0efc95f
type annotation change
Jul 31, 2019
90fbafb
Update kwargs and remove unused import
yunhaoling Jul 31, 2019
e06bad8
Misc changes from EventProcessor PR review
Jul 31, 2019
dd1d7ae
raise asyncio.CancelledError out instead of supressing it.
Jul 31, 2019
9d18dd9
Merge branch 'eventhubs_dev' into eventhubs_eph
Jul 31, 2019
a31ee67
Update livetest and small fixed (#6594)
yunhaoling Jul 31, 2019
19a5539
Merge branch 'eventhubs_dev' into eventhubs_eph
Aug 1, 2019
997dacf
Fix feedback from PR (1)
Aug 2, 2019
d688090
Revert "Merge branch 'eventhubs_dev' into eventhubs_eph"
Aug 2, 2019
2399dcb
Fix feedback from PR (2)
Aug 2, 2019
5ad0255
Update code according to the review (#6623)
yunhaoling Aug 2, 2019
5679065
Fix feedback from PR (3)
Aug 2, 2019
35245a1
Merge branch 'eventhubs_dev' into eventhubs_eph
Aug 2, 2019
83d0ec2
small bug fixing
Aug 2, 2019
a2195ba
Remove old EPH
Aug 2, 2019
f8acc8b
Update decorator implementation (#6642)
yunhaoling Aug 2, 2019
6fe4533
Remove old EPH pytest
Aug 2, 2019
598245d
Revert "Revert "Merge branch 'eventhubs_dev' into eventhubs_eph""
Aug 2, 2019
97dfce5
Update sample codes and docstring (#6643)
yunhaoling Aug 2, 2019
64c5c7d
Check tablename to prevent sql injection
Aug 3, 2019
13014dd
PR review update
Aug 3, 2019
3e82e90
Removed old EPH stuffs.
Aug 3, 2019
0f670d8
Small fix (#6650)
yunhaoling Aug 3, 2019
2a8b34c
Merge branch 'eventhubs_dev' into eventhubs_eph
Aug 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Release History

## 5.0.0b2 (2019-08-06)

**New features**

- Added ability to create and send EventDataBatch object with limited data size.
- Added new configuration parameters for exponential delay among each retry operation.
- `retry_total`: The total number of attempts to redo the failed operation.
- `backoff_factor`: The delay time factor.
- `backoff_max`: The maximum delay time in total.

**Breaking changes**

- New `EventProcessor` design
- The `EventProcessorHost` was waived.

## 5.0.0b1 (2019-06-25)

Version 5.0.0b1 is a preview of our efforts to create a client library that is user friendly and idiomatic to the Python ecosystem. The reasons for most of the changes in this update can be found in the [Azure SDK Design Guidelines for Python](https://azuresdkspecs.z5.web.core.windows.net/PythonSpec.html). For more information, please visit https://aka.ms/azure-sdk-preview1-python.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@

def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs):
timeout = kwargs.get("timeout", None)
timeout = kwargs.pop("timeout", None)
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
kwargs.pop("timeout", None)
while True:
try:
return to_be_wrapped_func(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
Expand All @@ -46,7 +45,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name))
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))

def _create_handler(self):
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@

def _retry_decorator(to_be_wrapped_func):
async def wrapped_func(self, *args, **kwargs):
timeout = kwargs.get("timeout", None)
timeout = kwargs.pop("timeout", None)
if not timeout:
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
kwargs.pop("timeout", None)
while True:
try:
return await to_be_wrapped_func(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
return await to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
Expand All @@ -47,7 +46,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name))
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))

def _create_handler(self):
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,8 @@ async def _open(self, timeout_time=None):
self.source = self.redirected.address
await super(EventHubConsumer, self)._open(timeout_time)

async def _receive(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
last_exception = kwargs.get("last_exception")
max_batch_size = kwargs.get("max_batch_size")
data_batch = kwargs.get("data_batch")

await self._open(timeout_time)
Expand All @@ -171,6 +169,10 @@ async def _receive(self, **kwargs):
data_batch.append(event_data)
return data_batch

@_retry_decorator
async def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs):
return await self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs)

@property
def queue_size(self):
# type: () -> int
Expand Down Expand Up @@ -217,8 +219,7 @@ async def receive(self, *, max_batch_size=None, timeout=None):
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return await _retry_decorator(self._receive)(self, timeout=timeout,
max_batch_size=max_batch_size, data_batch=data_batch)
return await self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
23 changes: 20 additions & 3 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ async def _open(self, timeout_time=None, **kwargs):
self.target = self.redirected.address
await super(EventHubProducer, self)._open(timeout_time)

@_retry_decorator
async def _open_with_retry(self, timeout_time=None, **kwargs):
return await self._open(timeout_time=timeout_time, **kwargs)

async def _send_event_data(self, timeout_time=None, last_exception=None):
if self.unsent_events:
await self._open(timeout_time)
Expand All @@ -131,6 +135,10 @@ async def _send_event_data(self, timeout_time=None, last_exception=None):
_error(self._outcome, self._condition)
return

@_retry_decorator
async def _send_event_data_with_retry(self, timeout_time=None, last_exception=None):
return await self._send_event_data(timeout_time=timeout_time, last_exception=last_exception)

def _on_outcome(self, outcome, condition):
"""
Called when the outcome is received for a delivery.
Expand All @@ -155,10 +163,19 @@ async def create_batch(self, max_size=None, partition_key=None):
:type partition_key: str
:return: an EventDataBatch instance
:rtype: ~azure.eventhub.EventDataBatch

Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START eventhub_client_async_create_batch]
:end-before: [END eventhub_client_async_create_batch]
:language: python
:dedent: 4
:caption: Create EventDataBatch object within limited size

"""

if not self._max_message_size_on_link:
await _retry_decorator(self._open)(self, timeout=self.client.config.send_timeout)
await self._open_with_retry(timeout=self.client.config.send_timeout)

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
Expand Down Expand Up @@ -187,7 +204,7 @@ async def send(self, event_data, *, partition_key=None, timeout=None):
:rtype: None

Example:
.. literalinclude:: ../examples/test_examples_eventhub.py
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START eventhub_client_async_send]
:end-before: [END eventhub_client_async_send]
:language: python
Expand All @@ -212,7 +229,7 @@ async def send(self, event_data, *, partition_key=None, timeout=None):
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
await _retry_decorator(self._send_event_data)(self, timeout=timeout)
await self._send_event_data_with_retry(timeout=timeout)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
10 changes: 6 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,8 @@ def _open(self, timeout_time=None):
self.source = self.redirected.address
super(EventHubConsumer, self)._open(timeout_time)

def _receive(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
last_exception = kwargs.get("last_exception")
max_batch_size = kwargs.get("max_batch_size")
data_batch = kwargs.get("data_batch")

self._open(timeout_time)
Expand All @@ -165,6 +163,10 @@ def _receive(self, **kwargs):
data_batch.append(event_data)
return data_batch

@_retry_decorator
def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs):
return self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs)

@property
def queue_size(self):
# type:() -> int
Expand Down Expand Up @@ -210,7 +212,7 @@ def receive(self, max_batch_size=None, timeout=None):
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return _retry_decorator(self._receive)(self, timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
return self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)

def close(self, exception=None):
# type:(Exception) -> None
Expand Down
21 changes: 19 additions & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ def _open(self, timeout_time=None, **kwargs):
self.target = self.redirected.address
super(EventHubProducer, self)._open(timeout_time)

@_retry_decorator
def _open_with_retry(self, timeout_time=None, **kwargs):
return self._open(timeout_time=timeout_time, **kwargs)

def _send_event_data(self, timeout_time=None, last_exception=None):
if self.unsent_events:
self._open(timeout_time)
Expand All @@ -138,6 +142,10 @@ def _send_event_data(self, timeout_time=None, last_exception=None):
_error(self._outcome, self._condition)
return

@_retry_decorator
def _send_event_data_with_retry(self, timeout_time=None, last_exception=None):
return self._send_event_data(timeout_time=timeout_time, last_exception=last_exception)

def _on_outcome(self, outcome, condition):
"""
Called when the outcome is received for a delivery.
Expand All @@ -162,10 +170,19 @@ def create_batch(self, max_size=None, partition_key=None):
:type partition_key: str
:return: an EventDataBatch instance
:rtype: ~azure.eventhub.EventDataBatch

Example:
.. literalinclude:: ../examples/test_examples_eventhub.py
:start-after: [START eventhub_client_sync_create_batch]
:end-before: [END eventhub_client_sync_create_batch]
:language: python
:dedent: 4
:caption: Create EventDataBatch object within limited size

"""

if not self._max_message_size_on_link:
_retry_decorator(self._open)(self, timeout=self.client.config.send_timeout)
self._open_with_retry(timeout=self.client.config.send_timeout)

if max_size and max_size > self._max_message_size_on_link:
raise ValueError('Max message size: {} is too large, acceptable max batch size is: {} bytes.'
Expand Down Expand Up @@ -219,7 +236,7 @@ def send(self, event_data, partition_key=None, timeout=None):
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self.unsent_events = [wrapper_event_data.message]
_retry_decorator(self._send_event_data)(self, timeout=timeout)
self._send_event_data_with_retry(timeout=timeout)

def close(self, exception=None):
# type:(Exception) -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def test_example_eventhub_async_send_and_receive(live_eventhub_config):
os.environ['EVENT_HUB_HOSTNAME'],
os.environ['EVENT_HUB_SAS_POLICY'],
os.environ['EVENT_HUB_SAS_KEY'],
os.environ['EVENT_HUB_NAME'])
os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)
# [END create_eventhub_client_async]

Expand All @@ -49,6 +49,17 @@ async def test_example_eventhub_async_send_and_receive(live_eventhub_config):

await consumer.receive(timeout=1)

# [START eventhub_client_async_create_batch]
event_data_batch = await producer.create_batch(max_size=10000)
while True:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
# [END eventhub_client_async_create_batch]

# [START eventhub_client_async_send]
async with producer:
event_data = EventData(b"A single event")
Expand Down
64 changes: 64 additions & 0 deletions sdk/eventhub/azure-eventhubs/examples/event_data_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
An example to show creating and sending EventBatchData within limited size.
"""

# pylint: disable=C0111

import logging
import time
import os

from azure.eventhub import EventHubClient, EventData, EventHubSharedKeyCredential

import examples
logger = examples.get_logger(logging.INFO)


HOSTNAME = os.environ.get('EVENT_HUB_HOSTNAME') # <mynamespace>.servicebus.windows.net
EVENT_HUB = os.environ.get('EVENT_HUB_NAME')

USER = os.environ.get('EVENT_HUB_SAS_POLICY')
KEY = os.environ.get('EVENT_HUB_SAS_KEY')


def create_batch_data(producer):
event_data_batch = producer.create_batch(max_size=10000)
while True:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
# EventDataBatch object reaches max_size.
# New EventDataBatch object can be created here to send more data
break
return event_data_batch


try:
if not HOSTNAME:
raise ValueError("No EventHubs URL supplied.")

client = EventHubClient(host=HOSTNAME, event_hub_path=EVENT_HUB, credential=EventHubSharedKeyCredential(USER, KEY),
network_tracing=False)
producer = client.create_producer()

try:
start_time = time.time()
with producer:
event_data_batch = create_batch_data(producer)
producer.send(event_data_batch)
except:
raise
finally:
end_time = time.time()
run_time = end_time - start_time
logger.info("Runtime: {} seconds".format(run_time))

except KeyboardInterrupt:
pass
11 changes: 11 additions & 0 deletions sdk/eventhub/azure-eventhubs/examples/test_examples_eventhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ def test_example_eventhub_sync_send_and_receive(live_eventhub_config):
event_data = EventData(body=list_data)
# [END create_event_data]

# [START eventhub_client_sync_create_batch]
event_data_batch = producer.create_batch(max_size=10000)
while True:
try:
event_data_batch.try_add(EventData('Message inside EventBatchData'))
except ValueError:
# The EventDataBatch object reaches its max_size.
# You can send the full EventDataBatch object and create a new one here.
break
# [END eventhub_client_sync_create_batch]

# [START eventhub_client_sync_send]
with producer:
event_data = EventData(b"A single event")
Expand Down