Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
Release History
===============

1.2.3 (2019-10-07)
++++++++++++++++++

- Fixed bug in dropping received messages at the moment when the connection just started working.
- Fixed bug where underlying io type wasn't set to WebSocket when http_proxy was applied (PR#92, Thanks to skoop22).
- Fixed bug in noneffective timeout when sending messages.
- Added desired-capabilities for `ReceiveClient(Async)` and `MessageReceiver(Async)` as part of the AMQP protocol.
- Added delivery-tag to `Message` (azure-sdk-for-python issue #7336).
- Added method `work` to `MessageReceiver` and `work_async` to `MessageReceiverAsync` responsible for updating link status.

1.2.2 (2019-07-02)
++++++++++++++++++

Expand Down
69 changes: 57 additions & 12 deletions samples/asynctests/test_azure_event_hubs_receive_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
import sys

import uamqp
from uamqp import address
from uamqp import authentication
from uamqp import address, types, utils, authentication


def get_logger(level):
Expand Down Expand Up @@ -45,7 +44,7 @@ async def test_event_hubs_callback_async_receive(live_eventhub_config):
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])

receive_client = uamqp.ReceiveClientAsync(source, auth=sas_auth, timeout=10, prefetch=10)
receive_client = uamqp.ReceiveClientAsync(source, auth=sas_auth, timeout=1000, prefetch=10)
log.info("Created client, receiving...")
await receive_client.receive_messages_async(on_message_received)
log.info("Finished receiving")
Expand All @@ -65,7 +64,7 @@ async def test_event_hubs_filter_receive_async(live_eventhub_config):
source = address.Source(source_url)
source.set_filter(b"amqp.annotation.x-opt-enqueuedtimeutc > 1518731960545")

receive_client = uamqp.ReceiveClientAsync(source, auth=plain_auth, timeout=50)
receive_client = uamqp.ReceiveClientAsync(source, auth=plain_auth, timeout=5000)
await receive_client.receive_messages_async(on_message_received)


Expand All @@ -80,7 +79,7 @@ async def test_event_hubs_iter_receive_async(live_eventhub_config):
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])

receive_client = uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=1000, prefetch=10)
receive_client = uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=3000, prefetch=10)
count = 0
message_generator = receive_client.receive_messages_iter_async()
async for message in message_generator:
Expand Down Expand Up @@ -111,7 +110,7 @@ async def test_event_hubs_batch_receive_async(live_eventhub_config):
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])

async with uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=1000, prefetch=10) as receive_client:
async with uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=3000, prefetch=10) as receive_client:
message_batch = await receive_client.receive_message_batch_async(10)
log.info("got batch: {}".format(len(message_batch)))
for message in message_batch:
Expand All @@ -129,9 +128,54 @@ async def test_event_hubs_batch_receive_async(live_eventhub_config):
log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))


@pytest.mark.asyncio
async def test_event_hubs_client_web_socket_async(live_eventhub_config):
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAsync.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'],
transport_type=uamqp.TransportType.AmqpOverWebsocket)

source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub_config['hostname'],
live_eventhub_config['event_hub'],
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])

async with uamqp.ReceiveClientAsync(source, auth=sas_auth, debug=False, timeout=5000, prefetch=50) as receive_client:
receive_client.receive_message_batch(max_batch_size=10)


@pytest.mark.asyncio
async def test_event_hubs_receive_with_runtime_metric_async(live_eventhub_config):
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAsync.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub_config['hostname'],
live_eventhub_config['event_hub'],
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])

receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric'
symbol_array = [types.AMQPSymbol(receiver_runtime_metric_symbol)]
desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))

async with uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=1000, prefetch=10,
desired_capabilities=desired_capabilities) as receive_client:
message_batch = await receive_client.receive_message_batch_async(10)
log.info("got batch: {}".format(len(message_batch)))
for message in message_batch:
annotations = message.annotations
delivery_annotations = message.delivery_annotations
log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
assert b'last_enqueued_sequence_number' in delivery_annotations
assert b'last_enqueued_offset' in delivery_annotations
assert b'last_enqueued_time_utc' in delivery_annotations
assert b'runtime_info_retrieval_time_utc' in delivery_annotations


