Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "5.0.0b2"
__version__ = "5.0.0b3"
from uamqp import constants # type: ignore
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
self._client = None
self._handler = None
self.name = None
self._name = None

def __enter__(self):
return self
Expand All @@ -26,15 +26,15 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))
if self._error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _create_handler(self):
pass

def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._redirected = redirect
self._running = False
self._close_connection()

def _open(self):
Expand All @@ -45,36 +45,36 @@ def _open(self):

"""
# pylint: disable=protected-access
if not self.running:
if not self._running:
if self._handler:
self._handler.close()
if self.redirected:
if self._redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
"username": self._client._auth_config.get("iot_username"),
"password": self._client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self.client._conn_manager.get_connection( # pylint: disable=protected-access
self.client.address.hostname,
self.client.get_auth(**alt_creds)
self._handler.open(connection=self._client._conn_manager.get_connection( # pylint: disable=protected-access
self._client._address.hostname,
self._client._get_auth(**alt_creds)
))
while not self._handler.client_ready():
time.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True
self._running = True

def _close_handler(self):
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self.running = False
self._running = False

def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access
self._client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, self)

Expand All @@ -89,19 +89,18 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs):
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self.client.config.max_retries:
while retried_times <= self._client._config.max_retries: # pylint: disable=protected-access
try:
if operation_need_param:
return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
else:
return operation()
return operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self.name)
self._client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self._name)
retried_times += 1

log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception)
log.info("%r has exhausted retry. Exception still occurs (%r)", self._name, last_exception)
raise last_exception

def close(self, exception=None):
Expand All @@ -124,16 +123,16 @@ def close(self, exception=None):
:caption: Close down the handler.

"""
self.running = False
if self.error: # type: ignore
self._running = False
if self._error: # type: ignore
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
self._redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
self._error = exception
elif exception:
self.error = EventHubError(str(exception))
self._error = EventHubError(str(exception))
else:
self.error = EventHubError("{} handler is closed.".format(self.name))
self._error = EventHubError("{} handler is closed.".format(self._name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
class ConsumerProducerMixin(object):

def __init__(self):
self.client = None
self._client = None
self._handler = None
self.name = None
self._name = None

async def __aenter__(self):
return self
Expand All @@ -27,15 +27,15 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))
if self._error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self._name))

def _create_handler(self):
pass

async def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._redirected = redirect
self._running = False
await self._close_connection()

async def _open(self):
Expand All @@ -46,36 +46,36 @@ async def _open(self):

