Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
review update (#97)
  • Loading branch information
yunhaoling authored Sep 30, 2019
commit 671f84527cd96be50a64b86d1a9a8682fb35d72d
80 changes: 41 additions & 39 deletions uamqp/async_ops/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,47 +217,49 @@ async def open_async(self, connection=None):
# pylint: disable=protected-access
if self._session:
return # already open
if connection:
_logger.info("Using existing connection.")
self._auth = connection.auth
self._ext_connection = True
await connection.lock_async()
self._connection = connection or self.connection_type(
self._hostname,
self._auth,
container_id=self._name,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug_trace,
loop=self.loop)
if not self._connection.cbs and isinstance(self._auth, authentication.CBSAsyncAuthMixin):
self._connection.cbs = await asyncio.shield(self._auth.create_authenticator_async(
self._connection,
try:
if connection:
_logger.info("Using existing connection.")
self._auth = connection.auth
self._ext_connection = True
await connection.lock_async()
self._connection = connection or self.connection_type(
self._hostname,
self._auth,
container_id=self._name,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug_trace,
incoming_window=self._incoming_window,
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach,
loop=self.loop))
self._session = self._auth._session
elif self._connection.cbs:
self._session = self._auth._session
else:
self._session = self.session_type(
self._connection,
incoming_window=self._incoming_window,
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach,
loop=self.loop)
if self._keep_alive_interval:
self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async(), loop=self.loop)
if self._ext_connection:
connection.release_async()
if not self._connection.cbs and isinstance(self._auth, authentication.CBSAsyncAuthMixin):
self._connection.cbs = await asyncio.shield(self._auth.create_authenticator_async(
self._connection,
debug=self._debug_trace,
incoming_window=self._incoming_window,
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach,
loop=self.loop))
self._session = self._auth._session
elif self._connection.cbs:
self._session = self._auth._session
else:
self._session = self.session_type(
self._connection,
incoming_window=self._incoming_window,
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach,
loop=self.loop)
if self._keep_alive_interval:
self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async(), loop=self.loop)
finally:
if self._ext_connection:
connection.release_async()

async def close_async(self):
"""Close the client asynchronously. This includes closing the Session
Expand Down
80 changes: 41 additions & 39 deletions uamqp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,46 +233,48 @@ def open(self, connection=None):
if self._session:
return # already open.
_logger.debug("Opening client connection.")
if connection:
_logger.debug("Using existing connection.")
self._auth = connection.auth
self._ext_connection = True
connection.lock()
self._connection = connection or self.connection_type(
self._hostname,
self._auth,
container_id=self._name,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug_trace,
encoding=self._encoding)
if not self._connection.cbs and isinstance(self._auth, authentication.CBSAuthMixin):
self._connection.cbs = self._auth.create_authenticator(
self._connection,
try:
if connection:
_logger.debug("Using existing connection.")
self._auth = connection.auth
self._ext_connection = True
connection.lock()
self._connection = connection or self.connection_type(
self._hostname,
self._auth,
container_id=self._name,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug_trace,
incoming_window=self._incoming_window,
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach)
self._session = self._auth._session
elif self._connection.cbs:
self._session = self._auth._session
else:
self._session = self.session_type(
self._connection,
incoming_window=self._incoming_window,
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach)
if self._keep_alive_interval:
self._keep_alive_thread = threading.Thread(target=self._keep_alive)
self._keep_alive_thread.start()
if self._ext_connection:
connection.release()
encoding=self._encoding)
if not self._connection.cbs and isinstance(self._auth, authentication.CBSAuthMixin):
self._connection.cbs = self._auth.create_authenticator(
self._connection,
debug=self._debug_trace,
incoming_window=self._incoming_window,
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach)
self._session = self._auth._session
elif self._connection.cbs:
self._session = self._auth._session
else:
self._session = self.session_type(
self._connection,
incoming_window=self._incoming_window,
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach)
if self._keep_alive_interval:
self._keep_alive_thread = threading.Thread(target=self._keep_alive)
self._keep_alive_thread.start()
finally:
if self._ext_connection:
connection.release()

def close(self):
"""Close the client. This includes closing the Session
Expand Down