From 4156ac238ecb0be68ae910b455b67463002f4082 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" <47871814+yunhaoling@users.noreply.github.com> Date: Mon, 30 Sep 2019 14:02:29 -0700 Subject: [PATCH 1/9] Runtime metric (#95) * Init commit for desired compatibility * runtime metric init commit * Small fix of runtime receiver metric * Fix pylint error * Update according to review * update link destroy * Remove offered capabilities for now * add sample/test code * Update test --- .../test_azure_event_hubs_receive_async.py | 34 +++++++++- samples/test_azure_event_hubs_receive.py | 44 +++++++++++-- src/link.pyx | 11 ++++ .../azure-uamqp-c/inc/azure_uamqp_c/link.h | 2 + src/vendor/azure-uamqp-c/src/link.c | 65 +++++++++++++++++++ src/vendor/inc/c_link.pxd | 2 + uamqp/__init__.py | 10 +-- uamqp/async_ops/client_async.py | 1 + uamqp/async_ops/receiver_async.py | 4 +- uamqp/client.py | 4 +- uamqp/receiver.py | 5 +- 11 files changed, 166 insertions(+), 16 deletions(-) diff --git a/samples/asynctests/test_azure_event_hubs_receive_async.py b/samples/asynctests/test_azure_event_hubs_receive_async.py index 8b34cf79f..3bea85c6e 100644 --- a/samples/asynctests/test_azure_event_hubs_receive_async.py +++ b/samples/asynctests/test_azure_event_hubs_receive_async.py @@ -11,8 +11,7 @@ import sys import uamqp -from uamqp import address -from uamqp import authentication +from uamqp import address, types, utils, authentication def get_logger(level): @@ -129,6 +128,35 @@ async def test_event_hubs_batch_receive_async(live_eventhub_config): log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number'))) +@pytest.mark.asyncio +async def test_event_hubs_receive_with_runtime_metric_async(live_eventhub_config): + uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub']) + sas_auth = authentication.SASTokenAsync.from_shared_access_key( + uri, live_eventhub_config['key_name'], live_eventhub_config['access_key']) + source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format( + live_eventhub_config['hostname'], + live_eventhub_config['event_hub'], + live_eventhub_config['consumer_group'], + live_eventhub_config['partition']) + + receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric' + symbol_array = [types.AMQPSymbol(receiver_runtime_metric_symbol)] + desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array)) + + async with uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=1000, prefetch=10, + desired_capabilities=desired_capabilities) as receive_client: + message_batch = await receive_client.receive_message_batch_async(10) + log.info("got batch: {}".format(len(message_batch))) + for message in message_batch: + annotations = message.annotations + delivery_annotations = message.delivery_annotations + log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number'))) + assert b'last_enqueued_sequence_number' in delivery_annotations + assert b'last_enqueued_offset' in delivery_annotations + assert b'last_enqueued_time_utc' in delivery_annotations + assert b'runtime_info_retrieval_time_utc' in delivery_annotations + + @pytest.mark.asyncio async def test_event_hubs_shared_connection_async(live_eventhub_config): pytest.skip("Unstable on OSX and Linux - need to fix") # TODO @@ -158,6 +186,7 @@ async def test_event_hubs_shared_connection_async(live_eventhub_config): await partition_0.close_async() await partition_1.close_async() + async def receive_ten(partition, receiver): messages = [] count = 0 @@ -170,6 +199,7 @@ async def receive_ten(partition, receiver): print("Finished receiving on partition {}".format(partition)) return messages + @pytest.mark.asyncio async def test_event_hubs_multiple_receiver_async(live_eventhub_config): pytest.skip("Unstable on OSX and Linux - need to fix") # TODO diff --git a/samples/test_azure_event_hubs_receive.py b/samples/test_azure_event_hubs_receive.py index a960f7304..b1a9ff5d2 100644 --- a/samples/test_azure_event_hubs_receive.py +++ b/samples/test_azure_event_hubs_receive.py @@ -7,7 +7,6 @@ import logging import os import pytest -import time import sys try: from urllib import quote_plus #Py2 @@ -15,8 +14,7 @@ from urllib.parse import quote_plus import uamqp -from uamqp import address, errors -from uamqp import authentication +from uamqp import address, types, utils, authentication def get_logger(level): @@ -31,12 +29,14 @@ def get_logger(level): log = get_logger(logging.INFO) + def get_plain_auth(config): return authentication.SASLPlain( config['hostname'], config['key_name'], config['access_key']) + def test_event_hubs_simple_receive(live_eventhub_config): source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format( live_eventhub_config['hostname'], @@ -86,7 +86,6 @@ def test_event_hubs_single_batch_receive(live_eventhub_config): def test_event_hubs_client_proxy_settings(live_eventhub_config): - #pytest.skip("") proxy_settings={'proxy_hostname':'127.0.0.1', 'proxy_port': 12345} uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub']) sas_auth = authentication.SASTokenAuth.from_shared_access_key( @@ -98,11 +97,10 @@ def test_event_hubs_client_proxy_settings(live_eventhub_config): live_eventhub_config['consumer_group'], live_eventhub_config['partition']) - #if not sys.platform.startswith('darwin'): # Not sure why this passes for OSX: - # with pytest.raises(errors.AMQPConnectionError): with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=50, prefetch=50) as receive_client: receive_client.receive_message_batch(max_batch_size=10) + def test_event_hubs_client_receive_sync(live_eventhub_config): uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub']) sas_auth = authentication.SASTokenAuth.from_shared_access_key( @@ -128,6 +126,40 @@ def test_event_hubs_client_receive_sync(live_eventhub_config): log.info("Finished receiving") +def test_event_hubs_client_receive_with_runtime_metric_sync(live_eventhub_config): + uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub']) + sas_auth = authentication.SASTokenAuth.from_shared_access_key( + uri, live_eventhub_config['key_name'], live_eventhub_config['access_key']) + + source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format( + live_eventhub_config['hostname'], + live_eventhub_config['event_hub'], + live_eventhub_config['consumer_group'], + live_eventhub_config['partition']) + + receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric' + symbol_array = [types.AMQPSymbol(receiver_runtime_metric_symbol)] + desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array)) + + with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=50, prefetch=50, + desired_capabilities=desired_capabilities) as receive_client: + log.info("Created client, receiving...") + with pytest.raises(ValueError): + batch = receive_client.receive_message_batch(max_batch_size=100) + batch = receive_client.receive_message_batch(max_batch_size=10) + log.info("Got batch: {}".format(len(batch))) + assert len(batch) <= 10 + for message in batch: + annotations = message.annotations + delivery_annotations = message.delivery_annotations + log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number'))) + assert b'last_enqueued_sequence_number' in delivery_annotations + assert b'last_enqueued_offset' in delivery_annotations + assert b'last_enqueued_time_utc' in delivery_annotations + assert b'runtime_info_retrieval_time_utc' in delivery_annotations + log.info("Finished receiving") + + def test_event_hubs_callback_receive_sync(live_eventhub_config): def on_message_received(message): diff --git a/src/link.pyx b/src/link.pyx index 3ec0cb4be..fa8967f1c 100644 --- a/src/link.pyx +++ b/src/link.pyx @@ -136,6 +136,13 @@ cdef class cLink(StructBase): self._value_error() return value + @property + def desired_capabilities(self): + cdef c_amqpvalue.AMQP_VALUE value + if c_link.link_get_desired_capabilities(self._c_value, &value) != 0: + self._value_error() + return value_factory(value) + cpdef set_prefetch_count(self, stdint.uint32_t prefetch): if c_link.link_set_max_link_credit(self._c_value, prefetch) != 0: self._value_error("Unable to set link credit.") @@ -144,6 +151,10 @@ cdef class cLink(StructBase): if c_link.link_set_attach_properties(self._c_value, properties._c_value) != 0: self._value_error("Unable to set link attach properties.") + cpdef set_desired_capabilities(self, AMQPValue desired_capabilities): + if c_link.link_set_desired_capabilities(self._c_value, desired_capabilities._c_value) != 0: + self._value_error("Unable to set link desired capabilities.") + #### Callback diff --git a/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/link.h b/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/link.h index 542983740..7b154b766 100644 --- a/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/link.h +++ b/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/link.h @@ -67,6 +67,8 @@ MOCKABLE_FUNCTION(, int, link_set_max_message_size, LINK_HANDLE, link, uint64_t, MOCKABLE_FUNCTION(, int, link_get_max_message_size, LINK_HANDLE, link, uint64_t*, max_message_size); MOCKABLE_FUNCTION(, int, link_get_peer_max_message_size, LINK_HANDLE, link, uint64_t*, peer_max_message_size); MOCKABLE_FUNCTION(, int, link_set_attach_properties, LINK_HANDLE, link, fields, attach_properties); +MOCKABLE_FUNCTION(, int, link_set_desired_capabilities, LINK_HANDLE, link, AMQP_VALUE, desired_capabilities); +MOCKABLE_FUNCTION(, int, link_get_desired_capabilities, LINK_HANDLE, link, AMQP_VALUE*, desired_capabilities); MOCKABLE_FUNCTION(, int, link_set_max_link_credit, LINK_HANDLE, link, uint32_t, max_link_credit); MOCKABLE_FUNCTION(, int, link_get_name, LINK_HANDLE, link, const char**, link_name); MOCKABLE_FUNCTION(, int, link_get_received_message_id, LINK_HANDLE, link, delivery_number*, message_id); diff --git a/src/vendor/azure-uamqp-c/src/link.c b/src/vendor/azure-uamqp-c/src/link.c index 6cc067d83..4b4fb8d55 100644 --- a/src/vendor/azure-uamqp-c/src/link.c +++ b/src/vendor/azure-uamqp-c/src/link.c @@ -61,6 +61,7 @@ typedef struct LINK_INSTANCE_TAG uint32_t max_link_credit; uint32_t available; fields attach_properties; + AMQP_VALUE desired_capabilities; bool is_underlying_session_begun; bool is_closed; unsigned char* received_payload; @@ -278,6 +279,15 @@ static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, rol { (void)attach_set_properties(attach, link->attach_properties); } + + if (link->desired_capabilities != NULL) + { + if(attach_set_desired_capabilities(attach, link->desired_capabilities) != 0) + { + LogError("Cannot set attach desired capabilities"); + result = __FAILURE__; + } + } if (role == role_sender) { @@ -725,6 +735,7 @@ LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQ result->is_underlying_session_begun = false; result->is_closed = false; result->attach_properties = NULL; + result->desired_capabilities = NULL; result->received_payload = NULL; result->received_payload_size = 0; result->received_delivery_id = 0; @@ -808,6 +819,7 @@ LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HAND result->is_underlying_session_begun = false; result->is_closed = false; result->attach_properties = NULL; + result->desired_capabilities = NULL; result->received_payload = NULL; result->received_payload_size = 0; result->received_delivery_id = 0; @@ -1083,6 +1095,59 @@ int link_get_peer_max_message_size(LINK_HANDLE link, uint64_t* peer_max_message_ return result; } +int link_get_desired_capabilities(LINK_HANDLE link, AMQP_VALUE* desired_capabilities) +{ + int result; + if((link == NULL) || + (desired_capabilities == NULL)) + { + LogError("Bad arguments: link = %p, desired_capabilities = %p", + link, desired_capabilities); + result = __FAILURE__; + } + else + { + AMQP_VALUE link_desired_capabilties = amqpvalue_clone(link->desired_capabilities); + if(link_desired_capabilties == NULL) + { + LogError("Failed to clone link desired capabilities"); + result = __FAILURE__; + } + else + { + *desired_capabilities = link_desired_capabilties; + result = 0; + } + } + return result; +} + +int link_set_desired_capabilities(LINK_HANDLE link, AMQP_VALUE desired_capabilities) +{ + int result; + + if (link == NULL) + { + LogError("NULL link"); + result = __FAILURE__; + } + else + { + link->desired_capabilities = amqpvalue_clone(desired_capabilities); + if (link->desired_capabilities == NULL) + { + LogError("Failed cloning desired capabilities"); + result = __FAILURE__; + } + else + { + result = 0; + } + } + + return result; +} + int link_set_attach_properties(LINK_HANDLE link, fields attach_properties) { int result; diff --git a/src/vendor/inc/c_link.pxd b/src/vendor/inc/c_link.pxd index 93b3b6ee9..abb5dd871 100644 --- a/src/vendor/inc/c_link.pxd +++ b/src/vendor/inc/c_link.pxd @@ -52,6 +52,8 @@ cdef extern from "azure_uamqp_c/link.h": int link_get_max_message_size(LINK_HANDLE link, stdint.uint64_t* max_message_size) int link_get_peer_max_message_size(LINK_HANDLE link, stdint.uint64_t* peer_max_message_size) int link_set_attach_properties(LINK_HANDLE link, c_amqp_definitions.fields attach_properties) + int link_set_desired_capabilities(LINK_HANDLE link, c_amqpvalue.AMQP_VALUE desired_capabilities) + int link_get_desired_capabilities(LINK_HANDLE link, c_amqpvalue.AMQP_VALUE* desired_capabilities) int link_get_name(LINK_HANDLE link, const char** link_name) int link_get_received_message_id(LINK_HANDLE link, c_amqp_definitions.delivery_number* message_id) diff --git a/uamqp/__init__.py b/uamqp/__init__.py index b344d747e..2c7aaacad 100644 --- a/uamqp/__init__.py +++ b/uamqp/__init__.py @@ -33,7 +33,7 @@ pass # Async not supported. -__version__ = "1.2.2" +__version__ = "1.2.3" _logger = logging.getLogger(__name__) @@ -64,8 +64,8 @@ def send_message(target, data, auth=None, debug=False): """ message = data if isinstance(data, Message) else Message(body=data) with SendClient(target, auth=auth, debug=debug) as send_client: - send_client.queue_message(message) - return send_client.send_all_messages() + send_client.queue_message(message) # pylint: disable=no-member + return send_client.send_all_messages() # pylint: disable=no-member def receive_message(source, auth=None, timeout=0, debug=False): @@ -123,8 +123,8 @@ def receive_messages(source, auth=None, max_batch_size=None, timeout=0, debug=Fa if max_batch_size: kwargs['prefetch'] = max_batch_size with ReceiveClient(source, auth=auth, debug=debug, **kwargs) as receive_client: - return receive_client.receive_message_batch( - max_batch_size=max_batch_size or receive_client._prefetch, timeout=timeout) # pylint: disable=protected-access + return receive_client.receive_message_batch( # pylint: disable=no-member + max_batch_size=max_batch_size or receive_client._prefetch, timeout=timeout) # pylint: disable=protected-access, no-member class _Platform(object): diff --git a/uamqp/async_ops/client_async.py b/uamqp/async_ops/client_async.py index a0cde6cfc..af7daee95 100644 --- a/uamqp/async_ops/client_async.py +++ b/uamqp/async_ops/client_async.py @@ -807,6 +807,7 @@ async def _client_ready_async(self): properties=self._link_properties, error_policy=self._error_policy, encoding=self._encoding, + desired_capabilities=self._desired_capabilities, loop=self.loop) await asyncio.shield(self.message_handler.open_async()) return False diff --git a/uamqp/async_ops/receiver_async.py b/uamqp/async_ops/receiver_async.py index 73d3aa335..85130761c 100644 --- a/uamqp/async_ops/receiver_async.py +++ b/uamqp/async_ops/receiver_async.py @@ -80,6 +80,7 @@ def __init__(self, session, source, target, error_policy=None, debug=False, encoding='UTF-8', + desired_capabilities=None, loop=None): self.loop = loop or get_running_loop() super(MessageReceiverAsync, self).__init__( @@ -93,7 +94,8 @@ def __init__(self, session, source, target, properties=properties, error_policy=error_policy, debug=debug, - encoding=encoding) + encoding=encoding, + desired_capabilities=desired_capabilities) async def __aenter__(self): """Open the MessageReceiver in an async context manager.""" diff --git a/uamqp/client.py b/uamqp/client.py index 769003c4a..90569c691 100644 --- a/uamqp/client.py +++ b/uamqp/client.py @@ -136,6 +136,7 @@ def __init__( # Link settings self._send_settle_mode = kwargs.pop('send_settle_mode', None) or constants.SenderSettleMode.Unsettled self._receive_settle_mode = kwargs.pop('receive_settle_mode', None) or constants.ReceiverSettleMode.PeekLock + self._desired_capabilities = kwargs.pop('desired_capabilities', None) # AMQP object settings self.message_handler = None @@ -915,7 +916,8 @@ def _client_ready(self): max_message_size=self._max_message_size, properties=self._link_properties, error_policy=self._error_policy, - encoding=self._encoding) + encoding=self._encoding, + desired_capabilities=self._desired_capabilities) self.message_handler.open() return False if self.message_handler.get_state() == constants.MessageReceiverState.Error: diff --git a/uamqp/receiver.py b/uamqp/receiver.py index d44f9fd97..91d35732f 100644 --- a/uamqp/receiver.py +++ b/uamqp/receiver.py @@ -80,7 +80,8 @@ def __init__(self, session, source, target, properties=None, error_policy=None, debug=False, - encoding='UTF-8'): + encoding='UTF-8', + desired_capabilities=None): # pylint: disable=protected-access if name: self.name = name.encode(encoding) if isinstance(name, six.text_type) else name @@ -109,6 +110,8 @@ def __init__(self, session, source, target, self.send_settle_mode = send_settle_mode if max_message_size: self.max_message_size = max_message_size + if desired_capabilities: + self._link.set_desired_capabilities(desired_capabilities) self._receiver = c_uamqp.create_message_receiver(self._link, self) self._receiver.set_trace(debug) From d2c88f4aa577bd00b60a571f3dfcc576fe66c22f Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" <47871814+yunhaoling@users.noreply.github.com> Date: Mon, 30 Sep 2019 15:58:47 -0700 Subject: [PATCH 2/9] Share CBS Session (#89) * Share CBS Session (#88) * initial _received_messages object * Use flag to control the streaming behavior * Update comments and code * release_async isn't a sync function * Add missing module * Wait too short cause the receive client to close so that the test would fail * Remove unnecessary queue assignment * remove unused import module * Update timeout as the unit is milliseconds * Update for 1.2.3 (#91) * review update (#97) --- HISTORY.rst | 5 ++ .../test_azure_event_hubs_receive_async.py | 18 ++-- samples/test_azure_event_hubs_receive.py | 10 +-- samples/test_azure_iothub_receive2.py | 2 +- uamqp/async_ops/client_async.py | 85 +++++++++--------- uamqp/client.py | 89 ++++++++++--------- 6 files changed, 111 insertions(+), 98 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 79e93c8f4..5c6b5e5e1 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ Release History =============== +1.2.3 (2019-09-09) +++++++++++++++++++ + +- Fixed bug in dropping recevied messages when the connection just started working. + 1.2.2 (2019-07-02) ++++++++++++++++++ diff --git a/samples/asynctests/test_azure_event_hubs_receive_async.py b/samples/asynctests/test_azure_event_hubs_receive_async.py index 3bea85c6e..55f6de6a6 100644 --- a/samples/asynctests/test_azure_event_hubs_receive_async.py +++ b/samples/asynctests/test_azure_event_hubs_receive_async.py @@ -44,7 +44,7 @@ async def test_event_hubs_callback_async_receive(live_eventhub_config): live_eventhub_config['consumer_group'], live_eventhub_config['partition']) - receive_client = uamqp.ReceiveClientAsync(source, auth=sas_auth, timeout=10, prefetch=10) + receive_client = uamqp.ReceiveClientAsync(source, auth=sas_auth, timeout=1000, prefetch=10) log.info("Created client, receiving...") await receive_client.receive_messages_async(on_message_received) log.info("Finished receiving") @@ -64,7 +64,7 @@ async def test_event_hubs_filter_receive_async(live_eventhub_config): source = address.Source(source_url) source.set_filter(b"amqp.annotation.x-opt-enqueuedtimeutc > 1518731960545") - receive_client = uamqp.ReceiveClientAsync(source, auth=plain_auth, timeout=50) + receive_client = uamqp.ReceiveClientAsync(source, auth=plain_auth, timeout=5000) await receive_client.receive_messages_async(on_message_received) @@ -79,7 +79,7 @@ async def test_event_hubs_iter_receive_async(live_eventhub_config): live_eventhub_config['consumer_group'], live_eventhub_config['partition']) - receive_client = uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=1000, prefetch=10) + receive_client = uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=3000, prefetch=10) count = 0 message_generator = receive_client.receive_messages_iter_async() async for message in message_generator: @@ -110,7 +110,7 @@ async def test_event_hubs_batch_receive_async(live_eventhub_config): live_eventhub_config['consumer_group'], live_eventhub_config['partition']) - async with uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=1000, prefetch=10) as receive_client: + async with uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=3000, prefetch=10) as receive_client: message_batch = await receive_client.receive_message_batch_async(10) log.info("got batch: {}".format(len(message_batch))) for message in message_batch: @@ -159,7 +159,6 @@ async def test_event_hubs_receive_with_runtime_metric_async(live_eventhub_config @pytest.mark.asyncio async def test_event_hubs_shared_connection_async(live_eventhub_config): - pytest.skip("Unstable on OSX and Linux - need to fix") # TODO uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub']) sas_auth = authentication.SASTokenAsync.from_shared_access_key( uri, live_eventhub_config['key_name'], live_eventhub_config['access_key']) @@ -169,8 +168,8 @@ async def test_event_hubs_shared_connection_async(live_eventhub_config): live_eventhub_config['consumer_group']) async with uamqp.ConnectionAsync(live_eventhub_config['hostname'], sas_auth, debug=False) as conn: - partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=False, auth=sas_auth, timeout=1000, prefetch=10) - partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=False, auth=sas_auth, timeout=1000, prefetch=10) + partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=False, auth=sas_auth, timeout=3000, prefetch=10) + partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=False, auth=sas_auth, timeout=3000, prefetch=10) await partition_0.open_async(connection=conn) await partition_1.open_async(connection=conn) tasks = [ @@ -202,7 +201,6 @@ async def receive_ten(partition, receiver): @pytest.mark.asyncio async def test_event_hubs_multiple_receiver_async(live_eventhub_config): - pytest.skip("Unstable on OSX and Linux - need to fix") # TODO uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub']) sas_auth_a = authentication.SASTokenAsync.from_shared_access_key( uri, live_eventhub_config['key_name'], live_eventhub_config['access_key']) @@ -213,8 +211,8 @@ async def test_event_hubs_multiple_receiver_async(live_eventhub_config): live_eventhub_config['event_hub'], live_eventhub_config['consumer_group']) - partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=False, auth=sas_auth_a, timeout=1000, prefetch=10) - partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=False, auth=sas_auth_b, timeout=1000, prefetch=10) + partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=False, auth=sas_auth_a, timeout=3000, prefetch=10) + partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=False, auth=sas_auth_b, timeout=3000, prefetch=10) try: await partition_0.open_async() await partition_1.open_async() diff --git a/samples/test_azure_event_hubs_receive.py b/samples/test_azure_event_hubs_receive.py index b1a9ff5d2..704bed6d6 100644 --- a/samples/test_azure_event_hubs_receive.py +++ b/samples/test_azure_event_hubs_receive.py @@ -97,7 +97,7 @@ def test_event_hubs_client_proxy_settings(live_eventhub_config): live_eventhub_config['consumer_group'], live_eventhub_config['partition']) - with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=50, prefetch=50) as receive_client: + with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=5000, prefetch=50) as receive_client: receive_client.receive_message_batch(max_batch_size=10) @@ -111,7 +111,7 @@ def test_event_hubs_client_receive_sync(live_eventhub_config): live_eventhub_config['event_hub'], live_eventhub_config['consumer_group'], live_eventhub_config['partition']) - with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=50, prefetch=50) as receive_client: + with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=5000, prefetch=50) as receive_client: log.info("Created client, receiving...") with pytest.raises(ValueError): batch = receive_client.receive_message_batch(max_batch_size=100) @@ -178,7 +178,7 @@ def on_message_received(message): live_eventhub_config['consumer_group'], live_eventhub_config['partition']) - receive_client = uamqp.ReceiveClient(source, auth=sas_auth, timeout=10, debug=False) + receive_client = uamqp.ReceiveClient(source, auth=sas_auth, timeout=1000, debug=False) log.info("Created client, receiving...") receive_client.receive_messages(on_message_received) @@ -195,7 +195,7 @@ def test_event_hubs_iter_receive_sync(live_eventhub_config): live_eventhub_config['consumer_group'], live_eventhub_config['partition']) - receive_client = uamqp.ReceiveClient(source, auth=sas_auth, timeout=10, debug=False, prefetch=10) + receive_client = uamqp.ReceiveClient(source, auth=sas_auth, timeout=1000, debug=False, prefetch=10) count = 0 gen = receive_client.receive_messages_iter() for message in gen: @@ -230,7 +230,7 @@ def test_event_hubs_filter_receive(live_eventhub_config): source = address.Source(source_url) source.set_filter(b"amqp.annotation.x-opt-sequence-number > 1500") - with uamqp.ReceiveClient(source, auth=plain_auth, timeout=50, prefetch=50) as receive_client: + with uamqp.ReceiveClient(source, auth=plain_auth, timeout=5000, prefetch=50) as receive_client: log.info("Created client, receiving...") batch = receive_client.receive_message_batch(max_batch_size=10) while batch: diff --git a/samples/test_azure_iothub_receive2.py b/samples/test_azure_iothub_receive2.py index c0f14fdba..4478bcc18 100644 --- a/samples/test_azure_iothub_receive2.py +++ b/samples/test_azure_iothub_receive2.py @@ -60,7 +60,7 @@ def _build_iothub_amqp_endpoint_from_target(target): def _receive_message(conn, source, auth): batch = [] - receive_client = uamqp.ReceiveClient(source, auth=auth, debug=False, timeout=5, prefetch=50) + receive_client = uamqp.ReceiveClient(source, auth=auth, debug=False, timeout=5000, prefetch=50) try: receive_client.open(connection=conn) batch = receive_client.receive_message_batch(max_batch_size=10) diff --git a/uamqp/async_ops/client_async.py b/uamqp/async_ops/client_async.py index af7daee95..922eb9d8f 100644 --- a/uamqp/async_ops/client_async.py +++ b/uamqp/async_ops/client_async.py @@ -10,10 +10,9 @@ import asyncio import collections.abc import logging -import queue import uuid -from uamqp import address, authentication, client, constants, errors +from uamqp import address, authentication, client, constants, errors, compat from uamqp.utils import get_running_loop from uamqp.async_ops.connection_async import ConnectionAsync from uamqp.async_ops.receiver_async import MessageReceiverAsync @@ -218,44 +217,49 @@ async def open_async(self, connection=None): # pylint: disable=protected-access if self._session: return # already open - if connection: - _logger.info("Using existing connection.") - self._auth = connection.auth - self._ext_connection = True - self._connection = connection or self.connection_type( - self._hostname, - self._auth, - container_id=self._name, - max_frame_size=self._max_frame_size, - channel_max=self._channel_max, - idle_timeout=self._idle_timeout, - properties=self._properties, - remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio, - error_policy=self._error_policy, - debug=self._debug_trace, - loop=self.loop) - if not self._connection.cbs and isinstance(self._auth, authentication.CBSAsyncAuthMixin): - self._connection.cbs = await asyncio.shield(self._auth.create_authenticator_async( - self._connection, + try: + if connection: + _logger.info("Using existing connection.") + self._auth = connection.auth + self._ext_connection = True + await connection.lock_async() + self._connection = connection or self.connection_type( + self._hostname, + self._auth, + container_id=self._name, + max_frame_size=self._max_frame_size, + channel_max=self._channel_max, + idle_timeout=self._idle_timeout, + properties=self._properties, + remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio, + error_policy=self._error_policy, debug=self._debug_trace, - incoming_window=self._incoming_window, - outgoing_window=self._outgoing_window, - handle_max=self._handle_max, - on_attach=self._on_attach, - loop=self.loop)) - self._session = self._auth._session - elif self._connection.cbs: - self._session = self._auth._session - else: - self._session = self.session_type( - self._connection, - incoming_window=self._incoming_window, - outgoing_window=self._outgoing_window, - handle_max=self._handle_max, - on_attach=self._on_attach, loop=self.loop) - if self._keep_alive_interval: - self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async(), loop=self.loop) + if not self._connection.cbs and isinstance(self._auth, authentication.CBSAsyncAuthMixin): + self._connection.cbs = await asyncio.shield(self._auth.create_authenticator_async( + self._connection, + debug=self._debug_trace, + incoming_window=self._incoming_window, + outgoing_window=self._outgoing_window, + handle_max=self._handle_max, + on_attach=self._on_attach, + loop=self.loop)) + self._session = self._auth._session + elif self._connection.cbs: + self._session = self._auth._session + else: + self._session = self.session_type( + self._connection, + incoming_window=self._incoming_window, + outgoing_window=self._outgoing_window, + handle_max=self._handle_max, + on_attach=self._on_attach, + loop=self.loop) + if self._keep_alive_interval: + self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async(), loop=self.loop) + finally: + if self._ext_connection: + connection.release_async() async def close_async(self): """Close the client asynchronously. This includes closing the Session @@ -858,6 +862,7 @@ async def receive_messages_async(self, on_message_received): service. It takes a single argument, a ~uamqp.message.Message object. :type on_message_received: callable[~uamqp.message.Message] """ + self._streaming_receive = True await self.open_async() self._message_received_callback = on_message_received receiving = True @@ -868,6 +873,7 @@ async def receive_messages_async(self, on_message_received): receiving = False raise finally: + self._streaming_receive = False if not receiving: await self.close_async() @@ -902,7 +908,6 @@ async def receive_message_batch_async(self, max_batch_size=None, on_message_rece 'connection link credit: {}'.format(max_batch_size, self._prefetch)) timeout = self._counter.get_current_ms() + int(timeout) if timeout else 0 expired = False - self._received_messages = self._received_messages or queue.Queue() await self.open_async() receiving = True batch = [] @@ -946,7 +951,6 @@ def receive_messages_iter_async(self, on_message_received=None): :rtype: Generator[~uamqp.message.Message] """ self._message_received_callback = on_message_received - self._received_messages = queue.Queue() return AsyncMessageIter(self, auto_complete=self.auto_complete) async def redirect_async(self, redirect, auth): @@ -968,6 +972,7 @@ async def redirect_async(self, redirect, auth): self._shutdown = False self._last_activity_timestamp = None self._was_message_received = False + self._received_messages = compat.queue.Queue() self._remote_address = address.Source(redirect.address) await self._redirect_async(redirect, auth) diff --git a/uamqp/client.py b/uamqp/client.py index 90569c691..dda324d78 100644 --- a/uamqp/client.py +++ b/uamqp/client.py @@ -233,43 +233,48 @@ def open(self, connection=None): if self._session: return # already open. _logger.debug("Opening client connection.") - if connection: - _logger.debug("Using existing connection.") - self._auth = connection.auth - self._ext_connection = True - self._connection = connection or self.connection_type( - self._hostname, - self._auth, - container_id=self._name, - max_frame_size=self._max_frame_size, - channel_max=self._channel_max, - idle_timeout=self._idle_timeout, - properties=self._properties, - remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio, - error_policy=self._error_policy, - debug=self._debug_trace, - encoding=self._encoding) - if not self._connection.cbs and isinstance(self._auth, authentication.CBSAuthMixin): - self._connection.cbs = self._auth.create_authenticator( - self._connection, + try: + if connection: + _logger.debug("Using existing connection.") + self._auth = connection.auth + self._ext_connection = True + connection.lock() + self._connection = connection or self.connection_type( + self._hostname, + self._auth, + container_id=self._name, + max_frame_size=self._max_frame_size, + channel_max=self._channel_max, + idle_timeout=self._idle_timeout, + properties=self._properties, + remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio, + error_policy=self._error_policy, debug=self._debug_trace, - incoming_window=self._incoming_window, - outgoing_window=self._outgoing_window, - handle_max=self._handle_max, - on_attach=self._on_attach) - self._session = self._auth._session - elif self._connection.cbs: - self._session = self._auth._session - else: - self._session = self.session_type( - self._connection, - incoming_window=self._incoming_window, - outgoing_window=self._outgoing_window, - handle_max=self._handle_max, - on_attach=self._on_attach) - if self._keep_alive_interval: - self._keep_alive_thread = threading.Thread(target=self._keep_alive) - self._keep_alive_thread.start() + encoding=self._encoding) + if not self._connection.cbs and isinstance(self._auth, authentication.CBSAuthMixin): + self._connection.cbs = self._auth.create_authenticator( + self._connection, + debug=self._debug_trace, + incoming_window=self._incoming_window, + outgoing_window=self._outgoing_window, + handle_max=self._handle_max, + on_attach=self._on_attach) + self._session = self._auth._session + elif self._connection.cbs: + self._session = self._auth._session + else: + self._session = self.session_type( + self._connection, + incoming_window=self._incoming_window, + outgoing_window=self._outgoing_window, + handle_max=self._handle_max, + on_attach=self._on_attach) + if self._keep_alive_interval: + self._keep_alive_thread = threading.Thread(target=self._keep_alive) + self._keep_alive_thread.start() + finally: + if self._ext_connection: + connection.release() def close(self): """Close the client. This includes closing the Session @@ -872,7 +877,8 @@ def __init__( self._last_activity_timestamp = None self._was_message_received = False self._message_received_callback = None - self._received_messages = None + self._streaming_receive = False + self._received_messages = compat.queue.Queue() # Receiver and Link settings self._max_message_size = kwargs.pop('max_message_size', None) or constants.MAX_MESSAGE_LENGTH_BYTES @@ -995,7 +1001,7 @@ def _message_received(self, message): self._message_received_callback(message) self._complete_message(message, self.auto_complete) - if self._received_messages: + if not self._streaming_receive: self._received_messages.put(message) elif not message.settled: # Message was received with callback processing and wasn't settled. @@ -1034,7 +1040,6 @@ def receive_message_batch(self, max_batch_size=None, on_message_received=None, t 'connection link credit: {}'.format(self._prefetch)) timeout = self._counter.get_current_ms() + timeout if timeout else 0 expired = False - self._received_messages = self._received_messages or compat.queue.Queue() self.open() receiving = True batch = [] @@ -1075,8 +1080,8 @@ def receive_messages(self, on_message_received): service. It takes a single argument, a ~uamqp.message.Message object. :type on_message_received: callable[~uamqp.message.Message] """ + self._streaming_receive = True self.open() - self._received_messages = None self._message_received_callback = on_message_received receiving = True try: @@ -1086,6 +1091,7 @@ def receive_messages(self, on_message_received): receiving = False raise finally: + self._streaming_receive = False if not receiving: self.close() @@ -1099,7 +1105,6 @@ def receive_messages_iter(self, on_message_received=None): :type on_message_received: callable[~uamqp.message.Message] """ self._message_received_callback = on_message_received - self._received_messages = compat.queue.Queue() return self._message_generator() def redirect(self, redirect, auth): @@ -1121,7 +1126,7 @@ def redirect(self, redirect, auth): self._shutdown = False self._last_activity_timestamp = None self._was_message_received = False - self._received_messages = None + self._received_messages = compat.queue.Queue() self._remote_address = address.Source(redirect.address) self._redirect(redirect, auth) From 8c2f7908a7a0a778d92bd92e6c7c0329c1c51c08 Mon Sep 17 00:00:00 2001 From: annatisch Date: Wed, 2 Oct 2019 10:24:08 -0700 Subject: [PATCH 3/9] Service Bus message transfer fixes (#96) * Support message delivery tag * Added headers * Removed null init * Added memory cleanup * Fix build error * Moved delivery tag to message * Cython fixes * Binary type * Attempt to set message tag * Converted to AMQP_VALUE * Syntax fixes * Build error * Renamed value * Get tag type * Fixed name * Extract tag bytes * Some C cleanup * More logging * Updated test * pylint fix * More logging * More logging * More logging * Fixed print formatting * More logging * Syntax error * TLSIO logging * Log socket error * Added sleep * Fixed sleep * Reduced sleep * Another attempt * Ping CI * Attempt to move outgoing flow * Moved send flow frame * Removed debug logging * Update link status * Fix diff * pylint fixes * Py2.7 * Updated status description * Fixed executor * Some review feedback --- samples/test_azure_event_hubs_receive.py | 4 +- src/link.pyx | 3 + src/message.pyx | 10 ++ .../azure-uamqp-c/inc/azure_uamqp_c/message.h | 2 + .../azure-uamqp-c/src/amqp_management.c | 13 ++- src/vendor/azure-uamqp-c/src/link.c | 11 +- src/vendor/azure-uamqp-c/src/message.c | 104 ++++++++++++++++++ .../azure-uamqp-c/src/message_receiver.c | 50 ++++++--- src/vendor/inc/c_link.pxd | 1 + src/vendor/inc/c_message.pxd | 2 + tests/test_c_message.py | 5 + uamqp/__init__.py | 2 + uamqp/async_ops/client_async.py | 4 + uamqp/async_ops/receiver_async.py | 10 ++ uamqp/async_ops/sender_async.py | 10 ++ uamqp/client.py | 2 + uamqp/message.py | 4 + uamqp/receiver.py | 4 + uamqp/sender.py | 4 + 19 files changed, 224 insertions(+), 21 deletions(-) diff --git a/samples/test_azure_event_hubs_receive.py b/samples/test_azure_event_hubs_receive.py index 704bed6d6..02e41e0f9 100644 --- a/samples/test_azure_event_hubs_receive.py +++ b/samples/test_azure_event_hubs_receive.py @@ -121,7 +121,9 @@ def test_event_hubs_client_receive_sync(live_eventhub_config): assert len(batch) <= 10 for message in batch: annotations = message.annotations - log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number'))) + log.info("Sequence Number: {}, Delivery tag: {}".format( + annotations.get(b'x-opt-sequence-number'), + message.delivery_tag)) batch = receive_client.receive_message_batch(max_batch_size=10) log.info("Finished receiving") diff --git a/src/link.pyx b/src/link.pyx index fa8967f1c..80c63bfc3 100644 --- a/src/link.pyx +++ b/src/link.pyx @@ -143,6 +143,9 @@ cdef class cLink(StructBase): self._value_error() return value_factory(value) + cpdef do_work(self): + c_link.link_dowork(self._c_value) + cpdef set_prefetch_count(self, stdint.uint32_t prefetch): if c_link.link_set_max_link_credit(self._c_value, prefetch) != 0: self._value_error("Unable to set link credit.") diff --git a/src/message.pyx b/src/message.pyx index 1c36c2b83..1aafa94f3 100644 --- a/src/message.pyx +++ b/src/message.pyx @@ -233,6 +233,16 @@ cdef class cMessage(StructBase): if c_message.message_set_message_format(self._c_value, value) != 0: self._value_error() + @property + def delivery_tag(self): + cdef c_amqpvalue.AMQP_VALUE value + if c_message.message_get_delivery_tag(self._c_value, &value) == 0: + if value == NULL: + return None + return value_factory(value) + else: + self._value_error() + cpdef add_body_data(self, bytes value): cdef c_message.BINARY_DATA _binary length = len(value) diff --git a/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/message.h b/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/message.h index 9f850aabd..ffd0e91f5 100644 --- a/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/message.h +++ b/src/vendor/azure-uamqp-c/inc/azure_uamqp_c/message.h @@ -66,6 +66,8 @@ extern "C" { MOCKABLE_FUNCTION(, int, message_get_body_type, MESSAGE_HANDLE, message, MESSAGE_BODY_TYPE*, body_type); MOCKABLE_FUNCTION(, int, message_set_message_format, MESSAGE_HANDLE, message, uint32_t, message_format); MOCKABLE_FUNCTION(, int, message_get_message_format, MESSAGE_HANDLE, message, uint32_t*, message_format); + MOCKABLE_FUNCTION(, int, message_set_delivery_tag, MESSAGE_HANDLE, message, AMQP_VALUE, delivery_tag_value); + MOCKABLE_FUNCTION(, int, message_get_delivery_tag, MESSAGE_HANDLE, message, AMQP_VALUE*, delivery_tag_value); #ifdef __cplusplus } diff --git a/src/vendor/azure-uamqp-c/src/amqp_management.c b/src/vendor/azure-uamqp-c/src/amqp_management.c index 1b8b37fac..926f15654 100644 --- a/src/vendor/azure-uamqp-c/src/amqp_management.c +++ b/src/vendor/azure-uamqp-c/src/amqp_management.c @@ -198,8 +198,17 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag desc_value = amqpvalue_get_map_value(map, desc_key); if (desc_value != NULL) { - /* Codes_SRS_AMQP_MANAGEMENT_01_134: [ The status description value shall be extracted from the value found in the map by using `amqpvalue_get_string`. ]*/ - if (amqpvalue_get_string(desc_value, &status_description) != 0) + AMQP_TYPE amqp_type = amqpvalue_get_type(desc_value); + if (amqp_type == AMQP_TYPE_STRING) + { + /* Codes_SRS_AMQP_MANAGEMENT_01_134: [ The status description value shall be extracted from the value found in the map by using `amqpvalue_get_string`. ]*/ + if (amqpvalue_get_string(desc_value, &status_description) != 0) + { + /* Codes_SRS_AMQP_MANAGEMENT_01_125: [ If status description is not found, NULL shall be passed to the user callback as `status_description` argument. ]*/ + status_description = NULL; + } + } + else { /* Codes_SRS_AMQP_MANAGEMENT_01_125: [ If status description is not found, NULL shall be passed to the user callback as `status_description` argument. ]*/ status_description = NULL; diff --git a/src/vendor/azure-uamqp-c/src/link.c b/src/vendor/azure-uamqp-c/src/link.c index 4b4fb8d55..f26d3a09f 100644 --- a/src/vendor/azure-uamqp-c/src/link.c +++ b/src/vendor/azure-uamqp-c/src/link.c @@ -434,11 +434,6 @@ static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t link_instance->current_link_credit--; link_instance->delivery_count++; - if (link_instance->current_link_credit == 0) - { - link_instance->current_link_credit = link_instance->max_link_credit; - send_flow(link_instance); - } more = false; /* Attempt to get more flag, default to false */ @@ -1640,6 +1635,12 @@ void link_dowork(LINK_HANDLE link) { tickcounter_ms_t current_tick; + if (link->current_link_credit <= 0) + { + link->current_link_credit = link->max_link_credit; + send_flow(link); + } + if (tickcounter_get_current_ms(link->tick_counter, ¤t_tick) != 0) { LogError("Cannot get tick counter value"); diff --git a/src/vendor/azure-uamqp-c/src/message.c b/src/vendor/azure-uamqp-c/src/message.c index f861d67ea..417c6c6f1 100644 --- a/src/vendor/azure-uamqp-c/src/message.c +++ b/src/vendor/azure-uamqp-c/src/message.c @@ -31,6 +31,7 @@ typedef struct MESSAGE_INSTANCE_TAG application_properties application_properties; annotations footer; uint32_t message_format; + AMQP_VALUE delivery_tag; } MESSAGE_INSTANCE; MESSAGE_BODY_TYPE internal_get_body_type(MESSAGE_HANDLE message) @@ -119,6 +120,7 @@ MESSAGE_HANDLE message_create(void) result->body_amqp_value = NULL; result->body_amqp_sequence_items = NULL; result->body_amqp_sequence_count = 0; + result->delivery_tag = NULL; /* Codes_SRS_MESSAGE_01_135: [ By default a message on which `message_set_message_format` was not called shall have message format set to 0. ]*/ result->message_format = 0; @@ -229,6 +231,17 @@ MESSAGE_HANDLE message_clone(MESSAGE_HANDLE source_message) } } + if ((result != NULL) && (source_message->delivery_tag != NULL)) + { + result->delivery_tag = amqpvalue_clone(source_message->delivery_tag); + if (result->delivery_tag == NULL) + { + LogError("Cannot clone message delivery tag"); + message_destroy(result); + result = NULL; + } + } + if ((result != NULL) && (source_message->body_amqp_data_count > 0)) { size_t i; @@ -375,6 +388,11 @@ void message_destroy(MESSAGE_HANDLE message) amqpvalue_destroy(message->body_amqp_value); } + if (message->delivery_tag != NULL) + { + amqpvalue_destroy(message->delivery_tag); + } + /* Codes_SRS_MESSAGE_01_136: [ If the message body is made of several AMQP data items, they shall all be freed. ]*/ free_all_body_data_items(message); @@ -1447,3 +1465,89 @@ int message_get_message_format(MESSAGE_HANDLE message, uint32_t *message_format) return result; } + +int message_set_delivery_tag(MESSAGE_HANDLE message, AMQP_VALUE delivery_tag_value) +{ + int result; + + if (message == NULL) + { + LogError("NULL message"); + result = __FAILURE__; + } + else + { + if (delivery_tag_value == NULL) + { + if (message->delivery_tag != NULL) + { + amqpvalue_destroy(message->delivery_tag); + message->delivery_tag = NULL; + } + + /* Codes_SRS_MESSAGE_01_053: [ On success it shall return 0. ]*/ + result = 0; + } + else + { + + AMQP_VALUE new_delivery_tag = amqpvalue_clone(delivery_tag_value); + if (new_delivery_tag == NULL) + { + LogError("Cannot clone delivery tag"); + result = __FAILURE__; + } + else + { + if (message->delivery_tag != NULL) + { + amqpvalue_destroy(message->delivery_tag); + } + + message->delivery_tag = new_delivery_tag; + + /* Codes_SRS_MESSAGE_01_102: [ On success it shall return 0. ]*/ + result = 0; + } + } + } + + return result; +} + +int message_get_delivery_tag(MESSAGE_HANDLE message, AMQP_VALUE *delivery_tag_value) +{ + int result; + + if ((message == NULL) || + (delivery_tag_value == NULL)) + { + LogError("Bad arguments: message = %p, delivery_tag = %p", + message, delivery_tag_value); + result = __FAILURE__; + } + else + { + if (message->delivery_tag == NULL) + { + *delivery_tag_value = NULL; + result = 0; + } + else + { + AMQP_VALUE new_delivery_tag = amqpvalue_clone(message->delivery_tag); + if (new_delivery_tag == NULL) + { + LogError("Cannot clone delivery tag"); + result = __FAILURE__; + } + else + { + *delivery_tag_value = new_delivery_tag; + result = 0; + } + } + } + + return result; +} \ No newline at end of file diff --git a/src/vendor/azure-uamqp-c/src/message_receiver.c b/src/vendor/azure-uamqp-c/src/message_receiver.c index da8654794..e2024ba97 100644 --- a/src/vendor/azure-uamqp-c/src/message_receiver.c +++ b/src/vendor/azure-uamqp-c/src/message_receiver.c @@ -225,7 +225,6 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, AMQP_VALUE result = NULL; MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context; - (void)transfer; if (message_receiver->on_message_received != NULL) { MESSAGE_HANDLE message = message_create(); @@ -236,35 +235,60 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, } else { - AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver); - if (amqpvalue_decoder == NULL) + delivery_tag received_message_tag; + if (transfer_get_delivery_tag(transfer, &received_message_tag) != 0) { - LogError("Cannot create AMQP value decoder"); + LogError("Could not get the delivery tag from the transfer performative"); set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - message_receiver->decoded_message = message; - message_receiver->decode_error = false; - if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0) + AMQP_VALUE delivery_tag_value = amqpvalue_create_delivery_tag(received_message_tag); + if (delivery_tag_value == NULL) { - LogError("Cannot decode bytes"); + LogError("Could not create delivery tag value"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + else if (message_set_delivery_tag(message, delivery_tag_value) != 0) + { + LogError("Could not set message delivery tag"); set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - if (message_receiver->decode_error) + AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver); + if (amqpvalue_decoder == NULL) { - LogError("Error decoding message"); + LogError("Cannot create AMQP value decoder"); set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - result = message_receiver->on_message_received(message_receiver->callback_context, message); + message_receiver->decoded_message = message; + message_receiver->decode_error = false; + if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0) + { + LogError("Cannot decode bytes"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + if (message_receiver->decode_error) + { + LogError("Error decoding message"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + else + { + result = message_receiver->on_message_received(message_receiver->callback_context, message); + } + } + + amqpvalue_decoder_destroy(amqpvalue_decoder); } - } - amqpvalue_decoder_destroy(amqpvalue_decoder); + amqpvalue_destroy(delivery_tag_value); + } } message_destroy(message); diff --git a/src/vendor/inc/c_link.pxd b/src/vendor/inc/c_link.pxd index abb5dd871..d853435d9 100644 --- a/src/vendor/inc/c_link.pxd +++ b/src/vendor/inc/c_link.pxd @@ -41,6 +41,7 @@ cdef extern from "azure_uamqp_c/link.h": LINK_HANDLE link_create(c_session.SESSION_HANDLE session, const char* name, c_amqp_definitions.role role, c_amqpvalue.AMQP_VALUE source, c_amqpvalue.AMQP_VALUE target) void link_destroy(LINK_HANDLE handle) + void link_dowork(LINK_HANDLE link) int link_set_snd_settle_mode(LINK_HANDLE link, c_amqp_definitions.sender_settle_mode snd_settle_mode) int link_get_snd_settle_mode(LINK_HANDLE link, c_amqp_definitions.sender_settle_mode* snd_settle_mode) int link_set_rcv_settle_mode(LINK_HANDLE link, c_amqp_definitions.receiver_settle_mode rcv_settle_mode) diff --git a/src/vendor/inc/c_message.pxd b/src/vendor/inc/c_message.pxd index 4145a0ff6..90f387dd9 100644 --- a/src/vendor/inc/c_message.pxd +++ b/src/vendor/inc/c_message.pxd @@ -69,5 +69,7 @@ cdef extern from "azure_uamqp_c/message.h": int message_get_body_type(MESSAGE_HANDLE message, MESSAGE_BODY_TYPE_TAG* body_type) int message_set_message_format(MESSAGE_HANDLE message, stdint.uint32_t message_format) int message_get_message_format(MESSAGE_HANDLE message, stdint.uint32_t* message_format) + int message_get_delivery_tag(MESSAGE_HANDLE message, c_amqpvalue.AMQP_VALUE* delivery_tag) + diff --git a/tests/test_c_message.py b/tests/test_c_message.py index 1018abb58..d5bb105f9 100644 --- a/tests/test_c_message.py +++ b/tests/test_c_message.py @@ -36,3 +36,8 @@ def test_body_value(): body = message.get_body_value() assert body.type == c_uamqp.AMQPType.StringValue + + +def test_delivery_tag(): + message = c_uamqp.create_message() + assert not message.delivery_tag diff --git a/uamqp/__init__.py b/uamqp/__init__.py index 2c7aaacad..e37c58e6d 100644 --- a/uamqp/__init__.py +++ b/uamqp/__init__.py @@ -4,6 +4,8 @@ # license information. #-------------------------------------------------------------------------- +# pylint: disable=no-member + import logging import sys diff --git a/uamqp/async_ops/client_async.py b/uamqp/async_ops/client_async.py index 922eb9d8f..589e3acd4 100644 --- a/uamqp/async_ops/client_async.py +++ b/uamqp/async_ops/client_async.py @@ -516,6 +516,7 @@ async def _client_ready_async(self): properties=self._link_properties, error_policy=self._error_policy, encoding=self._encoding, + executor=self._connection._executor, loop=self.loop) await asyncio.shield(self.message_handler.open_async()) return False @@ -567,6 +568,7 @@ async def _client_run_async(self): :rtype: bool """ # pylint: disable=protected-access + await self.message_handler.work_async() self._waiting_messages = 0 async with self._pending_messages_lock: self._pending_messages = await self._filter_pending_async() @@ -812,6 +814,7 @@ async def _client_ready_async(self): error_policy=self._error_policy, encoding=self._encoding, desired_capabilities=self._desired_capabilities, + executor=self._connection._executor, loop=self.loop) await asyncio.shield(self.message_handler.open_async()) return False @@ -832,6 +835,7 @@ async def _client_run_async(self): :rtype: bool """ + await self.message_handler.work_async() await self._connection.work_async() now = self._counter.get_current_ms() if self._last_activity_timestamp and not self._was_message_received: diff --git a/uamqp/async_ops/receiver_async.py b/uamqp/async_ops/receiver_async.py index 85130761c..7d13babca 100644 --- a/uamqp/async_ops/receiver_async.py +++ b/uamqp/async_ops/receiver_async.py @@ -5,6 +5,7 @@ #-------------------------------------------------------------------------- import logging +import functools from uamqp import constants, errors, receiver from uamqp.utils import get_running_loop @@ -81,8 +82,10 @@ def __init__(self, session, source, target, debug=False, encoding='UTF-8', desired_capabilities=None, + executor=None, loop=None): self.loop = loop or get_running_loop() + self.executor = executor super(MessageReceiverAsync, self).__init__( session, source, target, on_message_received, @@ -125,6 +128,13 @@ async def open_async(self): "Failed to open Message Receiver. " "Please confirm credentials and target URI.") + async def work_async(self): + """Update the link status.""" + # pylint: disable=protected-access + await self.loop.run_in_executor( + self.executor, + functools.partial(self._link.do_work)) + async def close_async(self): """Close the Receiver asynchronously, leaving the link intact.""" self.close() diff --git a/uamqp/async_ops/sender_async.py b/uamqp/async_ops/sender_async.py index 0e971b584..d96ed8d29 100644 --- a/uamqp/async_ops/sender_async.py +++ b/uamqp/async_ops/sender_async.py @@ -5,6 +5,7 @@ #-------------------------------------------------------------------------- import logging +import functools from uamqp import constants, errors, sender from uamqp.utils import get_running_loop @@ -84,8 +85,10 @@ def __init__(self, session, source, target, error_policy=None, debug=False, encoding='UTF-8', + executor=None, loop=None): self.loop = loop or get_running_loop() + self.executor = executor super(MessageSenderAsync, self).__init__( session, source, target, name=name, @@ -157,6 +160,13 @@ async def send_async(self, message, callback, timeout=0): finally: self._session._connection.release_async() + async def work_async(self): + """Update the link status.""" + # pylint: disable=protected-access + await self.loop.run_in_executor( + self.executor, + functools.partial(self._link.do_work)) + async def close_async(self): """Close the sender asynchronously, leaving the link intact.""" self._sender.close() diff --git a/uamqp/client.py b/uamqp/client.py index dda324d78..4a08b3350 100644 --- a/uamqp/client.py +++ b/uamqp/client.py @@ -648,6 +648,7 @@ def _client_run(self): :rtype: bool """ # pylint: disable=protected-access + self.message_handler.work() self._waiting_messages = 0 self._pending_messages = self._filter_pending() if self._backoff and not self._waiting_messages: @@ -943,6 +944,7 @@ def _client_run(self): :rtype: bool """ + self.message_handler.work() self._connection.work() now = self._counter.get_current_ms() if self._last_activity_timestamp and not self._was_message_received: diff --git a/uamqp/message.py b/uamqp/message.py index 928976b21..5b1fca71d 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -78,6 +78,7 @@ def __init__(self, self._settler = None self._encoding = encoding self.delivery_no = delivery_no + self.delivery_tag = None self.on_send_complete = None self.properties = None self.application_properties = None @@ -139,6 +140,9 @@ def _parse_message(self, message): """ _logger.debug("Parsing received message %r.", self.delivery_no) self._message = message + delivery_tag = self._message.delivery_tag + if delivery_tag: + self.delivery_tag = delivery_tag.value body_type = message.body_type if body_type == c_uamqp.MessageBodyType.NoneType: self._body = None diff --git a/uamqp/receiver.py b/uamqp/receiver.py index 91d35732f..590bccdfb 100644 --- a/uamqp/receiver.py +++ b/uamqp/receiver.py @@ -262,6 +262,10 @@ def get_state(self): raise return self._state + def work(self): + """Update the link status.""" + self._link.do_work() + def destroy(self): """Close both the Receiver and the Link. Clean up any C objects.""" self._receiver.destroy() diff --git a/uamqp/sender.py b/uamqp/sender.py index eda618072..068c18c0f 100644 --- a/uamqp/sender.py +++ b/uamqp/sender.py @@ -189,6 +189,10 @@ def get_state(self): raise return self._state + def work(self): + """Update the link status.""" + self._link.do_work() + def destroy(self): """Close both the Sender and the Link. Clean up any C objects.""" self._sender.destroy() From c01e66a4fc78fd99e4bc2bcb8d94545cc73a19ce Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" <47871814+yunhaoling@users.noreply.github.com> Date: Wed, 2 Oct 2019 11:28:04 -0700 Subject: [PATCH 4/9] Performance improvement (#98) * Remove deepcopy and increase buffer size * Move parse into cython * Small fix * lazy parse * small fix * Update name * Update message property * remove unused import * put deepcopy back as it influences the performance little * Add footer setter * Update setters of message --- .../inc/azure_c_shared_utility/socketio.h | 2 +- uamqp/message.py | 160 +++++++++++++----- 2 files changed, 121 insertions(+), 41 deletions(-) diff --git a/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/inc/azure_c_shared_utility/socketio.h b/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/inc/azure_c_shared_utility/socketio.h index de1ee46e9..c94fb52d8 100644 --- a/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/inc/azure_c_shared_utility/socketio.h +++ b/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/inc/azure_c_shared_utility/socketio.h @@ -22,7 +22,7 @@ typedef struct SOCKETIO_CONFIG_TAG void* accepted_socket; } SOCKETIO_CONFIG; -#define RECEIVE_BYTES_VALUE 64 +#define RECEIVE_BYTES_VALUE 1024 MOCKABLE_FUNCTION(, CONCRETE_IO_HANDLE, socketio_create, void*, io_create_parameters); MOCKABLE_FUNCTION(, void, socketio_destroy, CONCRETE_IO_HANDLE, socket_io); diff --git a/uamqp/message.py b/uamqp/message.py index 5b1fca71d..b4b747fcc 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -80,12 +80,13 @@ def __init__(self, self.delivery_no = delivery_no self.delivery_tag = None self.on_send_complete = None - self.properties = None - self.application_properties = None - self.annotations = None - self.header = None - self.footer = None - self.delivery_annotations = None + self._properties = None + self._application_properties = None + self._annotations = None + self._header = None + self._footer = None + self._delivery_annotations = None + self._need_further_parse = False if message: if settler: @@ -95,7 +96,7 @@ def __init__(self, self.state = constants.MessageState.ReceivedSettled self._response = errors.MessageAlreadySettled() self._settler = settler - self._parse_message(message) + self._parse_message_body(message) else: self._message = c_uamqp.create_message() if isinstance(body, (six.text_type, six.binary_type)): @@ -110,10 +111,83 @@ def __init__(self, self._body.set(body) if msg_format: self._message.message_format = msg_format - self.properties = properties - self.application_properties = application_properties - self.annotations = annotations - self.header = header + self._properties = properties + self._application_properties = application_properties + self._annotations = annotations + self._header = header + + @property + def properties(self): + if self._need_further_parse: + self._parse_message_properties() + return self._properties + + @properties.setter + def properties(self, value): + if not isinstance(value, MessageProperties): + raise TypeError("Properties must be a MessageProperties.") + self._properties = value + + @property + def header(self): + if self._need_further_parse: + self._parse_message_properties() + return self._header + + @header.setter + def header(self, value): + if not isinstance(value, MessageHeader): + raise TypeError("Header must be a MessageHeader.") + self._header = value + + @property + def footer(self): + if self._need_further_parse: + self._parse_message_properties() + return self._footer + + @footer.setter + def footer(self, value): + if not isinstance(value, dict): + raise TypeError("Footer must be a dictionary") + footer_props = c_uamqp.create_footer( + utils.data_factory(value, encoding=self._encoding)) + self._message.footer = footer_props + self._footer = value + + @property + def application_properties(self): + if self._need_further_parse: + self._parse_message_properties() + return self._application_properties + + @application_properties.setter + def application_properties(self, value): + if not isinstance(value, dict): + raise TypeError("Application properties must be a dictionary.") + self._application_properties = value + + @property + def annotations(self): + if self._need_further_parse: + self._parse_message_properties() + return self._annotations + + @annotations.setter + def annotations(self, value): + if not isinstance(value, dict): + raise TypeError("Message annotations must be a dictionary.") + self._annotations = value + + @property + def delivery_annotations(self): + if self._need_further_parse: + self._parse_message_properties() + return self._delivery_annotations + + @delivery_annotations.setter + def delivery_annotations(self, value): + self._delivery_annotations = value @classmethod def decode_from_bytes(cls, data): @@ -132,12 +206,41 @@ def __str__(self): return "" return str(self._body) - def _parse_message(self, message): + def _parse_message_properties(self): + if self._need_further_parse: + _props = self._message.properties + if _props: + _logger.debug("Parsing received message properties %r.", self.delivery_no) + self._properties = MessageProperties(properties=_props, encoding=self._encoding) + _header = self._message.header + if _header: + _logger.debug("Parsing received message header %r.", self.delivery_no) + self._header = MessageHeader(header=_header) + _footer = self._message.footer + if _footer: + _logger.debug("Parsing received message footer %r.", self.delivery_no) + self._footer = _footer.map + _app_props = self._message.application_properties + if _app_props: + _logger.debug("Parsing received message application properties %r.", self.delivery_no) + self._application_properties = _app_props.map + _ann = self._message.message_annotations + if _ann: + _logger.debug("Parsing received message annotations %r.", self.delivery_no) + self._annotations = _ann.map + _delivery_ann = self._message.delivery_annotations + if _delivery_ann: + _logger.debug("Parsing received message delivery annotations %r.", self.delivery_no) + self._delivery_annotations = _delivery_ann.map + self._need_further_parse = False + + def _parse_message_body(self, message): """Parse a message received from an AMQP service. :param message: The received C message. :type message: uamqp.c_uamqp.cMessage """ + _logger.debug("Parsing received message %r.", self.delivery_no) self._message = message delivery_tag = self._message.delivery_tag @@ -152,30 +255,7 @@ def _parse_message(self, message): raise TypeError("Message body type Sequence not supported.") else: self._body = ValueBody(self._message) - _props = self._message.properties - if _props: - _logger.debug("Parsing received message properties %r.", self.delivery_no) - self.properties = MessageProperties(properties=_props, encoding=self._encoding) - _header = self._message.header - if _header: - _logger.debug("Parsing received message header %r.", self.delivery_no) - self.header = MessageHeader(header=_header) - _footer = self._message.footer - if _footer: - _logger.debug("Parsing received message footer %r.", self.delivery_no) - self.footer = _footer.map - _app_props = self._message.application_properties - if _app_props: - _logger.debug("Parsing received message application properties %r.", self.delivery_no) - self.application_properties = _app_props.map - _ann = self._message.message_annotations - if _ann: - _logger.debug("Parsing received message annotations %r.", self.delivery_no) - self.annotations = _ann.map - _delivery_ann = self._message.delivery_annotations - if _delivery_ann: - _logger.debug("Parsing received message delivery annotations %r.", self.delivery_no) - self.delivery_annotations = _delivery_ann.map + self._need_further_parse = True def _can_settle_message(self): if self.state not in constants.RECEIVE_STATES: @@ -435,10 +515,10 @@ def __init__(self, self._body_gen = data self._encoding = encoding self.on_send_complete = None - self.properties = properties - self.application_properties = application_properties - self.annotations = annotations - self.header = header + self._properties = properties + self._application_properties = application_properties + self._annotations = annotations + self._header = header def _create_batch_message(self): """Create a ~uamqp.message.Message for a value supplied by the data From 68746619991d41ff636a2fd54f823a6b23b6e5c7 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" <47871814+yunhaoling@users.noreply.github.com> Date: Wed, 2 Oct 2019 13:50:35 -0700 Subject: [PATCH 5/9] fix bug in batch message (#99) --- uamqp/message.py | 1 + 1 file changed, 1 insertion(+) diff --git a/uamqp/message.py b/uamqp/message.py index b4b747fcc..2e123b258 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -519,6 +519,7 @@ def __init__(self, self._application_properties = application_properties self._annotations = annotations self._header = header + self._need_further_parse = False def _create_batch_message(self): """Create a ~uamqp.message.Message for a value supplied by the data From 148f75b2c5494bcd16e16c546a710fe3b569ccf4 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" <47871814+yunhaoling@users.noreply.github.com> Date: Wed, 2 Oct 2019 15:26:17 -0700 Subject: [PATCH 6/9] Bug fix, properties of message can be None type (#100) --- uamqp/message.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/uamqp/message.py b/uamqp/message.py index 2e123b258..37a7dc481 100644 --- a/uamqp/message.py +++ b/uamqp/message.py @@ -124,7 +124,7 @@ def properties(self): @properties.setter def properties(self, value): - if not isinstance(value, MessageProperties): + if value and not isinstance(value, MessageProperties): raise TypeError("Properties must be a MessageProperties.") self._properties = value @@ -136,7 +136,7 @@ def header(self): @header.setter def header(self, value): - if not isinstance(value, MessageHeader): + if value and not isinstance(value, MessageHeader): raise TypeError("Header must be a MessageHeader.") self._header = value @@ -148,7 +148,7 @@ def footer(self): @footer.setter def footer(self, value): - if not isinstance(value, dict): + if value and not isinstance(value, dict): raise TypeError("Footer must be a dictionary") footer_props = c_uamqp.create_footer( utils.data_factory(value, encoding=self._encoding)) @@ -163,7 +163,7 @@ def application_properties(self): @application_properties.setter def application_properties(self, value): - if not isinstance(value, dict): + if value and not isinstance(value, dict): raise TypeError("Application properties must be a dictionary.") self._application_properties = value @@ -175,7 +175,7 @@ def annotations(self): @annotations.setter def annotations(self, value): - if not isinstance(value, dict): + if value and not isinstance(value, dict): raise TypeError("Message annotations must be a dictionary.") self._annotations = value From 22a04f911b39042676da81a892ec2220d077c1bf Mon Sep 17 00:00:00 2001 From: KieranBrantnerMagee Date: Wed, 2 Oct 2019 21:48:17 -0700 Subject: [PATCH 7/9] on_transfer_received should not fail due to lack of a delivery tag (#101) * on_transfer_received should not fail due to lack of a delivery tag * Test reversing if-statements * Try initialize value * Trying to make Windows happy --- .../azure-uamqp-c/src/message_receiver.c | 69 +++++++++---------- 1 file changed, 32 insertions(+), 37 deletions(-) diff --git a/src/vendor/azure-uamqp-c/src/message_receiver.c b/src/vendor/azure-uamqp-c/src/message_receiver.c index e2024ba97..b33f336e8 100644 --- a/src/vendor/azure-uamqp-c/src/message_receiver.c +++ b/src/vendor/azure-uamqp-c/src/message_receiver.c @@ -236,61 +236,56 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer, else { delivery_tag received_message_tag; - if (transfer_get_delivery_tag(transfer, &received_message_tag) != 0) + AMQP_VALUE delivery_tag_value; + AMQPVALUE_DECODER_HANDLE amqpvalue_decoder; + + if (transfer_get_delivery_tag(transfer, &received_message_tag) == 0) + { + delivery_tag_value = amqpvalue_create_delivery_tag(received_message_tag); + if ((delivery_tag_value != NULL) && (message_set_delivery_tag(message, delivery_tag_value) != 0)) + { + LogError("Could not set message delivery tag"); + set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); + } + } + else + { + delivery_tag_value = NULL; + } + + amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver); + if (amqpvalue_decoder == NULL) { - LogError("Could not get the delivery tag from the transfer performative"); + LogError("Cannot create AMQP value decoder"); set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - AMQP_VALUE delivery_tag_value = amqpvalue_create_delivery_tag(received_message_tag); - if (delivery_tag_value == NULL) + message_receiver->decoded_message = message; + message_receiver->decode_error = false; + if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0) { - LogError("Could not create delivery tag value"); - set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); - } - else if (message_set_delivery_tag(message, delivery_tag_value) != 0) - { - LogError("Could not set message delivery tag"); + LogError("Cannot decode bytes"); set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver); - if (amqpvalue_decoder == NULL) + if (message_receiver->decode_error) { - LogError("Cannot create AMQP value decoder"); + LogError("Error decoding message"); set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); } else { - message_receiver->decoded_message = message; - message_receiver->decode_error = false; - if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0) - { - LogError("Cannot decode bytes"); - set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); - } - else - { - if (message_receiver->decode_error) - { - LogError("Error decoding message"); - set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR); - } - else - { - result = message_receiver->on_message_received(message_receiver->callback_context, message); - } - } - - amqpvalue_decoder_destroy(amqpvalue_decoder); + result = message_receiver->on_message_received(message_receiver->callback_context, message); } - - amqpvalue_destroy(delivery_tag_value); } - } + amqpvalue_decoder_destroy(amqpvalue_decoder); + } + if ( delivery_tag_value != NULL ) { + amqpvalue_destroy(delivery_tag_value); + } message_destroy(message); } } From f3f04c08b5f8fc88ca56a06b35c76e9908f5510c Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" <47871814+yunhaoling@users.noreply.github.com> Date: Fri, 4 Oct 2019 13:37:15 -0700 Subject: [PATCH 8/9] Fix proxy test (#104) --- .../test_azure_event_hubs_receive_async.py | 17 +++++++++++++++++ samples/test_azure_event_hubs_receive.py | 19 ++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/samples/asynctests/test_azure_event_hubs_receive_async.py b/samples/asynctests/test_azure_event_hubs_receive_async.py index 55f6de6a6..63b1bde22 100644 --- a/samples/asynctests/test_azure_event_hubs_receive_async.py +++ b/samples/asynctests/test_azure_event_hubs_receive_async.py @@ -128,6 +128,23 @@ async def test_event_hubs_batch_receive_async(live_eventhub_config): log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number'))) +@pytest.mark.asyncio +async def test_event_hubs_client_web_socket_async(live_eventhub_config): + uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub']) + sas_auth = authentication.SASTokenAsync.from_shared_access_key( + uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'], + transport_type=uamqp.TransportType.AmqpOverWebsocket) + + source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format( + live_eventhub_config['hostname'], + live_eventhub_config['event_hub'], + live_eventhub_config['consumer_group'], + live_eventhub_config['partition']) + + async with uamqp.ReceiveClientAsync(source, auth=sas_auth, debug=False, timeout=5000, prefetch=50) as receive_client: + receive_client.receive_message_batch(max_batch_size=10) + + @pytest.mark.asyncio async def test_event_hubs_receive_with_runtime_metric_async(live_eventhub_config): uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub']) diff --git a/samples/test_azure_event_hubs_receive.py b/samples/test_azure_event_hubs_receive.py index 02e41e0f9..d932e964a 100644 --- a/samples/test_azure_event_hubs_receive.py +++ b/samples/test_azure_event_hubs_receive.py @@ -85,8 +85,25 @@ def test_event_hubs_single_batch_receive(live_eventhub_config): assert len(message) <= 300 +def test_event_hubs_client_web_socket(live_eventhub_config): + uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub']) + sas_auth = authentication.SASTokenAuth.from_shared_access_key( + uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'], + transport_type=uamqp.TransportType.AmqpOverWebsocket) + + source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format( + live_eventhub_config['hostname'], + live_eventhub_config['event_hub'], + live_eventhub_config['consumer_group'], + live_eventhub_config['partition']) + + with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=5000, prefetch=50) as receive_client: + receive_client.receive_message_batch(max_batch_size=10) + + def test_event_hubs_client_proxy_settings(live_eventhub_config): - proxy_settings={'proxy_hostname':'127.0.0.1', 'proxy_port': 12345} + pytest.skip("skipping the test in CI due to no proxy server") + proxy_settings={'proxy_hostname': '127.0.0.1', 'proxy_port': 12345} uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub']) sas_auth = authentication.SASTokenAuth.from_shared_access_key( uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'], http_proxy=proxy_settings) From ed6b90d10081a3680828ad68576ba4aaca08ef9f Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" <47871814+yunhaoling@users.noreply.github.com> Date: Fri, 4 Oct 2019 13:52:56 -0700 Subject: [PATCH 9/9] Update docs (#103) * Update docs * fix typo * More typo --- HISTORY.rst | 9 +++++++-- uamqp/async_ops/client_async.py | 5 +++++ uamqp/async_ops/receiver_async.py | 5 +++++ uamqp/client.py | 5 +++++ uamqp/receiver.py | 5 +++++ 5 files changed, 27 insertions(+), 2 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 5c6b5e5e1..05d0fe6b2 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,10 +3,15 @@ Release History =============== -1.2.3 (2019-09-09) +1.2.3 (2019-10-07) ++++++++++++++++++ -- Fixed bug in dropping recevied messages when the connection just started working. +- Fixed bug in dropping received messages at the moment when the connection just started working. +- Fixed bug where underlying io type wasn't set to WebSocket when http_proxy was applied (PR#92, Thanks to skoop22). +- Fixed bug in noneffective timeout when sending messages. +- Added desired-capabilities for `ReceiveClient(Async)` and `MessageReceiver(Async)` as part of the AMQP protocol. +- Added delivery-tag to `Message` (azure-sdk-for-python issue #7336). +- Added method `work` to `MessageReceiver` and `work_async` to `MessageReceiverAsync` responsible for updating link status. 1.2.2 (2019-07-02) ++++++++++++++++++ diff --git a/uamqp/async_ops/client_async.py b/uamqp/async_ops/client_async.py index 589e3acd4..ed00683a8 100644 --- a/uamqp/async_ops/client_async.py +++ b/uamqp/async_ops/client_async.py @@ -726,6 +726,11 @@ class ReceiveClientAsync(client.ReceiveClient, AMQPClientAsync): will assume successful receipt of the message and clear it from the queue. The default is `PeekLock`. :type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode + :param desired_capabilities: The extension capabilities desired from the peer endpoint. + To create an desired_capabilities object, please do as follows: + - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` + - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` + :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param max_message_size: The maximum allowed message size negotiated for the Link. :type max_message_size: int :param link_properties: Metadata to be sent in the Link ATTACH frame. diff --git a/uamqp/async_ops/receiver_async.py b/uamqp/async_ops/receiver_async.py index 7d13babca..57290a65c 100644 --- a/uamqp/async_ops/receiver_async.py +++ b/uamqp/async_ops/receiver_async.py @@ -50,6 +50,11 @@ class MessageReceiverAsync(receiver.MessageReceiver): from the service that the message was successfully sent. If set to 'Settled', the client will not wait for confirmation and assume success. :type send_settle_mode: ~uamqp.constants.SenderSettleMode + :param desired_capabilities: The extension capabilities desired from the peer endpoint. + To create an desired_capabilities object, please do as follows: + - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` + - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` + :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param max_message_size: The maximum allowed message size negotiated for the Link. :type max_message_size: int :param prefetch: The receiver Link credit that determines how many diff --git a/uamqp/client.py b/uamqp/client.py index 4a08b3350..9be97c652 100644 --- a/uamqp/client.py +++ b/uamqp/client.py @@ -835,6 +835,11 @@ class ReceiveClient(AMQPClient): will assume successful receipt of the message and clear it from the queue. The default is `PeekLock`. :type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode + :param desired_capabilities: The extension capabilities desired from the peer endpoint. + To create an desired_capabilities object, please do as follows: + - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` + - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` + :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param max_message_size: The maximum allowed message size negotiated for the Link. :type max_message_size: int :param link_properties: Metadata to be sent in the Link ATTACH frame. diff --git a/uamqp/receiver.py b/uamqp/receiver.py index 590bccdfb..ba7a7219c 100644 --- a/uamqp/receiver.py +++ b/uamqp/receiver.py @@ -52,6 +52,11 @@ class MessageReceiver(object): from the service that the message was successfully sent. If set to 'Settled', the client will not wait for confirmation and assume success. :type send_settle_mode: ~uamqp.constants.SenderSettleMode + :param desired_capabilities: The extension capabilities desired from the peer endpoint. + To create an desired_capabilities object, please do as follows: + - 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]` + - 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))` + :type desired_capabilities: ~uamqp.c_uamqp.AMQPValue :param max_message_size: The maximum allowed message size negotiated for the Link. :type max_message_size: int :param prefetch: The receiver Link credit that determines how many