-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Eventhubs conn #6394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eventhubs conn #6394
Conversation
|
Can one of the admins verify this patch? |
annatisch
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loving all the deleted error handling code! 👍
| c_uamqp.ConnectionState.CLOSE_RCVD, | ||
| c_uamqp.ConnectionState.CLOSE_SENT, | ||
| c_uamqp.ConnectionState.DISCARDING, | ||
| c_uamqp.ConnectionState.END, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how - but we may need to confirm whether all of these states indicate that it is safe to GC the connection. If the connection is GC'ed before it's finished cleaning up the C objects it can lead to a crash. This is something that may be revealed when we do the stress and perf tests - so worth keeping in mind for now.
| self._conn = None | ||
|
|
||
| def reset_connection_if_broken(self): | ||
| with self._lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be async with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, fixed
| transport_type=transport_type) | ||
|
|
||
| async def _handle_exception(self, exception, retry_count, max_retries): | ||
| await _handle_exception(exception, retry_count, max_retries, self, log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should be passing log here - there should be a logger defined in the error handler module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| await _handle_exception(exception, retry_count, max_retries, self, log) | ||
|
|
||
| async def _close_connection(self): | ||
| self._conn_manager.reset_connection_if_broken() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset_connection_if_broken should be an async function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, fixed
| await self._handle_exception(exception, retry_count, max_retries) | ||
| retry_count += 1 | ||
| finally: | ||
| await mgmt_client.close_async() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an existing bug in the track1 SDK - it should to be:
if mgmt_client:
await mgmt_client.close_async()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put mgmt_client definition out of try. It will only throw an exception unless there is a bug
| elif isinstance(exception, errors.MessageException): | ||
| log.error("Event data send error (%r)", exception) | ||
| error = EventDataSendError(str(exception), exception) | ||
| await closable.close(exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does a send error always result in the sender being closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventHubConsumer.close( ) no longer closes the connection in when connection is shared. It only closes the link. Anyway I've removed this statement because the link has no problem.
| raise error | ||
| async def _close_connection(self): | ||
| await self._close_handler() | ||
| await self.client._conn_manager.close_connection() # close the shared connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want a single producer to close the shared connection? What will this do to other producers/consumers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to "self.client._conn_manager.reset_connection_if_broken()". For shared connection, when a connection is not broken, a producer/consumer won't close a connection if the connection is not broken.
| self._conn_manager = get_connection_manager(**kwargs) | ||
|
|
||
| def __del__(self): | ||
| self._conn_manager.close_connection() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we're defining __del__? I don't think we do for any of our other clients...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to guard if a user forgets to close a client (connection).
There is no async del, so the async client doesn't have this.
I have no problem if we remove del for sync
| def _handle_exception(exception, retry_count, max_retries, closable, log): | ||
| type_name = type(closable).__name__ | ||
| if isinstance(exception, KeyboardInterrupt): | ||
| log.info("{} stops due to keyboard interrupt".format(type_name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging statements shouldn't use .format() - instead the should use the string formatting like on line 147:
log.error("Event data error (%r)", exception)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| ed_iter = iter(event_datas) | ||
| for ed in ed_iter: | ||
| ed._set_partition_key(partition_key) | ||
| yield ed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a stylistic thing - but I like to put internal functions like these at the top of the file to make them easy to find :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion. Fixed
…nts (#6510) * Update optional parameters in public api into kwargs and update some comments * Update more optional parameters into kwargs paramter
* Eventhubs conn (#6394) * Shared connection (sync) draft * Shared connection (sync) draft 2 * Shared connection (sync) test update * Shared connection * Fix an issue * add retry exponential delay and timeout to exception handling * put module method before class def * fixed Client.get_properties error * Improve send timeout (#6481) * Add timeout information to the link prop during link creation process (#6485) * Update optional parameters in public api into kwargs and update comments (#6510) * Update optional parameters in public api into kwargs and update some comments * Update more optional parameters into kwargs paramter * create_batch feature implementation (#6256) (#6324) * create_batch feature implementation (#6256) * Init create batch event * create_batch implementation * Revert _set_partition_key method and update comment * Refacor EventDataBatch class * Revert logic when setting partition_key of event_data * Misc fixes from code review * Rename max_message_size to max_size Other small fixes * Warn if event_data is None when call try_add * Create batch event update (#6509) * Update according to the review * Update comment * Revert some kwargs backto optional parameters as it may cause breaking changes * Small fixes (#6520) * Change back to normal number writings as not supported by python under 3.6 * small fix * Add missing return (#6522) * Fix livetest (#6523) * Eventhubs new EventProcessor (#6550) * Shared connection (sync) draft * Shared connection (sync) draft 2 * Shared connection (sync) test update * Shared connection * Fix an issue * add retry exponential delay and timeout to exception handling * put module method before class def * fixed Client.get_properties error * new eph (draft) * new eph (draft2) * remove in memory partition manager * EventProcessor draft 3 * small format change * Fix logging * Add EventProcessor example * use decorator to implement retry logic and update some tests (#6544) * Update livetest (#6547) * Remove legacy code and update livetest (#6549) * Update livetest * Remove legacy code and update livetest * make sync longrunning multi-threaded * small changes on async long running test * reset retry_count for iterator * Don't return early when open a ReceiveClient or SendClient * type annotation change * Update kwargs and remove unused import * Misc changes from EventProcessor PR review * raise asyncio.CancelledError out instead of supressing it. * Update livetest and small fixed (#6594) * Add missing close in livetest * Update livetest to wait longer * Close handler each time before retry * Fix feedback from PR (1) * Revert "Merge branch 'eventhubs_dev' into eventhubs_eph" This reverts commit 19a5539, reversing changes made to 9d18dd9. * Fix feedback from PR (2) * Update code according to the review (#6623) * Wait longer for reconnect op * Raise authentication error when open timeout * Optimize retry decorator * Update code according to review * Small fix * Fix feedback from PR (3) * small bug fixing * Remove old EPH * Update decorator implementation (#6642) * Update decorator implementation * Remove old EPH pytest * Revert "Revert "Merge branch 'eventhubs_dev' into eventhubs_eph"" This reverts commit d688090. * Update sample codes and docstring (#6643) * Check tablename to prevent sql injection * PR review update * Removed old EPH stuffs. Added new EPH stuffs. * Small fix (#6650) * Draft for changelog * Improve syntax for kwargs * keep partition manager open for call restart() again * Example to process async operations * Update version to 5.0.0b2 * fix mypy problem * fix small issue on max_retries * compatible with python < 3.7 * Update docstring of event processor * small changes on max_retries * small changes on max_retries * small changes * new EventProcessor long-running live test * change offset to text * Updating docstings, docs, samples (#6673) * Draft for updating documentations * Small improvement * Updating docstrings, docs and sample * support 3.5 type hint * fix 3.5 compatibility * Update docs (#6678) * Add a run_awhile example function * small fix on example * small fix on example * Update documentations (#6694) * Small update (#6696)
An EventHubClient has a connection manager, which creates a connection. All EventHubConsumer and EventHubProducer instances that are created by an EventHubClient share that connection.
EventHubClient.get_properties and .get_partition_properties use that shared connection too.
Exception handling code is also refactored as it's an important part of the connection / link management.
Both sync and async are updated.
There is a switch between shared connection and non-shared. Change get_connection_manager of _connection_manager.py to create a shared connection manager or separate connection manager. This switch isn't made into a parameter exposed to users. It is no longer needed after shared connection is stable.
There is a known bug possibly related to the underlying python-to-c binding.