@pytest.mark.asyncio
async def test_event_hubs_shared_connection_async(live_eventhub_config):
pytest.skip("Unstable on OSX and Linux - need to fix") # TODO
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAsync.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
Expand All @@ -141,8 +185,8 @@ async def test_event_hubs_shared_connection_async(live_eventhub_config):
live_eventhub_config['consumer_group'])

async with uamqp.ConnectionAsync(live_eventhub_config['hostname'], sas_auth, debug=False) as conn:
partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=False, auth=sas_auth, timeout=1000, prefetch=10)
partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=False, auth=sas_auth, timeout=1000, prefetch=10)
partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=False, auth=sas_auth, timeout=3000, prefetch=10)
partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=False, auth=sas_auth, timeout=3000, prefetch=10)
await partition_0.open_async(connection=conn)
await partition_1.open_async(connection=conn)
tasks = [
Expand All @@ -158,6 +202,7 @@ async def test_event_hubs_shared_connection_async(live_eventhub_config):
await partition_0.close_async()
await partition_1.close_async()


async def receive_ten(partition, receiver):
messages = []
count = 0
Expand All @@ -170,9 +215,9 @@ async def receive_ten(partition, receiver):
print("Finished receiving on partition {}".format(partition))
return messages


@pytest.mark.asyncio
async def test_event_hubs_multiple_receiver_async(live_eventhub_config):
pytest.skip("Unstable on OSX and Linux - need to fix") # TODO
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth_a = authentication.SASTokenAsync.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
Expand All @@ -183,8 +228,8 @@ async def test_event_hubs_multiple_receiver_async(live_eventhub_config):
live_eventhub_config['event_hub'],
live_eventhub_config['consumer_group'])

partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=False, auth=sas_auth_a, timeout=1000, prefetch=10)
partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=False, auth=sas_auth_b, timeout=1000, prefetch=10)
partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=False, auth=sas_auth_a, timeout=3000, prefetch=10)
partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=False, auth=sas_auth_b, timeout=3000, prefetch=10)
try:
await partition_0.open_async()
await partition_1.open_async()
Expand Down
77 changes: 64 additions & 13 deletions samples/test_azure_event_hubs_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@
import logging
import os
import pytest
import time
import sys
try:
from urllib import quote_plus #Py2
except Exception:
from urllib.parse import quote_plus

import uamqp
from uamqp import address, errors
from uamqp import authentication
from uamqp import address, types, utils, authentication


def get_logger(level):
Expand All @@ -31,12 +29,14 @@ def get_logger(level):

log = get_logger(logging.INFO)


def get_plain_auth(config):
return authentication.SASLPlain(
config['hostname'],
config['key_name'],
config['access_key'])


def test_event_hubs_simple_receive(live_eventhub_config):
source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub_config['hostname'],
Expand Down Expand Up @@ -85,9 +85,25 @@ def test_event_hubs_single_batch_receive(live_eventhub_config):
assert len(message) <= 300


def test_event_hubs_client_web_socket(live_eventhub_config):
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'],
transport_type=uamqp.TransportType.AmqpOverWebsocket)

source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub_config['hostname'],
live_eventhub_config['event_hub'],
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])

with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=5000, prefetch=50) as receive_client:
receive_client.receive_message_batch(max_batch_size=10)


def test_event_hubs_client_proxy_settings(live_eventhub_config):
#pytest.skip("")
proxy_settings={'proxy_hostname':'127.0.0.1', 'proxy_port': 12345}
pytest.skip("skipping the test in CI due to no proxy server")
proxy_settings={'proxy_hostname': '127.0.0.1', 'proxy_port': 12345}
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'], http_proxy=proxy_settings)
Expand All @@ -98,11 +114,10 @@ def test_event_hubs_client_proxy_settings(live_eventhub_config):
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])

