Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e3a1c90
Remove iothub/link-redirect related code
Sep 24, 2019
e6b1246
Remove self._running from consumer and producer
Sep 24, 2019
7208b58
Remove IoT related params "operation" and "device"
Sep 24, 2019
1785ec1
Remove exception from close()
Sep 24, 2019
f28cb29
add iterator long running test
Sep 25, 2019
566c415
small bug fix
Sep 25, 2019
047b1b6
small bug fix
Sep 25, 2019
27160e6
Fix connection properties bug and format
Sep 27, 2019
d3780b3
Changed product to azure-eventhub in user agent
Sep 27, 2019
6de2f72
Fix a type hint
Sep 27, 2019
1accdc5
Merge branch 'eventhubs_preview4' of github.com:Azure/azure-sdk-for-p…
Sep 28, 2019
fb756f4
Improve stress script
Oct 1, 2019
af3d319
Print to console configurable
Oct 1, 2019
a8c915b
small changes
Oct 1, 2019
41b88a9
Disable tracking last enqueued event properties for uamqp 1.2.2
Oct 1, 2019
72be395
use different consumer group
Oct 1, 2019
a6eb524
fix an issue about consumer group
Oct 1, 2019
3249a4b
fix an issue about consumer group
Oct 1, 2019
0284f3f
Fix a get_properties bug
Oct 2, 2019
947d862
Merge branch 'eventhubs_preview4' of github.com:Azure/azure-sdk-for-p…
Oct 2, 2019
963f049
Merge branch 'eventhubs_preview4' of github.com:Azure/azure-sdk-for-p…
Oct 3, 2019
d2232fa
Re-org test folders
Oct 2, 2019
140a587
Merge branch 'eventhubs_preview4' of github.com:Azure/azure-sdk-for-p…
Oct 3, 2019
9be4a55
Re-org test folders
Oct 2, 2019
061e485
Merge branch 'eventhubs_unittest' of github.com:YijunXieMS/azure-sdk-…
Oct 3, 2019
a503485
Add unittest for EventData
Oct 3, 2019
22ffdfc
Add unittest for EventDataBatch
Oct 3, 2019
51cc231
Rename eventprocessor test folder
Oct 3, 2019
385f767
Exclude folder stress from setup.py
Oct 3, 2019
5ea1715
Change uamqp version test to use StrictVersion
Oct 3, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
from typing import List, Any
import time
from distutils.version import StrictVersion

import uamqp # type: ignore
from uamqp import errors, types, utils # type: ignore
Expand Down Expand Up @@ -132,7 +133,7 @@ def _create_handler(self):
if self._offset is not None:
source.set_filter(self._offset._selector()) # pylint:disable=protected-access

if uamqp.__version__ <= "1.2.2": # backward compatible until uamqp 1.2.3 is released
if StrictVersion(uamqp.__version__) < StrictVersion("1.2.3"): # backward compatible until uamqp 1.2.3 is released
desired_capabilities = {}
elif self._track_last_enqueued_event_properties:
symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)]
Expand Down
3 changes: 2 additions & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from uamqp import types, errors, utils # type: ignore
from uamqp import ReceiveClient, Source # type: ignore
import uamqp
from distutils.version import StrictVersion

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import _error_handler
Expand Down Expand Up @@ -129,7 +130,7 @@ def _create_handler(self):
if self._offset is not None:
source.set_filter(self._offset._selector()) # pylint:disable=protected-access

if uamqp.__version__ <= "1.2.2": # backward compatible until uamqp 1.2.3 is released
if StrictVersion(uamqp.__version__) < StrictVersion("1.2.3"): # backward compatible until uamqp 1.2.3 is released
desired_capabilities = {}
elif self._track_last_enqueued_event_properties:
symbol_array = [types.AMQPSymbol(self._receiver_runtime_metric_symbol)]
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhubs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

def pytest_addoption(parser):
parser.addoption(
"--sleep", action="store", default="True", help="sleep on reconnect test: True or False"
"--sleep", action="store", default="False", help="sleep on reconnect test: True or False"
)


