Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make sync longrunning multi-threaded
  • Loading branch information
yijxie committed Jul 31, 2019
commit fbb66bd95e3fdda1574f7b294e351a97ab3d1772
62 changes: 30 additions & 32 deletions sdk/eventhub/azure-eventhubs/tests/test_longrunning_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import time
import os
import sys
import threading
import pytest

from logging.handlers import RotatingFileHandler
Expand All @@ -22,6 +23,7 @@
from azure.eventhub import EventHubClient
from azure.eventhub import EventHubSharedKeyCredential


def get_logger(filename, level=logging.INFO):
azure_logger = logging.getLogger("azure.eventhub")
azure_logger.setLevel(level)
Expand All @@ -47,38 +49,33 @@ def get_logger(filename, level=logging.INFO):
logger = get_logger("recv_test.log", logging.INFO)


def get_partitions(args):
eh_data = args.get_properties()
return eh_data["partition_ids"]


def pump(receivers, duration):
def pump(receiver, duration):
total = 0
iteration = 0
deadline = time.time() + duration
try:
while time.time() < deadline:
for pid, receiver in receivers.items():
with receiver:
try:
while time.time() < deadline:
batch = receiver.receive(timeout=5)
size = len(batch)
total += size
iteration += 1
if size == 0:
print("{}: No events received, queue size {}, delivered {}".format(
pid,
receiver.partition,
receiver.queue_size,
total))
elif iteration >= 50:
elif iteration >= 5:
iteration = 0
print("{}: total received {}, last sn={}, last offset={}".format(
pid,
receiver.partition,
total,
batch[-1].sequence_number,
batch[-1].offset))
print("Total received {}".format(total))
except Exception as e:
print("EventHubConsumer failed: {}".format(e))
raise
print("{}: Total received {}".format(receiver.partition, total))
except Exception as e:
print("EventHubConsumer failed: {}".format(e))
raise


@pytest.mark.liveTest
Expand Down Expand Up @@ -112,22 +109,23 @@ def test_long_running_receive(connection_str):
except ImportError:
raise ValueError("Must specify either '--conn-str' or '--address'")

try:
if not args.partitions:
partitions = get_partitions(client)
else:
partitions = args.partitions.split(",")
pumps = {}
for pid in partitions:
pumps[pid] = client.create_consumer(consumer_group="$default",
partition_id=pid,
event_position=EventPosition(args.offset),
prefetch=50)
pump(pumps, args.duration)
finally:
for pid in partitions:
pumps[pid].close()
if args.partitions:
partitions = args.partitions.split(",")
else:
partitions = client.get_partition_ids()

threads = []
for pid in partitions:
consumer = client.create_consumer(consumer_group="$default",
partition_id=pid,
event_position=EventPosition(args.offset),
prefetch=300)
thread = threading.Thread(target=pump, args=(consumer, args.duration))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()


if __name__ == '__main__':
test_long_running_receive(os.environ.get('EVENT_HUB_CONNECTION_STR'))
test_long_running_receive(os.environ.get('EVENT_HUB_PERF_CONN_STR'))
44 changes: 24 additions & 20 deletions sdk/eventhub/azure-eventhubs/tests/test_longrunning_send.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
import time
import os
import sys
import threading
import logging
import pytest
from logging.handlers import RotatingFileHandler

from azure.eventhub import EventHubClient, EventHubProducer, EventData, EventHubSharedKeyCredential
from azure.eventhub import EventHubClient, EventDataBatch, EventData, EventHubSharedKeyCredential


def get_logger(filename, level=logging.INFO):
Expand All @@ -42,34 +43,30 @@ def get_logger(filename, level=logging.INFO):

return azure_logger

logger = get_logger("send_test.log", logging.INFO)


def check_send_successful(outcome, condition):
if outcome.value != 0:
print("Send failed {}".format(condition))
logger = get_logger("send_test.log", logging.INFO)


def main(client, args):
sender = client.create_producer()
def send(sender, args):
# sender = client.create_producer()
deadline = time.time() + args.duration
total = 0

try:
with sender:
event_list = []
batch = sender.create_batch()
while time.time() < deadline:
data = EventData(body=b"D" * args.payload)
event_list.append(data)
total += 1
if total % 100 == 0:
sender.send(event_list)
event_list = []
print("Send total {}".format(total))
try:
batch.try_add(data)
total += 1
except ValueError:
sender.send(batch, timeout=0)
print("Sent total {} of partition {}".format(total, sender.partition))
batch = sender.create_batch()
except Exception as err:
print("Send failed {}".format(err))
print("Partition {} send failed {}".format(sender.partition, err))
raise
print("Sent total {}".format(total))
print("Sent total {} of partition {}".format(total, sender.partition))


@pytest.mark.liveTest
Expand Down Expand Up @@ -105,10 +102,17 @@ def test_long_running_send(connection_str):
raise ValueError("Must specify either '--conn-str' or '--address'")

try:
main(client, args)
partition_ids = client.get_partition_ids()
threads = []
for pid in partition_ids:
sender = client.create_producer(partition_id=pid)
thread = threading.Thread(target=send, args=(sender, args))
thread.start()
threads.append(thread)
thread.join()
except KeyboardInterrupt:
pass


if __name__ == '__main__':
test_long_running_send(os.environ.get('EVENT_HUB_CONNECTION_STR'))
test_long_running_send(os.environ.get('EVENT_HUB_PERF_CONN_STR'))