Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add a run_awhile example function
  • Loading branch information
yijxie committed Aug 6, 2019
commit b0c69c4d9ae537bba38ab56a637eeb528d7306fa
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from azure.eventhub.eventprocessor import PartitionProcessor
from azure.eventhub.eventprocessor import Sqlite3PartitionManager

TEST_DURATION = 60 # seconds
RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3 # number of retries for receive operations
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
Expand All @@ -33,19 +32,36 @@ def partition_processor_factory(checkpoint_manager):
return MyPartitionProcessor(checkpoint_manager)


async def main():
async def run_until_interrupt():
client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL)
partition_manager = Sqlite3PartitionManager()
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
# You can also define a callable object for creating PartitionProcessor like below:
# event_processor = EventProcessor(client, "$default", partition_processor_factory, partition_manager)
await event_processor.start()
except KeyboardInterrupt:
await event_processor.stop()
finally:
await partition_manager.close()


async def run_awhile(duration):
client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT,
retry_total=RETRY_TOTAL)
partition_manager = Sqlite3PartitionManager()
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
try:
asyncio.ensure_future(event_processor.start())
await asyncio.sleep(TEST_DURATION)
await asyncio.sleep(duration)
await event_processor.stop()
finally:
await partition_manager.close()


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_until_complete(run_until_interrupt())

# use the following code to run for 60 seconds
# loop.run_until_complete(run_awhile(60))