Expand Down Expand Up @@ -82,7 +82,7 @@ def cleanup_eventhub(eventhub_config, hub_name, client=None):
client.delete_event_hub(hub_name)


@pytest.fixture()
@pytest.fixture(scope="session")
def live_eventhub_config():
try:
config = {}
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

exclude_packages = [
'tests',
"tests.asynctests",
'stress',
'examples',
# Exclude packages that will be covered by PEP420 or nspkg
'azure',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,6 @@ async def test_create_batch_with_invalid_hostname_async(invalid_hostname):
await sender.close()


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_create_batch_with_none_async(connection_str):
client = EventHubClient.from_connection_string(connection_str, network_tracing=False)
sender = client.create_producer()
batch_event_data = await sender.create_batch(max_size=300, partition_key="key")
try:
with pytest.raises(ValueError):
batch_event_data.try_add(EventData(None))
finally:
await sender.close()


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_create_batch_with_too_large_size_async(connection_str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ async def test_receive_over_websocket_async(connstr_senders):
@pytest.mark.asyncio
@pytest.mark.liveTest
async def test_receive_run_time_metric_async(connstr_senders):
pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.")
from uamqp import __version__ as uamqp_version
from distutils.version import StrictVersion
if StrictVersion(uamqp_version) < StrictVersion('1.2.3'):
pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.")
connection_str, senders = connstr_senders
client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket,
network_tracing=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,45 +42,22 @@ async def test_send_with_long_interval_async(connstr_receivers, sleep):
assert list(received[0].body)[0] == b"A single event"


def pump(receiver):
messages = []
with receiver:
batch = receiver.receive(timeout=1)
messages.extend(batch)
while batch:
batch = receiver.receive(timeout=1)
messages.extend(batch)
return messages


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_with_forced_conn_close_async(connstr_receivers, sleep):
pytest.skip("This test is similar to the above one")
connection_str, receivers = connstr_receivers
client = EventHubClient.from_connection_string(connection_str, network_tracing=False)
sender = client.create_producer()
try:
await sender.send(EventData(b"A single event"))
if sleep:
await asyncio.sleep(300)
else:
sender._handler._connection._conn.destroy()
await sender.send(EventData(b"A single event"))
await sender.send(EventData(b"A single event"))
if sleep:
await asyncio.sleep(300)
else:
sender._handler._connection._conn.destroy()
await sender.send(EventData(b"A single event"))
sender._handler._connection._conn.destroy()
await sender.send(EventData(b"A single event"))
finally:
await sender.close()

received = []
for r in receivers:
if not sleep:
r._handler._connection._conn.destroy()
received.extend(pump(r))
assert len(received) == 5
r._handler._connection._conn.destroy()
received.extend(r.receive(timeout=5))
assert len(received) == 2
assert list(received[0].body)[0] == b"A single event"
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@


@pytest.mark.liveTest
def test_send_with_invalid_hostname(invalid_hostname, connstr_receivers):
_, receivers = connstr_receivers
def test_send_with_invalid_hostname(invalid_hostname):
client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False)
sender = client.create_producer()
with pytest.raises(AuthenticationError):
Expand All @@ -40,8 +39,7 @@ def test_receive_with_invalid_hostname_sync(invalid_hostname):


@pytest.mark.liveTest
def test_send_with_invalid_key(invalid_key, connstr_receivers):
_, receivers = connstr_receivers
def test_send_with_invalid_key(invalid_key):
client = EventHubClient.from_connection_string(invalid_key, network_tracing=False)
sender = client.create_producer()
with pytest.raises(AuthenticationError):
Expand All @@ -60,8 +58,7 @@ def test_receive_with_invalid_key_sync(invalid_key):


@pytest.mark.liveTest
def test_send_with_invalid_policy(invalid_policy, connstr_receivers):
_, receivers = connstr_receivers
def test_send_with_invalid_policy(invalid_policy):
client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False)
sender = client.create_producer()
with pytest.raises(AuthenticationError):
Expand Down Expand Up @@ -226,16 +223,6 @@ def test_create_batch_with_invalid_hostname_sync(invalid_hostname):
sender.close()


