Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Small fixes (#6520)
* Change back to normal number writings as not supported by python under 3.6

* small fix
  • Loading branch information
yunhaoling authored Jul 29, 2019
commit e4be05ec4fb46a1f63592c9231670825cf5f1fb3
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def get_connection(self, host, auth):
async def close_connection(self):
pass

def reset_connection_if_broken(self):
async def reset_connection_if_broken(self):
pass


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async def receive(self, **kwargs):
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
timeout = self.client.config.receive_timeout if timeout is None else timeout
if not timeout:
timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout

data_batch = []
start_time = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def _open(self, timeout_time=None):
async def _send_event_data(self, timeout=None):
timeout = timeout or self.client.config.send_timeout
if not timeout:
timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
start_time = time.time()
timeout_time = start_time + timeout
max_retries = self.client.config.max_retries
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def receive(self, **kwargs):
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
timeout = self.client.config.receive_timeout if timeout is None else timeout
if not timeout:
timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout

data_batch = [] # type: List[EventData]
start_time = time.time()
Expand Down
3 changes: 1 addition & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ class EventHubError(Exception):
:vartype details: dict[str, str]
"""

def __init__(self, message, **kwargs):
details = kwargs.get("details", None)
def __init__(self, message, details=None):
self.error = None
self.message = message
self.details = details
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def _open(self, timeout_time=None):
def _send_event_data(self, timeout=None):
timeout = timeout or self.client.config.send_timeout
if not timeout:
timeout = 100_000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
timeout = 100000 # timeout None or 0 mean no timeout. 100000 seconds is equivalent to no timeout
start_time = time.time()
timeout_time = start_time + timeout
max_retries = self.client.config.max_retries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def test_client_secret_credential_async(aad_credential, live_eventhub):

async with receiver:

received = await receiver.receive(timeout=1)
received = await receiver.receive(timeout=3)
assert len(received) == 0

async with sender:
Expand All @@ -40,7 +40,7 @@ async def test_client_secret_credential_async(aad_credential, live_eventhub):

await asyncio.sleep(1)

received = await receiver.receive(timeout=1)
received = await receiver.receive(timeout=3)

assert len(received) == 1
assert list(received[0].body)[0] == 'A single message'.encode('utf-8')
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def test_send_with_invalid_hostname_async(invalid_hostname, connstr_receiv
client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False)
sender = client.create_producer()
with pytest.raises(AuthenticationError):
await sender._open()
await sender.send(EventData("test data"))


@pytest.mark.liveTest
Expand All @@ -38,7 +38,7 @@ async def test_receive_with_invalid_hostname_async(invalid_hostname):
client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False)
sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"))
with pytest.raises(AuthenticationError):
await sender._open()
await sender.send(EventData("test data"))


@pytest.mark.liveTest
Expand All @@ -48,7 +48,7 @@ async def test_send_with_invalid_key_async(invalid_key, connstr_receivers):
client = EventHubClient.from_connection_string(invalid_key, network_tracing=False)
sender = client.create_producer()
with pytest.raises(AuthenticationError):
await sender._open()
await sender.send(EventData("test data"))


@pytest.mark.liveTest
Expand All @@ -57,7 +57,7 @@ async def test_receive_with_invalid_key_async(invalid_key):
client = EventHubClient.from_connection_string(invalid_key, network_tracing=False)
sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"))
with pytest.raises(AuthenticationError):
await sender._open()
await sender.send(EventData("test data"))


@pytest.mark.liveTest
Expand All @@ -67,7 +67,7 @@ async def test_send_with_invalid_policy_async(invalid_policy, connstr_receivers)
client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False)
sender = client.create_producer()
with pytest.raises(AuthenticationError):
await sender._open()
await sender.send(EventData("test data"))


@pytest.mark.liveTest
Expand All @@ -76,7 +76,7 @@ async def test_receive_with_invalid_policy_async(invalid_policy):
client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False)
sender = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"))
with pytest.raises(AuthenticationError):
await sender._open()
await sender.send(EventData("test data"))


@pytest.mark.liveTest
Expand All @@ -88,7 +88,7 @@ async def test_send_partition_key_with_partition_async(connection_str):
try:
data = EventData(b"Data")
with pytest.raises(ValueError):
await sender.send(data)
await sender.send(EventData("test data"))
finally:
await sender.close()

Expand All @@ -99,7 +99,7 @@ async def test_non_existing_entity_sender_async(connection_str):
client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False)
sender = client.create_producer(partition_id="1")
with pytest.raises(AuthenticationError):
await sender._open()
await sender.send(EventData("test data"))


@pytest.mark.liveTest
Expand All @@ -108,35 +108,31 @@ async def test_non_existing_entity_receiver_async(connection_str):
client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False)
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"))
with pytest.raises(AuthenticationError):
await receiver._open()
await receiver.receive(timeout=5)


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_receive_from_invalid_partitions_async(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
partitions = ["XYZ", "-1", "1000", "-"]
for p in partitions:
client = EventHubClient.from_connection_string(connection_str, network_tracing=False)
receiver = client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1"))
try:
with pytest.raises(ConnectError):
await receiver.receive(timeout=10)
finally:
await receiver.close()
with pytest.raises(ConnectError):
await receiver.receive(timeout=10)
await receiver.close()


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_to_invalid_partitions_async(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
partitions = ["XYZ", "-1", "1000", "-"]
for p in partitions:
client = EventHubClient.from_connection_string(connection_str, network_tracing=False)
sender = client.create_producer(partition_id=p)
try:
with pytest.raises(ConnectError):
await sender._open()
finally:
await sender.close()
with pytest.raises(ConnectError):
await sender.send(EventData("test data"))
await sender.close()


@pytest.mark.liveTest
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhubs/tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ def test_client_secret_credential(aad_credential, live_eventhub):
receiver = client.create_consumer(consumer_group="$default", partition_id='0', event_position=EventPosition("@latest"))

with receiver:
received = receiver.receive(timeout=1)
received = receiver.receive(timeout=3)
assert len(received) == 0

with sender:
event = EventData(body='A single message')
sender.send(event)
time.sleep(1)

received = receiver.receive(timeout=1)
received = receiver.receive(timeout=3)

assert len(received) == 1
assert list(received[0].body)[0] == 'A single message'.encode('utf-8')
22 changes: 15 additions & 7 deletions sdk/eventhub/azure-eventhubs/tests/test_negative.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def test_receive_with_invalid_hostname_sync(invalid_hostname):
client = EventHubClient.from_connection_string(invalid_hostname, network_tracing=False)
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"))
with pytest.raises(AuthenticationError):
receiver.receive(timeout=3)
receiver.receive(timeout=5)
receiver.close()


@pytest.mark.liveTest
Expand All @@ -44,14 +45,16 @@ def test_send_with_invalid_key(invalid_key, connstr_receivers):
sender = client.create_producer()
with pytest.raises(AuthenticationError):
sender.send(EventData("test data"))

sender.close()

@pytest.mark.liveTest
def test_receive_with_invalid_key_sync(invalid_key):
client = EventHubClient.from_connection_string(invalid_key, network_tracing=False)
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"))

with pytest.raises(AuthenticationError):
receiver.receive(timeout=3)
receiver.receive(timeout=10)
receiver.close()


@pytest.mark.liveTest
Expand All @@ -61,14 +64,16 @@ def test_send_with_invalid_policy(invalid_policy, connstr_receivers):
sender = client.create_producer()
with pytest.raises(AuthenticationError):
sender.send(EventData("test data"))
sender.close()


@pytest.mark.liveTest
def test_receive_with_invalid_policy_sync(invalid_policy):
client = EventHubClient.from_connection_string(invalid_policy, network_tracing=False)
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"))
with pytest.raises(AuthenticationError):
receiver.receive(timeout=3)
receiver.receive(timeout=5)
receiver.close()


@pytest.mark.liveTest
Expand Down Expand Up @@ -97,13 +102,16 @@ def test_non_existing_entity_sender(connection_str):
def test_non_existing_entity_receiver(connection_str):
client = EventHubClient.from_connection_string(connection_str, event_hub_path="nemo", network_tracing=False)
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"))

with pytest.raises(AuthenticationError):
receiver.receive(timeout=3)
receiver.receive(timeout=5)
receiver.close()



@pytest.mark.liveTest
def test_receive_from_invalid_partitions_sync(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
partitions = ["XYZ", "-1", "1000", "-"]
for p in partitions:
client = EventHubClient.from_connection_string(connection_str, network_tracing=False)
receiver = client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1"))
Expand All @@ -116,7 +124,7 @@ def test_receive_from_invalid_partitions_sync(connection_str):

@pytest.mark.liveTest
def test_send_to_invalid_partitions(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
partitions = ["XYZ", "-1", "1000", "-"]
for p in partitions:
client = EventHubClient.from_connection_string(connection_str, network_tracing=False)
sender = client.create_producer(partition_id=p)
Expand Down