"""
# pylint: disable=protected-access
if not self.running:
if not self._running:
if self._handler:
await self._handler.close_async()
if self.redirected:
if self._redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
"username": self._client._auth_config.get("iot_username"),
"password": self._client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
await self._handler.open_async(connection=await self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
await self._handler.open_async(connection=await self._client._conn_manager.get_connection(
self._client._address.hostname,
self._client._get_auth(**alt_creds)
))
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True
self._running = True

async def _close_handler(self):
await self._handler.close_async() # close the link (sharing connection) or connection (not sharing)
self.running = False
self._running = False

async def _close_connection(self):
await self._close_handler()
await self.client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access
await self._client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access

async def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return await _handle_exception(exception, self)

Expand All @@ -90,19 +90,18 @@ async def _do_retryable_operation(self, operation, timeout=None, **kwargs):
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self.client.config.max_retries:
while retried_times <= self._client._config.max_retries:
try:
if operation_need_param:
return await operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
else:
return await operation()
return await operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception)
await self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self.name)
await self._client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self._name)
retried_times += 1

log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception)
log.info("%r has exhausted retry. Exception still occurs (%r)", self._name, last_exception)
raise last_exception

async def close(self, exception=None):
Expand All @@ -125,18 +124,18 @@ async def close(self, exception=None):
:caption: Close down the handler.

"""
self.running = False
if self.error: #type: ignore
self._running = False
if self._error: #type: ignore
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
self._redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
self._error = exception
elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
self.error = ConnectError(str(exception), exception)
self._error = ConnectError(str(exception), exception)
elif exception:
self.error = EventHubError(str(exception))
self._error = EventHubError(str(exception))
else:
self.error = EventHubError("This receive handler is now closed.")
self._error = EventHubError("This receive handler is now closed.")
if self._handler:
await self._handler.close_async()
44 changes: 22 additions & 22 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,46 +68,46 @@ def _create_auth(self, username=None, password=None):
:param password: The shared access key.
:type password: str
"""
http_proxy = self.config.http_proxy
transport_type = self.config.transport_type
auth_timeout = self.config.auth_timeout
http_proxy = self._config.http_proxy
transport_type = self._config.transport_type
auth_timeout = self._config.auth_timeout

if isinstance(self.credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return
if isinstance(self._credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(
self.host, username, password, http_proxy=http_proxy, transport_type=transport_type)
self._host, username, password, http_proxy=http_proxy, transport_type=transport_type)
return authentication.SASTokenAsync.from_shared_access_key(
self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy,
self._auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy,
transport_type=transport_type)

elif isinstance(self.credential, EventHubSASTokenCredential):
token = self.credential.get_sas_token()
elif isinstance(self._credential, EventHubSASTokenCredential):
token = self._credential.get_sas_token()
try:
expiry = int(parse_sas_token(token)['se'])
except (KeyError, TypeError, IndexError):
raise ValueError("Supplied SAS token has no valid expiry value.")
return authentication.SASTokenAsync(
self.auth_uri, self.auth_uri, token,
self._auth_uri, self._auth_uri, token,
expires_at=expiry,
timeout=auth_timeout,
http_proxy=http_proxy,
transport_type=transport_type)

else:
get_jwt_token = functools.partial(self.credential.get_token, 'https://eventhubs.azure.net//.default')
return authentication.JWTTokenAsync(self.auth_uri, self.auth_uri,
get_jwt_token = functools.partial(self._credential.get_token, 'https://eventhubs.azure.net//.default')
return authentication.JWTTokenAsync(self._auth_uri, self._auth_uri,
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)

async def _close_connection(self):
await self._conn_manager.reset_connection_if_broken()

async def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_name=None):
entity_name = entity_name or self.container_id
backoff = self.config.backoff_factor * 2 ** retried_times
if backoff <= self.config.backoff_max and (
entity_name = entity_name or self._container_id
backoff = self._config.backoff_factor * 2 ** retried_times
if backoff <= self._config.backoff_max and (
timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return
await asyncio.sleep(backoff)
log.info("%r has an exception (%r). Retrying...", format(entity_name), last_exception)
Expand All @@ -123,11 +123,11 @@ async def _management_request(self, mgmt_msg, op_type):
}

retried_times = 0
while retried_times <= self.config.max_retries:
while retried_times <= self._config.max_retries:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.config.network_tracing)
mgmt_client = AMQPClientAsync(self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing)
try:
conn = await self._conn_manager.get_connection(self.host, mgmt_auth)
conn = await self._conn_manager.get_connection(self._host, mgmt_auth)
await mgmt_client.open_async(connection=conn)
response = await mgmt_client.mgmt_request_async(
mgmt_msg,
Expand Down Expand Up @@ -265,12 +265,12 @@ def create_consumer(
"""
owner_level = kwargs.get("owner_level")
operation = kwargs.get("operation")
prefetch = kwargs.get("prefetch") or self.config.prefetch
prefetch = kwargs.get("prefetch") or self._config.prefetch
loop = kwargs.get("loop")

path = self.address.path + operation if operation else self.address.path
path = self._address.path + operation if operation else self._address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition_id)
self._address.hostname, path, consumer_group, partition_id)
handler = EventHubConsumer(
self, source_url, event_position=event_position, owner_level=owner_level,
prefetch=prefetch, loop=loop)
Expand Down Expand Up @@ -309,10 +309,10 @@ def create_producer(

"""

target = "amqps://{}{}".format(self.address.hostname, self.address.path)
target = "amqps://{}{}".format(self._address.hostname, self._address.path)
if operation:
target = target + operation
send_timeout = self.config.send_timeout if send_timeout is None else send_timeout
send_timeout = self._config.send_timeout if send_timeout is None else send_timeout

handler = EventHubProducer(
self, target, partition=partition_id, send_timeout=send_timeout, loop=loop)
Expand Down
Loading