diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index f942dbcaace1..d5736d615bad 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fixed a bug where the correct URI was not being used for consumer authentication, causing issues when assigning roles at the consumer group level. ([#35337](https://github.com/Azure/azure-sdk-for-python/issues/35337)) + ### Other Changes ## 5.11.7 (2024-04-10) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py index 3a64c9676c1c..e2fbb2753b05 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py @@ -329,7 +329,8 @@ def __init__( else: self._credential = credential # type: ignore self._auto_reconnect = kwargs.get("auto_reconnect", True) - self._auth_uri = f"sb://{self._address.hostname}{self._address.path}" + self._auth_uri: str + self._eventhub_auth_uri = f"sb://{self._address.hostname}{self._address.path}" self._config = Configuration( amqp_transport=self._amqp_transport, hostname=self._address.hostname, @@ -359,7 +360,7 @@ def _from_connection_string(conn_str: str, **kwargs: Any) -> Dict[str, Any]: kwargs["credential"] = EventHubSharedKeyCredential(policy, key) return kwargs - def _create_auth(self) -> Union["uamqp_JWTTokenAuth", JWTTokenAuth]: + def _create_auth(self, *, auth_uri: Optional[str] = None) -> Union["uamqp_JWTTokenAuth", JWTTokenAuth]: """ Create an ~uamqp.authentication.SASTokenAuth instance to authenticate the session. @@ -367,6 +368,9 @@ def _create_auth(self) -> Union["uamqp_JWTTokenAuth", JWTTokenAuth]: :return: The auth for the session. :rtype: JWTTokenAuth or uamqp_JWTTokenAuth """ + # if auth_uri is not provided, use the default hub one + entity_auth_uri = auth_uri if auth_uri else self._eventhub_auth_uri + try: # ignore mypy's warning because token_type is Optional token_type = self._credential.token_type # type: ignore @@ -374,14 +378,14 @@ def _create_auth(self) -> Union["uamqp_JWTTokenAuth", JWTTokenAuth]: token_type = b"jwt" if token_type == b"servicebus.windows.net:sastoken": return self._amqp_transport.create_token_auth( - self._auth_uri, - functools.partial(self._credential.get_token, self._auth_uri), + entity_auth_uri, + functools.partial(self._credential.get_token, entity_auth_uri), token_type=token_type, config=self._config, update_token=True, ) return self._amqp_transport.create_token_auth( - self._auth_uri, + entity_auth_uri, functools.partial(self._credential.get_token, JWT_TOKEN_SCOPE), token_type=token_type, config=self._config, @@ -574,7 +578,7 @@ def _open(self) -> bool: if not self.running: if self._handler: self._handler.close() - auth = self._client._create_auth() + auth = self._client._create_auth(auth_uri=self._client._auth_uri) self._create_handler(auth) conn = self._client._conn_manager.get_connection( # pylint: disable=protected-access endpoint=self._client._address.hostname, auth=auth diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py index 56cedbdd83f2..49a992caab56 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py @@ -200,7 +200,7 @@ def _open(self) -> bool: if not self.running: if self._handler: self._handler.close() - auth = self._client._create_auth() + auth = self._client._create_auth(auth_uri=self._client._auth_uri) self._create_handler(auth) conn = self._client._conn_manager.get_connection( # pylint: disable=protected-access endpoint=self._client._address.hostname, auth=auth diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py index b3b376bc6cff..266864daadd3 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py @@ -178,6 +178,8 @@ def __init__( network_tracing=network_tracing, **kwargs ) + # consumer auth URI additionally includes consumer group + self._auth_uri = f"sb://{self._address.hostname}{self._address.path}/consumergroups/{self._consumer_group}" self._lock = threading.Lock() self._event_processors: Dict[Tuple[str, str], EventProcessor] = {} diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 288cedc73f08..320bea5c601f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -201,7 +201,7 @@ def __init__( network_tracing=kwargs.get("logging_enable"), **kwargs ) - + self._auth_uri = f"sb://{self._address.hostname}{self._address.path}" self._keep_alive = kwargs.get("keep_alive", None) self._producers: Dict[str, Optional[EventHubProducer]] = { diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py index 4ee5d75db77b..2410ac688b44 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py @@ -46,16 +46,14 @@ from .._pyamqp.aio._authentication_async import JWTTokenAuthAsync try: from uamqp import ( - authentication as uamqp_authentication, Message as uamqp_Message, AMQPClientAsync as uamqp_AMQPClientAsync, ) - from uamqp.authentication import JWTTokenAsync + from uamqp.authentication import JWTTokenAsync as uamqp_JWTTokenAsync except ImportError: - uamqp_authentication = None uamqp_Message = None uamqp_AMQPClientAsync = None - JWTTokenAsync = None + uamqp_JWTTokenAsync = None from azure.core.credentials_async import AsyncTokenCredential try: @@ -109,7 +107,7 @@ def running(self) -> bool: def running(self, value: bool) -> None: pass - def _create_handler(self, auth: Union["JWTTokenAsync", JWTTokenAuthAsync]) -> None: + def _create_handler(self, auth: Union["uamqp_JWTTokenAsync", JWTTokenAuthAsync]) -> None: pass _MIXIN_BASE = AbstractConsumerProducer @@ -268,15 +266,19 @@ def _from_connection_string(conn_str: str, **kwargs) -> Dict[str, Any]: kwargs["credential"] = EventHubSharedKeyCredential(policy, key) return kwargs - async def _create_auth_async(self) -> Union[uamqp_authentication.JWTTokenAsync, JWTTokenAuthAsync]: + async def _create_auth_async( + self, *, auth_uri: Optional[str] = None + ) -> Union["uamqp_JWTTokenAsync", JWTTokenAuthAsync]: """ Create an ~uamqp.authentication.SASTokenAuthAsync instance to authenticate the session. :return: A JWTTokenAuthAsync instance to authenticate the session. :rtype: ~uamqp.authentication.JWTTokenAsync or JWTTokenAuthAsync - """ + # if auth_uri is not provided, use the default hub one + entity_auth_uri = auth_uri if auth_uri else self._eventhub_auth_uri + try: # ignore mypy's warning because token_type is Optional token_type = self._credential.token_type # type: ignore @@ -284,14 +286,14 @@ async def _create_auth_async(self) -> Union[uamqp_authentication.JWTTokenAsync, token_type = b"jwt" if token_type == b"servicebus.windows.net:sastoken": return await self._amqp_transport.create_token_auth_async( - self._auth_uri, - functools.partial(self._credential.get_token, self._auth_uri), + entity_auth_uri, + functools.partial(self._credential.get_token, entity_auth_uri), token_type=token_type, config=self._config, update_token=True, ) return await self._amqp_transport.create_token_auth_async( - self._auth_uri, + entity_auth_uri, functools.partial(self._credential.get_token, JWT_TOKEN_SCOPE), token_type=token_type, config=self._config, @@ -475,7 +477,7 @@ async def _open(self) -> None: if not self.running: if self._handler: await self._handler.close_async() - auth = await self._client._create_auth_async() + auth = await self._client._create_auth_async(auth_uri=self._client._auth_uri) self._create_handler(auth) conn = await self._client._conn_manager_async.get_connection( endpoint=self._client._address.hostname, auth=auth diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py index 37cca02c01a1..7834539bbf51 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py @@ -191,6 +191,8 @@ def __init__( network_tracing=network_tracing, **kwargs, ) + # consumer auth URI additionally includes consumer group + self._auth_uri = f"sb://{self._address.hostname}{self._address.path}/consumergroups/{self._consumer_group}" self._lock = asyncio.Lock(**self._internal_kwargs) self._event_processors: Dict[Tuple[str, str], EventProcessor] = {} diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py index 1c95dfd97b95..bdac602669dd 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py @@ -188,6 +188,7 @@ def __init__( network_tracing=kwargs.pop("logging_enable", False), **kwargs ) + self._auth_uri = f"sb://{self._address.hostname}{self._address.path}" self._keep_alive = kwargs.get("keep_alive", None) self._producers: Dict[str, Optional[EventHubProducer]] = { ALL_PARTITIONS: self._create_producer()