Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
small fix on example
  • Loading branch information
yijxie committed Aug 6, 2019
commit b8a86d11507044a3a9aa6c6ce1a9399e7d9a1044
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from azure.eventhub.eventprocessor import Sqlite3PartitionManager

RECEIVE_TIMEOUT = 5 # timeout in seconds for a receiving operation. 0 or None means no timeout
RETRY_TOTAL = 3 # number of retries for receive operations
RETRY_TOTAL = 3 # max number of retries for receive operations within the receive timeout. Actual number of retries clould be less if RECEIVE_TIMEOUT is too small
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]

logging.basicConfig(level=logging.INFO)
Expand All @@ -32,20 +32,6 @@ def partition_processor_factory(checkpoint_manager):
return MyPartitionProcessor(checkpoint_manager)


async def run_until_interrupt():
client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT, retry_total=RETRY_TOTAL)
partition_manager = Sqlite3PartitionManager()
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
# You can also define a callable object for creating PartitionProcessor like below:
# event_processor = EventProcessor(client, "$default", partition_processor_factory, partition_manager)
try:
await event_processor.start()
except KeyboardInterrupt:
await event_processor.stop()
finally:
await partition_manager.close()


async def run_awhile(duration):
client = EventHubClient.from_connection_string(CONNECTION_STR, receive_timeout=RECEIVE_TIMEOUT,
retry_total=RETRY_TOTAL)
Expand All @@ -61,7 +47,4 @@ async def run_awhile(duration):

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run_until_interrupt())

# use the following code to run for 60 seconds
# loop.run_until_complete(run_awhile(60))
loop.run_until_complete(run_awhile(60))