From 03328d01f462c3154f59acfa0d11aa8059ab931f Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Wed, 1 Jul 2020 10:39:58 -0700 Subject: [PATCH 01/10] autolockrenewer can now take a callback that fires when for any non-user-defined reason (e.g. not due to settlement or shutdown) a lock is lost on an auto-lock-renewed session or message. Adds tests as well and changelog notes. --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 8 +- .../azure/servicebus/__init__.py | 2 +- .../servicebus/_common/auto_lock_renewer.py | 132 ++++++++++++++++++ .../azure/servicebus/_common/utils.py | 100 ------------- .../azure/servicebus/aio/__init__.py | 2 +- .../aio/_async_auto_lock_renewer.py | 127 +++++++++++++++++ .../azure/servicebus/aio/_async_utils.py | 95 ------------- .../tests/async_tests/mocks_async.py | 27 ++++ .../tests/async_tests/test_queues_async.py | 63 +++++++-- .../tests/async_tests/test_sessions_async.py | 16 +++ .../azure-servicebus/tests/mocks.py | 27 ++++ .../azure-servicebus/tests/test_queues.py | 61 ++++++-- .../azure-servicebus/tests/test_sessions.py | 18 ++- 13 files changed, 460 insertions(+), 218 deletions(-) create mode 100644 sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py create mode 100644 sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py create mode 100644 sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py create mode 100644 sdk/servicebus/azure-servicebus/tests/mocks.py diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 99dbb05d2860..a466e31ae059 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -2,10 +2,14 @@ ## 7.0.0b4 (Unreleased) +**New Features** + +* Add an on_lock_renew_failure as a parameter to `AutoLockRenew`, taking a callback for when the lock is lost non-intentially (e.g. not via settling, shutdown, or autolockrenew duration completion) + **BugFixes** -* Fixed bug where sync AutoLockRenew does not shutdown itself timely. -* Fixed bug where async AutoLockRenew does not support context manager. +* Fixed bug where sync `AutoLockRenew` does not shutdown itself timely. +* Fixed bug where async `AutoLockRenew` does not support context manager. ## 7.0.0b3 (2020-06-08) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py b/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py index 10e5205fd439..826e54f3e087 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py @@ -16,7 +16,7 @@ from ._base_handler import ServiceBusSharedKeyCredential from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage from ._common.constants import ReceiveSettleMode, NEXT_AVAILABLE -from ._common.utils import AutoLockRenew +from ._common.auto_lock_renewer import AutoLockRenew TransportType = constants.TransportType diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py new file mode 100644 index 000000000000..3cf8a3a829fa --- /dev/null +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py @@ -0,0 +1,132 @@ +# ------------------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# ------------------------------------------------------------------------- + +import sys +import datetime +import logging +import threading +import time +import functools +from concurrent.futures import ThreadPoolExecutor + +from .._servicebus_session import ServiceBusSession +from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError +from .utils import renewable_start_time, utc_now + +_log = logging.getLogger(__name__) + +class AutoLockRenew(object): + """Auto renew locks for messages and sessions using a background thread pool. + + :param executor: A user-specified thread pool. This cannot be combined with + setting `max_workers`. + :type executor: ~concurrent.futures.ThreadPoolExecutor + :param max_workers: Specify the maximum workers in the thread pool. If not + specified the number used will be derived from the core count of the environment. + This cannot be combined with `executor`. + :type max_workers: int + + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START auto_lock_renew_message_sync] + :end-before: [END auto_lock_renew_message_sync] + :language: python + :dedent: 4 + :caption: Automatically renew a message lock + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START auto_lock_renew_session_sync] + :end-before: [END auto_lock_renew_session_sync] + :language: python + :dedent: 4 + :caption: Automatically renew a session lock + + """ + + def __init__(self, executor=None, max_workers=None): + self.executor = executor or ThreadPoolExecutor(max_workers=max_workers) + self._shutdown = threading.Event() + self.sleep_time = 1 + self.renew_period = 10 + + def __enter__(self): + if self._shutdown.is_set(): + raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" + " auto lock renewing.") + return self + + def __exit__(self, *args): + self.shutdown() + + def _renewable(self, renewable): + if self._shutdown.is_set(): + return False + if hasattr(renewable, 'settled') and renewable.settled: + return False + if isinstance(renewable, ServiceBusSession) and not renewable._receiver._running: + return False + if renewable.expired: + return False + return True + + def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None): + _log.debug("Running lock auto-renew thread for %r seconds", timeout) + error = None + try: + while self._renewable(renewable): + if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): + _log.debug("Reached auto lock renew timeout - letting lock expire.") + raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout)) + if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self.renew_period): + _log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period) + renewable.renew_lock() + time.sleep(self.sleep_time) + except AutoLockRenewTimeout as e: + renewable.auto_renew_error = e + except Exception as e: # pylint: disable=broad-except + _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) + error = AutoLockRenewFailed( + "Failed to auto-renew lock", + inner_exception=e) + renewable.auto_renew_error = error + finally: + if on_lock_renew_failure: + if self._shutdown.is_set() \ + or (hasattr(renewable, 'settled') and renewable.settled) \ + or (isinstance(renewable, ServiceBusSession) and not renewable._receiver._running) \ + or (not error and not renewable.expired): + # Basically we want to make sure that any of the "intentional" exit cases are not fired on. + # e.g. shutdown, settlement, session closure, renewer timeout (as opposed to renew failure/expiry) + return + on_lock_renew_failure(renewable) + + def register(self, renewable, timeout=300, on_lock_renew_failure=None): + """Register a renewable entity for automatic lock renewal. + + :param renewable: A locked entity that needs to be renewed. + :type renewable: ~azure.servicebus.ReceivedMessage or + ~azure.servicebus.Session + :param float timeout: A time in seconds that the lock should be maintained for. + Default value is 300 (5 minutes). + :param Callable[[Union[Session,ReceivedMessage]], None] on_lock_renew_failure: A callback may be specified + to be called when the lock is lost on the renewable that is being registered. + Default value is None (no callback). + """ + if self._shutdown.is_set(): + raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" + " auto lock renewing.") + starttime = renewable_start_time(renewable) + self.executor.submit(self._auto_lock_renew, renewable, starttime, timeout, on_lock_renew_failure) + + def shutdown(self, wait=True): + """Shutdown the thread pool to clean up any remaining lock renewal threads. + + :param wait: Whether to block until thread pool has shutdown. Default is `True`. + :type wait: bool + """ + self._shutdown.set() + self.executor.shutdown(wait=wait) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 11ee8e0292b9..9dfb0711f849 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -14,11 +14,9 @@ from urlparse import urlparse except ImportError: from urllib.parse import urlparse -from concurrent.futures import ThreadPoolExecutor from uamqp import authentication -from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError from .._version import VERSION as sdk_version from .constants import ( JWT_TOKEN_SCOPE, @@ -156,101 +154,3 @@ def generate_dead_letter_entity_name( ) return entity_name - - -class AutoLockRenew(object): - """Auto renew locks for messages and sessions using a background thread pool. - - :param executor: A user-specified thread pool. This cannot be combined with - setting `max_workers`. - :type executor: ~concurrent.futures.ThreadPoolExecutor - :param max_workers: Specify the maximum workers in the thread pool. If not - specified the number used will be derived from the core count of the environment. - This cannot be combined with `executor`. - :type max_workers: int - - .. admonition:: Example: - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START auto_lock_renew_message_sync] - :end-before: [END auto_lock_renew_message_sync] - :language: python - :dedent: 4 - :caption: Automatically renew a message lock - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START auto_lock_renew_session_sync] - :end-before: [END auto_lock_renew_session_sync] - :language: python - :dedent: 4 - :caption: Automatically renew a session lock - - """ - - def __init__(self, executor=None, max_workers=None): - self.executor = executor or ThreadPoolExecutor(max_workers=max_workers) - self._shutdown = threading.Event() - self.sleep_time = 1 - self.renew_period = 10 - - def __enter__(self): - if self._shutdown.is_set(): - raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" - " auto lock renewing.") - return self - - def __exit__(self, *args): - self.shutdown() - - def _renewable(self, renewable): - if self._shutdown.is_set(): - return False - if hasattr(renewable, 'settled') and renewable.settled: - return False - if renewable.expired: - return False - return True - - def _auto_lock_renew(self, renewable, starttime, timeout): - _log.debug("Running lock auto-renew thread for %r seconds", timeout) - try: - while self._renewable(renewable): - if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): - _log.debug("Reached auto lock renew timeout - letting lock expire.") - raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout)) - if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self.renew_period): - _log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period) - renewable.renew_lock() - time.sleep(self.sleep_time) - except AutoLockRenewTimeout as e: - renewable.auto_renew_error = e - except Exception as e: # pylint: disable=broad-except - _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) - error = AutoLockRenewFailed( - "Failed to auto-renew lock", - inner_exception=e) - renewable.auto_renew_error = error - - def register(self, renewable, timeout=300): - """Register a renewable entity for automatic lock renewal. - - :param renewable: A locked entity that needs to be renewed. - :type renewable: ~azure.servicebus.ReceivedMessage or - ~azure.servicebus.Session - :param float timeout: A time in seconds that the lock should be maintained for. - Default value is 300 (5 minutes). - """ - if self._shutdown.is_set(): - raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" - " auto lock renewing.") - starttime = renewable_start_time(renewable) - self.executor.submit(self._auto_lock_renew, renewable, starttime, timeout) - - def shutdown(self, wait=True): - """Shutdown the thread pool to clean up any remaining lock renewal threads. - - :param wait: Whether to block until thread pool has shutdown. Default is `True`. - :type wait: bool - """ - self._shutdown.set() - self.executor.shutdown(wait=wait) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py index 7b24904344b2..8db9f20d7b1c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py @@ -10,7 +10,7 @@ from ._servicebus_session_receiver_async import ServiceBusSessionReceiver from ._servicebus_session_async import ServiceBusSession from ._servicebus_client_async import ServiceBusClient -from ._async_utils import AutoLockRenew +from ._async_auto_lock_renewer import AutoLockRenew __all__ = [ 'ReceivedMessage', diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py new file mode 100644 index 000000000000..285d54ee0a73 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -0,0 +1,127 @@ +# ------------------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# ------------------------------------------------------------------------- + +import asyncio +import logging +import datetime +import functools + +from ._servicebus_session_async import ServiceBusSession +from .._common.utils import renewable_start_time, utc_now +from ._async_utils import get_running_loop +from ..exceptions import AutoLockRenewTimeout, AutoLockRenewFailed, ServiceBusError + +_log = logging.getLogger(__name__) + + +class AutoLockRenew: + """Auto lock renew. + + An asynchronous AutoLockRenew handler for renewing the lock + tokens of messages and/or sessions in the background. + + :param loop: An async event loop. + :type loop: ~asyncio.EventLoop + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START auto_lock_renew_message_async] + :end-before: [END auto_lock_renew_message_async] + :language: python + :dedent: 4 + :caption: Automatically renew a message lock + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START auto_lock_renew_session_async] + :end-before: [END auto_lock_renew_session_async] + :language: python + :dedent: 4 + :caption: Automatically renew a session lock + + """ + + def __init__(self, loop=None): + self._shutdown = asyncio.Event() + self._futures = [] + self.loop = loop or get_running_loop() + self.sleep_time = 1 + self.renew_period = 10 + + async def __aenter__(self): + if self._shutdown.is_set(): + raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" + " auto lock renewing.") + return self + + async def __aexit__(self, *args): + await self.shutdown() + + def _renewable(self, renewable): + if self._shutdown.is_set(): + return False + if hasattr(renewable, 'settled') and renewable.settled: + return False + if renewable.expired: + return False + if isinstance(renewable, ServiceBusSession) and not renewable._receiver._running: + return False + return True + + async def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None): + _log.debug("Running async lock auto-renew for %r seconds", timeout) + error = None + try: + while self._renewable(renewable): + if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): + _log.debug("Reached auto lock renew timeout - letting lock expire.") + raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout)) + if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self.renew_period): + _log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period) + await renewable.renew_lock() + await asyncio.sleep(self.sleep_time) + except AutoLockRenewTimeout as e: + renewable.auto_renew_error = e + except Exception as e: # pylint: disable=broad-except + _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) + error = AutoLockRenewFailed( + "Failed to auto-renew lock", + inner_exception=e) + renewable.auto_renew_error = error + finally: + if on_lock_renew_failure: + if self._shutdown.is_set() \ + or (hasattr(renewable, 'settled') and renewable.settled) \ + or (isinstance(renewable, ServiceBusSession) and not renewable._receiver._running) \ + or (not error and not renewable.expired): + # Basically we want to make sure that any of the "intentional" exit cases are not fired on. + # e.g. shutdown, settlement, session closure, renewer timeout (as opposed to renew failure/expiry) + return + await on_lock_renew_failure(renewable) + + def register(self, renewable, timeout=300, on_lock_renew_failure=None): + """Register a renewable entity for automatic lock renewal. + + :param renewable: A locked entity that needs to be renewed. + :type renewable: ~azure.servicebus.aio.ReceivedMessage or + ~azure.servicebus.aio.Session + :param float timeout: A time in seconds that the lock should be maintained for. + Default value is 300 (5 minutes). + :param Callable[[Union[Session,ReceivedMessage]], Awaitable[None]] on_lock_renew_failure: A callback may be specified + to be called when the lock is lost on the renewable that is being registered. + Default value is None (no callback). + """ + if self._shutdown.is_set(): + raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" + " auto lock renewing.") + starttime = renewable_start_time(renewable) + renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), loop=self.loop) + self._futures.append(renew_future) + + async def shutdown(self): + """Cancel remaining open lock renewal futures.""" + self._shutdown.set() + await asyncio.wait(self._futures) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py index b1f1050d1754..6a126fa91b53 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py @@ -12,7 +12,6 @@ from uamqp import authentication from .._common.utils import renewable_start_time, utc_now -from ..exceptions import AutoLockRenewTimeout, AutoLockRenewFailed, ServiceBusError from .._common.constants import ( JWT_TOKEN_SCOPE, TOKEN_TYPE_JWT, @@ -70,97 +69,3 @@ async def create_authentication(client): http_proxy=client._config.http_proxy, transport_type=client._config.transport_type, ) - - -class AutoLockRenew: - """Auto lock renew. - - An asynchronous AutoLockRenew handler for renewing the lock - tokens of messages and/or sessions in the background. - - :param loop: An async event loop. - :type loop: ~asyncio.EventLoop - - .. admonition:: Example: - - .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py - :start-after: [START auto_lock_renew_message_async] - :end-before: [END auto_lock_renew_message_async] - :language: python - :dedent: 4 - :caption: Automatically renew a message lock - - .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py - :start-after: [START auto_lock_renew_session_async] - :end-before: [END auto_lock_renew_session_async] - :language: python - :dedent: 4 - :caption: Automatically renew a session lock - - """ - - def __init__(self, loop=None): - self._shutdown = asyncio.Event() - self._futures = [] - self.loop = loop or get_running_loop() - self.sleep_time = 1 - self.renew_period = 10 - - async def __aenter__(self): - if self._shutdown.is_set(): - raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" - " auto lock renewing.") - return self - - async def __aexit__(self, *args): - await self.shutdown() - - def _renewable(self, renewable): - if self._shutdown.is_set(): - return False - if hasattr(renewable, 'settled') and renewable.settled: - return False - if renewable.expired: - return False - return True - - async def _auto_lock_renew(self, renewable, starttime, timeout): - _log.debug("Running async lock auto-renew for %r seconds", timeout) - try: - while self._renewable(renewable): - if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): - _log.debug("Reached auto lock renew timeout - letting lock expire.") - raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout)) - if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self.renew_period): - _log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period) - await renewable.renew_lock() - await asyncio.sleep(self.sleep_time) - except AutoLockRenewTimeout as e: - renewable.auto_renew_error = e - except Exception as e: # pylint: disable=broad-except - _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) - error = AutoLockRenewFailed( - "Failed to auto-renew lock", - inner_exception=e) - renewable.auto_renew_error = error - - def register(self, renewable, timeout=300): - """Register a renewable entity for automatic lock renewal. - - :param renewable: A locked entity that needs to be renewed. - :type renewable: ~azure.servicebus.aio.ReceivedMessage or - ~azure.servicebus.aio.Session - :param float timeout: A time in seconds that the lock should be maintained for. - Default value is 300 (5 minutes). - """ - if self._shutdown.is_set(): - raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" - " auto lock renewing.") - starttime = renewable_start_time(renewable) - renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout), loop=self.loop) - self._futures.append(renew_future) - - async def shutdown(self): - """Cancel remaining open lock renewal futures.""" - self._shutdown.set() - await asyncio.wait(self._futures) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py new file mode 100644 index 000000000000..e618c74211cd --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py @@ -0,0 +1,27 @@ +from datetime import timedelta + +from azure.servicebus._common.utils import utc_now + +class MockReceivedMessage: + def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False): + self._lock_duration = 2 + + self.received_timestamp_utc = utc_now() + self.locked_until_utc = self.received_timestamp_utc + timedelta(seconds=self._lock_duration) + self.settled = False + + self._prevent_renew_lock = prevent_renew_lock + self._exception_on_renew_lock = exception_on_renew_lock + + + async def renew_lock(self): + if self._exception_on_renew_lock: + raise Exception("Generated exception via MockReceivedMessage exception_on_renew_lock") + if not self._prevent_renew_lock: + self.locked_until_utc = self.locked_until_utc + timedelta(seconds=self._lock_duration) + + @property + def expired(self): + if self.locked_until_utc and self.locked_until_utc <= utc_now(): + return True + return False \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 90e27a482d7b..7e1042bd3f9a 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -33,6 +33,7 @@ from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer from servicebus_preparer import CachedServiceBusNamespacePreparer, CachedServiceBusQueuePreparer, ServiceBusQueuePreparer from utilities import get_logger, print_message, sleep_until_expired +from mocks_async import MockReceivedMessage _logger = get_logger(logging.DEBUG) @@ -1103,20 +1104,63 @@ async def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_lin assert len(messages) == 1 await messages[0].complete() + @pytest.mark.asyncio - async def test_async_queue_mock_no_reusing_auto_lock_renew(self): - class MockReceivedMessage: - def __init__(self): - self.received_timestamp_utc = utc_now() - self.locked_until_utc = self.received_timestamp_utc + timedelta(seconds=10) + async def test_async_queue_mock_auto_lock_renew_callback(self): + results = [] + async def callback_mock(renewable): + results.append(renewable) + + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 # So we can run the test fast. + async with auto_lock_renew: # Check that it is called when the object expires for any reason (silent renew failure) + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + await asyncio.sleep(3) + assert len(results) and results[-1].expired == True - async def renew_lock(self): - self.locked_until_utc = self.locked_until_utc + timedelta(seconds=10) + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + async with auto_lock_renew: # Check that in normal operation it does not get called + auto_lock_renew.register(renewable=MockReceivedMessage(), on_lock_renew_failure=callback_mock) + await asyncio.sleep(3) + assert len(results) == 1 + + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + async with auto_lock_renew: # Check that when a message is settled, it will not get called even after expiry + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + message.settled = True + await asyncio.sleep(3) + assert len(results) == 1 auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + async with auto_lock_renew: # Check that it is called when there is an overt renew failure + message = MockReceivedMessage(exception_on_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + await asyncio.sleep(3) + assert len(results) == 2 and results[-1].expired == True + + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + async with auto_lock_renew: # Check that it is not called when the renewer is shutdown + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + await auto_lock_renew.shutdown() + await asyncio.sleep(3) + assert len(results) == 2 + + + @pytest.mark.asyncio + async def test_async_queue_mock_no_reusing_auto_lock_renew(self): + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + async with auto_lock_renew: auto_lock_renew.register(renewable=MockReceivedMessage()) - await asyncio.sleep(12) + await asyncio.sleep(3) with pytest.raises(ServiceBusError): async with auto_lock_renew: @@ -1126,9 +1170,10 @@ async def renew_lock(self): auto_lock_renew.register(renewable=MockReceivedMessage()) auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 auto_lock_renew.register(renewable=MockReceivedMessage()) - time.sleep(12) + time.sleep(3) await auto_lock_renew.shutdown() diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index aa9dc7311efd..07b0febcc9bc 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -477,6 +477,10 @@ async def test_async_session_by_conn_str_receive_handler_with_autolockrenew(self message = Message("{}".format(i), session_id=session_id) await sender.send(message) + results = [] + async def lock_lost_callback(renewable): + results.append(renewable) + renewer = AutoLockRenew() messages = [] async with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5, mode=ReceiveSettleMode.PeekLock, prefetch=20) as session: @@ -498,8 +502,10 @@ async def test_async_session_by_conn_str_receive_handler_with_autolockrenew(self messages.append(message) elif len(messages) == 1: + assert not results await asyncio.sleep(45) print("Second sleep {}".format(session.session.locked_until_utc - utc_now())) + assert not results assert session.session.expired assert isinstance(session.session.auto_renew_error, AutoLockRenewTimeout) try: @@ -509,6 +515,16 @@ async def test_async_session_by_conn_str_receive_handler_with_autolockrenew(self assert isinstance(e.inner_exception, AutoLockRenewTimeout) messages.append(message) + # While we're testing autolockrenew and sessions, let's make sure we don't call the lock-lost callback when a session exits. + renewer.renew_period = 1 + session = None + + async with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5, mode=ReceiveSettleMode.PeekLock, prefetch=10) as receiver: + session = receiver.session + renewer.register(session, timeout=5, on_lock_renew_failure=lock_lost_callback) + await asyncio.sleep(max(0,(session.locked_until_utc - utc_now()).total_seconds()+1)) # If this pattern repeats make sleep_until_expired_async + assert not results + await renewer.shutdown() assert len(messages) == 2 diff --git a/sdk/servicebus/azure-servicebus/tests/mocks.py b/sdk/servicebus/azure-servicebus/tests/mocks.py new file mode 100644 index 000000000000..a9162ef3729c --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/mocks.py @@ -0,0 +1,27 @@ +from datetime import timedelta + +from azure.servicebus._common.utils import utc_now + +class MockReceivedMessage: + def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False): + self._lock_duration = 2 + + self.received_timestamp_utc = utc_now() + self.locked_until_utc = self.received_timestamp_utc + timedelta(seconds=self._lock_duration) + self.settled = False + + self._prevent_renew_lock = prevent_renew_lock + self._exception_on_renew_lock = exception_on_renew_lock + + + def renew_lock(self): + if self._exception_on_renew_lock: + raise Exception("Generated exception via MockReceivedMessage exception_on_renew_lock") + if not self._prevent_renew_lock: + self.locked_until_utc = self.locked_until_utc + timedelta(seconds=self._lock_duration) + + @property + def expired(self): + if self.locked_until_utc and self.locked_until_utc <= utc_now(): + return True + return False \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 2edc6f61af53..480a41847314 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -29,6 +29,7 @@ from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer from servicebus_preparer import CachedServiceBusNamespacePreparer, ServiceBusQueuePreparer, CachedServiceBusQueuePreparer from utilities import get_logger, print_message, sleep_until_expired +from mocks import MockReceivedMessage _logger = get_logger(logging.DEBUG) @@ -1215,19 +1216,60 @@ def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self assert len(messages) == 1 messages[0].complete() - def test_queue_mock_no_reusing_auto_lock_renew(self): - class MockReceivedMessage: - def __init__(self): - self.received_timestamp_utc = utc_now() - self.locked_until_utc = self.received_timestamp_utc + timedelta(seconds=10) - def renew_lock(self): - self.locked_until_utc = self.locked_until_utc + timedelta(seconds=10) + def test_queue_mock_auto_lock_renew_callback(self): + results = [] + def callback_mock(renewable): + results.append(renewable) + + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 # So we can run the test fast. + with auto_lock_renew: # Check that it is called when the object expires for any reason (silent renew failure) + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + time.sleep(3) + assert len(results) and results[-1].expired == True + + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + with auto_lock_renew: # Check that in normal operation it does not get called + auto_lock_renew.register(renewable=MockReceivedMessage(), on_lock_renew_failure=callback_mock) + time.sleep(3) + assert len(results) == 1 + + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + with auto_lock_renew: # Check that when a message is settled, it will not get called even after expiry + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + message.settled = True + time.sleep(3) + assert len(results) == 1 auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + with auto_lock_renew: # Check that it is called when there is an overt renew failure + message = MockReceivedMessage(exception_on_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + time.sleep(3) + assert len(results) == 2 and results[-1].expired == True + + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + with auto_lock_renew: # Check that it is not called when the renewer is shutdown + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + auto_lock_renew.shutdown() + time.sleep(3) + assert len(results) == 2 + + + def test_queue_mock_no_reusing_auto_lock_renew(self): + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 # So we can run the test fast. with auto_lock_renew: auto_lock_renew.register(renewable=MockReceivedMessage()) - time.sleep(12) + time.sleep(3) with pytest.raises(ServiceBusError): with auto_lock_renew: @@ -1237,10 +1279,11 @@ def renew_lock(self): auto_lock_renew.register(renewable=MockReceivedMessage()) auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 with auto_lock_renew: auto_lock_renew.register(renewable=MockReceivedMessage()) - time.sleep(12) + time.sleep(3) auto_lock_renew.shutdown() diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index 3aab098b8275..0c7639bb87af 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -546,10 +546,14 @@ def test_session_by_conn_str_receive_handler_with_autolockrenew(self, servicebus message = Message("{}".format(i), session_id=session_id) sender.send(message) + results = [] + def lock_lost_callback(renewable): + results.append(renewable) + renewer = AutoLockRenew() messages = [] with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5, mode=ReceiveSettleMode.PeekLock, prefetch=10) as receiver: - renewer.register(receiver.session, timeout=60) + renewer.register(receiver.session, timeout=60, on_lock_renew_failure = lock_lost_callback) print("Registered lock renew thread", receiver.session._locked_until_utc, utc_now()) with pytest.raises(SessionLockExpired): for message in receiver: @@ -571,8 +575,10 @@ def test_session_by_conn_str_receive_handler_with_autolockrenew(self, servicebus print("Starting second sleep") time.sleep(40) # ensure renewer expires print("Second sleep {}".format(receiver.session._locked_until_utc - utc_now())) + assert not results sleep_until_expired(receiver.session) # and then ensure it didn't slip a renew under the wire. assert receiver.session.expired + assert not results # Should not callback since it timed out as specified. assert isinstance(receiver.session.auto_renew_error, AutoLockRenewTimeout) try: message.complete() @@ -581,6 +587,16 @@ def test_session_by_conn_str_receive_handler_with_autolockrenew(self, servicebus assert isinstance(e.inner_exception, AutoLockRenewTimeout) messages.append(message) + # While we're testing autolockrenew and sessions, let's make sure we don't call the lock-lost callback when a session exits. + renewer.renew_period = 1 + session = None + + with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5, mode=ReceiveSettleMode.PeekLock, prefetch=10) as receiver: + session = receiver.session + renewer.register(session, timeout=5, on_lock_renew_failure=lock_lost_callback) + sleep_until_expired(receiver.session) + assert not results + renewer.shutdown() assert len(messages) == 2 From c2c855042f83a7bb55121e8963e749ee7d5f1432 Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Thu, 9 Jul 2020 13:50:25 -0700 Subject: [PATCH 02/10] - PR comments; simplifying the callback conditional into a state variable - add a test for receiver shutdown halting autorenewal (and corrosponding mocks) - Add proper typing and documentation to aio code. --- .../servicebus/_common/auto_lock_renewer.py | 20 ++++----- .../aio/_async_auto_lock_renewer.py | 45 ++++++++++--------- .../tests/async_tests/mocks_async.py | 5 +++ .../tests/async_tests/test_queues_async.py | 9 ++++ .../azure-servicebus/tests/mocks.py | 5 +++ .../azure-servicebus/tests/test_queues.py | 9 ++++ 6 files changed, 60 insertions(+), 33 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py index 3cf8a3a829fa..0320e7fb3bad 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py @@ -67,7 +67,7 @@ def _renewable(self, renewable): return False if hasattr(renewable, 'settled') and renewable.settled: return False - if isinstance(renewable, ServiceBusSession) and not renewable._receiver._running: + if not renewable._receiver._running: return False if renewable.expired: return False @@ -76,6 +76,7 @@ def _renewable(self, renewable): def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None): _log.debug("Running lock auto-renew thread for %r seconds", timeout) error = None + clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) try: while self._renewable(renewable): if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): @@ -85,8 +86,10 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure= _log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period) renewable.renew_lock() time.sleep(self.sleep_time) + clean_shutdown = not renewable.expired except AutoLockRenewTimeout as e: renewable.auto_renew_error = e + clean_shutdown = not renewable.expired except Exception as e: # pylint: disable=broad-except _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) error = AutoLockRenewFailed( @@ -94,14 +97,7 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure= inner_exception=e) renewable.auto_renew_error = error finally: - if on_lock_renew_failure: - if self._shutdown.is_set() \ - or (hasattr(renewable, 'settled') and renewable.settled) \ - or (isinstance(renewable, ServiceBusSession) and not renewable._receiver._running) \ - or (not error and not renewable.expired): - # Basically we want to make sure that any of the "intentional" exit cases are not fired on. - # e.g. shutdown, settlement, session closure, renewer timeout (as opposed to renew failure/expiry) - return + if on_lock_renew_failure and not clean_shutdown: on_lock_renew_failure(renewable) def register(self, renewable, timeout=300, on_lock_renew_failure=None): @@ -109,11 +105,11 @@ def register(self, renewable, timeout=300, on_lock_renew_failure=None): :param renewable: A locked entity that needs to be renewed. :type renewable: ~azure.servicebus.ReceivedMessage or - ~azure.servicebus.Session + ~azure.servicebus.ServiceBusSession :param float timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes). - :param Callable[[Union[Session,ReceivedMessage]], None] on_lock_renew_failure: A callback may be specified - to be called when the lock is lost on the renewable that is being registered. + :param Optional[Callable[[Union[~azure.servicebus.ServiceBusSession, ReceivedMessage]], Awaitable[None]]] on_lock_renew_failure: + A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback). """ if self._shutdown.is_set(): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py index 285d54ee0a73..210348f18ddd 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -8,6 +8,7 @@ import logging import datetime import functools +from typing import Optional, Iterable, Any from ._servicebus_session_async import ServiceBusSession from .._common.utils import renewable_start_time, utc_now @@ -24,7 +25,7 @@ class AutoLockRenew: tokens of messages and/or sessions in the background. :param loop: An async event loop. - :type loop: ~asyncio.EventLoop + :type loop: ~asyncio.BaseEventLoop .. admonition:: Example: @@ -44,36 +45,41 @@ class AutoLockRenew: """ - def __init__(self, loop=None): + def __init__(self, loop: Optional[asyncio.BaseEventLoop] = None) -> None: self._shutdown = asyncio.Event() self._futures = [] self.loop = loop or get_running_loop() self.sleep_time = 1 self.renew_period = 10 - async def __aenter__(self): + async def __aenter__(self) -> "AutoLockRenew": if self._shutdown.is_set(): raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" " auto lock renewing.") return self - async def __aexit__(self, *args): + async def __aexit__(self, *args: Iterable[Any]) -> None: await self.shutdown() - def _renewable(self, renewable): + def _renewable(self, renewable: "Union[ReceivedMessage, ServiceBusSession]") -> bool: if self._shutdown.is_set(): return False if hasattr(renewable, 'settled') and renewable.settled: return False if renewable.expired: return False - if isinstance(renewable, ServiceBusSession) and not renewable._receiver._running: + if not renewable._receiver._running: return False return True - async def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None): + async def _auto_lock_renew(self, + renewable: "Union[ReceivedMessage, ServiceBusSession]", + starttime: datetime.datetime, + timeout: int, + on_lock_renew_failure: "Optional[Callable[[Union[ServiceBusSession, ReceivedMessage]], Awaitable[None]]]"=None) -> None: _log.debug("Running async lock auto-renew for %r seconds", timeout) error = None + clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) try: while self._renewable(renewable): if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): @@ -83,8 +89,10 @@ async def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_fa _log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period) await renewable.renew_lock() await asyncio.sleep(self.sleep_time) + clean_shutdown = not renewable.expired except AutoLockRenewTimeout as e: renewable.auto_renew_error = e + clean_shutdown = not renewable.expired except Exception as e: # pylint: disable=broad-except _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) error = AutoLockRenewFailed( @@ -92,26 +100,21 @@ async def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_fa inner_exception=e) renewable.auto_renew_error = error finally: - if on_lock_renew_failure: - if self._shutdown.is_set() \ - or (hasattr(renewable, 'settled') and renewable.settled) \ - or (isinstance(renewable, ServiceBusSession) and not renewable._receiver._running) \ - or (not error and not renewable.expired): - # Basically we want to make sure that any of the "intentional" exit cases are not fired on. - # e.g. shutdown, settlement, session closure, renewer timeout (as opposed to renew failure/expiry) - return + if on_lock_renew_failure and not clean_shutdown: await on_lock_renew_failure(renewable) - def register(self, renewable, timeout=300, on_lock_renew_failure=None): + def register(self, + renewable: "Union[ReceivedMessage, ServiceBusSession]", + timeout: float = 300, + on_lock_renew_failure: "Optional[Callable[[Union[ServiceBusSession, ReceivedMessage]], Awaitable[None]]]" = None) -> None: """Register a renewable entity for automatic lock renewal. :param renewable: A locked entity that needs to be renewed. - :type renewable: ~azure.servicebus.aio.ReceivedMessage or - ~azure.servicebus.aio.Session + :type renewable: Union[~azure.servicebus.aio.ReceivedMessage,~azure.servicebus.aio.ServiceBusSession] :param float timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes). - :param Callable[[Union[Session,ReceivedMessage]], Awaitable[None]] on_lock_renew_failure: A callback may be specified - to be called when the lock is lost on the renewable that is being registered. + :param Optional[Callable[[Union[~azure.servicebus.aio.ServiceBusSession, ReceivedMessage]], Awaitable[None]]] on_lock_renew_failure: + A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback). """ if self._shutdown.is_set(): @@ -121,7 +124,7 @@ def register(self, renewable, timeout=300, on_lock_renew_failure=None): renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), loop=self.loop) self._futures.append(renew_future) - async def shutdown(self): + async def shutdown(self) -> None: """Cancel remaining open lock renewal futures.""" self._shutdown.set() await asyncio.wait(self._futures) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py index e618c74211cd..6d659794dee8 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py @@ -2,6 +2,10 @@ from azure.servicebus._common.utils import utc_now +class MockReceiver: + def __init__(self): + self._running = True + class MockReceivedMessage: def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False): self._lock_duration = 2 @@ -9,6 +13,7 @@ def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False): self.received_timestamp_utc = utc_now() self.locked_until_utc = self.received_timestamp_utc + timedelta(seconds=self._lock_duration) self.settled = False + self._receiver = MockReceiver() self._prevent_renew_lock = prevent_renew_lock self._exception_on_renew_lock = exception_on_renew_lock diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 7e1042bd3f9a..102f04436762 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -1152,6 +1152,15 @@ async def callback_mock(renewable): await asyncio.sleep(3) assert len(results) == 2 + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + async with auto_lock_renew: # Check that it is not called when the receiver is shutdown + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + message._receiver._running = False + await asyncio.sleep(3) + assert len(results) == 2 + @pytest.mark.asyncio async def test_async_queue_mock_no_reusing_auto_lock_renew(self): diff --git a/sdk/servicebus/azure-servicebus/tests/mocks.py b/sdk/servicebus/azure-servicebus/tests/mocks.py index a9162ef3729c..be6cd2fb4c08 100644 --- a/sdk/servicebus/azure-servicebus/tests/mocks.py +++ b/sdk/servicebus/azure-servicebus/tests/mocks.py @@ -2,6 +2,10 @@ from azure.servicebus._common.utils import utc_now +class MockReceiver: + def __init__(self): + self._running = True + class MockReceivedMessage: def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False): self._lock_duration = 2 @@ -9,6 +13,7 @@ def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False): self.received_timestamp_utc = utc_now() self.locked_until_utc = self.received_timestamp_utc + timedelta(seconds=self._lock_duration) self.settled = False + self._receiver = MockReceiver() self._prevent_renew_lock = prevent_renew_lock self._exception_on_renew_lock = exception_on_renew_lock diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 480a41847314..c54e6dde59ab 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1263,6 +1263,15 @@ def callback_mock(renewable): time.sleep(3) assert len(results) == 2 + auto_lock_renew = AutoLockRenew() + auto_lock_renew.renew_period = 1 + with auto_lock_renew: # Check that it is not called when the receiver is shutdown + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + message._receiver._running = False + time.sleep(3) + assert len(results) == 2 + def test_queue_mock_no_reusing_auto_lock_renew(self): auto_lock_renew = AutoLockRenew() From 21fd0ab2ef62e81e598b41f8ddf938a49afbdf43 Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Mon, 13 Jul 2020 09:23:01 -0700 Subject: [PATCH 03/10] Rename autolockrenew shutdown to close to normalize method name with other comparable instances. Adjust tests/docs/guides/etc. Add changelog entry for the on lock renew callback. --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 10 ++++++++-- sdk/servicebus/azure-servicebus/README.md | 4 ++-- .../azure/servicebus/_common/auto_lock_renewer.py | 6 +++--- .../azure/servicebus/aio/_async_auto_lock_renewer.py | 6 +++--- sdk/servicebus/azure-servicebus/migration_guide.md | 5 +++++ .../samples/async_samples/auto_lock_renew_async.py | 4 ++-- .../samples/sync_samples/auto_lock_renew.py | 4 ++-- .../tests/async_tests/test_queues_async.py | 6 +++--- .../tests/async_tests/test_sessions_async.py | 8 ++++---- sdk/servicebus/azure-servicebus/tests/test_queues.py | 6 +++--- sdk/servicebus/azure-servicebus/tests/test_sessions.py | 2 +- 11 files changed, 36 insertions(+), 25 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index a466e31ae059..07942b7f1d39 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -1,10 +1,16 @@ # Release History -## 7.0.0b4 (Unreleased) +## 7.0.0b5 (Unreleased) **New Features** -* Add an on_lock_renew_failure as a parameter to `AutoLockRenew`, taking a callback for when the lock is lost non-intentially (e.g. not via settling, shutdown, or autolockrenew duration completion) +* Add `on_lock_renew_failure` as a parameter to `AutoLockRenew.register`, taking a callback for when the lock is lost non-intentially (e.g. not via settling, shutdown, or autolockrenew duration completion) + +**Breaking Changes** + +* `AutoLockRenew.shutdown` is now `AutoLockRenew.close` to normalize with other equivelent behaviors. + +## 7.0.0b4 (Unreleased) **BugFixes** diff --git a/sdk/servicebus/azure-servicebus/README.md b/sdk/servicebus/azure-servicebus/README.md index 9ab1645f83c7..ca32296558de 100644 --- a/sdk/servicebus/azure-servicebus/README.md +++ b/sdk/servicebus/azure-servicebus/README.md @@ -381,7 +381,7 @@ connstr = os.environ['SERVICE_BUS_CONN_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] session_id = os.environ['SERVICE_BUS_SESSION_ID'] -# Can also be called via "with AutoLockRenew() as renewer" to automate shutdown. +# Can also be called via "with AutoLockRenew() as renewer" to automate closing. renewer = AutoLockRenew() with ServiceBusClient.from_connection_string(connstr) as client: with client.get_queue_session_receiver(queue_name, session_id=session_id) as receiver: @@ -390,7 +390,7 @@ with ServiceBusClient.from_connection_string(connstr) as client: renewer.register(msg, timeout=60) # Do your application logic here msg.complete() -renewer.shutdown() +renewer.close() ``` If for any reason auto-renewal has been interrupted or failed, this can be observed via the `auto_renew_error` property on the object being renewed. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py index 0320e7fb3bad..b5df886108c5 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py @@ -60,7 +60,7 @@ def __enter__(self): return self def __exit__(self, *args): - self.shutdown() + self.close() def _renewable(self, renewable): if self._shutdown.is_set(): @@ -118,8 +118,8 @@ def register(self, renewable, timeout=300, on_lock_renew_failure=None): starttime = renewable_start_time(renewable) self.executor.submit(self._auto_lock_renew, renewable, starttime, timeout, on_lock_renew_failure) - def shutdown(self, wait=True): - """Shutdown the thread pool to clean up any remaining lock renewal threads. + def close(self, wait=True): + """Cease autorenewal by shutting down the thread pool to clean up any remaining lock renewal threads. :param wait: Whether to block until thread pool has shutdown. Default is `True`. :type wait: bool diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py index 210348f18ddd..dcbde144e10a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -59,7 +59,7 @@ async def __aenter__(self) -> "AutoLockRenew": return self async def __aexit__(self, *args: Iterable[Any]) -> None: - await self.shutdown() + await self.close() def _renewable(self, renewable: "Union[ReceivedMessage, ServiceBusSession]") -> bool: if self._shutdown.is_set(): @@ -124,7 +124,7 @@ def register(self, renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), loop=self.loop) self._futures.append(renew_future) - async def shutdown(self) -> None: - """Cancel remaining open lock renewal futures.""" + async def close(self) -> None: + """Cease autorenewal by cancelling any remaining open lock renewal futures.""" self._shutdown.set() await asyncio.wait(self._futures) diff --git a/sdk/servicebus/azure-servicebus/migration_guide.md b/sdk/servicebus/azure-servicebus/migration_guide.md index 1715af743099..6c0adf1bdd21 100644 --- a/sdk/servicebus/azure-servicebus/migration_guide.md +++ b/sdk/servicebus/azure-servicebus/migration_guide.md @@ -76,6 +76,11 @@ semantics with the sender or receiver lifetime. | `azure.servicebus.control_client.ServiceBusService().create_queue(queue_name)` | `azure.servicebus.management.ServiceBusManagementClient().create_queue(queue_name)` | [Create a queue](./samples/sync_samples/mgmt_queue.py) | | `azure.servicebus.ServiceBusClient().list_queues()` | `azure.servicebus.management.ServiceBusManagementClient().list_queues()` | [List queues](./samples/sync_samples/mgmt_queue.py ) | +### Working with AutoLockRenew +| In v0.50 | Equivalent in v7 | Sample | +|---|---|---| +| `azure.servicebus.AutoLockRenew().shutdown()` | `azure.servicebus.AutoLockRenew().close()` | [Close an auto-lock-renewer](./samples/sync_samples/auto_lock_renew.py) | + ## Migration samples diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py index 3e570e9e69e7..f718ada08581 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py @@ -50,7 +50,7 @@ async def renew_lock_on_message_received_from_non_sessionful_entity(): await msg.complete() print('Complete messages.') - await renewer.shutdown() + await renewer.close() async def renew_lock_on_session_of_the_sessionful_entity(): @@ -81,7 +81,7 @@ async def renew_lock_on_session_of_the_sessionful_entity(): await msg.complete() print('Complete messages.') - await renewer.shutdown() + await renewer.close() loop = asyncio.get_event_loop() diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py index d97182411d4d..c174affac74d 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py @@ -49,7 +49,7 @@ def renew_lock_on_message_received_from_non_sessionful_entity(): msg.complete() # Settling the message deregisters it from the AutoLockRenewer print('Complete messages.') - renewer.shutdown() + renewer.close() def renew_lock_on_session_of_the_sessionful_entity(): @@ -82,7 +82,7 @@ def renew_lock_on_session_of_the_sessionful_entity(): print('Complete messages.') - renewer.shutdown() + renewer.close() renew_lock_on_message_received_from_non_sessionful_entity() diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 102f04436762..bb0c98f3da61 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -725,7 +725,7 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_with_autoloc print("Remaining messages", message.locked_until_utc, utc_now()) messages.append(message) await message.complete() - await renewer.shutdown() + await renewer.close() assert len(messages) == 11 @pytest.mark.liveTest @@ -1148,7 +1148,7 @@ async def callback_mock(renewable): async with auto_lock_renew: # Check that it is not called when the renewer is shutdown message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) - await auto_lock_renew.shutdown() + await auto_lock_renew.close() await asyncio.sleep(3) assert len(results) == 2 @@ -1184,7 +1184,7 @@ async def test_async_queue_mock_no_reusing_auto_lock_renew(self): auto_lock_renew.register(renewable=MockReceivedMessage()) time.sleep(3) - await auto_lock_renew.shutdown() + await auto_lock_renew.close() with pytest.raises(ServiceBusError): async with auto_lock_renew: diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index 07b0febcc9bc..21261e9eb0e8 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -525,7 +525,7 @@ async def lock_lost_callback(renewable): await asyncio.sleep(max(0,(session.locked_until_utc - utc_now()).total_seconds()+1)) # If this pattern repeats make sleep_until_expired_async assert not results - await renewer.shutdown() + await renewer.close() assert len(messages) == 2 @@ -626,7 +626,7 @@ async def test_async_session_schedule_message(self, servicebus_namespace_connect assert len(messages) == 1 else: raise Exception("Failed to receive schdeduled message.") - await renewer.shutdown() + await renewer.close() @pytest.mark.liveTest @@ -666,7 +666,7 @@ async def test_async_session_schedule_multiple_messages(self, servicebus_namespa assert len(messages) == 2 else: raise Exception("Failed to receive schdeduled message.") - await renewer.shutdown() + await renewer.close() @pytest.mark.liveTest @@ -700,7 +700,7 @@ async def test_async_session_cancel_scheduled_messages(self, servicebus_namespac print(str(m)) await m.complete() raise - await renewer.shutdown() + await renewer.close() @pytest.mark.liveTest diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index c54e6dde59ab..953252b0b0bb 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -821,7 +821,7 @@ def test_queue_by_queue_client_conn_str_receive_handler_with_autolockrenew(self, print("Remaining messages", message.locked_until_utc, utc_now()) messages.append(message) message.complete() - renewer.shutdown() + renewer.close() assert len(messages) == 11 @pytest.mark.liveTest @@ -1259,7 +1259,7 @@ def callback_mock(renewable): with auto_lock_renew: # Check that it is not called when the renewer is shutdown message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) - auto_lock_renew.shutdown() + auto_lock_renew.close() time.sleep(3) assert len(results) == 2 @@ -1294,7 +1294,7 @@ def test_queue_mock_no_reusing_auto_lock_renew(self): auto_lock_renew.register(renewable=MockReceivedMessage()) time.sleep(3) - auto_lock_renew.shutdown() + auto_lock_renew.close() with pytest.raises(ServiceBusError): with auto_lock_renew: diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index 0c7639bb87af..8e6e9a384658 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -597,7 +597,7 @@ def lock_lost_callback(renewable): sleep_until_expired(receiver.session) assert not results - renewer.shutdown() + renewer.close() assert len(messages) == 2 From c96f1b178c51c4c639779209633a718b37c439c1 Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Mon, 20 Jul 2020 09:09:20 -0700 Subject: [PATCH 04/10] autolockrenewer on_lock_renew_failure should take two parameters, the renwable and the optional error. --- .../azure/servicebus/_common/auto_lock_renewer.py | 4 ++-- .../azure/servicebus/aio/_async_auto_lock_renewer.py | 4 ++-- .../tests/async_tests/test_queues_async.py | 11 ++++++++++- .../tests/async_tests/test_sessions_async.py | 2 +- sdk/servicebus/azure-servicebus/tests/test_queues.py | 11 ++++++++++- .../azure-servicebus/tests/test_sessions.py | 2 +- 6 files changed, 26 insertions(+), 8 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py index b5df886108c5..1c6f7a85290d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py @@ -98,7 +98,7 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure= renewable.auto_renew_error = error finally: if on_lock_renew_failure and not clean_shutdown: - on_lock_renew_failure(renewable) + on_lock_renew_failure(renewable, error) def register(self, renewable, timeout=300, on_lock_renew_failure=None): """Register a renewable entity for automatic lock renewal. @@ -108,7 +108,7 @@ def register(self, renewable, timeout=300, on_lock_renew_failure=None): ~azure.servicebus.ServiceBusSession :param float timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes). - :param Optional[Callable[[Union[~azure.servicebus.ServiceBusSession, ReceivedMessage]], Awaitable[None]]] on_lock_renew_failure: + :param Optional[Callable[[Union[~azure.servicebus.ServiceBusSession, ReceivedMessage], Optional[Exception]], Awaitable[None]]] on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback). """ diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py index dcbde144e10a..89cc133c32ae 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -76,7 +76,7 @@ async def _auto_lock_renew(self, renewable: "Union[ReceivedMessage, ServiceBusSession]", starttime: datetime.datetime, timeout: int, - on_lock_renew_failure: "Optional[Callable[[Union[ServiceBusSession, ReceivedMessage]], Awaitable[None]]]"=None) -> None: + on_lock_renew_failure: "Optional[Callable[[Union[ServiceBusSession, ReceivedMessage], Optional[Exception]], Awaitable[None]]]"=None) -> None: _log.debug("Running async lock auto-renew for %r seconds", timeout) error = None clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) @@ -101,7 +101,7 @@ async def _auto_lock_renew(self, renewable.auto_renew_error = error finally: if on_lock_renew_failure and not clean_shutdown: - await on_lock_renew_failure(renewable) + await on_lock_renew_failure(renewable, error) def register(self, renewable: "Union[ReceivedMessage, ServiceBusSession]", diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 94268c423a4b..7b0bd7f453b7 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -1108,8 +1108,11 @@ async def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_lin @pytest.mark.asyncio async def test_async_queue_mock_auto_lock_renew_callback(self): results = [] - async def callback_mock(renewable): + errors = [] + async def callback_mock(renewable, error): results.append(renewable) + if error: + errors.append(error) auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 # So we can run the test fast. @@ -1118,6 +1121,7 @@ async def callback_mock(renewable): auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) await asyncio.sleep(3) assert len(results) and results[-1].expired == True + assert not errors auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 @@ -1125,6 +1129,7 @@ async def callback_mock(renewable): auto_lock_renew.register(renewable=MockReceivedMessage(), on_lock_renew_failure=callback_mock) await asyncio.sleep(3) assert len(results) == 1 + assert not errors auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 @@ -1134,6 +1139,7 @@ async def callback_mock(renewable): message.settled = True await asyncio.sleep(3) assert len(results) == 1 + assert not errors auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 @@ -1142,6 +1148,7 @@ async def callback_mock(renewable): auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) await asyncio.sleep(3) assert len(results) == 2 and results[-1].expired == True + assert errors[-1] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 @@ -1151,6 +1158,7 @@ async def callback_mock(renewable): await auto_lock_renew.close() await asyncio.sleep(3) assert len(results) == 2 + assert len(errors) == 1 auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 @@ -1160,6 +1168,7 @@ async def callback_mock(renewable): message._receiver._running = False await asyncio.sleep(3) assert len(results) == 2 + assert len(errors) == 1 @pytest.mark.asyncio diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index 48b6612d93ec..22505a7e92be 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -478,7 +478,7 @@ async def test_async_session_by_conn_str_receive_handler_with_autolockrenew(self await sender.send_messages(message) results = [] - async def lock_lost_callback(renewable): + async def lock_lost_callback(renewable, error): results.append(renewable) renewer = AutoLockRenew() diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 34cd6518321e..0e20793c4781 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1219,8 +1219,11 @@ def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self def test_queue_mock_auto_lock_renew_callback(self): results = [] - def callback_mock(renewable): + errors = [] + def callback_mock(renewable, error): results.append(renewable) + if error: + errors.append(error) auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 # So we can run the test fast. @@ -1229,6 +1232,7 @@ def callback_mock(renewable): auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) time.sleep(3) assert len(results) and results[-1].expired == True + assert not errors auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 @@ -1236,6 +1240,7 @@ def callback_mock(renewable): auto_lock_renew.register(renewable=MockReceivedMessage(), on_lock_renew_failure=callback_mock) time.sleep(3) assert len(results) == 1 + assert not errors auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 @@ -1245,6 +1250,7 @@ def callback_mock(renewable): message.settled = True time.sleep(3) assert len(results) == 1 + assert not errors auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 @@ -1253,6 +1259,7 @@ def callback_mock(renewable): auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) time.sleep(3) assert len(results) == 2 and results[-1].expired == True + assert errors[-1] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 @@ -1262,6 +1269,7 @@ def callback_mock(renewable): auto_lock_renew.close() time.sleep(3) assert len(results) == 2 + assert len(errors) == 1 auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 @@ -1271,6 +1279,7 @@ def callback_mock(renewable): message._receiver._running = False time.sleep(3) assert len(results) == 2 + assert len(errors) == 1 def test_queue_mock_no_reusing_auto_lock_renew(self): diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index a441535b2901..abc503c1c145 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -547,7 +547,7 @@ def test_session_by_conn_str_receive_handler_with_autolockrenew(self, servicebus sender.send_messages(message) results = [] - def lock_lost_callback(renewable): + def lock_lost_callback(renewable, error): results.append(renewable) renewer = AutoLockRenew() From 570ec22e283aa55fc4c72a708d46a5e26e28e874 Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Mon, 20 Jul 2020 16:42:56 -0700 Subject: [PATCH 05/10] make unused ivars truly internal (loop, executor) within autolockrenew; make tests be more precise by explicitly clearing results list between trials. --- .../servicebus/_common/auto_lock_renewer.py | 6 ++--- .../aio/_async_auto_lock_renewer.py | 4 +-- .../tests/async_tests/test_queues_async.py | 26 +++++++++++++------ .../azure-servicebus/tests/test_queues.py | 26 +++++++++++++------ 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py index 14a1282d2bed..5885fda7d930 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py @@ -48,7 +48,7 @@ class AutoLockRenew(object): """ def __init__(self, executor=None, max_workers=None): - self.executor = executor or ThreadPoolExecutor(max_workers=max_workers) + self._executor = executor or ThreadPoolExecutor(max_workers=max_workers) self._shutdown = threading.Event() self.sleep_time = 1 self.renew_period = 10 @@ -116,7 +116,7 @@ def register(self, renewable, timeout=300, on_lock_renew_failure=None): raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" " auto lock renewing.") starttime = renewable_start_time(renewable) - self.executor.submit(self._auto_lock_renew, renewable, starttime, timeout, on_lock_renew_failure) + self._executor.submit(self._auto_lock_renew, renewable, starttime, timeout, on_lock_renew_failure) def close(self, wait=True): """Cease autorenewal by shutting down the thread pool to clean up any remaining lock renewal threads. @@ -125,4 +125,4 @@ def close(self, wait=True): :type wait: bool """ self._shutdown.set() - self.executor.shutdown(wait=wait) + self._executor.shutdown(wait=wait) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py index 3808a4fffca6..d173858e9acf 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -48,7 +48,7 @@ class AutoLockRenew: def __init__(self, loop: Optional[asyncio.BaseEventLoop] = None) -> None: self._shutdown = asyncio.Event() self._futures = [] - self.loop = loop or get_running_loop() + self._loop = loop or get_running_loop() self.sleep_time = 1 self.renew_period = 10 @@ -121,7 +121,7 @@ def register(self, raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" " auto lock renewing.") starttime = renewable_start_time(renewable) - renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), loop=self.loop) + renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), loop=self._loop) self._futures.append(renew_future) async def close(self) -> None: diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index a98563d372fc..dd9691735731 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -1124,17 +1124,21 @@ async def callback_mock(renewable, error): message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) await asyncio.sleep(3) - assert len(results) and results[-1]._lock_expired == True + assert len(results) == 1 and results[-1]._lock_expired == True assert not errors + del results[:] + del errors[:] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 async with auto_lock_renew: # Check that in normal operation it does not get called auto_lock_renew.register(renewable=MockReceivedMessage(), on_lock_renew_failure=callback_mock) await asyncio.sleep(3) - assert len(results) == 1 + assert not results assert not errors + del results[:] + del errors[:] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 async with auto_lock_renew: # Check that when a message is settled, it will not get called even after expiry @@ -1142,18 +1146,22 @@ async def callback_mock(renewable, error): auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) message._settled = True await asyncio.sleep(3) - assert len(results) == 1 + assert not results assert not errors + del results[:] + del errors[:] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 async with auto_lock_renew: # Check that it is called when there is an overt renew failure message = MockReceivedMessage(exception_on_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) await asyncio.sleep(3) - assert len(results) == 2 and results[-1]._lock_expired == True + assert len(results) == 1 and results[-1]._lock_expired == True assert errors[-1] + del results[:] + del errors[:] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 async with auto_lock_renew: # Check that it is not called when the renewer is shutdown @@ -1161,9 +1169,11 @@ async def callback_mock(renewable, error): auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) await auto_lock_renew.close() await asyncio.sleep(3) - assert len(results) == 2 - assert len(errors) == 1 + assert not results + assert not errors + del results[:] + del errors[:] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 async with auto_lock_renew: # Check that it is not called when the receiver is shutdown @@ -1171,8 +1181,8 @@ async def callback_mock(renewable, error): auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) message._receiver._running = False await asyncio.sleep(3) - assert len(results) == 2 - assert len(errors) == 1 + assert not results + assert not errors @pytest.mark.asyncio diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 8ec2bee2ba30..4ab6b3a0376d 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1353,17 +1353,21 @@ def callback_mock(renewable, error): message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) time.sleep(3) - assert len(results) and results[-1]._lock_expired == True + assert len(results) == 1 and results[-1]._lock_expired == True assert not errors + del results[:] + del errors[:] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 with auto_lock_renew: # Check that in normal operation it does not get called auto_lock_renew.register(renewable=MockReceivedMessage(), on_lock_renew_failure=callback_mock) time.sleep(3) - assert len(results) == 1 + assert not results assert not errors + del results[:] + del errors[:] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 with auto_lock_renew: # Check that when a message is settled, it will not get called even after expiry @@ -1371,18 +1375,22 @@ def callback_mock(renewable, error): auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) message._settled = True time.sleep(3) - assert len(results) == 1 + assert not results assert not errors + del results[:] + del errors[:] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 with auto_lock_renew: # Check that it is called when there is an overt renew failure message = MockReceivedMessage(exception_on_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) time.sleep(3) - assert len(results) == 2 and results[-1]._lock_expired == True + assert len(results) == 1 and results[-1]._lock_expired == True assert errors[-1] + del results[:] + del errors[:] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 with auto_lock_renew: # Check that it is not called when the renewer is shutdown @@ -1390,9 +1398,11 @@ def callback_mock(renewable, error): auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) auto_lock_renew.close() time.sleep(3) - assert len(results) == 2 - assert len(errors) == 1 + assert not results + assert not errors + del results[:] + del errors[:] auto_lock_renew = AutoLockRenew() auto_lock_renew.renew_period = 1 with auto_lock_renew: # Check that it is not called when the receiver is shutdown @@ -1400,8 +1410,8 @@ def callback_mock(renewable, error): auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) message._receiver._running = False time.sleep(3) - assert len(results) == 2 - assert len(errors) == 1 + assert not results + assert not errors def test_queue_mock_no_reusing_auto_lock_renew(self): From 96b3872e155e17243f389e98be2c911ae60c8112 Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Tue, 21 Jul 2020 09:40:30 -0700 Subject: [PATCH 06/10] pylint/mypy fixes. Adjust line endings and length, use a type alias for the callbacks, differentiate the async and sync autolockrenewer callback docstring. --- .../servicebus/_common/auto_lock_renewer.py | 17 +++++--- .../azure/servicebus/_common/utils.py | 2 - .../aio/_async_auto_lock_renewer.py | 35 +++++++++------ .../azure/servicebus/aio/_async_utils.py | 1 - .../azure-servicebus/samples/README.md | 1 + .../async_samples/auto_lock_renew_async.py | 43 ++++++++++++++++++- .../samples/sync_samples/auto_lock_renew.py | 43 +++++++++++++++++++ 7 files changed, 119 insertions(+), 23 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py index 5885fda7d930..2b7a82e2cef0 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py @@ -4,18 +4,23 @@ # license information. # ------------------------------------------------------------------------- -import sys import datetime import logging import threading import time -import functools from concurrent.futures import ThreadPoolExecutor +from typing import TYPE_CHECKING from .._servicebus_session import ServiceBusSession from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError from .utils import renewable_start_time, utc_now +if TYPE_CHECKING: + from typing import Callable, Union, Optional, Awaitable + from .message import ReceivedMessage + LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ReceivedMessage], + Optional[Exception]], None] + _log = logging.getLogger(__name__) class AutoLockRenew(object): @@ -63,6 +68,7 @@ def __exit__(self, *args): self.close() def _renewable(self, renewable): + # pylint: disable=protected-access if self._shutdown.is_set(): return False if hasattr(renewable, '_settled') and renewable._settled: @@ -74,6 +80,7 @@ def _renewable(self, renewable): return True def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None): + # pylint: disable=protected-access _log.debug("Running lock auto-renew thread for %r seconds", timeout) error = None clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) @@ -87,8 +94,8 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure= renewable.renew_lock() time.sleep(self.sleep_time) clean_shutdown = not renewable._lock_expired - except AutoLockRenewTimeout as e: - renewable.auto_renew_error = e + except AutoLockRenewTimeout as error: + renewable.auto_renew_error = error clean_shutdown = not renewable._lock_expired except Exception as e: # pylint: disable=broad-except _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) @@ -108,7 +115,7 @@ def register(self, renewable, timeout=300, on_lock_renew_failure=None): ~azure.servicebus.ServiceBusSession :param float timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes). - :param Optional[Callable[[Union[~azure.servicebus.ServiceBusSession, ReceivedMessage], Optional[Exception]], Awaitable[None]]] on_lock_renew_failure: + :param Optional[LockRenewFailureCallback] on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback). """ diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 019bf8fc37bf..82fbfd5e3707 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -7,8 +7,6 @@ import sys import datetime import logging -import threading -import time import functools try: from urlparse import urlparse diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py index d173858e9acf..2c48a76fb969 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -7,14 +7,17 @@ import asyncio import logging import datetime -import functools -from typing import Optional, Iterable, Any +from typing import Optional, Iterable, Any, Union, Callable, Awaitable, List +from ._async_message import ReceivedMessage from ._servicebus_session_async import ServiceBusSession from .._common.utils import renewable_start_time, utc_now from ._async_utils import get_running_loop from ..exceptions import AutoLockRenewTimeout, AutoLockRenewFailed, ServiceBusError +AsyncLockRenewFailureCallback = Callable[[Union[ServiceBusSession, ReceivedMessage], + Optional[Exception]], Awaitable[None]] + _log = logging.getLogger(__name__) @@ -47,7 +50,7 @@ class AutoLockRenew: def __init__(self, loop: Optional[asyncio.BaseEventLoop] = None) -> None: self._shutdown = asyncio.Event() - self._futures = [] + self._futures: List[asyncio.Task] = [] self._loop = loop or get_running_loop() self.sleep_time = 1 self.renew_period = 10 @@ -61,7 +64,8 @@ async def __aenter__(self) -> "AutoLockRenew": async def __aexit__(self, *args: Iterable[Any]) -> None: await self.close() - def _renewable(self, renewable: "Union[ReceivedMessage, ServiceBusSession]") -> bool: + def _renewable(self, renewable: Union[ReceivedMessage, ServiceBusSession]) -> bool: + # pylint: disable=protected-access if self._shutdown.is_set(): return False if hasattr(renewable, '_settled') and renewable._settled: @@ -73,10 +77,11 @@ def _renewable(self, renewable: "Union[ReceivedMessage, ServiceBusSession]") -> return True async def _auto_lock_renew(self, - renewable: "Union[ReceivedMessage, ServiceBusSession]", + renewable: Union[ReceivedMessage, ServiceBusSession], starttime: datetime.datetime, - timeout: int, - on_lock_renew_failure: "Optional[Callable[[Union[ServiceBusSession, ReceivedMessage], Optional[Exception]], Awaitable[None]]]"=None) -> None: + timeout: float, + on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None: + # pylint: disable=protected-access _log.debug("Running async lock auto-renew for %r seconds", timeout) error = None clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) @@ -90,8 +95,8 @@ async def _auto_lock_renew(self, await renewable.renew_lock() await asyncio.sleep(self.sleep_time) clean_shutdown = not renewable._lock_expired - except AutoLockRenewTimeout as e: - renewable.auto_renew_error = e + except AutoLockRenewTimeout as error: + renewable.auto_renew_error = error clean_shutdown = not renewable._lock_expired except Exception as e: # pylint: disable=broad-except _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) @@ -104,24 +109,26 @@ async def _auto_lock_renew(self, await on_lock_renew_failure(renewable, error) def register(self, - renewable: "Union[ReceivedMessage, ServiceBusSession]", + renewable: Union[ReceivedMessage, ServiceBusSession], timeout: float = 300, - on_lock_renew_failure: "Optional[Callable[[Union[ServiceBusSession, ReceivedMessage]], Awaitable[None]]]" = None) -> None: + on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None: """Register a renewable entity for automatic lock renewal. :param renewable: A locked entity that needs to be renewed. :type renewable: Union[~azure.servicebus.aio.ReceivedMessage,~azure.servicebus.aio.ServiceBusSession] :param float timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes). - :param Optional[Callable[[Union[~azure.servicebus.aio.ServiceBusSession, ReceivedMessage]], Awaitable[None]]] on_lock_renew_failure: - A callback may be specified to be called when the lock is lost on the renewable that is being registered. + :param Optional[AsyncLockRenewFailureCallback] on_lock_renew_failure: + An async callback may be specified to be called when the lock is lost on the renewable being registered. Default value is None (no callback). """ if self._shutdown.is_set(): raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" " auto lock renewing.") starttime = renewable_start_time(renewable) - renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), loop=self._loop) + renew_future = asyncio.ensure_future( + self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), + loop=self._loop) self._futures.append(renew_future) async def close(self) -> None: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py index 6a126fa91b53..387c7e3afcad 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py @@ -11,7 +11,6 @@ from uamqp import authentication -from .._common.utils import renewable_start_time, utc_now from .._common.constants import ( JWT_TOKEN_SCOPE, TOKEN_TYPE_JWT, diff --git a/sdk/servicebus/azure-servicebus/samples/README.md b/sdk/servicebus/azure-servicebus/samples/README.md index 12dc083032cb..9f9b47b4b25d 100644 --- a/sdk/servicebus/azure-servicebus/samples/README.md +++ b/sdk/servicebus/azure-servicebus/samples/README.md @@ -51,6 +51,7 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp - [auto_lock_renew.py](./sync_samples/auto_lock_renew.py) ([async_version](./async_samples/auto_lock_renew_async.py)) - Examples to show usage of AutoLockRenew: - Automatically renew lock on message received from non-sessionful entity - Automatically renew lock on the session of sessionful entity + - Configure a callback to be triggered on auto lock renew failures. - [mgmt_queue](./sync_samples/mgmt_queue.py) ([async_version](./async_samples/mgmt_queue_async.py)) - Examples to manage queue entities under a given servicebus namespace - Create a queue - Delete a queue diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py index bb05ee358f50..a5914116895e 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py @@ -81,9 +81,50 @@ async def renew_lock_on_session_of_the_sessionful_entity(): await msg.complete() print('Complete messages.') - await renewer.close() + +async def renew_lock_with_lock_renewal_failure_callback(): + servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) + + async with servicebus_client: + async with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender: + await sender.send_messages(Message("message")) + + async with AutoLockRenew() as renewer: + # For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the + # service side message lock duration, to demonstrate failure. Normally, this should not be adjusted. + renewer._sleep_time = 40 + async with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10) as receiver: + + def on_lock_renew_failure_callback(renewable, error): + # If auto-lock-renewal fails, this function will be called. + # If failure is due to an error, the second argument will be populated, otherwise + # it will default to `None`. + # This callback can be an ideal location to log the failure, or take action to safely + # handle any processing on the message or session that was in progress. + print("Intentionally failed to renew lock on {} due to {}".format(renewable, error)) + + received_msgs = await receiver.receive_messages(max_batch_size=1, max_wait_time=5) + + for msg in received_msgs: + # automatically renew the lock on each message for 120 seconds + renewer.register(msg, timeout=90, on_lock_renew_failure=on_lock_renew_failure_callback) + print('Register messages into AutoLockRenew done.') + + # Cause the messages and autorenewal to time out. + # Other reasons for renew failure could include a network or service outage. + await asyncio.sleep(80) + + try: + for msg in received_msgs: + await msg.complete() + except MessageLockExpired as e: + print('Messages cannot be settled if they have timed out. (This is expected)') + + print('Lock renew failure demonstration complete.') + loop = asyncio.get_event_loop() loop.run_until_complete(renew_lock_on_message_received_from_non_sessionful_entity()) loop.run_until_complete(renew_lock_on_session_of_the_sessionful_entity()) +loop.run_until_complete(renew_lock_with_lock_renewal_failure_callback()) \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py index 41a2f9a86ab0..92268bbcf3f3 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py @@ -17,6 +17,7 @@ import time from azure.servicebus import ServiceBusClient, AutoLockRenew, Message +from azure.servicebus.exceptions import MessageLockExpired CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] @@ -85,5 +86,47 @@ def renew_lock_on_session_of_the_sessionful_entity(): renewer.close() +def renew_lock_with_lock_renewal_failure_callback(): + servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) + + with servicebus_client: + with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender: + sender.send_messages(Message("message")) + + with AutoLockRenew() as renewer: + # For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the + # service side message lock duration, to demonstrate failure. Normally, this should not be adjusted. + renewer._sleep_time = 40 + with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10) as receiver: + + def on_lock_renew_failure_callback(renewable, error): + # If auto-lock-renewal fails, this function will be called. + # If failure is due to an error, the second argument will be populated, otherwise + # it will default to `None`. + # This callback can be an ideal location to log the failure, or take action to safely + # handle any processing on the message or session that was in progress. + print("Intentionally failed to renew lock on {} due to {}".format(renewable, error)) + + received_msgs = receiver.receive_messages(max_batch_size=1, max_wait_time=5) + + for msg in received_msgs: + # automatically renew the lock on each message for 120 seconds + renewer.register(msg, timeout=90, on_lock_renew_failure=on_lock_renew_failure_callback) + print('Register messages into AutoLockRenew done.') + + # Cause the messages and autorenewal to time out. + # Other reasons for renew failure could include a network or service outage. + time.sleep(80) + + try: + for msg in received_msgs: + msg.complete() + except MessageLockExpired as e: + print('Messages cannot be settled if they have timed out. (This is expected)') + + print('Lock renew failure demonstration complete.') + + renew_lock_on_message_received_from_non_sessionful_entity() renew_lock_on_session_of_the_sessionful_entity() +renew_lock_with_lock_renewal_failure_callback() \ No newline at end of file From d11215d960f567eb931d513c4595aa2ce52a199c Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Tue, 21 Jul 2020 14:31:44 -0700 Subject: [PATCH 07/10] mypy fixes, add changelog entry, fix async sample, make renew_period and sleep_duration internal. --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 1 + .../servicebus/_common/auto_lock_renewer.py | 15 ++++++------- .../aio/_async_auto_lock_renewer.py | 21 ++++++++++--------- .../async_samples/auto_lock_renew_async.py | 3 ++- .../tests/async_tests/test_queues_async.py | 16 +++++++------- .../tests/async_tests/test_sessions_async.py | 2 +- .../azure-servicebus/tests/test_queues.py | 16 +++++++------- .../azure-servicebus/tests/test_sessions.py | 2 +- 8 files changed, 40 insertions(+), 36 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 7ca8bfe42537..c91e84f27f48 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -29,6 +29,7 @@ **Breaking Changes** +* `AutoLockRenew.sleep_time` and `AutoLockRenew.renew_period` have been made internal as `_sleep_time` and `_renew_period` respectively, as it is not expected a user will have to interact with them. * `AutoLockRenew.shutdown` is now `AutoLockRenew.close` to normalize with other equivelent behaviors. ## 7.0.0b4 (2020-07-06) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py index 2b7a82e2cef0..cb9a939f6c01 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py @@ -55,8 +55,8 @@ class AutoLockRenew(object): def __init__(self, executor=None, max_workers=None): self._executor = executor or ThreadPoolExecutor(max_workers=max_workers) self._shutdown = threading.Event() - self.sleep_time = 1 - self.renew_period = 10 + self._sleep_time = 1 + self._renew_period = 10 def __enter__(self): if self._shutdown.is_set(): @@ -89,13 +89,14 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure= if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): _log.debug("Reached auto lock renew timeout - letting lock expire.") raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout)) - if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self.renew_period): - _log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period) + if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self._renew_period): + _log.debug("%r seconds or less until lock expires - auto renewing.", self._renew_period) renewable.renew_lock() - time.sleep(self.sleep_time) + time.sleep(self._sleep_time) clean_shutdown = not renewable._lock_expired - except AutoLockRenewTimeout as error: - renewable.auto_renew_error = error + except AutoLockRenewTimeout as e: + error = e + renewable.auto_renew_error = e clean_shutdown = not renewable._lock_expired except Exception as e: # pylint: disable=broad-except _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py index 2c48a76fb969..35c6e87b2e07 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -50,10 +50,10 @@ class AutoLockRenew: def __init__(self, loop: Optional[asyncio.BaseEventLoop] = None) -> None: self._shutdown = asyncio.Event() - self._futures: List[asyncio.Task] = [] + self._futures = [] # type: List[asyncio.Future] self._loop = loop or get_running_loop() - self.sleep_time = 1 - self.renew_period = 10 + self._sleep_time = 1 + self._renew_period = 10 async def __aenter__(self) -> "AutoLockRenew": if self._shutdown.is_set(): @@ -68,7 +68,7 @@ def _renewable(self, renewable: Union[ReceivedMessage, ServiceBusSession]) -> bo # pylint: disable=protected-access if self._shutdown.is_set(): return False - if hasattr(renewable, '_settled') and renewable._settled: + if hasattr(renewable, '_settled') and renewable._settled: # type: ignore return False if renewable._lock_expired: return False @@ -83,20 +83,21 @@ async def _auto_lock_renew(self, on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None: # pylint: disable=protected-access _log.debug("Running async lock auto-renew for %r seconds", timeout) - error = None + error = None # type: Optional[Exception] clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) try: while self._renewable(renewable): if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): _log.debug("Reached auto lock renew timeout - letting lock expire.") raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout)) - if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self.renew_period): - _log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period) + if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self._renew_period): + _log.debug("%r seconds or less until lock expires - auto renewing.", self._renew_period) await renewable.renew_lock() - await asyncio.sleep(self.sleep_time) + await asyncio.sleep(self._sleep_time) clean_shutdown = not renewable._lock_expired - except AutoLockRenewTimeout as error: - renewable.auto_renew_error = error + except AutoLockRenewTimeout as e: + error = e + renewable.auto_renew_error = e clean_shutdown = not renewable._lock_expired except Exception as e: # pylint: disable=broad-except _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py index a5914116895e..917eec08f0b0 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py @@ -18,6 +18,7 @@ from azure.servicebus import Message from azure.servicebus.aio import ServiceBusClient, AutoLockRenew +from azure.servicebus.exceptions import MessageLockExpired CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] @@ -95,7 +96,7 @@ async def renew_lock_with_lock_renewal_failure_callback(): renewer._sleep_time = 40 async with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10) as receiver: - def on_lock_renew_failure_callback(renewable, error): + async def on_lock_renew_failure_callback(renewable, error): # If auto-lock-renewal fails, this function will be called. # If failure is due to an error, the second argument will be populated, otherwise # it will default to `None`. diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index dd9691735731..ac9d4cadd3db 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -1119,7 +1119,7 @@ async def callback_mock(renewable, error): errors.append(error) auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 # So we can run the test fast. + auto_lock_renew._renew_period = 1 # So we can run the test fast. async with auto_lock_renew: # Check that it is called when the object expires for any reason (silent renew failure) message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) @@ -1130,7 +1130,7 @@ async def callback_mock(renewable, error): del results[:] del errors[:] auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 async with auto_lock_renew: # Check that in normal operation it does not get called auto_lock_renew.register(renewable=MockReceivedMessage(), on_lock_renew_failure=callback_mock) await asyncio.sleep(3) @@ -1140,7 +1140,7 @@ async def callback_mock(renewable, error): del results[:] del errors[:] auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 async with auto_lock_renew: # Check that when a message is settled, it will not get called even after expiry message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) @@ -1152,7 +1152,7 @@ async def callback_mock(renewable, error): del results[:] del errors[:] auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 async with auto_lock_renew: # Check that it is called when there is an overt renew failure message = MockReceivedMessage(exception_on_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) @@ -1163,7 +1163,7 @@ async def callback_mock(renewable, error): del results[:] del errors[:] auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 async with auto_lock_renew: # Check that it is not called when the renewer is shutdown message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) @@ -1175,7 +1175,7 @@ async def callback_mock(renewable, error): del results[:] del errors[:] auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 async with auto_lock_renew: # Check that it is not called when the receiver is shutdown message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) @@ -1188,7 +1188,7 @@ async def callback_mock(renewable, error): @pytest.mark.asyncio async def test_async_queue_mock_no_reusing_auto_lock_renew(self): auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 async with auto_lock_renew: auto_lock_renew.register(renewable=MockReceivedMessage()) @@ -1202,7 +1202,7 @@ async def test_async_queue_mock_no_reusing_auto_lock_renew(self): auto_lock_renew.register(renewable=MockReceivedMessage()) auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 auto_lock_renew.register(renewable=MockReceivedMessage()) time.sleep(3) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index 2ed984634d2d..99b05b1d2e2a 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -519,7 +519,7 @@ async def lock_lost_callback(renewable, error): messages.append(message) # While we're testing autolockrenew and sessions, let's make sure we don't call the lock-lost callback when a session exits. - renewer.renew_period = 1 + renewer._renew_period = 1 session = None async with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5, mode=ReceiveSettleMode.PeekLock, prefetch=10) as receiver: diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 4ab6b3a0376d..05f4326b3ad9 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1348,7 +1348,7 @@ def callback_mock(renewable, error): errors.append(error) auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 # So we can run the test fast. + auto_lock_renew._renew_period = 1 # So we can run the test fast. with auto_lock_renew: # Check that it is called when the object expires for any reason (silent renew failure) message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) @@ -1359,7 +1359,7 @@ def callback_mock(renewable, error): del results[:] del errors[:] auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 with auto_lock_renew: # Check that in normal operation it does not get called auto_lock_renew.register(renewable=MockReceivedMessage(), on_lock_renew_failure=callback_mock) time.sleep(3) @@ -1369,7 +1369,7 @@ def callback_mock(renewable, error): del results[:] del errors[:] auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 with auto_lock_renew: # Check that when a message is settled, it will not get called even after expiry message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) @@ -1381,7 +1381,7 @@ def callback_mock(renewable, error): del results[:] del errors[:] auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 with auto_lock_renew: # Check that it is called when there is an overt renew failure message = MockReceivedMessage(exception_on_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) @@ -1392,7 +1392,7 @@ def callback_mock(renewable, error): del results[:] del errors[:] auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 with auto_lock_renew: # Check that it is not called when the renewer is shutdown message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) @@ -1404,7 +1404,7 @@ def callback_mock(renewable, error): del results[:] del errors[:] auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 with auto_lock_renew: # Check that it is not called when the receiver is shutdown message = MockReceivedMessage(prevent_renew_lock=True) auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) @@ -1416,7 +1416,7 @@ def callback_mock(renewable, error): def test_queue_mock_no_reusing_auto_lock_renew(self): auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 # So we can run the test fast. + auto_lock_renew._renew_period = 1 # So we can run the test fast. with auto_lock_renew: auto_lock_renew.register(renewable=MockReceivedMessage()) time.sleep(3) @@ -1429,7 +1429,7 @@ def test_queue_mock_no_reusing_auto_lock_renew(self): auto_lock_renew.register(renewable=MockReceivedMessage()) auto_lock_renew = AutoLockRenew() - auto_lock_renew.renew_period = 1 + auto_lock_renew._renew_period = 1 with auto_lock_renew: auto_lock_renew.register(renewable=MockReceivedMessage()) diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index bf020bc83f8a..56037252ccb9 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -616,7 +616,7 @@ def lock_lost_callback(renewable, error): messages.append(message) # While we're testing autolockrenew and sessions, let's make sure we don't call the lock-lost callback when a session exits. - renewer.renew_period = 1 + renewer._renew_period = 1 session = None with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5, mode=ReceiveSettleMode.PeekLock, prefetch=10) as receiver: From 75d82272e9c59924f4e63e2a77dc69b5d5e1947c Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Tue, 21 Jul 2020 14:49:58 -0700 Subject: [PATCH 08/10] remove trailing whitespace. --- .../azure/servicebus/aio/_async_auto_lock_renewer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py index 35c6e87b2e07..be006a36f1c8 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -50,7 +50,7 @@ class AutoLockRenew: def __init__(self, loop: Optional[asyncio.BaseEventLoop] = None) -> None: self._shutdown = asyncio.Event() - self._futures = [] # type: List[asyncio.Future] + self._futures = [] # type: List[asyncio.Future] self._loop = loop or get_running_loop() self._sleep_time = 1 self._renew_period = 10 From f478966b6b72b19234c95164e74d99eca52e50da Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Wed, 22 Jul 2020 11:33:15 -0700 Subject: [PATCH 09/10] increase idle_timeout for receiveanddelete test to avoid flakiness. --- .../azure-servicebus/tests/async_tests/test_queues_async.py | 2 +- sdk/servicebus/azure-servicebus/tests/test_queues.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index ac9d4cadd3db..9ea1051a1003 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -149,7 +149,7 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_receiveandde await sender.send_messages(message) messages = [] - async with sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete, idle_timeout=5) as receiver: + async with sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete, idle_timeout=8) as receiver: async for message in receiver: messages.append(message) with pytest.raises(MessageAlreadySettled): diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 05f4326b3ad9..e90e00f0e313 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -213,7 +213,7 @@ def test_queue_by_queue_client_conn_str_receive_handler_receiveanddelete(self, s messages = [] with sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete, - idle_timeout=5) as receiver: + idle_timeout=8) as receiver: for message in receiver: assert not message.properties assert not message.label From 3ef1a86fe07bc20dd95566e77b2083b91c9d5ce2 Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Wed, 22 Jul 2020 11:52:27 -0700 Subject: [PATCH 10/10] Remove unused imports from utils.py --- .../azure-servicebus/azure/servicebus/_common/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index cf8fea5fbd28..b5ba7f35ef16 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -17,7 +17,7 @@ from uamqp import authentication, types -from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError +from ..exceptions import ServiceBusError from .._version import VERSION from .constants import ( JWT_TOKEN_SCOPE,