Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Eventhubs conn (#6394)
* Shared connection (sync) draft

* Shared connection (sync) draft 2

* Shared connection (sync) test update

* Shared connection

* Fix an issue

* add retry exponential delay and timeout to exception handling

* put module method before class def

* fixed Client.get_properties error

* Improve send timeout (#6481)

* Add timeout information to the link prop during link creation process (#6485)

* Update optional parameters in public api into kwargs and update comments (#6510)

* Update optional parameters in public api into kwargs and update some comments

* Update more optional parameters into kwargs paramter
  • Loading branch information
YijunXieMS authored and yunhaoling committed Jul 26, 2019
commit 29e54b9c9d710dbe39fdb2fd5e9a736635ec0cb7
77 changes: 77 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/_connection_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from threading import RLock
from uamqp import Connection, TransportType, c_uamqp


class _SharedConnectionManager(object):
def __init__(self, **kwargs):
self._lock = RLock()
self._conn = None # type: Connection

self._container_id = kwargs.get("container_id")
self._debug = kwargs.get("debug")
self._error_policy = kwargs.get("error_policy")
self._properties = kwargs.get("properties")
self._encoding = kwargs.get("encoding") or "UTF-8"
self._transport_type = kwargs.get('transport_type') or TransportType.Amqp
self._http_proxy = kwargs.get('http_proxy')
self._max_frame_size = kwargs.get("max_frame_size")
self._channel_max = kwargs.get("channel_max")
self._idle_timeout = kwargs.get("idle_timeout")
self._remote_idle_timeout_empty_frame_send_ratio = kwargs.get("remote_idle_timeout_empty_frame_send_ratio")

def get_connection(self, host, auth):
# type: (...) -> Connection
with self._lock:
if self._conn is None:
self._conn = Connection(
host,
auth,
container_id=self._container_id,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug,
encoding=self._encoding)
return self._conn

def close_connection(self):
with self._lock:
if self._conn:
self._conn.destroy()
self._conn = None

def reset_connection_if_broken(self):
with self._lock:
if self._conn and self._conn._state in (
c_uamqp.ConnectionState.CLOSE_RCVD,
c_uamqp.ConnectionState.CLOSE_SENT,
c_uamqp.ConnectionState.DISCARDING,
c_uamqp.ConnectionState.END,
):
self._conn = None


class _SeparateConnectionManager(object):
def __init__(self, **kwargs):
pass

def get_connection(self, host, auth):
return None

def close_connection(self):
pass

def reset_connection_if_broken(self):
pass


def get_connection_manager(**kwargs):
return _SeparateConnectionManager(**kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import logging
import time

from uamqp import errors
from azure.eventhub.error import EventHubError, _handle_exception

log = logging.getLogger(__name__)


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
self._handler = None
self.name = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name))

def _create_handler(self):
pass

def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._close_connection()

def _open(self, timeout_time=None):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

"""
# pylint: disable=protected-access
if not self.running:
if self.redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
))
while not self._handler.client_ready():
if timeout_time and time.time() >= timeout_time:
return
time.sleep(0.05)
self.running = True

def _close_handler(self):
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self.running = False

def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken()

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
_handle_exception(exception, retry_count, max_retries, self, timeout_time)

def close(self, exception=None):
# type:(Exception) -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.

:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception

Example:
.. literalinclude:: ../examples/test_examples_eventhub.py
:start-after: [START eventhub_client_receiver_close]
:end-before: [END eventhub_client_receiver_close]
:language: python
:dedent: 4
:caption: Close down the handler.

"""
self.running = False
if self.error:
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif exception:
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("{} handler is closed.".format(self.name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from asyncio import Lock
from uamqp import TransportType, c_uamqp
from uamqp.async_ops import ConnectionAsync


class _SharedConnectionManager(object):
def __init__(self, **kwargs):
self._lock = Lock()
self._conn = None

self._container_id = kwargs.get("container_id")
self._debug = kwargs.get("debug")
self._error_policy = kwargs.get("error_policy")
self._properties = kwargs.get("properties")
self._encoding = kwargs.get("encoding") or "UTF-8"
self._transport_type = kwargs.get('transport_type') or TransportType.Amqp
self._http_proxy = kwargs.get('http_proxy')
self._max_frame_size = kwargs.get("max_frame_size")
self._channel_max = kwargs.get("channel_max")
self._idle_timeout = kwargs.get("idle_timeout")
self._remote_idle_timeout_empty_frame_send_ratio = kwargs.get("remote_idle_timeout_empty_frame_send_ratio")

async def get_connection(self, host, auth):
# type: (...) -> ConnectionAsync
async with self._lock:
if self._conn is None:
self._conn = ConnectionAsync(
host,
auth,
container_id=self._container_id,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug,
encoding=self._encoding)
return self._conn

async def close_connection(self):
async with self._lock:
if self._conn:
await self._conn.destroy_async()
self._conn = None

async def reset_connection_if_broken(self):
async with self._lock:
if self._conn and self._conn._state in (
c_uamqp.ConnectionState.CLOSE_RCVD,
c_uamqp.ConnectionState.CLOSE_SENT,
c_uamqp.ConnectionState.DISCARDING,
c_uamqp.ConnectionState.END,
):
self._conn = None


class _SeparateConnectionManager(object):
def __init__(self, **kwargs):
pass

async def get_connection(self, host, auth):
pass # return None

async def close_connection(self):
pass

def reset_connection_if_broken(self):
pass


def get_connection_manager(**kwargs):
return _SharedConnectionManager(**kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import asyncio
import logging
import time

from uamqp import errors
from azure.eventhub.error import EventHubError, ConnectError
from ..aio.error_async import _handle_exception

log = logging.getLogger(__name__)


class ConsumerProducerMixin(object):

def __init__(self):
self.client = None
self._handler = None
self.name = None

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new consumer to receive event data.".format(self.name))

def _create_handler(self):
pass

async def _redirect(self, redirect):
self.redirected = redirect
self.running = False
await self._close_connection()

async def _open(self, timeout_time=None):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

"""
# pylint: disable=protected-access
if not self.running:
if self.redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
await self._handler.open_async(connection=await self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
))
while not await self._handler.client_ready_async():
if timeout_time and time.time() >= timeout_time:
return
await asyncio.sleep(0.05)
self.running = True

async def _close_handler(self):
await self._handler.close_async() # close the link (sharing connection) or connection (not sharing)
self.running = False

async def _close_connection(self):
await self._close_handler()
await self.client._conn_manager.reset_connection_if_broken()

async def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
await _handle_exception(exception, retry_count, max_retries, self, timeout_time)

async def close(self, exception=None):
# type: (Exception) -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.

:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception

Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
:start-after: [START eventhub_client_async_receiver_close]
:end-before: [END eventhub_client_async_receiver_close]
:language: python
:dedent: 4
:caption: Close down the handler.

"""
self.running = False
if self.error:
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = ConnectError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("This receive handler is now closed.")
if self._handler:
await self._handler.close_async()
Loading