Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Improvement of iothub mgmt
  • Loading branch information
yunhaoling committed Aug 28, 2019
commit afd8df6c8def2d77aee664d8a502975480018623
34 changes: 14 additions & 20 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ async def _close_connection(self):
await self._conn_manager.reset_connection_if_broken()

async def _management_request(self, mgmt_msg, op_type):
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()

alt_creds = {
"username": self._auth_config.get("iot_username"),
"password": self._auth_config.get("iot_password")
Expand All @@ -133,23 +136,16 @@ async def _management_request(self, mgmt_msg, op_type):
await mgmt_client.close_async()

async def _iothub_redirect(self):
async with self.create_consumer(consumer_group='$default',
partition_id='0',
event_position=EventPosition('-1'),
operation='/messages/events') as redirect_consumer:
await redirect_consumer._open_with_retry(timeout=self.config.receive_timeout) # pylint: disable=protected-access

async def _process_redirect_uri(self, redirect):
async with self._lock:
redirect_uri = redirect.address.decode('utf-8')
auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups")
self.address = urlparse(auth_uri)
self.host = self.address.hostname
self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path)
self.eh_name = self.address.path.lstrip('/')
self.mgmt_target = redirect_uri
if self._is_iothub:
self._iothub_redirected = redirect
if self._is_iothub and not self._iothub_redirect_info:
if not self._redirect_consumer:
self._redirect_consumer = self.create_consumer(consumer_group='$default',
partition_id='0',
event_position=EventPosition('-1'),
operation='/messages/events')
async with self._redirect_consumer:
await self._redirect_consumer._open_with_retry(timeout=self.config.receive_timeout) # pylint: disable=protected-access
self._redirect_consumer = None

async def get_properties(self):
# type:() -> Dict[str, Any]
Expand All @@ -164,9 +160,8 @@ async def get_properties(self):
:rtype: dict
:raises: ~azure.eventhub.ConnectError
"""
if self._is_iothub and not self._iothub_redirected:
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()

mgmt_msg = Message(application_properties={'name': self.eh_name})
response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub')
output = {}
Expand Down Expand Up @@ -206,9 +201,8 @@ async def get_partition_properties(self, partition):
:rtype: dict
:raises: ~azure.eventhub.ConnectError
"""
if self._is_iothub and not self._iothub_redirected:
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()

mgmt_msg = Message(application_properties={'name': self.eh_name,
'partition': partition})
response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:partition')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,10 @@ async def _open(self, timeout_time=None, **kwargs):

"""
# pylint: disable=protected-access
if self.client._iothub_redirected:
self.redirected = self.client._iothub_redirected
self.redirected = self.redirected or self.client._iothub_redirect_info

if not self.running and self.redirected:
await self.client._process_redirect_uri(self.redirected)
self.client._process_redirect_uri(self.redirected)
self.source = self.redirected.address
await super(EventHubConsumer, self)._open(timeout_time)

Expand Down
33 changes: 12 additions & 21 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class EventHubClient(EventHubClientAbstract):

def __init__(self, host, event_hub_path, credential, **kwargs):
super(EventHubClient, self).__init__(host, event_hub_path, credential, **kwargs)
self._lock = threading.Lock()
self._lock = threading.RLock()
self._conn_manager = get_connection_manager(**kwargs)

def __enter__(self):
Expand Down Expand Up @@ -137,23 +137,16 @@ def _management_request(self, mgmt_msg, op_type):
mgmt_client.close()

def _iothub_redirect(self):
with self.create_consumer(consumer_group='$default',
partition_id='0',
event_position=EventPosition('-1'),
operation='/messages/events') as redirect_consumer:
redirect_consumer._open_with_retry(timeout=self.config.receive_timeout) # pylint: disable=protected-access

def _process_redirect_uri(self, redirect):
with self._lock:
redirect_uri = redirect.address.decode('utf-8')
auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups")
self.address = urlparse(auth_uri)
self.host = self.address.hostname
self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path)
self.eh_name = self.address.path.lstrip('/')
self.mgmt_target = redirect_uri
if self._is_iothub:
self._iothub_redirected = redirect
if self._is_iothub and not self._iothub_redirect_info:
if not self._redirect_consumer:
self._redirect_consumer = self.create_consumer(consumer_group='$default',
partition_id='0',
event_position=EventPosition('-1'),
operation='/messages/events')
with self._redirect_consumer:
self._redirect_consumer._open_with_retry(timeout=self.config.receive_timeout) # pylint: disable=protected-access
self._redirect_consumer = None

def get_properties(self):
# type:() -> Dict[str, Any]
Expand All @@ -168,9 +161,8 @@ def get_properties(self):
:rtype: dict
:raises: ~azure.eventhub.ConnectError
"""
if self._is_iothub and not self._iothub_redirected:
if self._is_iothub and not self._iothub_redirect_info:
self._iothub_redirect()