#if not sys.platform.startswith('darwin'): # Not sure why this passes for OSX:
# with pytest.raises(errors.AMQPConnectionError):
with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=50, prefetch=50) as receive_client:
with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=5000, prefetch=50) as receive_client:
receive_client.receive_message_batch(max_batch_size=10)


def test_event_hubs_client_receive_sync(live_eventhub_config):
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
Expand All @@ -113,7 +128,7 @@ def test_event_hubs_client_receive_sync(live_eventhub_config):
live_eventhub_config['event_hub'],
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])
with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=50, prefetch=50) as receive_client:
with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=5000, prefetch=50) as receive_client:
log.info("Created client, receiving...")
with pytest.raises(ValueError):
batch = receive_client.receive_message_batch(max_batch_size=100)
Expand All @@ -123,11 +138,47 @@ def test_event_hubs_client_receive_sync(live_eventhub_config):
assert len(batch) <= 10
for message in batch:
annotations = message.annotations
log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
log.info("Sequence Number: {}, Delivery tag: {}".format(
annotations.get(b'x-opt-sequence-number'),
message.delivery_tag))
batch = receive_client.receive_message_batch(max_batch_size=10)
log.info("Finished receiving")


def test_event_hubs_client_receive_with_runtime_metric_sync(live_eventhub_config):
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])

source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub_config['hostname'],
live_eventhub_config['event_hub'],
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])

receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric'
symbol_array = [types.AMQPSymbol(receiver_runtime_metric_symbol)]
desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))

with uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=50, prefetch=50,
desired_capabilities=desired_capabilities) as receive_client:
log.info("Created client, receiving...")
with pytest.raises(ValueError):
batch = receive_client.receive_message_batch(max_batch_size=100)
batch = receive_client.receive_message_batch(max_batch_size=10)
log.info("Got batch: {}".format(len(batch)))
assert len(batch) <= 10
for message in batch:
annotations = message.annotations
delivery_annotations = message.delivery_annotations
log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
assert b'last_enqueued_sequence_number' in delivery_annotations
assert b'last_enqueued_offset' in delivery_annotations
assert b'last_enqueued_time_utc' in delivery_annotations
assert b'runtime_info_retrieval_time_utc' in delivery_annotations
log.info("Finished receiving")


def test_event_hubs_callback_receive_sync(live_eventhub_config):

def on_message_received(message):
Expand All @@ -146,7 +197,7 @@ def on_message_received(message):
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])

receive_client = uamqp.ReceiveClient(source, auth=sas_auth, timeout=10, debug=False)
receive_client = uamqp.ReceiveClient(source, auth=sas_auth, timeout=1000, debug=False)
log.info("Created client, receiving...")

receive_client.receive_messages(on_message_received)
Expand All @@ -163,7 +214,7 @@ def test_event_hubs_iter_receive_sync(live_eventhub_config):
live_eventhub_config['consumer_group'],
live_eventhub_config['partition'])

receive_client = uamqp.ReceiveClient(source, auth=sas_auth, timeout=10, debug=False, prefetch=10)
receive_client = uamqp.ReceiveClient(source, auth=sas_auth, timeout=1000, debug=False, prefetch=10)
count = 0
gen = receive_client.receive_messages_iter()
for message in gen:
Expand Down Expand Up @@ -198,7 +249,7 @@ def test_event_hubs_filter_receive(live_eventhub_config):
source = address.Source(source_url)
source.set_filter(b"amqp.annotation.x-opt-sequence-number > 1500")

with uamqp.ReceiveClient(source, auth=plain_auth, timeout=50, prefetch=50) as receive_client:
with uamqp.ReceiveClient(source, auth=plain_auth, timeout=5000, prefetch=50) as receive_client:
log.info("Created client, receiving...")
batch = receive_client.receive_message_batch(max_batch_size=10)
while batch:
Expand Down
2 changes: 1 addition & 1 deletion samples/test_azure_iothub_receive2.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _build_iothub_amqp_endpoint_from_target(target):

