Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
120 commits
Select commit Hold shift + click to select a range
3a32907
Draft EventProcessor Loadbalancing
Aug 22, 2019
39b1b86
EventProcessor Load balancing
Aug 22, 2019
64da8ed
Small changes from code review
Aug 23, 2019
17f5153
small changes from bryan's review
Aug 23, 2019
04ef548
remove checkpoint manager from initialize
Aug 23, 2019
9be1741
small changes
Aug 23, 2019
d951dcf
change EventData.msg_properties to private attribute
Aug 26, 2019
8bbac25
remove abstract method
Aug 27, 2019
875841e
initial blob storage
Aug 28, 2019
70a33d0
code clean 1
Aug 28, 2019
6df7253
fix leaking connection str
Aug 28, 2019
abbdd25
code clean 2
Aug 28, 2019
b45d6b3
Fix pylint
Aug 29, 2019
247004a
Fix pylint
Aug 29, 2019
6ace6ce
Use properties EventData.partition_key
Aug 29, 2019
008421d
Small changes from code review
Aug 23, 2019
b8c027d
change EventData.msg_properties to private attribute
Aug 26, 2019
2489dd3
remove abstract method
Aug 27, 2019
3a2d72f
code clean 1
Aug 28, 2019
9735756
code clean 2
Aug 28, 2019
288617e
Fix pylint
Aug 29, 2019
2bdbffe
Fix pylint
Aug 29, 2019
e8ea699
Use properties EventData.partition_key
Aug 29, 2019
1b5753c
Draft EventProcessor Loadbalancing
Aug 22, 2019
b4b77f9
EventProcessor Load balancing
Aug 22, 2019
1787fdd
small changes from bryan's review
Aug 23, 2019
c2d0155
remove checkpoint manager from initialize
Aug 23, 2019
1074385
small changes
Aug 23, 2019
386baf0
Fix code review feedback
Aug 29, 2019
889597c
Merge branch 'eventhubs_preview3' of github.com:Azure/azure-sdk-for-p…
Aug 29, 2019
cb08478
Use properties EventData.partition_key
Aug 29, 2019
b3dcd07
Temporarily disable pylint errors that need refactoring
Aug 29, 2019
b85e6cc
fix pylint errors
Aug 29, 2019
92feb09
Merge branch 'master' into eventhubs_preview3
Aug 29, 2019
1afbf0c
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
c126bea
Packaging update of azure-mgmt-datalake-analytics
AutorestCI Aug 30, 2019
40c7f03
Packaging update of azure-loganalytics
AutorestCI Aug 30, 2019
cf22c7c
Packaging update of azure-mgmt-storage
AutorestCI Aug 30, 2019
5e51ce2
fix pylint errors
Aug 30, 2019
726bf6f
ignore eventprocessor pylint temporarily
Aug 30, 2019
c7440b2
Merge branch 'eventhubs_preview3' into eventhubs_yx
Aug 30, 2019
fa804f4
code review fixes and pylint error
Aug 30, 2019
470cf7e
Merge branch 'eventhubs_yx' of github.com:Azure/azure-sdk-for-python …
Aug 30, 2019
ffd8cb0
small pylint adjustment
Aug 30, 2019
e5f3b50
reduce dictionary access
Aug 30, 2019
27cb0bf
initial blob storage
Aug 28, 2019
f6d77e7
fix leaking connection str
Aug 28, 2019
2f69d65
Merge branch 'master' into eventhubs_preview3
Aug 30, 2019
e5c8d1c
Add typing for Python2.7
Aug 30, 2019
32833b3
Merge branch 'eventhubs_blobstorage' of github.com:Azure/azure-sdk-fo…
Aug 30, 2019
e85ac17
[EventHub] IoTHub management operations improvement and bug fixing (#…
yunhaoling Sep 2, 2019
da6199f
Change test polling to 5 sec
Sep 2, 2019
e6a7c5e
Add async lock to ensure etag consistency
Sep 2, 2019
ebc4362
Add dependency to azure-storage
Sep 2, 2019
1503604
Remove dependency on PartitionManager of azure-eventhub
Sep 2, 2019
c9707c4
Fix azure-storage-blob requirement error
Sep 2, 2019
8343876
Revert "Packaging update of azure-mgmt-storage"
Sep 2, 2019
66c5b31
Revert "Packaging update of azure-loganalytics"
Sep 2, 2019
bcd851a
Revert "Packaging update of azure-mgmt-datalake-analytics"
Sep 2, 2019
d7b2606
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 2, 2019
d740bb0
Trivial code change
Sep 2, 2019
778ab66
Add docstring to BlobPartitionManager
Sep 2, 2019
017d9f0
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 2, 2019
1fb341b
[EventHub] Retry refactor (#7026)
yunhaoling Sep 3, 2019
aad6978
Refine exception handling for eventprocessor
Sep 3, 2019
a55dc13
Enable pylint for eventprocessor
Sep 3, 2019
a339985
Expose OwnershipLostError
Sep 3, 2019
9bed566
Refine exception handling
Sep 3, 2019
b878002
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 3, 2019
7762130
add system_properties to EventData
Sep 3, 2019
1b10d00
Fix a small bug
Sep 4, 2019
13237b5
Refine example code
Sep 4, 2019
cbc6792
handle exception for claim_ownership and update_checkpoint
Sep 4, 2019
8748e1f
Add license info
Sep 4, 2019
97e0558
Restructure packages
Sep 4, 2019
e61d6a1
Lock each ownership with a separate lock
Sep 4, 2019
9102713
Move eventprocessor to aio
Sep 4, 2019
278592c
change checkpoint_manager to partition context
Sep 4, 2019
665f28c
fix pylint error
Sep 4, 2019
998eeed
Update receive method (#7064)
yunhaoling Sep 4, 2019
b03cc64
Merge branch 'eventhubs_yx' into eventhubs_blobstorage
Sep 4, 2019
db93fd4
Re-org namespace package structure
Sep 4, 2019
2050615
raise error while list_ownership got an exception
Sep 5, 2019
2781062
Restructure package structure
Sep 5, 2019
8a32e44
replace checkpointer with checkpointstore as a part of package name
Sep 6, 2019
e13ddee
Update accessibility of class (#7091)
yunhaoling Sep 6, 2019
f616f37
Update samples and codes according to the review (#7098)
yunhaoling Sep 6, 2019
dad5baa
Python EventHubs load balancing (#6901)
YijunXieMS Sep 7, 2019
8e7e1c1
Fix a pylint error
Sep 7, 2019
88ca853
Merge remote-tracking branch 'central/eventhubs_preview3' into eventh…
Sep 7, 2019
b32417b
Merge remote-tracking branch 'central/eventhubs_preview3' into eventh…
Sep 7, 2019
667f0b0
remove duplicated partition manager
Sep 7, 2019
f28365c
Fix a bug in list_ownership
Sep 7, 2019
74f39ce
Add pytest for blob partition manager
Sep 8, 2019
9959dc0
remove conftest.py from blob partition manager
Sep 8, 2019
1fd2243
Cache BlobClient instead of using ContainerClient to improve performance
Sep 9, 2019
b0e27a3
fix a list_ownership bug
Sep 9, 2019
c062fe0
add python requires
Sep 9, 2019
2b78446
Small fix
Sep 9, 2019
5bd2420
Change azure storage blob dependency version
Sep 9, 2019
05bdf04
Merge branch 'master' into eventhubs_blobstorage
Sep 9, 2019
b5af820
universal=0 by definition
Sep 9, 2019
74391a4
remove azure-eventhubs from dev requirement
Sep 9, 2019
f4f38bd
Update HISTORY
Sep 10, 2019
feffcfd
Update README
Sep 10, 2019
bb71c4a
Update README
Sep 10, 2019
2a41d06
empty init file under folder extensions to align with azure-eventhub
Sep 10, 2019
4553bba
Update readme.md
yunhaoling Sep 10, 2019
6938fc2
Fix a link issue
Sep 10, 2019
ca49bd8
Merge branch 'eventhubs_blobstorage' of github.com:YijunXieMS/azure-s…
Sep 10, 2019
2939ba2
fix a class name issue
Sep 10, 2019
89c99d1
add azure-eventhubs in dev_requirement
Sep 11, 2019
fd8ecdf
Revert "add azure-eventhubs in dev_requirement"
Sep 11, 2019
f7b85b4
Merge branch 'master' into eventhubs_blobstorage
Sep 11, 2019
74bd105
add azure-eventhubs in dev_requirement
Sep 11, 2019
c45bba9
Merge branch 'master' into eventhubs_blobstorage
Sep 11, 2019
f1053d2
Update azure-eventhub dependency to 5.0.0b3
Sep 11, 2019
da9f415
override azure-storage-blob version for azure-eventhubs-checkpointsto…
Sep 11, 2019
5eaa826
Add azure-eventhub in shared_requirements.txt
Sep 11, 2019
06896c0
Add extensions in manifest.in
Sep 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[EventHub] Retry refactor (#7026)
* Retry refactor

* Refactor retry, delay and handle exception

* Remove unused module

* Small fix

* Small fix
  • Loading branch information
yunhaoling authored Sep 3, 2019
commit 1fb341b6c203b6f73fa0a78cd85c87d0f66c4a43
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,6 @@
log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
Expand All @@ -55,9 +37,9 @@ def _redirect(self, redirect):
self.running = False
self._close_connection()

def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
Open the EventHubConsumer/EventHubProducer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.

Expand Down Expand Up @@ -91,12 +73,36 @@ def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return _handle_exception(exception, self)

return _handle_exception(exception, self)

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# pylint:disable=protected-access
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self.client.config.max_retries:
try:
if operation_need_param:
return operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
else:
return operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception)
self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self.name)
retried_times += 1

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)
log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception)
raise last_exception

def close(self, exception=None):
# type:(Exception) -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,6 @@
log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
async def wrapped_func(self, *args, **kwargs): # pylint:disable=unused-argument # TODO: to refactor
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return await to_be_wrapped_func(
self, timeout_time=timeout_time, last_exception=last_exception, **kwargs
)
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):

def __init__(self):
Expand All @@ -58,7 +38,7 @@ async def _redirect(self, redirect):
self.running = False
await self._close_connection()

async def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to refactor
async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
Expand Down Expand Up @@ -94,12 +74,36 @@ async def _close_connection(self):
await self._close_handler()
await self.client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access

async def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
async def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)
return await _handle_exception(exception, self)

return await _handle_exception(exception, self)

async def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# pylint:disable=protected-access
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
retried_times = 0
last_exception = kwargs.pop('last_exception', None)
operation_need_param = kwargs.pop('operation_need_param', True)

while retried_times <= self.client.config.max_retries:
try:
if operation_need_param:
return await operation(timeout_time=timeout_time, last_exception=last_exception, **kwargs)
else:
return await operation()
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception)
await self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
timeout_time=timeout_time, entity_name=self.name)
retried_times += 1