@pytest.mark.liveTest
def test_create_batch_with_none_sync(connection_str):
client = EventHubClient.from_connection_string(connection_str, network_tracing=False)
sender = client.create_producer()
batch_event_data = sender.create_batch(max_size=300, partition_key="key")
with pytest.raises(ValueError):
batch_event_data.try_add(EventData(None))
sender.close()


@pytest.mark.liveTest
def test_create_batch_with_too_large_size_sync(connection_str):
client = EventHubClient.from_connection_string(connection_str, network_tracing=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,10 @@ def test_receive_over_websocket_sync(connstr_senders):

@pytest.mark.liveTest
def test_receive_run_time_metric(connstr_senders):
pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.")
from uamqp import __version__ as uamqp_version
from distutils.version import StrictVersion
if StrictVersion(uamqp_version) < StrictVersion('1.2.3'):
pytest.skip("Disabled for uamqp 1.2.2. Will enable after uamqp 1.2.3 is released.")
connection_str, senders = connstr_senders
client = EventHubClient.from_connection_string(connection_str, transport_type=TransportType.AmqpOverWebsocket,
network_tracing=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,18 @@ def test_send_with_long_interval_sync(connstr_receivers, sleep):

@pytest.mark.liveTest
def test_send_with_forced_conn_close_sync(connstr_receivers, sleep):
pytest.skip("This test is similar to the above one")
connection_str, receivers = connstr_receivers
client = EventHubClient.from_connection_string(connection_str, network_tracing=False)
sender = client.create_producer()
with sender:
sender.send(EventData(b"A single event"))
sender._handler._connection._conn.destroy()
if sleep:
time.sleep(300)
else:
sender._handler._connection._conn.destroy()
sender.send(EventData(b"A single event"))
sender.send(EventData(b"A single event"))
if sleep:
time.sleep(300)
else:
sender._handler._connection._conn.destroy()
sender.send(EventData(b"A single event"))
sender.send(EventData(b"A single event"))

received = []
for r in receivers:
if not sleep:
r._handler._connection._conn.destroy()
received.extend(r.receive(timeout=1))
assert len(received) == 5
received.extend(r.receive(timeout=5))
assert len(received) == 2
assert list(received[0].body)[0] == b"A single event"
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def test_send_and_receive_large_body_size(connstr_receivers):

received = []
for r in receivers:
received.extend(r.receive(timeout=4))
received.extend(r.receive(timeout=10))

assert len(received) == 1
assert len(list(received[0].body)[0]) == payload
Expand Down
43 changes: 43 additions & 0 deletions sdk/eventhub/azure-eventhubs/tests/unittest/test_event_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest
from azure.eventhub import EventData, EventDataBatch
from uamqp import Message


@pytest.mark.parametrize("test_input, expected_result",
[("", ""), ("AAA", "AAA"), (None, ValueError), (["a", "b", "c"], "abc"), (b"abc", "abc")])
def test_constructor(test_input, expected_result):
if isinstance(expected_result, type):
with pytest.raises(expected_result):
EventData(test_input)
else:
event_data = EventData(test_input)
assert event_data.body_as_str() == expected_result
assert event_data.partition_key is None
assert event_data.application_properties is None
assert event_data.enqueued_time is None
assert event_data.offset is None
assert event_data.sequence_number is None
assert event_data.system_properties == {}
with pytest.raises(TypeError):
event_data.body_as_json()


def test_body_json():
event_data = EventData('{"a":"b"}')
jo = event_data.body_as_json()
assert jo["a"] == "b"


def test_app_properties():
app_props = {"a": "b"}
event_data = EventData("")
event_data.application_properties = app_props
assert event_data.application_properties["a"] == "b"


def test_evetn_data_batch():
batch = EventDataBatch(max_size=100, partition_key="par")
batch.try_add(EventData("A"))
assert batch.size == 89 and len(batch) == 1
with pytest.raises(ValueError):
batch.try_add(EventData("A"))