Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
599896f
remove async_ops
May 18, 2019
384e802
EventHubs track2 starter (#5330)
YijunXieMS May 21, 2019
24219aa
Eventhubs track2 python main issues (#5575)
YijunXieMS Jun 3, 2019
b77d019
Error hierarchy, sample code and docstring (#5743)
YijunXieMS Jun 14, 2019
87c11dd
Fix missing consumer group directory in EPH
Jun 17, 2019
89bb40e
Fix livetest code problem
Jun 17, 2019
06a6ebf
history and readme
Jun 17, 2019
edeaffd
Update history and setup (#5902)
yunhaoling Jun 17, 2019
106976f
EventPosition.first_available_event -> earliest
Jun 18, 2019
72e420e
Change EventSender's event_position to be mandatory
Jun 18, 2019
f353da8
Update uamqp shared_req to 1.2.0
Jun 18, 2019
4df0aaf
Disable network_tracing
Jun 18, 2019
a8c7a50
update uamqp dependency ~=1.2.0
Jun 18, 2019
daf3d71
Remove EventPosition helper functions
Jun 19, 2019
fc0ce06
Names changed to EventHubConsumer/Producer
Jun 19, 2019
a73bf7b
Avoid nested with statement
Jun 19, 2019
e47ce29
Skip forced reconnect test
Jun 19, 2019
ef3ac56
Update naming in eventhub (consumer and producer). (#5984)
yunhaoling Jun 20, 2019
86af983
Remove azure-core requirement tentatively
Jun 20, 2019
981fc99
Warn when eventhubs or storage teardown fails
Jun 20, 2019
3074c79
add type hints comments
Jun 20, 2019
dbf7001
add azure identity in dev_requirements
Jun 21, 2019
e7ff951
put TransportType in __init__ directly
Jun 21, 2019
9ce3f8e
change EventData's offset to be str, not EventPosition
Jun 21, 2019
95bba82
remove from_iot_connection_string
Jun 21, 2019
b3a6bb4
small fix
Jun 21, 2019
437a6fe
docstring timeout from int to float
Jun 21, 2019
168e522
fix TransportType import
Jun 21, 2019
17c748f
add pytest option sleep for reconnect test
Jun 21, 2019
cb1017f
Fix eventposition issue in receive test
Jun 21, 2019
1288a8b
fix constants import issue
Jun 21, 2019
f03fee1
remove azure identity dev_req
Jun 21, 2019
980b4f0
fix some example issues
Jun 21, 2019
4e2a6e1
iot string fix
Jun 21, 2019
7a5aa4d
Change filename for consumer and producer
Jun 21, 2019
613a6c4
Vendor azure-storage-blob in eventprocesshost (#6018)
yunhaoling Jun 21, 2019
002a50f
Add vendored blob-storage required pkg
Jun 22, 2019
cb5f979
Vendor storage 2.0.1 within EPH (#6031)
yunhaoling Jun 22, 2019
f6dcf2d
catch exception for mgmt_request
Jun 23, 2019
c6ae04a
Update comment and code structure (#6042)
yunhaoling Jun 23, 2019
ce8b0c9
code review changes
Jun 24, 2019
3f808f9
Merge branch 'master' into eventhubs_dev
Jun 24, 2019
8786b9a
Add python-dateutil in shared_requirements
Jun 24, 2019
041e36b
Add aad credential env var to tests.yml
Jun 24, 2019
9a4c14c
Change example code assertion for parallel running
Jun 24, 2019
5abdcd7
Enable iothub receive test case
Jun 24, 2019
c0990d2
Revert "Enable iothub receive test case"
Jun 24, 2019
65bfec5
fix auth test error
Jun 24, 2019
6d017e1
change offset.value to offset
Jun 24, 2019
94c2eec
Fix an eventposition problem
Jun 24, 2019
59e57b7
Remove path append
Jun 24, 2019
fe6459b
trying removing the module init py within tests
scbedd Jun 24, 2019
cbbbfc4
Separate MockEventProcessor to a different file
Jun 24, 2019
4b44f0c
remove tests to path
Jun 24, 2019
41ae796
trying a run based on a nested conftest to establish the async fixtur…
scbedd Jun 24, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Error hierarchy, sample code and docstring (#5743)
* Recover from fork repo

* Packaging update of azure-eventhubs

* Fix error message

* update iterator example

* Revert "Packaging update of azure-eventhubs"

This reverts commit 56fc4f0.

* disable autorest auto update

* Sender/Receiver -> EventSender/Receiver

* Change _batching_label back to partition_key

* Remove transfer examples

* move async to async folder

* Update docstring string, sample codes and test codes (#5793)

* catch and process LinkRedirect

* Add receiver iterator pytest

* small fix of iterator example

* add retrieval_time to partition prop

* fix open and re-send bugs

* small fixes

* fix reconnect test case

* close iterator when closing receiver

* Misc changes for code review fix

* client.py type hints

* catch KeyboardInterrupt

* add next() for 2.7 iterator

* raise KeyboardInterrupt instead of exit()
  • Loading branch information
YijunXieMS authored Jun 14, 2019
commit b77d019f884ee8ff3cef524f968ddbdaedbd628b
22 changes: 12 additions & 10 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,33 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.3.1"
__version__ = "2.0.0-preview.1"

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, AuthenticationError
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
AuthenticationError, EventDataSendError, ConnectionLostError
from azure.eventhub.client import EventHubClient
from azure.eventhub.sender import Sender
from azure.eventhub.receiver import Receiver
from azure.eventhub.sender import EventSender
from azure.eventhub.receiver import EventReceiver
from .constants import MessageSendResult
from .constants import TransportType
from .common import FIRST_AVAILABLE, NEW_EVENTS_ONLY, SharedKeyCredentials, SASTokenCredentials
from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential

__all__ = [
"__version__",
"EventData",
"EventHubError",
"ConnectError",
"ConnectionLostError",
"EventDataError",
"EventDataSendError",
"AuthenticationError",
"EventPosition",
"EventHubClient",
"Sender",
"Receiver",
"EventSender",
"EventReceiver",
"MessageSendResult",
"TransportType",
"FIRST_AVAILABLE", "NEW_EVENTS_ONLY",
"SharedKeyCredentials",
"SASTokenCredentials",
"EventHubSharedKeyCredential",
"EventHubSASTokenCredential",
]
8 changes: 4 additions & 4 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from .event_hubs_client_async import EventHubClient
from .receiver_async import Receiver
from .sender_async import Sender
from .receiver_async import EventReceiver
from .sender_async import EventSender

__all__ = [
"EventHubClient",
"Receiver",
"Sender"
"EventReceiver",
"EventSender"
]
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
AMQPClientAsync,
)

from azure.eventhub.common import parse_sas_token, SharedKeyCredentials, SASTokenCredentials
from azure.eventhub.common import parse_sas_token, EventPosition, EventHubSharedKeyCredential, EventHubSASTokenCredential
from azure.eventhub import (
EventHubError)
from ..client_abstract import EventHubClientAbstract

from .sender_async import Sender
from .receiver_async import Receiver
from .sender_async import EventSender
from .receiver_async import EventReceiver


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,7 +56,7 @@ def _create_auth(self, username=None, password=None):
transport_type = self.config.transport_type
auth_timeout = self.config.auth_timeout

if isinstance(self.credential, SharedKeyCredentials):
if isinstance(self.credential, EventHubSharedKeyCredential):
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
Expand All @@ -66,7 +66,7 @@ def _create_auth(self, username=None, password=None):
self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy,
transport_type=transport_type)

elif isinstance(self.credential, SASTokenCredentials):
elif isinstance(self.credential, EventHubSASTokenCredential):
token = self.credential.get_sas_token()
try:
expiry = int(parse_sas_token(token)['se'])
Expand All @@ -85,10 +85,14 @@ def _create_auth(self, username=None, password=None):
get_jwt_token, http_proxy=http_proxy,
transport_type=transport_type)


async def get_properties(self):
"""
Get details on the specified EventHub async.
Get properties of the specified EventHub async.
Keys in the details dictionary include:

-'path'
-'created_at'
-'partition_ids'

:rtype: dict
"""
Expand Down Expand Up @@ -117,21 +121,25 @@ async def get_properties(self):
await mgmt_client.close_async()

async def get_partition_ids(self):
"""
Get partition ids of the specified EventHub async.

:rtype: list[str]
"""
return (await self.get_properties())['partition_ids']

async def get_partition_properties(self, partition):
"""
Get information on the specified partition async.
Get properties of the specified partition async.
Keys in the details dictionary include:

-'name'
-'type'
-'partition'
-'begin_sequence_number'
-'event_hub_path'
-'id'
-'beginning_sequence_number'
-'last_enqueued_sequence_number'
-'last_enqueued_offset'
-'last_enqueued_time_utc'
-'is_partition_empty'
-'is_empty'

:param partition: The target partition id.
:type partition: str
Expand Down Expand Up @@ -168,23 +176,27 @@ async def get_partition_properties(self, partition):
await mgmt_client.close_async()

def create_receiver(
self, partition_id, consumer_group="$Default", event_position=None, exclusive_receiver_priority=None, operation=None,
prefetch=None, loop=None):
self, partition_id, consumer_group="$Default", event_position=EventPosition.first_available_event(), exclusive_receiver_priority=None,
operation=None, prefetch=None, loop=None):
"""
Add an async receiver to the client for a particular consumer group and partition.
Create an async receiver to the client for a particular consumer group and partition.

:param consumer_group: The name of the consumer group.
:param partition_id: The ID of the partition.
:type partition_id: str
:param consumer_group: The name of the consumer group. Default value is `$Default`.
:type consumer_group: str
:param partition: The ID of the partition.
:type partition: str
:param event_position: The position from which to start receiving.
:type event_position: ~azure.eventhub.common.EventPosition
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:operation: An optional operation to be appended to the hostname in the source URL.
:param exclusive_receiver_priority: The priority of the exclusive receiver. The client will create an exclusive
receiver if exclusive_receiver_priority is set.
:type exclusive_receiver_priority: int
:param operation: An optional operation to be appended to the hostname in the source URL.
The value must start with `/` character.
:type operation: str
:rtype: ~azure.eventhub.aio.receiver_async.ReceiverAsync
:param prefetch: The message prefetch count of the receiver. Default is 300.
:type prefetch: int
:param loop: An event loop. If not specified the default event loop will be used.
:rtype: ~azure.eventhub.aio.receiver_async.EventReceiver

Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
Expand All @@ -200,35 +212,30 @@ def create_receiver(
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition_id)
handler = Receiver(
self, source_url, offset=event_position, exclusive_receiver_priority=exclusive_receiver_priority,
handler = EventReceiver(
self, source_url, event_position=event_position, exclusive_receiver_priority=exclusive_receiver_priority,
prefetch=prefetch, loop=loop)
return handler

def create_sender(
self, partition_id=None, operation=None, send_timeout=None, loop=None):
"""
Add an async sender to the client to send ~azure.eventhub.common.EventData object
Create an async sender to the client to send ~azure.eventhub.common.EventData object
to an EventHub.

:param partition: Optionally specify a particular partition to send to.
:param partition_id: Optionally specify a particular partition to send to.
If omitted, the events will be distributed to available partitions via
round-robin.
:type partition: str
:operation: An optional operation to be appended to the hostname in the target URL.
:type partition_id: str
:param operation: An optional operation to be appended to the hostname in the target URL.
The value must start with `/` character.
:type operation: str
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
:type send_timeout: int
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
periods of inactivity. The default value is 30 seconds. If set to `None`, the connection will not
be pinged.
:type keep_alive: int
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
Default value is `True`.
:type auto_reconnect: bool
:rtype: ~azure.eventhub.aio.sender_async.SenderAsync
:type send_timeout: float
:param loop: An event loop. If not specified the default event loop will be used.
:rtype ~azure.eventhub.aio.sender_async.EventSender


Example:
.. literalinclude:: ../examples/async_examples/test_examples_eventhub_async.py
Expand All @@ -245,6 +252,6 @@ def create_sender(
target = target + operation
send_timeout = self.config.send_timeout if send_timeout is None else send_timeout

handler = Sender(
handler = EventSender(
self, target, partition=partition_id, send_timeout=send_timeout, loop=loop)
return handler
Loading