Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
64da8ed
Small changes from code review
Aug 23, 2019
d951dcf
change EventData.msg_properties to private attribute
Aug 26, 2019
8bbac25
remove abstract method
Aug 27, 2019
70a33d0
code clean 1
Aug 28, 2019
abbdd25
code clean 2
Aug 28, 2019
b45d6b3
Fix pylint
Aug 29, 2019
247004a
Fix pylint
Aug 29, 2019
6ace6ce
Use properties EventData.partition_key
Aug 29, 2019
008421d
Small changes from code review
Aug 23, 2019
b8c027d
change EventData.msg_properties to private attribute
Aug 26, 2019
2489dd3
remove abstract method
Aug 27, 2019
3a2d72f
code clean 1
Aug 28, 2019
9735756
code clean 2
Aug 28, 2019
288617e
Fix pylint
Aug 29, 2019
2bdbffe
Fix pylint
Aug 29, 2019
e8ea699
Use properties EventData.partition_key
Aug 29, 2019
889597c
Merge branch 'eventhubs_preview3' of github.com:Azure/azure-sdk-for-p…
Aug 29, 2019
cb08478
Use properties EventData.partition_key
Aug 29, 2019
b3dcd07
Temporarily disable pylint errors that need refactoring
Aug 29, 2019
b85e6cc
fix pylint errors
Aug 29, 2019
92feb09
Merge branch 'master' into eventhubs_preview3
Aug 29, 2019
5e51ce2
fix pylint errors
Aug 30, 2019
726bf6f
ignore eventprocessor pylint temporarily
Aug 30, 2019
ffd8cb0
small pylint adjustment
Aug 30, 2019
2f69d65
Merge branch 'master' into eventhubs_preview3
Aug 30, 2019
e5c8d1c
Add typing for Python2.7
Aug 30, 2019
e85ac17
[EventHub] IoTHub management operations improvement and bug fixing (#…
yunhaoling Sep 2, 2019
1fb341b
[EventHub] Retry refactor (#7026)
yunhaoling Sep 3, 2019
7762130
add system_properties to EventData
Sep 3, 2019
1b10d00
Fix a small bug
Sep 4, 2019
13237b5
Refine example code
Sep 4, 2019
998eeed
Update receive method (#7064)
yunhaoling Sep 4, 2019
e13ddee
Update accessibility of class (#7091)
yunhaoling Sep 6, 2019
f616f37
Update samples and codes according to the review (#7098)
yunhaoling Sep 6, 2019
dad5baa
Python EventHubs load balancing (#6901)
YijunXieMS Sep 7, 2019
8e7e1c1
Fix a pylint error
Sep 7, 2019
13a8fe7
Eventhubs blobstorage checkpointstore merge to preview3 (#7109)
YijunXieMS Sep 7, 2019
b5c933f
exclude eventprocessor test for python27
Sep 7, 2019
7b0f5fe
exclude eventprocessor test
Sep 7, 2019
167361e
Revert "Eventhubs blobstorage checkpointstore merge to preview3 (#7109)"
Sep 7, 2019
1253983
Fix small problem in consumer iterator (#7110)
yunhaoling Sep 7, 2019
548a989
Fixed an issue that initializes partition processor multiple times
Sep 8, 2019
725b333
Update release history for 5.0.0b3
Sep 9, 2019
c359042
Update README for 5.0.0b3
Sep 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix pylint
  • Loading branch information
yijxie committed Aug 29, 2019
commit b45d6b3647403b41791d625b2b683d18cd5f7838
3 changes: 1 addition & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
# --------------------------------------------------------------------------------------------

__version__ = "5.0.0b2"

from uamqp import constants # type: ignore
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
AuthenticationError, EventDataSendError, ConnectionLostError
from azure.eventhub.client import EventHubClient
from azure.eventhub.producer import EventHubProducer
from azure.eventhub.consumer import EventHubConsumer
from uamqp import constants # type: ignore
from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential

TransportType = constants.TransportType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from uamqp import Connection, TransportType, c_uamqp # type: ignore


class _SharedConnectionManager(object):
class _SharedConnectionManager(object): #pylint:disable=too-many-instance-attributes
def __init__(self, **kwargs):
self._lock = RLock()
self._conn = None # type: Connection
Expand Down Expand Up @@ -50,11 +50,11 @@ def close_connection(self):

def reset_connection_if_broken(self):
with self._lock:
if self._conn and self._conn._state in (
c_uamqp.ConnectionState.CLOSE_RCVD,
c_uamqp.ConnectionState.CLOSE_SENT,
c_uamqp.ConnectionState.DISCARDING,
c_uamqp.ConnectionState.END,
if self._conn and self._conn._state in ( # pylint:disable=protected-access
c_uamqp.ConnectionState.CLOSE_RCVD, # pylint:disable=c-extension-no-member
c_uamqp.ConnectionState.CLOSE_SENT, # pylint:disable=c-extension-no-member
c_uamqp.ConnectionState.DISCARDING, # pylint:disable=c-extension-no-member
c_uamqp.ConnectionState.END, # pylint:disable=c-extension-no-member
):
self._conn = None

Expand All @@ -63,7 +63,7 @@ class _SeparateConnectionManager(object):
def __init__(self, **kwargs):
pass

def get_connection(self, host, auth):
def get_connection(self, host, auth): # pylint:disable=unused-argument, no-self-use
return None

def close_connection(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def wrapped_func(self, *args, **kwargs):
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
except Exception as exception: # pylint:disable=broad-except
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time) # pylint:disable=protected-access
retry_count += 1
return wrapped_func

Expand Down Expand Up @@ -89,7 +89,7 @@ def _close_handler(self):

def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken()
self.client._conn_manager.reset_connection_if_broken() # pylint: disable=protected-access

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from uamqp.async_ops import ConnectionAsync # type: ignore


class _SharedConnectionManager(object):
class _SharedConnectionManager(object): # pylint:disable=too-many-instance-attributes
def __init__(self, **kwargs):
self._lock = Lock()
self._conn = None
Expand Down Expand Up @@ -51,11 +51,11 @@ async def close_connection(self):

async def reset_connection_if_broken(self):
async with self._lock:
if self._conn and self._conn._state in (
c_uamqp.ConnectionState.CLOSE_RCVD,
c_uamqp.ConnectionState.CLOSE_SENT,
c_uamqp.ConnectionState.DISCARDING,
c_uamqp.ConnectionState.END,
if self._conn and self._conn._state in ( # pylint:disable=protected-access
c_uamqp.ConnectionState.CLOSE_RCVD, # pylint:disable=c-extension-no-member
c_uamqp.ConnectionState.CLOSE_SENT, # pylint:disable=c-extension-no-member
c_uamqp.ConnectionState.DISCARDING, # pylint:disable=c-extension-no-member
c_uamqp.ConnectionState.END, # pylint:disable=c-extension-no-member
):
self._conn = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ async def wrapped_func(self, *args, **kwargs):
last_exception = None
while True:
try:
return await to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
return await to_be_wrapped_func(
self, timeout_time=timeout_time, last_exception=last_exception, **kwargs
)
except Exception as exception: # pylint:disable=broad-except
last_exception = await self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
return wrapped_func
Expand Down Expand Up @@ -90,7 +92,7 @@ async def _close_handler(self):

async def _close_connection(self):
await self._close_handler()
await self.client._conn_manager.reset_connection_if_broken()
await self.client._conn_manager.reset_connection_if_broken() # pylint:disable=protected-access

async def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
Expand Down Expand Up @@ -133,4 +135,4 @@ async def close(self, exception=None):
else:
self.error = EventHubError("This receive handler is now closed.")
if self._handler:
await self._handler.close_async()
await self._handler.close_async()
31 changes: 23 additions & 8 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@
import datetime
import functools
import asyncio
from typing import Any, List, Dict
from typing import Any, List, Dict, Union, TYPE_CHECKING

from uamqp import authentication, constants # type: ignore
from uamqp import (
Message,
AMQPClientAsync,
) # type: ignore

from azure.eventhub.common import parse_sas_token, EventPosition, EventHubSharedKeyCredential, EventHubSASTokenCredential
from azure.eventhub.common import parse_sas_token, EventPosition, \
EventHubSharedKeyCredential, EventHubSASTokenCredential
from ..client_abstract import EventHubClientAbstract

from .producer_async import EventHubProducer
from .consumer_async import EventHubConsumer
from ._connection_manager_async import get_connection_manager
from .error_async import _handle_exception

if TYPE_CHECKING:
from azure.core.credentials import TokenCredential # type: ignore

log = logging.getLogger(__name__)

Expand All @@ -42,7 +45,9 @@ class EventHubClient(EventHubClientAbstract):
"""

def __init__(self, host, event_hub_path, credential, **kwargs):
super(EventHubClient, self).__init__(host, event_hub_path, credential, **kwargs)
# type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], ...) -> None

super(EventHubClient, self).__init__(host=host, event_hub_path=event_hub_path, credential=credential, **kwargs)
self._conn_manager = get_connection_manager(**kwargs)

async def __aenter__(self):
Expand All @@ -65,7 +70,7 @@ def _create_auth(self, username=None, password=None):
transport_type = self.config.transport_type
auth_timeout = self.config.auth_timeout

if isinstance(self.credential, EventHubSharedKeyCredential):
if isinstance(self.credential, EventHubSharedKeyCredential): # pylint:disable=no-else-return
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
Expand Down Expand Up @@ -116,7 +121,7 @@ async def _management_request(self, mgmt_msg, op_type):
status_code_field=b'status-code',
description_fields=b'status-description')
return response
except Exception as exception:
except Exception as exception: # pylint:disable=broad-except
await self._handle_exception(exception, retry_count, max_retries)
retry_count += 1
finally:
Expand Down Expand Up @@ -190,7 +195,12 @@ async def get_partition_properties(self, partition):
output['is_empty'] = partition_info[b'is_partition_empty']
return output

def create_consumer(self, consumer_group: str, partition_id: str, event_position: EventPosition, **kwargs):
def create_consumer(
self,
consumer_group: str,
partition_id: str,
event_position: EventPosition, **kwargs
) -> EventHubConsumer:
"""
Create an async consumer to the client for a particular consumer group and partition.

Expand Down Expand Up @@ -234,8 +244,13 @@ def create_consumer(self, consumer_group: str, partition_id: str, event_position
prefetch=prefetch, loop=loop)
return handler

def create_producer(self, *, partition_id=None, operation=None, send_timeout=None, loop=None):
# type: (str, str, float, asyncio.AbstractEventLoop) -> EventHubProducer
def create_producer(
self, *,
partition_id: str = None,
operation: str = None,
send_timeout: float = None,
loop: asyncio.AbstractEventLoop = None
) -> EventHubProducer:
"""
Create an async producer to send EventData object to an EventHub.

Expand Down
20 changes: 10 additions & 10 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
log = logging.getLogger(__name__)


class EventHubConsumer(ConsumerProducerMixin):
class EventHubConsumer(ConsumerProducerMixin): # pylint:disable=too-many-instance-attributes
"""
A consumer responsible for reading EventData from a specific Event Hub
partition and as a member of a specific consumer group.
Expand Down Expand Up @@ -100,21 +100,21 @@ async def __anext__(self):
if not self.messages_iter:
self.messages_iter = self._handler.receive_messages_iter_async()
message = await self.messages_iter.__anext__()
event_data = EventData._from_message(message)
event_data = EventData._from_message(message) # pylint:disable=protected-access
self.offset = EventPosition(event_data.offset, inclusive=False)
retry_count = 0
return event_data
except Exception as exception:
await self._handle_exception(exception, retry_count, max_retries)
except Exception as exception: # pylint:disable=broad-except
await self._handle_exception(exception, retry_count, max_retries, timeout_time=None)
retry_count += 1

def _create_handler(self):
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
"username": self.client._auth_config.get("iot_username"), # pylint:disable=protected-access
"password": self.client._auth_config.get("iot_password")} # pylint:disable=protected-access
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset._selector())
source.set_filter(self.offset._selector()) # pylint:disable=protected-access
self._handler = ReceiveClientAsync(
source,
auth=self.client.get_auth(**alt_creds),
Expand All @@ -125,8 +125,8 @@ def _create_handler(self):
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client._create_properties(
self.client.config.user_agent), # pylint: disable=protected-access
properties=self.client._create_properties( # pylint:disable=protected-access
self.client.config.user_agent),
loop=self.loop)
self.messages_iter = None

Expand Down Expand Up @@ -164,7 +164,7 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
max_batch_size=max_batch_size,
timeout=remaining_time_ms)
for message in message_batch:
event_data = EventData._from_message(message)
event_data = EventData._from_message(message) # pylint:disable=protected-access
self.offset = EventPosition(event_data.offset)
data_batch.append(event_data)
return data_batch
Expand Down
32 changes: 18 additions & 14 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/error_async.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import asyncio
import time
import logging
Expand Down Expand Up @@ -32,20 +36,20 @@ def _create_eventhub_exception(exception):
return error


async def _handle_exception(exception, retry_count, max_retries, closable, timeout_time=None):
async def _handle_exception(exception, retry_count, max_retries, closable, timeout_time=None): # pylint:disable=too-many-branches, too-many-statements
if isinstance(exception, asyncio.CancelledError):
raise
raise exception
try:
name = closable.name
except AttributeError:
name = closable.container_id
if isinstance(exception, KeyboardInterrupt):
if isinstance(exception, KeyboardInterrupt): # pylint:disable=no-else-raise
log.info("%r stops due to keyboard interrupt", name)
closable.close()
raise
raise exception
elif isinstance(exception, EventHubError):
closable.close()
raise
raise exception
elif isinstance(exception, (
errors.MessageAccepted,
errors.MessageAlreadySettled,
Expand All @@ -68,29 +72,29 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo
else:
if isinstance(exception, errors.AuthenticationException):
if hasattr(closable, "_close_connection"):
await closable._close_connection()
await closable._close_connection() # pylint:disable=protected-access
elif isinstance(exception, errors.LinkRedirect):
log.info("%r link redirect received. Redirecting...", name)
redirect = exception
if hasattr(closable, "_redirect"):
await closable._redirect(redirect)
await closable._redirect(redirect) # pylint:disable=protected-access
elif isinstance(exception, errors.LinkDetach):
if hasattr(closable, "_close_handler"):
await closable._close_handler()
await closable._close_handler() # pylint:disable=protected-access
elif isinstance(exception, errors.ConnectionClose):
if hasattr(closable, "_close_connection"):
await closable._close_connection()
await closable._close_connection() # pylint:disable=protected-access
elif isinstance(exception, errors.MessageHandlerError):
if hasattr(closable, "_close_handler"):
await closable._close_handler()
await closable._close_handler() # pylint:disable=protected-access
elif isinstance(exception, errors.AMQPConnectionError):
if hasattr(closable, "_close_connection"):
await closable._close_connection()
await closable._close_connection() # pylint:disable=protected-access
elif isinstance(exception, compat.TimeoutException):
pass # Timeout doesn't need to recreate link or connection to retry
else:
if hasattr(closable, "_close_connection"):
await closable._close_connection()
await closable._close_connection() # pylint:disable=protected-access
# start processing retry delay
try:
backoff_factor = closable.client.config.backoff_factor
Expand All @@ -99,12 +103,12 @@ async def _handle_exception(exception, retry_count, max_retries, closable, timeo
backoff_factor = closable.config.backoff_factor
backoff_max = closable.config.backoff_max
backoff = backoff_factor * 2 ** retry_count
if backoff <= backoff_max and (timeout_time is None or time.time() + backoff <= timeout_time):
if backoff <= backoff_max and (timeout_time is None or time.time() + backoff <= timeout_time): # pylint:disable=no-else-return
await asyncio.sleep(backoff)
log.info("%r has an exception (%r). Retrying...", format(name), exception)
return _create_eventhub_exception(exception)
else:
error = _create_eventhub_exception(exception)
log.info("%r operation has timed out. Last exception before timeout is (%r)", name, error)
raise error
# end of processing retry delay
# end of processing retry delay
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid
import asyncio
import logging
from typing import Iterable, Union
from typing import Iterable, Union, Any
import time

from uamqp import types, constants, errors # type: ignore
Expand Down Expand Up @@ -185,7 +185,7 @@ async def create_batch(self, max_size=None, partition_key=None):
return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key)

async def send(self, event_data, *, partition_key=None, timeout=None):
# type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes], float) -> None
# type:(Union[EventData, EventDataBatch, Iterable[EventData]],Any, Union[str, bytes], float) -> None
"""
Sends an event data and blocks until acknowledgement is
received or operation times out.
Expand Down
Loading