return await _handle_exception(exception, retry_count, max_retries, self, timeout_time)
log.info("%r has exhausted retry. Exception still occurs (%r)", self.name, last_exception)
raise last_exception

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
32 changes: 20 additions & 12 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# --------------------------------------------------------------------------------------------
import logging
import datetime
import time
import functools
import asyncio

Expand Down Expand Up @@ -100,23 +101,29 @@ def _create_auth(self, username=None, password=None):
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)

async def _handle_exception(self, exception, retry_count, max_retries):
await _handle_exception(exception, retry_count, max_retries, self)

async def _close_connection(self):
await self._conn_manager.reset_connection_if_broken()

async def _management_request(self, mgmt_msg, op_type):
if self._is_iothub and not self._iothub_redirect_info:
await self._iothub_redirect()
async def _try_delay(self, retried_times, last_exception, timeout_time=None, entity_name=None):
entity_name = entity_name or self.container_id
backoff = self.config.backoff_factor * 2 ** retried_times
if backoff <= self.config.backoff_max and (
timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return
asyncio.sleep(backoff)
log.info("%r has an exception (%r). Retrying...", format(entity_name), last_exception)
else:
log.info("%r operation has timed out. Last exception before timeout is (%r)",
entity_name, last_exception)
raise last_exception

async def _management_request(self, mgmt_msg, op_type):
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password": self._auth_config.get("iot_password")
}
max_retries = self.config.max_retries
retry_count = 0
while True:

retried_times = 0
while retried_times <= self.config.max_retries:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.config.network_tracing)
try:
Expand All @@ -130,8 +137,9 @@ async def _management_request(self, mgmt_msg, op_type):
description_fields=b'status-description')
return response
except Exception as exception: # pylint:disable=broad-except
await self._handle_exception(exception, retry_count, max_retries)
retry_count += 1
last_exception = await _handle_exception(exception, self)
await self._try_delay(retried_times=retried_times, last_exception=last_exception)
retried_times += 1
finally:
await mgmt_client.close_async()

Expand All @@ -144,7 +152,7 @@ async def _iothub_redirect(self):
event_position=EventPosition('-1'),
operation='/messages/events')
async with self._redirect_consumer:
await self._redirect_consumer._open_with_retry(timeout=self.config.receive_timeout) # pylint: disable=protected-access
await self._redirect_consumer._open_with_retry() # pylint: disable=protected-access
self._redirect_consumer = None

async def get_properties(self):
Expand Down
34 changes: 17 additions & 17 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from azure.eventhub import EventData, EventPosition
from azure.eventhub.error import EventHubError, ConnectError, _error_handler
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator
from ._consumer_producer_mixin_async import ConsumerProducerMixin

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -92,21 +92,22 @@ def __aiter__(self):
return self

