diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/sender.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/sender.py index 9cba87f2bc71..ea6f202bfd0f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/sender.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/sender.py @@ -7,6 +7,7 @@ import uuid import logging import time +from threading import Lock from ._encode import encode_payload from .link import Link @@ -45,6 +46,7 @@ def __init__(self, session, handle, target_address, **kwargs): kwargs["source_address"] = "sender-link-{}".format(name) super(SenderLink, self).__init__(session, handle, name, role, target_address=target_address, **kwargs) self._pending_deliveries = [] + self.lock = Lock() @classmethod def from_incoming_frame(cls, session, handle, frame): @@ -139,21 +141,24 @@ def _on_session_state_change(self): super()._on_session_state_change() def update_pending_deliveries(self): - if self.current_link_credit <= 0: - self.current_link_credit = self.link_credit - self._outgoing_flow() - now = time.time() - pending = [] - for delivery in self._pending_deliveries: - if delivery.timeout and (now - delivery.start) >= delivery.timeout: - delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None) - continue - if not delivery.sent: - sent_and_settled = self._outgoing_transfer(delivery) - if sent_and_settled: + # TODO: Temporary fix until connection.listen removed from keep alive thread. + with self.lock: + if self.current_link_credit <= 0: + self.current_link_credit = self.link_credit + self._outgoing_flow() + now = time.time() + pending = [] + + for delivery in self._pending_deliveries: + if delivery.timeout and (now - delivery.start) >= delivery.timeout: + delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None) continue - pending.append(delivery) - self._pending_deliveries = pending + if not delivery.sent: + sent_and_settled = self._outgoing_transfer(delivery) + if sent_and_settled: + continue + pending.append(delivery) + self._pending_deliveries = pending def send_transfer(self, message, *, send_async=False, **kwargs): self._check_if_closed() diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 20e06ddce6d5..e956b0c578b9 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -9,6 +9,8 @@ ### Bugs Fixed +- Fixed a bug where sending large messages with synchronous client caused a frame buffer offset error ([#37916](https://github.com/Azure/azure-sdk-for-python/issues/37916)) + ### Other Changes ## 7.13.0 (2024-11-12) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/sender.py index 9cba87f2bc71..ea6f202bfd0f 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/sender.py @@ -7,6 +7,7 @@ import uuid import logging import time +from threading import Lock from ._encode import encode_payload from .link import Link @@ -45,6 +46,7 @@ def __init__(self, session, handle, target_address, **kwargs): kwargs["source_address"] = "sender-link-{}".format(name) super(SenderLink, self).__init__(session, handle, name, role, target_address=target_address, **kwargs) self._pending_deliveries = [] + self.lock = Lock() @classmethod def from_incoming_frame(cls, session, handle, frame): @@ -139,21 +141,24 @@ def _on_session_state_change(self): super()._on_session_state_change() def update_pending_deliveries(self): - if self.current_link_credit <= 0: - self.current_link_credit = self.link_credit - self._outgoing_flow() - now = time.time() - pending = [] - for delivery in self._pending_deliveries: - if delivery.timeout and (now - delivery.start) >= delivery.timeout: - delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None) - continue - if not delivery.sent: - sent_and_settled = self._outgoing_transfer(delivery) - if sent_and_settled: + # TODO: Temporary fix until connection.listen removed from keep alive thread. + with self.lock: + if self.current_link_credit <= 0: + self.current_link_credit = self.link_credit + self._outgoing_flow() + now = time.time() + pending = [] + + for delivery in self._pending_deliveries: + if delivery.timeout and (now - delivery.start) >= delivery.timeout: + delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None) continue - pending.append(delivery) - self._pending_deliveries = pending + if not delivery.sent: + sent_and_settled = self._outgoing_transfer(delivery) + if sent_and_settled: + continue + pending.append(delivery) + self._pending_deliveries = pending def send_transfer(self, message, *, send_async=False, **kwargs): self._check_if_closed() diff --git a/sdk/servicebus/azure-servicebus/tests/test_topic.py b/sdk/servicebus/azure-servicebus/tests/test_topic.py index e0a799403efa..d07632116921 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_topic.py +++ b/sdk/servicebus/azure-servicebus/tests/test_topic.py @@ -6,6 +6,9 @@ import logging import pytest +import time +import json +import sys from devtools_testutils import AzureMgmtRecordedTestCase, RandomNameResourceGroupPreparer, get_credential @@ -36,7 +39,7 @@ class TestServiceBusTopics(AzureMgmtRecordedTestCase): @CachedServiceBusTopicPreparer(name_prefix="servicebustest") @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids) @ArgPasser() - def test_topic_by_servicebus_client_conn_str_send_basic( + def test_topic_by_servicebus_client_send_basic( self, uamqp_transport, *, servicebus_namespace=None, servicebus_topic=None, **kwargs ): fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}" @@ -58,15 +61,15 @@ def test_topic_by_servicebus_client_conn_str_send_basic( @CachedServiceBusTopicPreparer(name_prefix="servicebustest") @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids) @ArgPasser() - def test_topic_by_sas_token_credential_conn_str_send_basic( - self, - uamqp_transport, - *, - servicebus_namespace=None, - servicebus_namespace_key_name=None, - servicebus_namespace_primary_key=None, - servicebus_topic=None, - **kwargs, + def test_topic_by_sas_token_credential_send_basic( + self, + uamqp_transport, + *, + servicebus_namespace=None, + servicebus_namespace_key_name=None, + servicebus_namespace_primary_key=None, + servicebus_topic=None, + **kwargs ): fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}" with ServiceBusClient( @@ -111,3 +114,49 @@ def test_topic_by_servicebus_client_list_topics( topics = client.list_topics() assert len(topics) >= 1 # assert all(isinstance(t, TopicClient) for t in topics) + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusTopicPreparer(name_prefix='servicebustest') + @pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids) + @ArgPasser() + def test_topic_by_servicebus_client_send_large_messages_w_sleep(self, uamqp_transport, *, servicebus_namespace=None, servicebus_topic=None, **kwargs): + fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}" + credential = get_credential() + + # message of 100 kb - requires multiple transfer frames + size = 100 + large_dict = { + "key": "A" * 1024 + } + for i in range(size): + large_dict[f"key_{i}"] = "A" * 1024 + body = json.dumps(large_dict) + + sb_client = ServiceBusClient( + fully_qualified_namespace=fully_qualified_namespace, + credential=credential, + logging_enable=True, + uamqp_transport=uamqp_transport + ) + + # This issue doesn't repro unless logging is added here w/ this socket timeout, + # seemingly due to slowing down and some threading behavior. + # Adding in the logging here to make sure this bug is being hit and tested. + sender = sb_client.get_topic_sender(servicebus_topic.name, socket_timeout=60) + for i in range(10): + try: + time.sleep(10) + logging.info("sender created for %d", i) + size_in_bytes = sys.getsizeof(body) + + # Convert bytes to kilobytes (KB) + size_in_kb = size_in_bytes / 1024 + logging.info(f"size of body: {size_in_kb:.2f} KB") + sender.send_messages(ServiceBusMessage(body)) + logging.info(f"Message sent %d successfully", i) + except Exception as e: + logging.error(f"Error sending message %d: %s", i, str(e)) + raise