mgmt_msg = Message(application_properties={'name': self.eh_name})
response = self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub')
output = {}
Expand Down Expand Up @@ -210,9 +202,8 @@ def get_partition_properties(self, partition):
:rtype: dict
:raises: ~azure.eventhub.ConnectError
"""
if self._is_iothub and not self._iothub_redirected:
if self._is_iothub and not self._iothub_redirect_info:
self._iothub_redirect()

mgmt_msg = Message(application_properties={'name': self.eh_name,
'partition': partition})
response = self._management_request(mgmt_msg, op_type=b'com.microsoft:partition')
Expand Down
23 changes: 19 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from azure.core.credentials import TokenCredential
from typing import Union, Any

from azure.eventhub import __version__
from azure.eventhub import __version__, EventPosition
from azure.eventhub.configuration import _Configuration
from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, _Address

Expand Down Expand Up @@ -157,8 +157,8 @@ def __init__(self, host, event_hub_path, credential, **kwargs):
self.get_auth = functools.partial(self._create_auth)
self.config = _Configuration(**kwargs)
self.debug = self.config.network_tracing
self._is_iothub = kwargs.get("is_iothub", False)
self._iothub_redirected = None
self._is_iothub = False
self._iothub_redirect_info = None

log.info("%r: Created the Event Hub client", self.container_id)

Expand All @@ -179,6 +179,11 @@ def _from_iothub_connection_string(cls, conn_str, **kwargs):
'iot_password': key,
'username': username,
'password': password}
client._is_iothub = True
client._redirect_consumer = client.create_consumer(consumer_group='$default',
partition_id='0',
event_position=EventPosition('-1'),
operation='/messages/events')
return client

@abstractmethod
Expand Down Expand Up @@ -211,6 +216,17 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use
properties["user-agent"] = final_user_agent
return properties

def _process_redirect_uri(self, redirect):
redirect_uri = redirect.address.decode('utf-8')
auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups")
self.address = urlparse(auth_uri)
self.host = self.address.hostname
self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path)
self.eh_name = self.address.path.lstrip('/')
self.mgmt_target = redirect_uri
if self._is_iothub:
self._iothub_redirect_info = redirect

@classmethod
def from_connection_string(cls, conn_str, **kwargs):
"""Create an EventHubClient from an EventHub/IotHub connection string.
Expand Down Expand Up @@ -272,7 +288,6 @@ def from_connection_string(cls, conn_str, **kwargs):
kwargs.pop("event_hub_path", None)
return cls(host, entity, EventHubSharedKeyCredential(policy, key), **kwargs)
else:
kwargs['is_iothub'] = True
return cls._from_iothub_connection_string(conn_str, **kwargs)

@abstractmethod
Expand Down
3 changes: 1 addition & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ def _open(self, timeout_time=None, **kwargs):

"""
# pylint: disable=protected-access
if self.client._iothub_redirected:
self.redirected = self.client._iothub_redirected
self.redirected = self.redirected or self.client._iothub_redirect_info

if not self.running and self.redirected:
self.client._process_redirect_uri(self.redirected)
Expand Down