async def __anext__(self):
max_retries = self.client.config.max_retries
retry_count = 0
while True:
retried_times = 0
while retried_times < self.client.config.max_retries:
try:
await self._open()
if not self.messages_iter:
self.messages_iter = self._handler.receive_messages_iter_async()
message = await self.messages_iter.__anext__()
event_data = EventData._from_message(message) # pylint:disable=protected-access
self.offset = EventPosition(event_data.offset, inclusive=False)
retry_count = 0
retried_times = 0
return event_data
except Exception as exception: # pylint:disable=broad-except
await self._handle_exception(exception, retry_count, max_retries, timeout_time=None)
retry_count += 1
last_exception = await self._handle_exception(exception)
await self.client._try_delay(retried_times=retried_times, last_exception=last_exception,
entity_name=self.name)
retried_times += 1

def _create_handler(self):
alt_creds = {
Expand Down Expand Up @@ -136,7 +137,7 @@ async def _redirect(self, redirect):
self.messages_iter = None
await super(EventHubConsumer, self)._redirect(redirect)

async def _open(self, timeout_time=None, **kwargs):
async def _open(self):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
Expand All @@ -149,17 +150,16 @@ async def _open(self, timeout_time=None, **kwargs):
if not self.running and self.redirected:
self.client._process_redirect_uri(self.redirected)
self.source = self.redirected.address
await super(EventHubConsumer, self)._open(timeout_time)
await super(EventHubConsumer, self)._open()

@_retry_decorator
async def _open_with_retry(self, timeout_time=None, **kwargs):
return await self._open(timeout_time=timeout_time, **kwargs)
async def _open_with_retry(self):
return await self._do_retryable_operation(self._open, operation_need_param=False)

async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
last_exception = kwargs.get("last_exception")
data_batch = kwargs.get("data_batch")

await self._open(timeout_time)
await self._open()
remaining_time = timeout_time - time.time()
if remaining_time <= 0.0:
if last_exception:
Expand All @@ -177,9 +177,9 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
data_batch.append(event_data)
return data_batch

@_retry_decorator
async def _receive_with_try(self, timeout_time=None, max_batch_size=None, **kwargs):
return await self._receive(timeout_time=timeout_time, max_batch_size=max_batch_size, **kwargs)
async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs):
return await self._do_retryable_operation(self._receive, timeout=timeout,
max_batch_size=max_batch_size, **kwargs)

@property
def queue_size(self):
Expand Down Expand Up @@ -227,7 +227,7 @@ async def receive(self, *, max_batch_size=None, timeout=None):
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return await self._receive_with_try(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
28 changes: 4 additions & 24 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def _create_eventhub_exception(exception):
return error


async def _handle_exception(exception, retry_count, max_retries, closable, timeout_time=None): # pylint:disable=too-many-branches, too-many-statements
async def _handle_exception(exception, closable): # pylint:disable=too-many-branches, too-many-statements
if isinstance(exception, asyncio.CancelledError):
raise exception
try:
Expand All @@ -45,10 +45,10 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo
name = closable.container_id
if isinstance(exception, KeyboardInterrupt): # pylint:disable=no-else-raise
log.info("%r stops due to keyboard interrupt", name)
closable.close()
await closable.close()
raise exception
elif isinstance(exception, EventHubError):
closable.close()
await closable.close()
raise exception
elif isinstance(exception, (
errors.MessageAccepted,
Expand All @@ -65,10 +65,6 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo
log.info("%r Event data send error (%r)", name, exception)
error = EventDataSendError(str(exception), exception)
raise error
elif retry_count >= max_retries:
error = _create_eventhub_exception(exception)
log.info("%r has exhausted retry. Exception still occurs (%r)", name, exception)
raise error
else:
if isinstance(exception, errors.AuthenticationException):
if hasattr(closable, "_close_connection"):
Expand All @@ -95,20 +91,4 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo
else:
if hasattr(closable, "_close_connection"):
await closable._close_connection() # pylint:disable=protected-access
# start processing retry delay
try:
backoff_factor = closable.client.config.backoff_factor
backoff_max = closable.client.config.backoff_max
except AttributeError:
backoff_factor = closable.config.backoff_factor
backoff_max = closable.config.backoff_max
backoff = backoff_factor * 2 ** retry_count
if backoff <= backoff_max and (timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return
await asyncio.sleep(backoff)
log.info("%r has an exception (%r). Retrying...", format(name), exception)
return _create_eventhub_exception(exception)
else:
error = _create_eventhub_exception(exception)
log.info("%r operation has timed out. Last exception before timeout is (%r)", name, error)
raise error
# end of processing retry delay
return _create_eventhub_exception(exception)
Loading