Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e62825f
Shared connection (sync) draft
Jul 15, 2019
84ae397
Shared connection (sync) draft 2
Jul 16, 2019
6e6c827
Shared connection (sync) test update
Jul 16, 2019
a86146e
Shared connection
Jul 17, 2019
c5a23c8
Fix an issue
Jul 17, 2019
b83264d
add retry exponential delay and timeout to exception handling
Jul 22, 2019
4f17781
put module method before class def
Jul 22, 2019
f0d98d1
fixed Client.get_properties error
Jul 22, 2019
ed7d414
new eph (draft)
Jul 27, 2019
ee228b4
new eph (draft2)
Jul 29, 2019
1d65719
remove in memory partition manager
Jul 29, 2019
4895385
EventProcessor draft 3
Jul 30, 2019
6415ac2
small format change
Jul 30, 2019
a685f87
Fix logging
Jul 30, 2019
6388f4a
Add EventProcessor example
Jul 30, 2019
1cd6275
Merge branch 'eventhubs_preview2' into eventhubs_eph
Jul 30, 2019
6394f19
use decorator to implement retry logic and update some tests (#6544)
yunhaoling Jul 30, 2019
56fdd1e
Update livetest (#6547)
yunhaoling Jul 30, 2019
10f0be6
Remove legacy code and update livetest (#6549)
yunhaoling Jul 30, 2019
fbb66bd
make sync longrunning multi-threaded
Jul 31, 2019
00ff723
small changes on async long running test
Jul 31, 2019
0f5180c
reset retry_count for iterator
Jul 31, 2019
6e0c238
Don't return early when open a ReceiveClient or SendClient
Jul 31, 2019
0efc95f
type annotation change
Jul 31, 2019
90fbafb
Update kwargs and remove unused import
yunhaoling Jul 31, 2019
e06bad8
Misc changes from EventProcessor PR review
Jul 31, 2019
dd1d7ae
raise asyncio.CancelledError out instead of supressing it.
Jul 31, 2019
9d18dd9
Merge branch 'eventhubs_dev' into eventhubs_eph
Jul 31, 2019
a31ee67
Update livetest and small fixed (#6594)
yunhaoling Jul 31, 2019
19a5539
Merge branch 'eventhubs_dev' into eventhubs_eph
Aug 1, 2019
997dacf
Fix feedback from PR (1)
Aug 2, 2019
d688090
Revert "Merge branch 'eventhubs_dev' into eventhubs_eph"
Aug 2, 2019
2399dcb
Fix feedback from PR (2)
Aug 2, 2019
5ad0255
Update code according to the review (#6623)
yunhaoling Aug 2, 2019
5679065
Fix feedback from PR (3)
Aug 2, 2019
35245a1
Merge branch 'eventhubs_dev' into eventhubs_eph
Aug 2, 2019
83d0ec2
small bug fixing
Aug 2, 2019
a2195ba
Remove old EPH
Aug 2, 2019
f8acc8b
Update decorator implementation (#6642)
yunhaoling Aug 2, 2019
6fe4533
Remove old EPH pytest
Aug 2, 2019
598245d
Revert "Revert "Merge branch 'eventhubs_dev' into eventhubs_eph""
Aug 2, 2019
97dfce5
Update sample codes and docstring (#6643)
yunhaoling Aug 2, 2019
64c5c7d
Check tablename to prevent sql injection
Aug 3, 2019
13014dd
PR review update
Aug 3, 2019
3e82e90
Removed old EPH stuffs.
Aug 3, 2019
0f670d8
Small fix (#6650)
yunhaoling Aug 3, 2019
2a8b34c
Merge branch 'eventhubs_dev' into eventhubs_eph
Aug 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'eventhubs_preview2' into eventhubs_eph
  • Loading branch information
yijxie committed Jul 30, 2019
commit 1cd627517ccd0aa81c5393ef74d7bc90851cf04b
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@ def reset_connection_if_broken(self):


def get_connection_manager(**kwargs):
return _SharedConnectionManager(**kwargs)
return _SeparateConnectionManager(**kwargs)
16 changes: 14 additions & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ def __init__( # pylint: disable=super-init-not-called
:type owner_level: int
:param loop: An event loop.
"""
event_position = kwargs.get("event_position", None)
prefetch = kwargs.get("prefetch", 300)
owner_level = kwargs.get("owner_level", None)
keep_alive = kwargs.get("keep_alive", None)
auto_reconnect = kwargs.get("auto_reconnect", True)
loop = kwargs.get("loop", None)

super(EventHubConsumer, self).__init__()
self.loop = loop or asyncio.get_event_loop()
self.running = False
Expand All @@ -76,7 +83,9 @@ def __init__( # pylint: disable=super-init-not-called
partition = self.source.split('/')[-1]
self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition)
if owner_level:
self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(owner_level))}
self._link_properties[types.AMQPSymbol(self._epoch)] = types.AMQPLong(int(owner_level))
link_property_timeout_ms = (self.client.config.receive_timeout or self.timeout) * 1000
self._link_properties[types.AMQPSymbol(self._timeout)] = types.AMQPLong(int(link_property_timeout_ms))
self._handler = None

def __aiter__(self):
Expand Down Expand Up @@ -110,7 +119,7 @@ def _create_handler(self):
auth=self.client.get_auth(**alt_creds),
debug=self.client.config.network_tracing,
prefetch=self.prefetch,
link_properties=self.properties,
link_properties=self._link_properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
Expand Down Expand Up @@ -177,6 +186,9 @@ async def receive(self, **kwargs):
:caption: Receives events asynchronously

"""
max_batch_size = kwargs.get("max_batch_size", None)
timeout = kwargs.get("timeout", None)

self._check_closed()
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
timeout = self.client.config.receive_timeout if timeout is None else timeout
Expand Down
20 changes: 16 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Iterable, Union
import time

from uamqp import constants, errors
from uamqp import types, constants, errors
from uamqp import SendClientAsync

from azure.eventhub.common import EventData, _BatchSendEventData
Expand Down Expand Up @@ -54,6 +54,12 @@ def __init__( # pylint: disable=super-init-not-called
:type auto_reconnect: bool
:param loop: An event loop. If not specified the default event loop will be used.
"""
partition = kwargs.get("partition", None)
send_timeout = kwargs.get("send_timeout", 60)
keep_alive = kwargs.get("keep_alive", None)
auto_reconnect = kwargs.get("auto_reconnect", True)
loop = kwargs.get("loop", None)

super(EventHubProducer, self).__init__()
self.loop = loop or asyncio.get_event_loop()
self.running = False
Expand All @@ -75,6 +81,7 @@ def __init__( # pylint: disable=super-init-not-called
self._handler = None
self._outcome = None
self._condition = None
self._link_properties = {types.AMQPSymbol(self._timeout): types.AMQPLong(int(self.timeout * 1000))}

def _create_handler(self):
self._handler = SendClientAsync(
Expand All @@ -85,6 +92,7 @@ def _create_handler(self):
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
link_properties=self._link_properties,
properties=self.client._create_properties(
self.client.config.user_agent), # pylint: disable=protected-access
loop=self.loop)
Expand Down Expand Up @@ -122,11 +130,14 @@ async def _send_event_data(self, timeout=None):
error = OperationTimeoutError("send operation timed out")
log.info("%r send operation timed out. (%r)", self.name, error)
raise error
self._handler._msg_timeout = remaining_time # pylint: disable=protected-access
self._handler.queue_message(*self.unsent_events)
await self._handler.wait_async()
self.unsent_events = self._handler.pending_messages
if self._outcome != constants.MessageSendResult.Ok:
_error(self._outcome, self._condition)
if self._outcome != constants.MessageSendResult.Ok:
if self._outcome == constants.MessageSendResult.Timeout:
self._condition = OperationTimeoutError("send operation timed out")
_error(self._outcome, self._condition)
return
except Exception as exception:
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time)
Expand All @@ -144,7 +155,7 @@ def _on_outcome(self, outcome, condition):
self._outcome = outcome
self._condition = condition

async def send(self, event_data, partition_key=None, timeout=None):
async def send(self, event_data, **kwargs):
# type:(Union[EventData, Iterable[EventData]], Union[str, bytes]) -> None
"""
Sends an event data and blocks until acknowledgement is
Expand Down Expand Up @@ -209,4 +220,5 @@ async def close(self, **kwargs):
:caption: Close down the handler.

"""
exception = kwargs.get("exception", None)
await super(EventHubProducer, self).close(exception)
15 changes: 12 additions & 3 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def __init__(self, client, source, **kwargs):
consumer if owner_level is set.
:type owner_level: int
"""
event_position = kwargs.get("event_position", None)
prefetch = kwargs.get("prefetch", 300)
owner_level = kwargs.get("owner_level", None)
keep_alive = kwargs.get("keep_alive", None)
auto_reconnect = kwargs.get("auto_reconnect", True)

super(EventHubConsumer, self).__init__()
self.running = False
self.client = client
Expand All @@ -71,7 +77,9 @@ def __init__(self, client, source, **kwargs):
partition = self.source.split('/')[-1]
self.name = "EHConsumer-{}-partition{}".format(uuid.uuid4(), partition)
if owner_level:
self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(owner_level))}
self._link_properties[types.AMQPSymbol(self._epoch)] = types.AMQPLong(int(owner_level))
link_property_timeout_ms = (self.client.config.receive_timeout or self.timeout) * 1000
self._link_properties[types.AMQPSymbol(self._timeout)] = types.AMQPLong(int(link_property_timeout_ms))
self._handler = None

def __iter__(self):
Expand Down Expand Up @@ -105,7 +113,7 @@ def _create_handler(self):
auth=self.client.get_auth(**alt_creds),
debug=self.client.config.network_tracing,
prefetch=self.prefetch,
link_properties=self.properties,
link_properties=self._link_properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
Expand Down Expand Up @@ -170,7 +178,8 @@ def receive(self, **kwargs):
:caption: Receive events from the EventHub.

"""
self._check_closed()
max_batch_size = kwargs.get("max_batch_size", None)
timeout = kwargs.get("timeout", None)

self._check_closed()
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

log = logging.getLogger(__name__)


def _error_handler(error):
"""
Called internally when an event has failed to send so we
Expand Down
12 changes: 11 additions & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ def __init__(self, client, target, **kwargs):
Default value is `True`.
:type auto_reconnect: bool
"""
partition = kwargs.get("partition", None)
send_timeout = kwargs.get("send_timeout", 60)
keep_alive = kwargs.get("keep_alive", None)
auto_reconnect = kwargs.get("auto_reconnect", True)

super(EventHubProducer, self).__init__()
self.running = False
self.client = client
Expand All @@ -84,6 +89,7 @@ def __init__(self, client, target, **kwargs):
self._handler = None
self._outcome = None
self._condition = None
self._link_properties = {types.AMQPSymbol(self._timeout): types.AMQPLong(int(self.timeout * 1000))}

def _create_handler(self):
self._handler = SendClient(
Expand Down Expand Up @@ -131,10 +137,13 @@ def _send_event_data(self, timeout=None):
error = OperationTimeoutError("send operation timed out")
log.info("%r send operation timed out. (%r)", self.name, error)
raise error
self._handler._msg_timeout = remaining_time # pylint: disable=protected-access
self._handler.queue_message(*self.unsent_events)
self._handler.wait()
self.unsent_events = self._handler.pending_messages
if self._outcome != constants.MessageSendResult.Ok:
if self._outcome == constants.MessageSendResult.Timeout:
self._condition = OperationTimeoutError("send operation timed out")
_error(self._outcome, self._condition)
return
except Exception as exception:
Expand All @@ -153,7 +162,7 @@ def _on_outcome(self, outcome, condition):
self._outcome = outcome
self._condition = condition

def send(self, event_data, partition_key=None, timeout=None):
def send(self, event_data, **kwargs):
# type:(Union[EventData, Iterable[EventData]], Union[str, bytes], float) -> None
"""
Sends an event data and blocks until acknowledgement is
Expand Down Expand Up @@ -219,4 +228,5 @@ def close(self, **kwargs):
:caption: Close down the handler.

"""
exception = kwargs.get("exception", None)
super(EventHubProducer, self).close(exception)
You are viewing a condensed version of this merge commit. You can view the full changes here.