Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update kwargs and remove unused import
  • Loading branch information
yunhaoling committed Jul 31, 2019
commit 90fbafb4e464a520191603e045f32374dd47174a
15 changes: 3 additions & 12 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
from uamqp import (
Message,
AMQPClientAsync,
errors,
)
from uamqp import compat

from azure.eventhub.error import ConnectError
from azure.eventhub.common import parse_sas_token, EventPosition, EventHubSharedKeyCredential, EventHubSASTokenCredential
from ..client_abstract import EventHubClientAbstract

Expand Down Expand Up @@ -193,8 +190,7 @@ async def get_partition_properties(self, partition):
output['is_empty'] = partition_info[b'is_partition_empty']
return output

def create_consumer(
self, consumer_group, partition_id, event_position, **kwargs):
def create_consumer(self, consumer_group, partition_id, event_position, **kwargs):
# type: (str, str, EventPosition, int, str, int, asyncio.AbstractEventLoop) -> EventHubConsumer
"""
Create an async consumer to the client for a particular consumer group and partition.
Expand Down Expand Up @@ -240,8 +236,7 @@ def create_consumer(
prefetch=prefetch, loop=loop)
return handler

def create_producer(
self, **kwargs):
def create_producer(self, partition_id=None, operation=None, send_timeout=None, loop=None):
# type: (str, str, float, asyncio.AbstractEventLoop) -> EventHubProducer
"""
Create an async producer to send EventData object to an EventHub.
Expand All @@ -268,10 +263,6 @@ def create_producer(
:caption: Add an async producer to the client to send EventData.

"""
partition_id = kwargs.get("partition_id", None)
operation = kwargs.get("operation", None)
send_timeout = kwargs.get("send_timeout", None)
loop = kwargs.get("loop", None)

target = "amqps://{}{}".format(self.address.hostname, self.address.path)
if operation:
Expand All @@ -283,4 +274,4 @@ def create_producer(
return handler

async def close(self):
await self._conn_manager.close_connection()
await self._conn_manager.close_connection()
40 changes: 19 additions & 21 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
from typing import List
import time

from uamqp import errors, types, compat
from uamqp import errors, types
from uamqp import ReceiveClientAsync, Source

from azure.eventhub import EventData, EventPosition
from azure.eventhub.error import EventHubError, AuthenticationError, ConnectError, ConnectionLostError, _error_handler
from ..aio.error_async import _handle_exception
from azure.eventhub.error import EventHubError, ConnectError, _error_handler
from ._consumer_producer_mixin_async import ConsumerProducerMixin, _retry_decorator

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -148,19 +147,6 @@ async def _open(self, timeout_time=None):
self.source = self.redirected.address
await super(EventHubConsumer, self)._open(timeout_time)

@property
def queue_size(self):
# type: () -> int
"""
The current size of the unprocessed Event queue.

:rtype: int
"""
# pylint: disable=protected-access
if self._handler._received_messages:
return self._handler._received_messages.qsize()
return 0

@_retry_decorator
async def _receive(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
Expand All @@ -186,8 +172,21 @@ async def _receive(self, **kwargs):
data_batch.append(event_data)
return data_batch

async def receive(self, **kwargs):
# type: (...) -> List[EventData]
@property
def queue_size(self):
# type: () -> int
"""
The current size of the unprocessed Event queue.

:rtype: int
"""
# pylint: disable=protected-access
if self._handler._received_messages:
return self._handler._received_messages.qsize()
return 0

async def receive(self, max_batch_size=None, timeout=None):
# type: (int, float) -> List[EventData]
"""
Receive events asynchronously from the EventHub.

Expand Down Expand Up @@ -215,9 +214,8 @@ async def receive(self, **kwargs):
"""
self._check_closed()

max_batch_size = kwargs.get("max_batch_size", None)
timeout = kwargs.get("timeout", None) or self.client.config.receive_timeout
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
timeout = timeout or self.client.config.receive_timeout
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return await self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ def _on_outcome(self, outcome, condition):
self._outcome = outcome
self._condition = condition

async def create_batch(self, **kwargs):
async def create_batch(self, max_size=None, partition_key=None):
# type:(int, str) -> EventDataBatch
"""
Create an EventDataBatch object with max size being max_size.
The max_size should be no greater than the max allowed message size defined by the service side.
Expand All @@ -159,8 +160,6 @@ async def create_batch(self, **kwargs):
:return: an EventDataBatch instance
:rtype: ~azure.eventhub.EventDataBatch
"""
max_size = kwargs.get("max_size", None)
partition_key = kwargs.get("partition_key", None)

@_retry_decorator
async def _wrapped_open(*args, **kwargs):
Expand All @@ -175,8 +174,8 @@ async def _wrapped_open(*args, **kwargs):

return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key)

async def send(self, event_data, **kwargs):
# type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None
async def send(self, event_data, partition_key=None, timeout=None):
# type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes], float) -> None
"""
Sends an event data and blocks until acknowledgement is
received or operation times out.
Expand Down Expand Up @@ -204,8 +203,6 @@ async def send(self, event_data, **kwargs):
:caption: Sends an event data and blocks until acknowledgement is received or operation times out.

"""
partition_key = kwargs.get("partition_key", None)
timeout = kwargs.get("timeout", None)

self._check_closed()
if isinstance(event_data, EventData):
Expand Down
15 changes: 4 additions & 11 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@
from uamqp import Message
from uamqp import authentication
from uamqp import constants
from uamqp import errors
from uamqp import compat

from azure.eventhub.producer import EventHubProducer
from azure.eventhub.consumer import EventHubConsumer
from azure.eventhub.common import parse_sas_token, EventPosition
from azure.eventhub.error import ConnectError, EventHubError
from .client_abstract import EventHubClientAbstract
from .common import EventHubSASTokenCredential, EventHubSharedKeyCredential
from ._connection_manager import get_connection_manager
from .error import _handle_exception


log = logging.getLogger(__name__)


Expand Down Expand Up @@ -199,9 +197,7 @@ def get_partition_properties(self, partition):
output['is_empty'] = partition_info[b'is_partition_empty']
return output

def create_consumer(
self, consumer_group, partition_id, event_position, **kwargs
):
def create_consumer(self, consumer_group, partition_id, event_position, **kwargs):
# type: (str, str, EventPosition, ...) -> EventHubConsumer
"""
Create a consumer to the client for a particular consumer group and partition.
Expand Down Expand Up @@ -245,8 +241,8 @@ def create_consumer(
prefetch=prefetch)
return handler

def create_producer(self, **kwargs):
# type: (...) -> EventHubProducer
def create_producer(self, partition_id=None, operation=None, send_timeout=None):
# type: (str, str, float) -> EventHubProducer
"""
Create an producer to send EventData object to an EventHub.

Expand All @@ -271,9 +267,6 @@ def create_producer(self, **kwargs):
:caption: Add a producer to the client to send EventData.

"""
partition_id = kwargs.get("partition_id", None)
operation = kwargs.get("operation", None)
send_timeout = kwargs.get("send_timeout", None)

target = "amqps://{}{}".format(self.address.hostname, self.address.path)
if operation:
Expand Down
16 changes: 5 additions & 11 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import logging

from azure.eventhub.error import EventDataError
from uamqp import BatchMessage, Message, types, constants, errors
from uamqp import BatchMessage, Message, types, constants
from uamqp.message import MessageHeader, MessageProperties

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,7 +57,7 @@ class EventData(object):
PROP_TIMESTAMP = b"x-opt-enqueued-time"
PROP_DEVICE_ID = b"iothub-connection-device-id"

def __init__(self, body=None, **kwargs):
def __init__(self, body=None, to_device=None, message=None):
"""
Initialize EventData.

Expand All @@ -70,8 +70,6 @@ def __init__(self, body=None, **kwargs):
:param message: The received message.
:type message: ~uamqp.message.Message
"""
to_device = kwargs.get("to_device", None)
message = kwargs.get("message", None)

self._partition_key = types.AMQPSymbol(EventData.PROP_PARTITION_KEY)
self._annotations = {}
Expand Down Expand Up @@ -215,7 +213,7 @@ def body(self):
except TypeError:
raise ValueError("Message data empty.")

def body_as_str(self, **kwargs):
def body_as_str(self, encoding='UTF-8'):
"""
The body of the event data as a string if the data is of a
compatible type.
Expand All @@ -224,7 +222,6 @@ def body_as_str(self, **kwargs):
Default is 'UTF-8'
:rtype: str or unicode
"""
encoding = kwargs.get("encoding", 'UTF-8')
data = self.body
try:
return "".join(b.decode(encoding) for b in data)
Expand All @@ -237,15 +234,14 @@ def body_as_str(self, **kwargs):
except Exception as e:
raise TypeError("Message data is not compatible with string type: {}".format(e))

def body_as_json(self, **kwargs):
def body_as_json(self, encoding='UTF-8'):
"""
The body of the event loaded as a JSON object is the data is compatible.

:param encoding: The encoding to use for decoding message data.
Default is 'UTF-8'
:rtype: dict
"""
encoding = kwargs.get("encoding", 'UTF-8')
data_str = self.body_as_str(encoding=encoding)
try:
return json.loads(data_str)
Expand All @@ -263,9 +259,7 @@ class EventDataBatch(object):
Do not instantiate an EventDataBatch object directly.
"""

def __init__(self, **kwargs):
max_size = kwargs.get("max_size", None)
partition_key = kwargs.get("partition_key", None)
def __init__(self, max_size=None, partition_key=None):
self.max_size = max_size or constants.MAX_MESSAGE_LENGTH_BYTES
self._partition_key = partition_key
self.message = BatchMessage(data=[], multi_messages=False, properties=None)
Expand Down
38 changes: 19 additions & 19 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
from uamqp import ReceiveClient, Source

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import _error_handler, EventHubError
from azure.eventhub.error import _error_handler
from ._consumer_producer_mixin import ConsumerProducerMixin, _retry_decorator


log = logging.getLogger(__name__)


Expand Down Expand Up @@ -141,19 +142,6 @@ def _open(self, timeout_time=None):
self.source = self.redirected.address
super(EventHubConsumer, self)._open(timeout_time)

@property
def queue_size(self):
# type:() -> int
"""
The current size of the unprocessed Event queue.

:rtype: int
"""
# pylint: disable=protected-access
if self._handler._received_messages:
return self._handler._received_messages.qsize()
return 0

@_retry_decorator
def _receive(self, **kwargs):
timeout_time = kwargs.get("timeout_time")
Expand All @@ -178,8 +166,21 @@ def _receive(self, **kwargs):
data_batch.append(event_data)
return data_batch

def receive(self, **kwargs):
# type: (...) -> List[EventData]
@property
def queue_size(self):
# type:() -> int
"""
The current size of the unprocessed Event queue.

:rtype: int
"""
# pylint: disable=protected-access
if self._handler._received_messages:
return self._handler._received_messages.qsize()
return 0

def receive(self, max_batch_size=None, timeout=None):
# type: (int, float) -> List[EventData]
"""
Receive events from the EventHub.

Expand All @@ -206,9 +207,8 @@ def receive(self, **kwargs):
"""
self._check_closed()

max_batch_size = kwargs.get("max_batch_size", None)
timeout = kwargs.get("timeout", None) or self.client.config.receive_timeout
max_batch_size = min(self.client.config.max_batch_size, self.prefetch) if max_batch_size is None else max_batch_size
timeout = timeout or self.client.config.receive_timeout
max_batch_size = max_batch_size or min(self.client.config.max_batch_size, self.prefetch)
data_batch = [] # type: List[EventData]

return self._receive(timeout=timeout, max_batch_size=max_batch_size, data_batch=data_batch)
Expand Down
10 changes: 3 additions & 7 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing import Iterable, Union

from uamqp import types, constants, errors
from uamqp import compat
from uamqp import SendClient

from azure.eventhub.common import EventData, EventDataBatch
Expand Down Expand Up @@ -155,7 +154,8 @@ def _on_outcome(self, outcome, condition):
self._outcome = outcome
self._condition = condition

def create_batch(self, **kwargs):
def create_batch(self, max_size=None, partition_key=None):
# type:(int, str) -> EventDataBatch
"""
Create an EventDataBatch object with max size being max_size.
The max_size should be no greater than the max allowed message size defined by the service side.
Expand All @@ -167,8 +167,6 @@ def create_batch(self, **kwargs):
:return: an EventDataBatch instance
:rtype: ~azure.eventhub.EventDataBatch
"""
max_size = kwargs.get("max_size", None)
partition_key = kwargs.get("partition_key", None)

@_retry_decorator
def _wrapped_open(*args, **kwargs):
Expand All @@ -183,7 +181,7 @@ def _wrapped_open(*args, **kwargs):

return EventDataBatch(max_size=(max_size or self._max_message_size_on_link), partition_key=partition_key)

def send(self, event_data, **kwargs):
def send(self, event_data, partition_key=None, timeout=None):
# type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes], float) -> None
"""
Sends an event data and blocks until acknowledgement is
Expand Down Expand Up @@ -212,8 +210,6 @@ def send(self, event_data, **kwargs):
:caption: Sends an event data and blocks until acknowledgement is received or operation times out.

"""
partition_key = kwargs.get("partition_key", None)
timeout = kwargs.get("timeout", None)

self._check_closed()
if isinstance(event_data, EventData):
Expand Down