Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
Change Log
============

2.11.1
+++++++

Changes
--------

* Updated default log formatter set by `pssh.utils` enable logger functions.

Fixes
------

* Using native clients under `pssh.clients.native` with very short lived commands would sometimes cause unexpected
stalls/delays in reading output from completed commands when a client ``timeout`` setting was used - #344.

2.11.0
+++++++

Expand Down
16 changes: 6 additions & 10 deletions pssh/clients/base/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ def _eagain_write_errcode(self, write_func, data, eagain, timeout=None):
rc, bytes_written = write_func(data[total_written:])
total_written += bytes_written
if rc == eagain:
self.poll(timeout=timeout)
self.poll()
sleep()

def _eagain_errcode(self, func, eagain, *args, **kwargs):
Expand Down Expand Up @@ -685,21 +685,17 @@ def _remote_paths_split(self, file_path):
if _sep > 0:
return file_path[:_sep]

def poll(self, timeout=None):
def poll(self):
raise NotImplementedError

def _poll_socket(self, events, timeout=None):
def _poll_socket(self, events):
if self.sock is None:
return
# gevent.select.poll converts seconds to miliseconds to match python socket
# implementation
timeout = timeout * 1000 if timeout is not None else 100
poller = poll()
poller.register(self.sock, eventmask=events)
poller.poll(timeout=timeout)
poller.poll(timeout=1)

def _poll_errcodes(self, directions_func, inbound, outbound, timeout=None):
timeout = self.timeout if timeout is None else timeout
def _poll_errcodes(self, directions_func, inbound, outbound):
directions = directions_func()
if directions == 0:
return
Expand All @@ -708,4 +704,4 @@ def _poll_errcodes(self, directions_func, inbound, outbound, timeout=None):
events = POLLIN
if directions & outbound:
events |= POLLOUT
self._poll_socket(events, timeout=timeout)
self._poll_socket(events)
2 changes: 1 addition & 1 deletion pssh/clients/native/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,12 +726,12 @@ def poll(self, timeout=None):

Blocks current greenlet only if socket has pending read or write operations
in the appropriate direction.
:param timeout: Deprecated and unused - to be removed.
"""
self._poll_errcodes(
self.session.block_directions,
LIBSSH2_SESSION_BLOCK_INBOUND,
LIBSSH2_SESSION_BLOCK_OUTBOUND,
timeout=timeout,
)

def _eagain_write(self, write_func, data, timeout=None):
Expand Down
5 changes: 1 addition & 4 deletions pssh/clients/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

try:
from io import BytesIO
except ImportError:
from cStringIO import StringIO as BytesIO
from io import BytesIO

from gevent import sleep
from gevent.event import Event
Expand Down
7 changes: 4 additions & 3 deletions pssh/clients/ssh/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def execute(self, cmd, use_pty=False, channel=None):

def _read_output_to_buffer(self, channel, _buffer, is_stderr=False):
while True:
self.poll(timeout=self.timeout)
self.poll()
try:
size, data = channel.read_nonblocking(is_stderr=is_stderr)
except EOF:
Expand Down Expand Up @@ -316,12 +316,13 @@ def close_channel(self, channel):
self._eagain(channel.close, timeout=self.timeout)

def poll(self, timeout=None):
"""ssh-python based co-operative gevent poll on session socket."""
"""ssh-python based co-operative gevent poll on session socket.
:param timeout: Deprecated and unused - to be removed.
"""
self._poll_errcodes(
self.session.get_poll_flags,
SSH_READ_PENDING,
SSH_WRITE_PENDING,
timeout=timeout,
)

def _eagain(self, func, *args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion pssh/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def enable_logger(_logger, level=logging.INFO):
logger.warning("Logger already has a StreamHandler attached")
return
handler = logging.StreamHandler()
host_log_format = logging.Formatter('%(message)s')
host_log_format = logging.Formatter('%(asctime)s %(levelname)-8s %(name)-15s %(message)s')
handler.setFormatter(host_log_format)
_logger.addHandler(handler)

Expand Down
14 changes: 14 additions & 0 deletions tests/native/test_single_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,20 @@ def test_copy_remote_dir_encoding(self):
]
self.assertListEqual(remote_file_mock.call_args_list, call_args)

def test_many_short_lived_commands(self):
for _ in range(20):
timeout = 2
start = datetime.now()
client = SSHClient(self.host, port=self.port,
pkey=self.user_key,
num_retries=1,
allow_agent=False,
timeout=timeout)
host_out = client.run_command(self.cmd)
_ = list(host_out.stdout)
end = datetime.now() - start
duration = end.total_seconds()
self.assertTrue(duration < timeout * 0.9, msg=f"Duration of instant cmd is {duration}")

# TODO
# * read output callback