def _receive_message(conn, source, auth):
batch = []
receive_client = uamqp.ReceiveClient(source, auth=auth, debug=False, timeout=5, prefetch=50)
receive_client = uamqp.ReceiveClient(source, auth=auth, debug=False, timeout=5000, prefetch=50)
try:
receive_client.open(connection=conn)
batch = receive_client.receive_message_batch(max_batch_size=10)
Expand Down
14 changes: 14 additions & 0 deletions src/link.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ cdef class cLink(StructBase):
self._value_error()
return value

@property
def desired_capabilities(self):
cdef c_amqpvalue.AMQP_VALUE value
if c_link.link_get_desired_capabilities(self._c_value, &value) != 0:
self._value_error()
return value_factory(value)

cpdef do_work(self):
c_link.link_dowork(self._c_value)

cpdef set_prefetch_count(self, stdint.uint32_t prefetch):
if c_link.link_set_max_link_credit(self._c_value, prefetch) != 0:
self._value_error("Unable to set link credit.")
Expand All @@ -144,6 +154,10 @@ cdef class cLink(StructBase):
if c_link.link_set_attach_properties(self._c_value, <c_amqp_definitions.fields>properties._c_value) != 0:
self._value_error("Unable to set link attach properties.")

cpdef set_desired_capabilities(self, AMQPValue desired_capabilities):
if c_link.link_set_desired_capabilities(self._c_value, <c_amqpvalue.AMQP_VALUE>desired_capabilities._c_value) != 0:
self._value_error("Unable to set link desired capabilities.")


#### Callback

Expand Down
10 changes: 10 additions & 0 deletions src/message.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,16 @@ cdef class cMessage(StructBase):
if c_message.message_set_message_format(self._c_value, value) != 0:
self._value_error()

@property
def delivery_tag(self):
cdef c_amqpvalue.AMQP_VALUE value
if c_message.message_get_delivery_tag(self._c_value, &value) == 0:
if <void*>value == NULL:
return None
return value_factory(value)
else:
self._value_error()

cpdef add_body_data(self, bytes value):
cdef c_message.BINARY_DATA _binary
length = len(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ typedef struct SOCKETIO_CONFIG_TAG
void* accepted_socket;
} SOCKETIO_CONFIG;

#define RECEIVE_BYTES_VALUE 64
#define RECEIVE_BYTES_VALUE 1024

MOCKABLE_FUNCTION(, CONCRETE_IO_HANDLE, socketio_create, void*, io_create_parameters);
MOCKABLE_FUNCTION(, void, socketio_destroy, CONCRETE_IO_HANDLE, socket_io);
Expand Down
2 changes: 2 additions & 0 deletions src/vendor/azure-uamqp-c/inc/azure_uamqp_c/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ MOCKABLE_FUNCTION(, int, link_set_max_message_size, LINK_HANDLE, link, uint64_t,
MOCKABLE_FUNCTION(, int, link_get_max_message_size, LINK_HANDLE, link, uint64_t*, max_message_size);
MOCKABLE_FUNCTION(, int, link_get_peer_max_message_size, LINK_HANDLE, link, uint64_t*, peer_max_message_size);
MOCKABLE_FUNCTION(, int, link_set_attach_properties, LINK_HANDLE, link, fields, attach_properties);
MOCKABLE_FUNCTION(, int, link_set_desired_capabilities, LINK_HANDLE, link, AMQP_VALUE, desired_capabilities);
MOCKABLE_FUNCTION(, int, link_get_desired_capabilities, LINK_HANDLE, link, AMQP_VALUE*, desired_capabilities);
MOCKABLE_FUNCTION(, int, link_set_max_link_credit, LINK_HANDLE, link, uint32_t, max_link_credit);
MOCKABLE_FUNCTION(, int, link_get_name, LINK_HANDLE, link, const char**, link_name);
MOCKABLE_FUNCTION(, int, link_get_received_message_id, LINK_HANDLE, link, delivery_number*, message_id);
Expand Down
Loading