diff --git a/Directory.Packages.props b/Directory.Packages.props
index 8138199d5..7348e14c5 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -4,18 +4,18 @@
false
-
+
-
+
-
+
-
+
diff --git a/src/Renci.SshNet/Abstractions/SocketAbstraction.cs b/src/Renci.SshNet/Abstractions/SocketAbstraction.cs
index 69ec38b26..63dc2bf54 100644
--- a/src/Renci.SshNet/Abstractions/SocketAbstraction.cs
+++ b/src/Renci.SshNet/Abstractions/SocketAbstraction.cs
@@ -12,34 +12,6 @@ namespace Renci.SshNet.Abstractions
{
internal static partial class SocketAbstraction
{
- public static bool CanRead(Socket socket)
- {
- if (socket.Connected)
- {
- return socket.Poll(-1, SelectMode.SelectRead) && socket.Available > 0;
- }
-
- return false;
- }
-
- ///
- /// Returns a value indicating whether the specified can be used
- /// to send data.
- ///
- /// The to check.
- ///
- /// if can be written to; otherwise, .
- ///
- public static bool CanWrite(Socket socket)
- {
- if (socket != null && socket.Connected)
- {
- return socket.Poll(-1, SelectMode.SelectWrite);
- }
-
- return false;
- }
-
public static Socket Connect(IPEndPoint remoteEndpoint, TimeSpan connectTimeout)
{
var socket = new Socket(remoteEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp) { NoDelay = true };
diff --git a/src/Renci.SshNet/Common/Extensions.cs b/src/Renci.SshNet/Common/Extensions.cs
index b7a97d067..1d3749c7b 100644
--- a/src/Renci.SshNet/Common/Extensions.cs
+++ b/src/Renci.SshNet/Common/Extensions.cs
@@ -3,6 +3,7 @@
using System.Globalization;
#if !NET
using System.IO;
+using System.Threading.Tasks;
#endif
using System.Net;
using System.Net.Sockets;
@@ -10,7 +11,6 @@
using System.Runtime.CompilerServices;
using System.Threading;
-using Renci.SshNet.Abstractions;
using Renci.SshNet.Messages;
namespace Renci.SshNet.Common
@@ -319,16 +319,6 @@ public static byte[] Concat(this byte[] first, byte[] second)
return concat;
}
- internal static bool CanRead(this Socket socket)
- {
- return SocketAbstraction.CanRead(socket);
- }
-
- internal static bool CanWrite(this Socket socket)
- {
- return SocketAbstraction.CanWrite(socket);
- }
-
internal static bool IsConnected(this Socket socket)
{
if (socket is null)
@@ -409,6 +399,29 @@ internal static void ReadExactly(this Stream stream, byte[] buffer, int offset,
totalRead += read;
}
}
+
+ internal static Task WaitAsync(this Task task, CancellationToken cancellationToken)
+ {
+ if (task.IsCompleted || !cancellationToken.CanBeCanceled)
+ {
+ return task;
+ }
+
+ return WaitCore();
+
+ async Task WaitCore()
+ {
+ TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ using var reg = cancellationToken.Register(
+ () => tcs.TrySetCanceled(cancellationToken),
+ useSynchronizationContext: false);
+
+ var completedTask = await Task.WhenAny(task, tcs.Task).ConfigureAwait(false);
+
+ return await completedTask.ConfigureAwait(false);
+ }
+ }
#endif
}
}
diff --git a/src/Renci.SshNet/IServiceFactory.cs b/src/Renci.SshNet/IServiceFactory.cs
index 7051942a1..681d69da9 100644
--- a/src/Renci.SshNet/IServiceFactory.cs
+++ b/src/Renci.SshNet/IServiceFactory.cs
@@ -83,18 +83,6 @@ internal partial interface IServiceFactory
/// No key exchange algorithm is supported by both client and server.
IKeyExchange CreateKeyExchange(IDictionary> clientAlgorithms, string[] serverAlgorithms);
- ///
- /// Creates an for the specified file and with the specified
- /// buffer size.
- ///
- /// The file to read.
- /// The SFTP session to use.
- /// The size of buffer.
- ///
- /// An .
- ///
- ISftpFileReader CreateSftpFileReader(string fileName, ISftpSession sftpSession, uint bufferSize);
-
///
/// Creates a new instance.
///
diff --git a/src/Renci.SshNet/ServiceFactory.cs b/src/Renci.SshNet/ServiceFactory.cs
index ab3ae16cb..e8e5a2c35 100644
--- a/src/Renci.SshNet/ServiceFactory.cs
+++ b/src/Renci.SshNet/ServiceFactory.cs
@@ -4,8 +4,6 @@
using System.Net.Sockets;
using System.Text;
-using Microsoft.Extensions.Logging;
-
using Renci.SshNet.Common;
using Renci.SshNet.Connection;
using Renci.SshNet.Messages.Transport;
@@ -118,51 +116,6 @@ public INetConfSession CreateNetConfSession(ISession session, int operationTimeo
return new NetConfSession(session, operationTimeout);
}
- ///
- /// Creates an for the specified file and with the specified
- /// buffer size.
- ///
- /// The file to read.
- /// The SFTP session to use.
- /// The size of buffer.
- ///
- /// An .
- ///
- public ISftpFileReader CreateSftpFileReader(string fileName, ISftpSession sftpSession, uint bufferSize)
- {
- const int defaultMaxPendingReads = 10;
-
- // Issue #292: Avoid overlapping SSH_FXP_OPEN and SSH_FXP_LSTAT requests for the same file as this
- // causes a performance degradation on Sun SSH
- var openAsyncResult = sftpSession.BeginOpen(fileName, Flags.Read, callback: null, state: null);
- var handle = sftpSession.EndOpen(openAsyncResult);
-
- var statAsyncResult = sftpSession.BeginLStat(fileName, callback: null, state: null);
-
- long? fileSize;
- int maxPendingReads;
-
- var chunkSize = sftpSession.CalculateOptimalReadLength(bufferSize);
-
- // fallback to a default maximum of pending reads when remote server does not allow us to obtain
- // the attributes of the file
- try
- {
- var fileAttributes = sftpSession.EndLStat(statAsyncResult);
- fileSize = fileAttributes.Size;
- maxPendingReads = Math.Min(100, (int)Math.Ceiling((double)fileAttributes.Size / chunkSize) + 1);
- }
- catch (SshException ex)
- {
- fileSize = null;
- maxPendingReads = defaultMaxPendingReads;
-
- sftpSession.SessionLoggerFactory.CreateLogger().LogInformation(ex, "Failed to obtain size of file. Allowing maximum {MaxPendingReads} pending reads", maxPendingReads);
- }
-
- return sftpSession.CreateFileReader(handle, sftpSession, chunkSize, maxPendingReads, fileSize);
- }
-
///
/// Creates a new instance.
///
diff --git a/src/Renci.SshNet/Session.cs b/src/Renci.SshNet/Session.cs
index ec3eac878..77fe9d4c2 100644
--- a/src/Renci.SshNet/Session.cs
+++ b/src/Renci.SshNet/Session.cs
@@ -81,12 +81,6 @@ public sealed class Session : ISession
private readonly ISocketFactory _socketFactory;
private readonly ILogger _logger;
- ///
- /// Holds an object that is used to ensure only a single thread can read from
- /// at any given time.
- ///
- private readonly Lock _socketReadLock = new Lock();
-
///
/// Holds an object that is used to ensure only a single thread can write to
/// at any given time.
@@ -105,7 +99,7 @@ public sealed class Session : ISession
/// This is also used to ensure that will not be disposed
/// while performing a given operation or set of operations on .
///
- private readonly SemaphoreSlim _socketDisposeLock = new SemaphoreSlim(1, 1);
+ private readonly Lock _socketDisposeLock = new Lock();
///
/// Holds an object that is used to ensure only a single thread can connect
@@ -279,17 +273,11 @@ public bool IsConnected
{
get
{
- if (_disposed || _isDisconnectMessageSent || !_isAuthenticated)
- {
- return false;
- }
-
- if (_messageListenerCompleted is null || _messageListenerCompleted.WaitOne(0))
- {
- return false;
- }
-
- return IsSocketConnected();
+ return !_disposed &&
+ !_isDisconnectMessageSent &&
+ _isAuthenticated &&
+ _messageListenerCompleted?.WaitOne(0) == false &&
+ _socket.IsConnected();
}
}
@@ -1046,7 +1034,7 @@ internal void WaitOnHandle(WaitHandle waitHandle, TimeSpan timeout)
/// The size of the packet exceeds the maximum size defined by the protocol.
internal void SendMessage(Message message)
{
- if (!_socket.CanWrite())
+ if (!_socket.IsConnected())
{
throw new SshConnectionException("Client not connected.");
}
@@ -1161,9 +1149,7 @@ internal void SendMessage(Message message)
///
private void SendPacket(byte[] packet, int offset, int length)
{
- _socketDisposeLock.Wait();
-
- try
+ lock (_socketDisposeLock)
{
if (!_socket.IsConnected())
{
@@ -1172,10 +1158,6 @@ private void SendPacket(byte[] packet, int offset, int length)
SocketAbstraction.Send(_socket, packet, offset, length);
}
- finally
- {
- _ = _socketDisposeLock.Release();
- }
}
///
@@ -1259,76 +1241,70 @@ private Message ReceiveMessage(Socket socket)
byte[] data;
uint packetLength;
- // avoid reading from socket while IsSocketConnected is attempting to determine whether the
- // socket is still connected by invoking Socket.Poll(...) and subsequently verifying value of
- // Socket.Available
- lock (_socketReadLock)
+ // Read first block - which starts with the packet length
+ var firstBlock = new byte[blockSize];
+ if (TrySocketRead(socket, firstBlock, 0, blockSize) == 0)
{
- // Read first block - which starts with the packet length
- var firstBlock = new byte[blockSize];
- if (TrySocketRead(socket, firstBlock, 0, blockSize) == 0)
- {
- // connection with SSH server was closed
- return null;
- }
+ // connection with SSH server was closed
+ return null;
+ }
- var plainFirstBlock = firstBlock;
+ var plainFirstBlock = firstBlock;
- // First block is not encrypted in AES GCM mode.
- if (_serverCipher is not null and not Security.Cryptography.Ciphers.AesGcmCipher)
- {
- _serverCipher.SetSequenceNumber(_inboundPacketSequence);
+ // First block is not encrypted in AES GCM mode.
+ if (_serverCipher is not null and not Security.Cryptography.Ciphers.AesGcmCipher)
+ {
+ _serverCipher.SetSequenceNumber(_inboundPacketSequence);
- // First block is not encrypted in ETM mode.
- if (_serverMac == null || !_serverEtm)
- {
- plainFirstBlock = _serverCipher.Decrypt(firstBlock);
- }
+ // First block is not encrypted in ETM mode.
+ if (_serverMac == null || !_serverEtm)
+ {
+ plainFirstBlock = _serverCipher.Decrypt(firstBlock);
}
+ }
- packetLength = BinaryPrimitives.ReadUInt32BigEndian(plainFirstBlock);
+ packetLength = BinaryPrimitives.ReadUInt32BigEndian(plainFirstBlock);
- // Test packet minimum and maximum boundaries
- if (packetLength < Math.Max((byte)8, blockSize) - 4 || packetLength > MaximumSshPacketSize - 4)
- {
- throw new SshConnectionException(string.Format(CultureInfo.CurrentCulture, "Bad packet length: {0}.", packetLength),
- DisconnectReason.ProtocolError);
- }
+ // Test packet minimum and maximum boundaries
+ if (packetLength < Math.Max((byte)8, blockSize) - 4 || packetLength > MaximumSshPacketSize - 4)
+ {
+ throw new SshConnectionException(string.Format(CultureInfo.CurrentCulture, "Bad packet length: {0}.", packetLength),
+ DisconnectReason.ProtocolError);
+ }
- // Determine the number of bytes left to read; We've already read "blockSize" bytes, but the
- // "packet length" field itself - which is 4 bytes - is not included in the length of the packet
- var bytesToRead = (int)(packetLength - (blockSize - packetLengthFieldLength)) + serverMacLength;
-
- // Construct buffer for holding the payload and the inbound packet sequence as we need both in order
- // to generate the hash.
- //
- // The total length of the "data" buffer is an addition of:
- // - inboundPacketSequenceLength (4 bytes)
- // - packetLength
- // - serverMacLength
- //
- // We include the inbound packet sequence to allow us to have the the full SSH packet in a single
- // byte[] for the purpose of calculating the client hash. Room for the server MAC is foreseen
- // to read the packet including server MAC in a single pass (except for the initial block).
- data = new byte[bytesToRead + blockSize + inboundPacketSequenceLength];
- BinaryPrimitives.WriteUInt32BigEndian(data, _inboundPacketSequence);
-
- // Use raw packet length field to calculate the mac in AEAD mode.
- if (_serverAead)
- {
- Buffer.BlockCopy(firstBlock, 0, data, inboundPacketSequenceLength, blockSize);
- }
- else
- {
- Buffer.BlockCopy(plainFirstBlock, 0, data, inboundPacketSequenceLength, blockSize);
- }
+ // Determine the number of bytes left to read; We've already read "blockSize" bytes, but the
+ // "packet length" field itself - which is 4 bytes - is not included in the length of the packet
+ var bytesToRead = (int)(packetLength - (blockSize - packetLengthFieldLength)) + serverMacLength;
+
+ // Construct buffer for holding the payload and the inbound packet sequence as we need both in order
+ // to generate the hash.
+ //
+ // The total length of the "data" buffer is an addition of:
+ // - inboundPacketSequenceLength (4 bytes)
+ // - packetLength
+ // - serverMacLength
+ //
+ // We include the inbound packet sequence to allow us to have the the full SSH packet in a single
+ // byte[] for the purpose of calculating the client hash. Room for the server MAC is foreseen
+ // to read the packet including server MAC in a single pass (except for the initial block).
+ data = new byte[bytesToRead + blockSize + inboundPacketSequenceLength];
+ BinaryPrimitives.WriteUInt32BigEndian(data, _inboundPacketSequence);
+
+ // Use raw packet length field to calculate the mac in AEAD mode.
+ if (_serverAead)
+ {
+ Buffer.BlockCopy(firstBlock, 0, data, inboundPacketSequenceLength, blockSize);
+ }
+ else
+ {
+ Buffer.BlockCopy(plainFirstBlock, 0, data, inboundPacketSequenceLength, blockSize);
+ }
- if (bytesToRead > 0)
+ if (bytesToRead > 0)
+ {
+ if (TrySocketRead(socket, data, blockSize + inboundPacketSequenceLength, bytesToRead) == 0)
{
- if (TrySocketRead(socket, data, blockSize + inboundPacketSequenceLength, bytesToRead) == 0)
- {
- return null;
- }
+ return null;
}
}
@@ -1888,84 +1864,6 @@ private static string ToHex(byte[] bytes)
#endif
}
- ///
- /// Gets a value indicating whether the socket is connected.
- ///
- ///
- /// if the socket is connected; otherwise, .
- ///
- ///
- ///
- /// As a first check we verify whether is
- /// . However, this only returns the state of the socket as of
- /// the last I/O operation.
- ///
- ///
- /// Therefore we use the combination of with mode
- /// and to verify if the socket is still connected.
- ///
- ///
- /// The MSDN doc mention the following on the return value of
- /// with mode :
- ///
- ///
- /// if data is available for reading;
- ///
- ///
- /// if the connection has been closed, reset, or terminated; otherwise, returns .
- ///
- ///
- ///
- ///
- /// Conclusion: when the return value is - but no data is available for reading - then
- /// the socket is no longer connected.
- ///
- ///
- /// When a is used from multiple threads, there's a race condition
- /// between the invocation of and the moment
- /// when the value of is obtained. To workaround this issue
- /// we synchronize reads from the .
- ///
- ///
- /// We assume the socket is still connected if the read lock cannot be acquired immediately.
- /// In this case, we just return without actually waiting to acquire
- /// the lock. We don't want to wait for the read lock if another thread already has it because
- /// there are cases where the other thread holding the lock can be waiting indefinitely for
- /// a socket read operation to complete.
- ///
- ///
- private bool IsSocketConnected()
- {
- _socketDisposeLock.Wait();
-
- try
- {
- if (!_socket.IsConnected())
- {
- return false;
- }
-
- if (!_socketReadLock.TryEnter())
- {
- return true;
- }
-
- try
- {
- var connectionClosedOrDataAvailable = _socket.Poll(0, SelectMode.SelectRead);
- return !(connectionClosedOrDataAvailable && _socket.Available == 0);
- }
- finally
- {
- _socketReadLock.Exit();
- }
- }
- finally
- {
- _ = _socketDisposeLock.Release();
- }
- }
-
///
/// Performs a blocking read on the socket until bytes are received.
///
@@ -1988,46 +1886,37 @@ private static int TrySocketRead(Socket socket, byte[] buffer, int offset, int l
///
private void SocketDisconnectAndDispose()
{
- if (_socket != null)
+ lock (_socketDisposeLock)
{
- _socketDisposeLock.Wait();
+ if (_socket is null)
+ {
+ return;
+ }
- try
+ if (_socket.Connected)
{
-#pragma warning disable CA1508 // Avoid dead conditional code; Value could have been changed by another thread.
- if (_socket != null)
-#pragma warning restore CA1508 // Avoid dead conditional code
+ try
{
- if (_socket.Connected)
- {
- try
- {
- _logger.LogDebug("[{SessionId}] Shutting down socket.", SessionIdHex);
-
- // Interrupt any pending reads; should be done outside of socket read lock as we
- // actually want shutdown the socket to make sure blocking reads are interrupted.
- //
- // This may result in a SocketException (eg. An existing connection was forcibly
- // closed by the remote host) which we'll log and ignore as it means the socket
- // was already shut down.
- _socket.Shutdown(SocketShutdown.Send);
- }
- catch (SocketException ex)
- {
- _logger.LogInformation(ex, "Failure shutting down socket");
- }
- }
-
- _logger.LogDebug("[{SessionId}] Disposing socket.", SessionIdHex);
- _socket.Dispose();
- _logger.LogDebug("[{SessionId}] Disposed socket.", SessionIdHex);
- _socket = null;
+ _logger.LogDebug("[{SessionId}] Shutting down socket.", SessionIdHex);
+
+ // Interrupt any pending reads; should be done outside of socket read lock as we
+ // actually want shutdown the socket to make sure blocking reads are interrupted.
+ //
+ // This may result in a SocketException (eg. An existing connection was forcibly
+ // closed by the remote host) which we'll log and ignore as it means the socket
+ // was already shut down.
+ _socket.Shutdown(SocketShutdown.Both);
+ }
+ catch (SocketException ex)
+ {
+ _logger.LogInformation(ex, "Failure shutting down socket");
}
}
- finally
- {
- _ = _socketDisposeLock.Release();
- }
+
+ _logger.LogDebug("[{SessionId}] Disposing socket.", SessionIdHex);
+ _socket.Dispose();
+ _logger.LogDebug("[{SessionId}] Disposed socket.", SessionIdHex);
+ _socket = null;
}
}
@@ -2048,25 +1937,6 @@ private void MessageListener()
break;
}
- try
- {
- // Block until either data is available or the socket is closed
- var connectionClosedOrDataAvailable = socket.Poll(-1, SelectMode.SelectRead);
- if (connectionClosedOrDataAvailable && socket.Available == 0)
- {
- // connection with SSH server was closed or connection was reset
- break;
- }
- }
- catch (ObjectDisposedException)
- {
- // The socket was disposed by either:
- // * a call to Disconnect()
- // * a call to Dispose()
- // * a SSH_MSG_DISCONNECT received from server
- break;
- }
-
var message = ReceiveMessage(socket);
if (message is null)
{
@@ -2102,25 +1972,12 @@ private void MessageListener()
/// The .
private void RaiseError(Exception exp)
{
- var connectionException = exp as SshConnectionException;
-
_logger.LogInformation(exp, "[{SessionId}] Raised exception", SessionIdHex);
- if (_isDisconnecting)
+ if (_isDisconnecting && exp is SshConnectionException or ObjectDisposedException)
{
- // a connection exception which is raised while isDisconnecting is normal and
- // should be ignored
- if (connectionException != null)
- {
- return;
- }
-
- // any timeout while disconnecting can be caused by loss of connectivity
- // altogether and should be ignored
- if (exp is SocketException socketException && socketException.SocketErrorCode == SocketError.TimedOut)
- {
- return;
- }
+ // Such an exception raised while isDisconnecting is expected and can be ignored.
+ return;
}
// "save" exception and set exception wait handle to ensure any waits are interrupted
@@ -2129,10 +1986,10 @@ private void RaiseError(Exception exp)
ErrorOccured?.Invoke(this, new ExceptionEventArgs(exp));
- if (connectionException != null)
+ if (exp is SshConnectionException connectionException)
{
_logger.LogInformation(exp, "[{SessionId}] Disconnecting after exception", SessionIdHex);
- Disconnect(connectionException.DisconnectReason, exp.ToString());
+ Disconnect(connectionException.DisconnectReason, exp.Message);
}
}
diff --git a/src/Renci.SshNet/Sftp/ISftpFileReader.cs b/src/Renci.SshNet/Sftp/ISftpFileReader.cs
deleted file mode 100644
index 823b2e23a..000000000
--- a/src/Renci.SshNet/Sftp/ISftpFileReader.cs
+++ /dev/null
@@ -1,23 +0,0 @@
-using System;
-
-using Renci.SshNet.Common;
-
-namespace Renci.SshNet.Sftp
-{
- ///
- /// Reads a given file.
- ///
- internal interface ISftpFileReader : IDisposable
- {
- ///
- /// Reads a sequence of bytes from the current file and advances the position within the file by the number of bytes read.
- ///
- ///
- /// The sequence of bytes read from the file, or a zero-length array if the end of the file
- /// has been reached.
- ///
- /// The current is disposed.
- /// Attempting to read beyond the end of the file.
- byte[] Read();
- }
-}
diff --git a/src/Renci.SshNet/Sftp/ISftpSession.cs b/src/Renci.SshNet/Sftp/ISftpSession.cs
index b9baf43a5..cd4d8b36b 100644
--- a/src/Renci.SshNet/Sftp/ISftpSession.cs
+++ b/src/Renci.SshNet/Sftp/ISftpSession.cs
@@ -71,7 +71,7 @@ internal interface ISftpSession : ISubsystemSession
///
/// The file attributes.
///
- SftpFileAttributes RequestFStat(byte[] handle, bool nullOnError);
+ SftpFileAttributes RequestFStat(byte[] handle, bool nullOnError = false);
///
/// Asynchronously performs a SSH_FXP_FSTAT request.
@@ -522,19 +522,5 @@ void RequestWrite(byte[] handle,
/// Currently, we do not take the remote window size into account.
///
uint CalculateOptimalWriteLength(uint bufferSize, byte[] handle);
-
- ///
- /// Creates an for reading the content of the file represented by a given .
- ///
- /// The handle of the file to read.
- /// The SFTP session.
- /// The maximum number of bytes to read with each chunk.
- /// The maximum number of pending reads.
- /// The size of the file or when the size could not be determined.
- ///
- /// An for reading the content of the file represented by the
- /// specified .
- ///
- ISftpFileReader CreateFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize);
}
}
diff --git a/src/Renci.SshNet/Sftp/SftpFileReader.cs b/src/Renci.SshNet/Sftp/SftpFileReader.cs
index 1f3fe396e..9794fd541 100644
--- a/src/Renci.SshNet/Sftp/SftpFileReader.cs
+++ b/src/Renci.SshNet/Sftp/SftpFileReader.cs
@@ -1,469 +1,175 @@
-using System;
+#nullable enable
+using System;
using System.Collections.Generic;
-using System.Globalization;
+using System.Diagnostics;
using System.Runtime.ExceptionServices;
using System.Threading;
+using System.Threading.Tasks;
-using Microsoft.Extensions.Logging;
-
-using Renci.SshNet.Abstractions;
+#if !NET
using Renci.SshNet.Common;
+#endif
namespace Renci.SshNet.Sftp
{
- internal sealed class SftpFileReader : ISftpFileReader
+ public sealed partial class SftpFileStream
{
- private const int ReadAheadWaitTimeoutInMilliseconds = 1000;
-
- private readonly byte[] _handle;
- private readonly ISftpSession _sftpSession;
- private readonly uint _chunkSize;
- private readonly SemaphoreSlim _semaphore;
- private readonly object _readLock;
- private readonly ManualResetEvent _disposingWaitHandle;
- private readonly ManualResetEvent _readAheadCompleted;
- private readonly Dictionary _queue;
- private readonly WaitHandle[] _waitHandles;
- private readonly ILogger _logger;
-
- ///
- /// Holds the size of the file, when available.
- ///
- private readonly long? _fileSize;
-
- private ulong _offset;
- private int _readAheadChunkIndex;
- private ulong _readAheadOffset;
- private int _nextChunkIndex;
-
- ///
- /// Holds a value indicating whether EOF has already been signaled by the SSH server.
- ///
- private bool _endOfFileReceived;
-
- ///
- /// Holds a value indicating whether the client has read up to the end of the file.
- ///
- private bool _isEndOfFileRead;
-
- private bool _disposingOrDisposed;
-
- private Exception _exception;
-
- ///
- /// Initializes a new instance of the class with the specified handle,
- /// and the maximum number of pending reads.
- ///
- /// The file handle.
- /// The SFT session.
- /// The size of a individual read-ahead chunk.
- /// The maximum number of pending reads.
- /// The size of the file, if known; otherwise, .
- public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize)
+ private sealed class SftpFileReader : IDisposable
{
- _handle = handle;
- _sftpSession = sftpSession;
- _chunkSize = chunkSize;
- _fileSize = fileSize;
- _semaphore = new SemaphoreSlim(maxPendingReads);
- _queue = new Dictionary(maxPendingReads);
- _readLock = new object();
- _readAheadCompleted = new ManualResetEvent(initialState: false);
- _disposingWaitHandle = new ManualResetEvent(initialState: false);
- _waitHandles = _sftpSession.CreateWaitHandleArray(_disposingWaitHandle, _semaphore.AvailableWaitHandle);
- _logger = sftpSession.SessionLoggerFactory.CreateLogger();
-
- StartReadAhead();
- }
-
- public byte[] Read()
- {
- ThrowHelper.ThrowObjectDisposedIf(_disposingOrDisposed, this);
-
- if (_exception is not null)
- {
- ExceptionDispatchInfo.Capture(_exception).Throw();
- }
-
- if (_isEndOfFileRead)
- {
- throw new SshException("Attempting to read beyond the end of the file.");
- }
-
- BufferedRead nextChunk;
-
- lock (_readLock)
- {
- // wait until either the next chunk is available, an exception has occurred or the current
- // instance is already disposed
- while (!_queue.TryGetValue(_nextChunkIndex, out nextChunk) && _exception is null)
+ private readonly byte[] _handle;
+ private readonly ISftpSession _sftpSession;
+ private readonly int _maxPendingReads;
+ private readonly ulong? _fileSize;
+ private readonly Dictionary _requests = [];
+ private readonly CancellationTokenSource _cts;
+
+ private uint _chunkSize;
+
+ private ulong _offset;
+ private ulong _readAheadOffset;
+ private int _currentMaxRequests = 1;
+ private ExceptionDispatchInfo? _exception;
+
+ ///
+ /// Initializes a new instance of the class with the specified handle,
+ /// and the maximum number of pending reads.
+ ///
+ /// The file handle.
+ /// The SFTP session.
+ /// The size of a individual read-ahead chunk.
+ /// The starting offset in the file.
+ /// The maximum number of pending reads.
+ /// The size of the file, if known; otherwise, .
+ public SftpFileReader(byte[] handle, ISftpSession sftpSession, int chunkSize, long position, int maxPendingReads, ulong? fileSize)
+ {
+ Debug.Assert(chunkSize > 0);
+ Debug.Assert(position >= 0);
+
+ _handle = handle;
+ _sftpSession = sftpSession;
+ _chunkSize = (uint)chunkSize;
+ _offset = _readAheadOffset = (ulong)position;
+ _maxPendingReads = maxPendingReads;
+ _fileSize = fileSize;
+
+ _cts = new CancellationTokenSource();
+ }
+
+ public async Task ReadAsync(CancellationToken cancellationToken)
+ {
+ _exception?.Throw();
+
+ try
{
- _ = Monitor.Wait(_readLock);
- }
-
- // throw when exception occured in read-ahead, or the current instance is already disposed
- if (_exception != null)
- {
- ExceptionDispatchInfo.Capture(_exception).Throw();
- }
-
- var data = nextChunk.Data;
-
- if (nextChunk.Offset == _offset)
- {
- // have we reached EOF?
- if (data.Length == 0)
- {
- // PERF: we do not bother updating all of the internal state when we've reached EOF
- _isEndOfFileRead = true;
- }
- else
+ // Fill up the requests buffer with as many requests as we currently allow.
+ // On the first call to Read, that number is 1. On the second it is 2 etc.
+ while (_requests.Count < _currentMaxRequests)
{
- // remove processed chunk
- _ = _queue.Remove(_nextChunkIndex);
-
- // update offset
- _offset += (ulong)data.Length;
+ AddRequest(_readAheadOffset, _chunkSize);
- // move to next chunk
- _nextChunkIndex++;
+ _readAheadOffset += _chunkSize;
}
- // unblock wait in read-ahead
- _ = _semaphore.Release();
+ var request = _requests[_offset];
- return data;
- }
+ var data = await request.Task.WaitAsync(cancellationToken).ConfigureAwait(false);
- // When we received an EOF for the next chunk and the size of the file is known, then
- // we only complete the current chunk if we haven't already read up to the file size.
- // This way we save an extra round-trip to the server.
- if (data.Length == 0 && _fileSize.HasValue && _offset == (ulong)_fileSize.Value)
- {
- // avoid future reads
- _isEndOfFileRead = true;
-
- // unblock wait in read-ahead
- _ = _semaphore.Release();
-
- // signal EOF to caller
- return nextChunk.Data;
- }
- }
-
- /*
- * When the server returned less bytes than requested (for the previous chunk)
- * we'll synchronously request the remaining data.
- *
- * Due to the optimization above, we'll only get here in one of the following cases:
- * - an EOF situation for files for which we were unable to obtain the file size
- * - fewer bytes that requested were returned
- *
- * According to the SSH specification, this last case should never happen for normal
- * disk files (but can happen for device files). In practice, OpenSSH - for example -
- * returns less bytes than requested when requesting more than 64 KB.
- *
- * Important:
- * To avoid a deadlock, this read must be done outside of the read lock.
- */
-
- var bytesToCatchUp = nextChunk.Offset - _offset;
-
- /*
- * TODO: break loop and interrupt blocking wait in case of exception
- */
-
- var read = _sftpSession.RequestRead(_handle, _offset, (uint)bytesToCatchUp);
- if (read.Length == 0)
- {
- // process data in read lock to avoid ObjectDisposedException while releasing semaphore
- lock (_readLock)
- {
- // a zero-length (EOF) response is only valid for the read-back when EOF has
- // been signaled for the next read-ahead chunk
- if (nextChunk.Data.Length == 0)
- {
- _isEndOfFileRead = true;
-
- // ensure we've not yet disposed the current instance
- if (!_disposingOrDisposed)
- {
- // unblock wait in read-ahead
- _ = _semaphore.Release();
- }
-
- // signal EOF to caller
- return read;
- }
-
- // move reader to error state
- _exception = new SshException("Unexpectedly reached end of file.");
-
- // ensure we've not yet disposed the current instance
- if (!_disposingOrDisposed)
+ if (data.Length == 0)
{
- // unblock wait in read-ahead
- _ = _semaphore.Release();
+ // EOF. We effectively disable this instance - further reads will
+ // continue to return EOF.
+ _currentMaxRequests = 0;
+ return data;
}
- // notify caller of error
- throw _exception;
- }
- }
-
- _offset += (uint)read.Length;
-
- return read;
- }
-
- public void Dispose()
- {
- Dispose(disposing: true);
- GC.SuppressFinalize(this);
- }
-
- ///
- /// Releases unmanaged and - optionally - managed resources.
- ///
- /// to release both managed and unmanaged resources; to release only unmanaged resources.
- private void Dispose(bool disposing)
- {
- if (_disposingOrDisposed)
- {
- return;
- }
-
- // transition to disposing state
- _disposingOrDisposed = true;
-
- if (disposing)
- {
- // record exception to break prevent future Read()
- _exception = new ObjectDisposedException(GetType().FullName);
-
- // signal that we're disposing to interrupt wait in read-ahead
- _ = _disposingWaitHandle.Set();
+ _ = _requests.Remove(_offset);
- // wait until the read-ahead thread has completed
- _ = _readAheadCompleted.WaitOne();
+ _offset += (ulong)data.Length;
- // unblock the Read()
- lock (_readLock)
- {
- // dispose semaphore in read lock to ensure we don't run into an ObjectDisposedException
- // in Read()
- _semaphore.Dispose();
-
- // awake Read
- Monitor.PulseAll(_readLock);
- }
-
- _readAheadCompleted.Dispose();
- _disposingWaitHandle.Dispose();
-
- if (_sftpSession.IsOpen)
- {
- try
- {
- var closeAsyncResult = _sftpSession.BeginClose(_handle, callback: null, state: null);
- _sftpSession.EndClose(closeAsyncResult);
- }
- catch (Exception ex)
+ if (data.Length < request.Count)
{
- _logger.LogInformation(ex, "Failure closing handle");
- }
- }
- }
- }
+ // We didn't receive all the data we requested.
+ // Add another request to fill in the gap.
+ AddRequest(_offset, request.Count - (uint)data.Length);
- private void StartReadAhead()
- {
- ThreadAbstraction.ExecuteThread(() =>
- {
- while (!_endOfFileReceived && _exception is null)
- {
- // check if we should continue with the read-ahead loop
- // note that the EOF and exception check are not included
- // in this check as they do not require Read() to be
- // unblocked (or have already done this)
- if (!ContinueReadAhead())
- {
- // unblock the Read()
- lock (_readLock)
+ if (data.Length < _chunkSize)
{
- Monitor.PulseAll(_readLock);
+ // Right-size the buffer to match the amount that the server
+ // is willing to return.
+ // Note that this also happens near EOF.
+ _chunkSize = Math.Max(512, (uint)data.Length);
}
-
- // break the read-ahead loop
- break;
}
- // attempt to obtain the semaphore; this may time out when all semaphores are
- // in use due to pending read-aheads (which in turn can happen when the server
- // is slow to respond or when the session is broken)
- if (!_semaphore.Wait(ReadAheadWaitTimeoutInMilliseconds))
+ if (_currentMaxRequests > 0)
{
- // re-evaluate whether an exception occurred, and - if not - wait again
- continue;
- }
-
- // don't bother reading any more chunks if we received EOF, an exception has occurred
- // or the current instance is disposed
- if (_endOfFileReceived || _exception != null)
- {
- break;
- }
-
- // start reading next chunk
- var bufferedRead = new BufferedRead(_readAheadChunkIndex, _readAheadOffset);
-
- try
- {
- // even if we know the size of the file and have read up to EOF, we still want
- // to keep reading (ahead) until we receive zero bytes from the remote host as
- // we do not want to rely purely on the reported file size
- //
- // if the offset of the read-ahead chunk is greater than that file size, then
- // we can expect to be reading the last (zero-byte) chunk and switch to synchronous
- // mode to avoid having multiple read-aheads that read beyond EOF
- if (_fileSize != null && (long)_readAheadOffset > _fileSize.Value)
+ if (_readAheadOffset > _fileSize)
{
- var asyncResult = _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, callback: null, bufferedRead);
- var data = _sftpSession.EndRead(asyncResult);
- ReadCompletedCore(bufferedRead, data);
+ _currentMaxRequests = 1;
}
- else
+ else if (_currentMaxRequests < _maxPendingReads)
{
- _ = _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, ReadCompleted, bufferedRead);
+ _currentMaxRequests++;
}
}
- catch (Exception ex)
- {
- HandleFailure(ex);
- break;
- }
-
- // advance read-ahead offset
- _readAheadOffset += _chunkSize;
- // increment index of read-ahead chunk
- _readAheadChunkIndex++;
+ return data;
}
-
- _ = _readAheadCompleted.Set();
- });
- }
-
- ///
- /// Returns a value indicating whether the read-ahead loop should be continued.
- ///
- ///
- /// if the read-ahead loop should be continued; otherwise, .
- ///
- private bool ContinueReadAhead()
- {
- try
- {
- var waitResult = _sftpSession.WaitAny(_waitHandles, _sftpSession.OperationTimeout);
- switch (waitResult)
+ catch (Exception ex) when (!(ex is OperationCanceledException oce && oce.CancellationToken == cancellationToken))
{
- case 0: // disposing
- return false;
- case 1: // semaphore available
- return true;
- default:
- throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return value '{0}' is not implemented.", waitResult));
+ // If the wait was cancelled then we will allow subsequent reads as normal.
+ // For any other errors, we prevent further read requests, effectively disabling
+ // this instance.
+ _currentMaxRequests = 0;
+ _exception = ExceptionDispatchInfo.Capture(ex);
+ throw;
}
}
- catch (Exception ex)
- {
- _ = Interlocked.CompareExchange(ref _exception, ex, comparand: null);
- return false;
- }
- }
- private void ReadCompleted(IAsyncResult result)
- {
- if (_disposingOrDisposed)
+ private void AddRequest(ulong offset, uint count)
{
- // skip further processing if we're disposing the current instance
- // to avoid accessing disposed members
- return;
+ _requests.Add(
+ offset,
+ new Request(
+ offset,
+ count,
+ _sftpSession.RequestReadAsync(_handle, offset, count, _cts.Token)));
}
- var readAsyncResult = (SftpReadAsyncResult)result;
-
- byte[] data;
-
- try
- {
- data = readAsyncResult.EndInvoke();
- }
- catch (Exception ex)
+ public void Dispose()
{
- HandleFailure(ex);
- return;
- }
+ _exception ??= ExceptionDispatchInfo.Capture(new ObjectDisposedException(GetType().FullName));
- // a read that completes with a zero-byte result signals EOF
- // but there may be pending reads before that read
- var bufferedRead = (BufferedRead)readAsyncResult.AsyncState;
- ReadCompletedCore(bufferedRead, data);
- }
-
- private void ReadCompletedCore(BufferedRead bufferedRead, byte[] data)
- {
- bufferedRead.Complete(data);
-
- lock (_readLock)
- {
- // add item to queue
- _queue.Add(bufferedRead.ChunkIndex, bufferedRead);
-
- // Signal that a chunk has been read or EOF has been reached.
- // In both cases, Read() will eventually also unblock the "read-ahead" thread.
- Monitor.PulseAll(_readLock);
- }
+ if (_requests.Count > 0)
+ {
+ // Cancel outstanding requests and observe the exception on them
+ // as an effort to prevent unhandled exceptions.
- // check if server signaled EOF
- if (data.Length == 0)
- {
- // set a flag to stop read-aheads
- _endOfFileReceived = true;
- }
- }
+ _cts.Cancel();
- private void HandleFailure(Exception cause)
- {
- _ = Interlocked.CompareExchange(ref _exception, cause, comparand: null);
+ foreach (var request in _requests.Values)
+ {
+ _ = request.Task.Exception;
+ }
- // unblock read-ahead
- _ = _semaphore.Release();
+ _requests.Clear();
+ }
- // unblock Read()
- lock (_readLock)
- {
- Monitor.PulseAll(_readLock);
+ _cts.Dispose();
}
- }
- internal sealed class BufferedRead
- {
- public int ChunkIndex { get; }
-
- public byte[] Data { get; private set; }
-
- public ulong Offset { get; }
-
- public BufferedRead(int chunkIndex, ulong offset)
+ private sealed class Request
{
- ChunkIndex = chunkIndex;
- Offset = offset;
- }
+ public Request(ulong offset, uint count, Task task)
+ {
+ Offset = offset;
+ Count = count;
+ Task = task;
+ }
- public void Complete(byte[] data)
- {
- Data = data;
+ public ulong Offset { get; }
+ public uint Count { get; }
+ public Task Task { get; }
}
}
}
diff --git a/src/Renci.SshNet/Sftp/SftpFileStream.cs b/src/Renci.SshNet/Sftp/SftpFileStream.cs
index c5e486a9a..ecf80f49f 100644
--- a/src/Renci.SshNet/Sftp/SftpFileStream.cs
+++ b/src/Renci.SshNet/Sftp/SftpFileStream.cs
@@ -1,6 +1,6 @@
-using System;
+#nullable enable
+using System;
using System.Diagnostics;
-using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.Threading;
@@ -11,65 +11,42 @@
namespace Renci.SshNet.Sftp
{
///
- /// Exposes a around a remote SFTP file, supporting both synchronous and asynchronous read and write operations.
+ /// Exposes a around a remote SFTP file, supporting
+ /// both synchronous and asynchronous read and write operations.
///
- ///
-#pragma warning disable IDE0079 // We intentionally want to suppress the below warning.
- [SuppressMessage("Performance", "CA1844: Provide memory-based overrides of async methods when subclassing 'Stream'", Justification = "TODO: This should be addressed in the future.")]
-#pragma warning restore IDE0079
- public sealed class SftpFileStream : Stream
+ public sealed partial class SftpFileStream : Stream
{
- private readonly Lock _lock = new Lock();
+ private const int MaxPendingReads = 100;
+
+ private readonly ISftpSession _session;
+ private readonly FileAccess _access;
private readonly int _readBufferSize;
- private readonly int _writeBufferSize;
- // Internal state.
- private byte[] _handle;
- private ISftpSession _session;
+ private SftpFileReader? _sftpFileReader;
+ private ReadOnlyMemory _readBuffer;
+ private System.Net.ArrayBuffer _writeBuffer;
- // Buffer information.
- private byte[] _readBuffer;
- private byte[] _writeBuffer;
- private int _bufferPosition;
- private int _bufferLen;
private long _position;
- private bool _bufferOwnedByWrite;
- private bool _canRead;
- private bool _canSeek;
- private bool _canWrite;
private TimeSpan _timeout;
+ private bool _disposed;
- ///
- /// Gets a value indicating whether the current stream supports reading.
- ///
- ///
- /// if the stream supports reading; otherwise, .
- ///
+ ///
public override bool CanRead
{
- get { return _canRead; }
+ get { return !_disposed && (_access & FileAccess.Read) == FileAccess.Read; }
}
- ///
- /// Gets a value indicating whether the current stream supports seeking.
- ///
- ///
- /// if the stream supports seeking; otherwise, .
- ///
+ ///
public override bool CanSeek
{
- get { return _canSeek; }
+ // TODO condition on fstat success https://github.com/sshnet/SSH.NET/issues/1633
+ get { return !_disposed; }
}
- ///
- /// Gets a value indicating whether the current stream supports writing.
- ///
- ///
- /// if the stream supports writing; otherwise, .
- ///
+ ///
public override bool CanWrite
{
- get { return _canWrite; }
+ get { return !_disposed && (_access & FileAccess.Write) == FileAccess.Write; }
}
///
@@ -83,63 +60,25 @@ public override bool CanTimeout
get { return true; }
}
- ///
- /// Gets the length in bytes of the stream.
- ///
- /// A long value representing the length of the stream in bytes.
- /// A class derived from Stream does not support seeking.
- /// Methods were called after the stream was closed.
- /// IO operation failed.
+ ///
public override long Length
{
get
{
- // Lock down the file stream while we do this.
- lock (_lock)
- {
- CheckSessionIsOpen();
-
- if (!CanSeek)
- {
- throw new NotSupportedException("Seek operation is not supported.");
- }
-
- // Flush the write buffer, because it may
- // affect the length of the stream.
- if (_bufferOwnedByWrite)
- {
- FlushWriteBuffer();
- }
+ ThrowIfNotSeekable();
- // obtain file attributes
- var attributes = _session.RequestFStat(_handle, nullOnError: true);
- if (attributes != null)
- {
- return attributes.Size;
- }
+ Flush();
- throw new IOException("Seek operation failed.");
- }
+ return _session.RequestFStat(Handle).Size;
}
}
- ///
- /// Gets or sets the position within the current stream.
- ///
- /// The current position within the stream.
- /// An I/O error occurs.
- /// The stream does not support seeking.
- /// Methods were called after the stream was closed.
+ ///
public override long Position
{
get
{
- CheckSessionIsOpen();
-
- if (!CanSeek)
- {
- throw new NotSupportedException("Seek operation not supported.");
- }
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
return _position;
}
@@ -155,7 +94,7 @@ public override long Position
///
/// The name of the path that was used to construct the current .
///
- public string Name { get; private set; }
+ public string Name { get; }
///
/// Gets the operating system file handle for the file that the current encapsulates.
@@ -163,14 +102,7 @@ public override long Position
///
/// The operating system file handle for the file that the current encapsulates.
///
- public byte[] Handle
- {
- get
- {
- Flush();
- return _handle;
- }
- }
+ public byte[] Handle { get; }
///
/// Gets or sets the operation timeout.
@@ -198,27 +130,25 @@ private SftpFileStream(ISftpSession session, string path, FileAccess access, int
Name = path;
_session = session;
- _canRead = (access & FileAccess.Read) == FileAccess.Read;
- _canSeek = true;
- _canWrite = (access & FileAccess.Write) == FileAccess.Write;
+ _access = access;
- _handle = handle;
+ Handle = handle;
_readBufferSize = readBufferSize;
- _writeBufferSize = writeBufferSize;
_position = position;
+ _writeBuffer = new System.Net.ArrayBuffer(writeBufferSize);
}
- internal static SftpFileStream Open(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize)
+ internal static SftpFileStream Open(ISftpSession? session, string path, FileMode mode, FileAccess access, int bufferSize)
{
return Open(session, path, mode, access, bufferSize, isAsync: false, CancellationToken.None).GetAwaiter().GetResult();
}
- internal static Task OpenAsync(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, CancellationToken cancellationToken)
+ internal static Task OpenAsync(ISftpSession? session, string path, FileMode mode, FileAccess access, int bufferSize, CancellationToken cancellationToken)
{
return Open(session, path, mode, access, bufferSize, isAsync: true, cancellationToken);
}
- private static async Task Open(ISftpSession session, string path, FileMode mode, FileAccess access, int bufferSize, bool isAsync, CancellationToken cancellationToken)
+ private static async Task Open(ISftpSession? session, string path, FileMode mode, FileAccess access, int bufferSize, bool isAsync, CancellationToken cancellationToken)
{
Debug.Assert(isAsync || cancellationToken == default);
@@ -328,7 +258,7 @@ private static async Task Open(ISftpSession session, string path
}
else
{
- attributes = session.RequestFStat(handle, nullOnError: false);
+ attributes = session.RequestFStat(handle);
}
position = attributes.Size;
@@ -337,82 +267,74 @@ private static async Task Open(ISftpSession session, string path
return new SftpFileStream(session, path, access, readBufferSize, writeBufferSize, handle, position);
}
- ///
- /// Clears all buffers for this stream and causes any buffered data to be written to the file.
- ///
- /// An I/O error occurs.
- /// Stream is closed.
+ ///
public override void Flush()
{
- lock (_lock)
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
+
+ var writeLength = _writeBuffer.ActiveLength;
+
+ if (writeLength == 0)
{
- CheckSessionIsOpen();
+ Debug.Assert(_writeBuffer.AvailableLength > 0);
+ return;
+ }
- if (_bufferOwnedByWrite)
- {
- FlushWriteBuffer();
- }
- else
- {
- FlushReadBuffer();
- }
+ // Under normal usage the offset will be nonnegative, but we nevertheless
+ // perform a checked conversion to prevent writing to a very large offset
+ // in case of corruption due to e.g. invalid multithreaded usage.
+ var serverOffset = checked((ulong)(_position - writeLength));
+
+ using (var wait = new AutoResetEvent(initialState: false))
+ {
+ _session.RequestWrite(
+ Handle,
+ serverOffset,
+ _writeBuffer.DangerousGetUnderlyingBuffer(),
+ _writeBuffer.ActiveStartOffset,
+ writeLength,
+ wait);
+
+ _writeBuffer.Discard(writeLength);
}
}
- ///
- /// Asynchronously clears all buffers for this stream and causes any buffered data to be written to the file.
- ///
- /// The to observe.
- /// A that represents the asynchronous flush operation.
- /// An I/O error occurs.
- /// Stream is closed.
- public override Task FlushAsync(CancellationToken cancellationToken)
+ ///
+ public override async Task FlushAsync(CancellationToken cancellationToken)
{
- CheckSessionIsOpen();
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
+
+ var writeLength = _writeBuffer.ActiveLength;
- if (_bufferOwnedByWrite)
+ if (writeLength == 0)
{
- return FlushWriteBufferAsync(cancellationToken);
+ return;
}
- FlushReadBuffer();
+ // Under normal usage the offset will be nonnegative, but we nevertheless
+ // perform a checked conversion to prevent writing to a very large offset
+ // in case of corruption due to e.g. invalid multithreaded usage.
+ var serverOffset = checked((ulong)(_position - writeLength));
- return Task.CompletedTask;
+ await _session.RequestWriteAsync(
+ Handle,
+ serverOffset,
+ _writeBuffer.DangerousGetUnderlyingBuffer(),
+ _writeBuffer.ActiveStartOffset,
+ writeLength,
+ cancellationToken).ConfigureAwait(false);
+
+ _writeBuffer.Discard(writeLength);
}
- ///
- /// Reads a sequence of bytes from the current stream and advances the position within the stream by the
- /// number of bytes read.
- ///
- /// An array of bytes. When this method returns, the buffer contains the specified byte array with the values between and ( + - 1) replaced by the bytes read from the current source.
- /// The zero-based byte offset in at which to begin storing the data read from the current stream.
- /// The maximum number of bytes to be read from the current stream.
- ///
- /// The total number of bytes read into the buffer. This can be less than the number of bytes requested
- /// if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.
- ///
- /// The sum of and is larger than the buffer length.
- /// is .
- /// or is negative.
- /// An I/O error occurs.
- /// The stream does not support reading.
- /// Methods were called after the stream was closed.
- ///
- ///
- /// This method attempts to read up to bytes. This either from the buffer, from the
- /// server (using one or more SSH_FXP_READ requests) or using a combination of both.
- ///
- ///
- /// The read loop is interrupted when either bytes are read, the server returns zero
- /// bytes (EOF) or less bytes than the read buffer size.
- ///
- ///
- /// When a server returns less number of bytes than the read buffer size, this may indicate that EOF has
- /// been reached. A subsequent (SSH_FXP_READ) server request is necessary to make sure EOF has effectively
- /// been reached. Breaking out of the read loop avoids reading from the server twice to determine EOF: once in
- /// the read loop, and once upon the next or invocation.
- ///
- ///
+ private void InvalidateReads()
+ {
+ _readBuffer = ReadOnlyMemory.Empty;
+ _sftpFileReader?.Dispose();
+ _sftpFileReader = null;
+ }
+
+ ///
public override int Read(byte[] buffer, int offset, int count)
{
#if !NET
@@ -420,766 +342,393 @@ public override int Read(byte[] buffer, int offset, int count)
#endif
ValidateBufferArguments(buffer, offset, count);
- var readLen = 0;
-
- // Lock down the file stream while we do this.
- lock (_lock)
- {
- CheckSessionIsOpen();
+ return Read(buffer.AsSpan(offset, count));
+ }
- // Set up for the read operation.
- SetupRead();
+#if NET
+ ///
+ public override int Read(Span buffer)
+#else
+ private int Read(Span buffer)
+#endif
+ {
+ ThrowIfNotReadable();
- // Read data into the caller's buffer.
- while (count > 0)
+ if (_readBuffer.IsEmpty)
+ {
+ if (_sftpFileReader is null)
{
- // How much data do we have available in the buffer?
- var bytesAvailableInBuffer = _bufferLen - _bufferPosition;
- if (bytesAvailableInBuffer <= 0)
- {
- var data = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize);
-
- if (data.Length == 0)
- {
- _bufferPosition = 0;
- _bufferLen = 0;
-
- break;
- }
-
- var bytesToWriteToCallerBuffer = count;
- if (bytesToWriteToCallerBuffer >= data.Length)
- {
- // write all data read to caller-provided buffer
- bytesToWriteToCallerBuffer = data.Length;
-
- // reset buffer since we will skip buffering
- _bufferPosition = 0;
- _bufferLen = 0;
- }
- else
- {
- // determine number of bytes that we should write into read buffer
- var bytesToWriteToReadBuffer = data.Length - bytesToWriteToCallerBuffer;
-
- // write remaining bytes to read buffer
- Buffer.BlockCopy(data, count, GetOrCreateReadBuffer(), 0, bytesToWriteToReadBuffer);
-
- // update position in read buffer
- _bufferPosition = 0;
-
- // update number of bytes in read buffer
- _bufferLen = bytesToWriteToReadBuffer;
- }
-
- // write bytes to caller-provided buffer
- Buffer.BlockCopy(data, 0, buffer, offset, bytesToWriteToCallerBuffer);
-
- // update stream position
- _position += bytesToWriteToCallerBuffer;
-
- // record total number of bytes read into caller-provided buffer
- readLen += bytesToWriteToCallerBuffer;
-
- // break out of the read loop when the server returned less than the request number of bytes
- // as that *may* indicate that we've reached EOF
- //
- // doing this avoids reading from server twice to determine EOF: once in the read loop, and
- // once upon the next Read or ReadByte invocation by the caller
- if (data.Length < _readBufferSize)
- {
- break;
- }
-
- // advance offset to start writing bytes into caller-provided buffer
- offset += bytesToWriteToCallerBuffer;
-
- // update number of bytes left to read into caller-provided buffer
- count -= bytesToWriteToCallerBuffer;
- }
- else
- {
- // limit the number of bytes to use from read buffer to the caller-request number of bytes
- if (bytesAvailableInBuffer > count)
- {
- bytesAvailableInBuffer = count;
- }
+ Flush();
+ _sftpFileReader = new(Handle, _session, _readBufferSize, _position, MaxPendingReads, fileSize: null);
+ }
- // copy data from read buffer to the caller-provided buffer
- Buffer.BlockCopy(GetOrCreateReadBuffer(), _bufferPosition, buffer, offset, bytesAvailableInBuffer);
+ _readBuffer = _sftpFileReader.ReadAsync(CancellationToken.None).GetAwaiter().GetResult();
- // update position in read buffer
- _bufferPosition += bytesAvailableInBuffer;
+ if (_readBuffer.IsEmpty)
+ {
+ // If we've hit EOF then throw away this reader instance.
+ // If Read is called again we will create a new reader.
+ // This takes care of the case when a file is expanding
+ // during reading.
+ _sftpFileReader.Dispose();
+ _sftpFileReader = null;
+ }
+ }
- // update stream position
- _position += bytesAvailableInBuffer;
+ Debug.Assert(_writeBuffer.ActiveLength == 0, "Write buffer should be empty when reading.");
- // record total number of bytes read into caller-provided buffer
- readLen += bytesAvailableInBuffer;
+ var bytesRead = Math.Min(buffer.Length, _readBuffer.Length);
- // advance offset to start writing bytes into caller-provided buffer
- offset += bytesAvailableInBuffer;
+ _readBuffer.Span.Slice(0, bytesRead).CopyTo(buffer);
+ _readBuffer = _readBuffer.Slice(bytesRead);
- // update number of bytes left to read
- count -= bytesAvailableInBuffer;
- }
- }
- }
+ _position += bytesRead;
- // return the number of bytes that were read to the caller.
- return readLen;
+ return bytesRead;
}
- ///
- /// Asynchronously reads a sequence of bytes from the current stream and advances the position within the stream by the
- /// number of bytes read.
- ///
- /// An array of bytes. When this method returns, the buffer contains the specified byte array with the values between and ( + - 1) replaced by the bytes read from the current source.
- /// The zero-based byte offset in at which to begin storing the data read from the current stream.
- /// The maximum number of bytes to be read from the current stream.
- /// The to observe.
- /// A that represents the asynchronous read operation.
- public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ ///
+ public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
#if !NET
ThrowHelper.
#endif
ValidateBufferArguments(buffer, offset, count);
- cancellationToken.ThrowIfCancellationRequested();
-
- var readLen = 0;
-
- CheckSessionIsOpen();
+ return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+ }
- // Set up for the read operation.
- SetupRead();
+#if NET
+ ///
+ public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default)
+#else
+ private async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken)
+#endif
+ {
+ ThrowIfNotReadable();
- // Read data into the caller's buffer.
- while (count > 0)
+ if (_readBuffer.IsEmpty)
{
- // How much data do we have available in the buffer?
- var bytesAvailableInBuffer = _bufferLen - _bufferPosition;
- if (bytesAvailableInBuffer <= 0)
+ if (_sftpFileReader is null)
{
- var data = await _session.RequestReadAsync(_handle, (ulong)_position, (uint)_readBufferSize, cancellationToken).ConfigureAwait(false);
-
- if (data.Length == 0)
- {
- _bufferPosition = 0;
- _bufferLen = 0;
-
- break;
- }
-
- var bytesToWriteToCallerBuffer = count;
- if (bytesToWriteToCallerBuffer >= data.Length)
- {
- // write all data read to caller-provided buffer
- bytesToWriteToCallerBuffer = data.Length;
-
- // reset buffer since we will skip buffering
- _bufferPosition = 0;
- _bufferLen = 0;
- }
- else
- {
- // determine number of bytes that we should write into read buffer
- var bytesToWriteToReadBuffer = data.Length - bytesToWriteToCallerBuffer;
-
- // write remaining bytes to read buffer
- Buffer.BlockCopy(data, count, GetOrCreateReadBuffer(), 0, bytesToWriteToReadBuffer);
-
- // update position in read buffer
- _bufferPosition = 0;
-
- // update number of bytes in read buffer
- _bufferLen = bytesToWriteToReadBuffer;
- }
-
- // write bytes to caller-provided buffer
- Buffer.BlockCopy(data, 0, buffer, offset, bytesToWriteToCallerBuffer);
+ await FlushAsync(cancellationToken).ConfigureAwait(false);
- // update stream position
- _position += bytesToWriteToCallerBuffer;
-
- // record total number of bytes read into caller-provided buffer
- readLen += bytesToWriteToCallerBuffer;
-
- // break out of the read loop when the server returned less than the request number of bytes
- // as that *may* indicate that we've reached EOF
- //
- // doing this avoids reading from server twice to determine EOF: once in the read loop, and
- // once upon the next Read or ReadByte invocation by the caller
- if (data.Length < _readBufferSize)
- {
- break;
- }
-
- // advance offset to start writing bytes into caller-provided buffer
- offset += bytesToWriteToCallerBuffer;
-
- // update number of bytes left to read into caller-provided buffer
- count -= bytesToWriteToCallerBuffer;
+ _sftpFileReader = new(Handle, _session, _readBufferSize, _position, MaxPendingReads, fileSize: null);
}
- else
- {
- // limit the number of bytes to use from read buffer to the caller-request number of bytes
- if (bytesAvailableInBuffer > count)
- {
- bytesAvailableInBuffer = count;
- }
- // copy data from read buffer to the caller-provided buffer
- Buffer.BlockCopy(GetOrCreateReadBuffer(), _bufferPosition, buffer, offset, bytesAvailableInBuffer);
+ _readBuffer = await _sftpFileReader.ReadAsync(cancellationToken).ConfigureAwait(false);
- // update position in read buffer
- _bufferPosition += bytesAvailableInBuffer;
+ if (_readBuffer.IsEmpty)
+ {
+ // If we've hit EOF then throw away this reader instance.
+ // If Read is called again we will create a new reader.
+ // This takes care of the case when a file is expanding
+ // during reading.
+ _sftpFileReader.Dispose();
+ _sftpFileReader = null;
+ }
+ }
- // update stream position
- _position += bytesAvailableInBuffer;
+ Debug.Assert(_writeBuffer.ActiveLength == 0, "Write buffer should be empty when reading.");
- // record total number of bytes read into caller-provided buffer
- readLen += bytesAvailableInBuffer;
+ var bytesRead = Math.Min(buffer.Length, _readBuffer.Length);
- // advance offset to start writing bytes into caller-provided buffer
- offset += bytesAvailableInBuffer;
+ _readBuffer.Slice(0, bytesRead).CopyTo(buffer);
+ _readBuffer = _readBuffer.Slice(bytesRead);
- // update number of bytes left to read
- count -= bytesAvailableInBuffer;
- }
- }
+ _position += bytesRead;
- // return the number of bytes that were read to the caller.
- return readLen;
+ return bytesRead;
}
- ///
- /// Reads a byte from the stream and advances the position within the stream by one byte, or returns -1 if at the end of the stream.
- ///
- ///
- /// The unsigned byte cast to an , or -1 if at the end of the stream.
- ///
- /// The stream does not support reading.
- /// Methods were called after the stream was closed.
- /// Read operation failed.
+#if NET
+ ///
public override int ReadByte()
{
- // Lock down the file stream while we do this.
- lock (_lock)
- {
- CheckSessionIsOpen();
-
- // Setup the object for reading.
- SetupRead();
-
- byte[] readBuffer;
-
- // Read more data into the internal buffer if necessary.
- if (_bufferPosition >= _bufferLen)
- {
- var data = _session.RequestRead(_handle, (ulong)_position, (uint)_readBufferSize);
- if (data.Length == 0)
- {
- // We've reached EOF.
- return -1;
- }
+ byte b = default;
+ var read = Read(new Span(ref b));
+ return read == 0 ? -1 : b;
+ }
+#endif
- readBuffer = GetOrCreateReadBuffer();
- Buffer.BlockCopy(data, 0, readBuffer, 0, data.Length);
+ ///
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ {
+ return TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count), callback, state);
+ }
- _bufferPosition = 0;
- _bufferLen = data.Length;
- }
- else
- {
- readBuffer = GetOrCreateReadBuffer();
- }
+ ///
+ public override int EndRead(IAsyncResult asyncResult)
+ {
+ return TaskToAsyncResult.End(asyncResult);
+ }
- // Extract the next byte from the buffer.
- ++_position;
+ ///
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+#if !NET
+ ThrowHelper.
+#endif
+ ValidateBufferArguments(buffer, offset, count);
- return readBuffer[_bufferPosition++];
- }
+ Write(buffer.AsSpan(offset, count));
}
- ///
- /// Sets the position within the current stream.
- ///
- /// A byte offset relative to the parameter.
- /// A value of type indicating the reference point used to obtain the new position.
- ///
- /// The new position within the current stream.
- ///
- /// An I/O error occurs.
- /// The stream does not support seeking, such as if the stream is constructed from a pipe or console output.
- /// Methods were called after the stream was closed.
- public override long Seek(long offset, SeekOrigin origin)
+#if NET
+ ///
+ public override void Write(ReadOnlySpan buffer)
+#else
+ private void Write(ReadOnlySpan buffer)
+#endif
{
- long newPosn;
-
- // Lock down the file stream while we do this.
- lock (_lock)
- {
- CheckSessionIsOpen();
+ ThrowIfNotWriteable();
- if (!CanSeek)
- {
- throw new NotSupportedException("Seek is not supported.");
- }
+ InvalidateReads();
- // Don't do anything if the position won't be moving.
- if (origin == SeekOrigin.Begin && offset == _position)
- {
- return offset;
- }
+ while (!buffer.IsEmpty)
+ {
+ var byteCount = Math.Min(buffer.Length, _writeBuffer.AvailableLength);
- if (origin == SeekOrigin.Current && offset == 0)
- {
- return _position;
- }
+ buffer.Slice(0, byteCount).CopyTo(_writeBuffer.AvailableSpan);
- // The behaviour depends upon the read/write mode.
- if (_bufferOwnedByWrite)
- {
- // Flush the write buffer and then seek.
- FlushWriteBuffer();
- }
- else
- {
- // Determine if the seek is to somewhere inside
- // the current read buffer bounds.
- if (origin == SeekOrigin.Begin)
- {
- newPosn = _position - _bufferPosition;
- if (offset >= newPosn && offset < (newPosn + _bufferLen))
- {
- _bufferPosition = (int)(offset - newPosn);
- _position = offset;
- return _position;
- }
- }
- else if (origin == SeekOrigin.Current)
- {
- newPosn = _position + offset;
- if (newPosn >= (_position - _bufferPosition) &&
- newPosn < (_position - _bufferPosition + _bufferLen))
- {
- _bufferPosition = (int)(newPosn - (_position - _bufferPosition));
- _position = newPosn;
- return _position;
- }
- }
+ buffer = buffer.Slice(byteCount);
- // Abandon the read buffer.
- _bufferPosition = 0;
- _bufferLen = 0;
- }
+ _writeBuffer.Commit(byteCount);
- // Seek to the new position.
- switch (origin)
- {
- case SeekOrigin.Begin:
- newPosn = offset;
- break;
- case SeekOrigin.Current:
- newPosn = _position + offset;
- break;
- case SeekOrigin.End:
- var attributes = _session.RequestFStat(_handle, nullOnError: false);
- newPosn = attributes.Size + offset;
- break;
- default:
- throw new ArgumentException("Invalid seek origin.", nameof(origin));
- }
+ _position += byteCount;
- if (newPosn < 0)
+ if (_writeBuffer.AvailableLength == 0)
{
- throw new EndOfStreamException();
+ Flush();
}
-
- _position = newPosn;
- return _position;
}
}
- ///
- /// Sets the length of the current stream.
- ///
- /// The desired length of the current stream in bytes.
- /// An I/O error occurs.
- /// The stream does not support both writing and seeking.
- /// Methods were called after the stream was closed.
- /// must be greater than zero.
- ///
- ///
- /// Buffers are first flushed.
- ///
- ///
- /// If the specified value is less than the current length of the stream, the stream is truncated and - if the
- /// current position is greater than the new length - the current position is moved to the last byte of the stream.
- ///
- ///
- /// If the given value is greater than the current length of the stream, the stream is expanded and the current
- /// position remains the same.
- ///
- ///
- public override void SetLength(long value)
+ ///
+ public override void WriteByte(byte value)
{
- ThrowHelper.ThrowIfNegative(value);
-
- // Lock down the file stream while we do this.
- lock (_lock)
- {
- CheckSessionIsOpen();
-
- if (!CanSeek)
- {
- throw new NotSupportedException("Seek is not supported.");
- }
-
- if (_bufferOwnedByWrite)
- {
- FlushWriteBuffer();
- }
- else
- {
- SetupWrite();
- }
-
- var attributes = _session.RequestFStat(_handle, nullOnError: false);
- attributes.Size = value;
- _session.RequestFSetStat(_handle, attributes);
-
- if (_position > value)
- {
- _position = value;
- }
- }
+ Write([value]);
}
- ///
- /// Writes a sequence of bytes to the current stream and advances the current position within this stream by the number of bytes written.
- ///
- /// An array of bytes. This method copies bytes from to the current stream.
- /// The zero-based byte offset in at which to begin copying bytes to the current stream.
- /// The number of bytes to be written to the current stream.
- /// The sum of and is greater than the buffer length.
- /// is .
- /// or is negative.
- /// An I/O error occurs.
- /// The stream does not support writing.
- /// Methods were called after the stream was closed.
- public override void Write(byte[] buffer, int offset, int count)
+ ///
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
#if !NET
ThrowHelper.
#endif
ValidateBufferArguments(buffer, offset, count);
- // Lock down the file stream while we do this.
- lock (_lock)
- {
- CheckSessionIsOpen();
+ return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+ }
- // Setup this object for writing.
- SetupWrite();
+#if NET
+ ///
+ public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default)
+#else
+ private async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken)
+#endif
+ {
+ ThrowIfNotWriteable();
- // Write data to the file stream.
- while (count > 0)
- {
- // Determine how many bytes we can write to the buffer.
- var tempLen = _writeBufferSize - _bufferPosition;
- if (tempLen <= 0)
- {
- // flush write buffer, and mark it empty
- FlushWriteBuffer();
+ InvalidateReads();
- // we can now write or buffer the full buffer size
- tempLen = _writeBufferSize;
- }
+ while (!buffer.IsEmpty)
+ {
+ var byteCount = Math.Min(buffer.Length, _writeBuffer.AvailableLength);
- // limit the number of bytes to write to the actual number of bytes requested
- if (tempLen > count)
- {
- tempLen = count;
- }
+ buffer.Slice(0, byteCount).CopyTo(_writeBuffer.AvailableMemory);
- // Can we short-cut the internal buffer?
- if (_bufferPosition == 0 && tempLen == _writeBufferSize)
- {
- using (var wait = new AutoResetEvent(initialState: false))
- {
- _session.RequestWrite(_handle, (ulong)_position, buffer, offset, tempLen, wait);
- }
- }
- else
- {
- // No: copy the data to the write buffer first.
- Buffer.BlockCopy(buffer, offset, GetOrCreateWriteBuffer(), _bufferPosition, tempLen);
- _bufferPosition += tempLen;
- }
+ buffer = buffer.Slice(byteCount);
- // Advance the buffer and stream positions.
- _position += tempLen;
- offset += tempLen;
- count -= tempLen;
- }
+ _writeBuffer.Commit(byteCount);
- // If the buffer is full, then do a speculative flush now,
- // rather than waiting for the next call to this method.
- if (_bufferPosition >= _writeBufferSize)
- {
- using (var wait = new AutoResetEvent(initialState: false))
- {
- _session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), GetOrCreateWriteBuffer(), 0, _bufferPosition, wait);
- }
+ _position += byteCount;
- _bufferPosition = 0;
+ if (_writeBuffer.AvailableLength == 0)
+ {
+ await FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
}
- ///
- /// Asynchronously writes a sequence of bytes to the current stream and advances the current position within this stream by the number of bytes written.
- ///
- /// An array of bytes. This method copies bytes from to the current stream.
- /// The zero-based byte offset in at which to begin copying bytes to the current stream.
- /// The number of bytes to be written to the current stream.
- /// The to observe.
- /// A that represents the asynchronous write operation.
- /// The sum of and is greater than the buffer length.
- /// is .
- /// or is negative.
- /// An I/O error occurs.
- /// The stream does not support writing.
- /// Methods were called after the stream was closed.
- public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ ///
+ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
-#if !NET
- ThrowHelper.
-#endif
- ValidateBufferArguments(buffer, offset, count);
+ return TaskToAsyncResult.Begin(WriteAsync(buffer, offset, count), callback, state);
+ }
- cancellationToken.ThrowIfCancellationRequested();
+ ///
+ public override void EndWrite(IAsyncResult asyncResult)
+ {
+ TaskToAsyncResult.End(asyncResult);
+ }
- CheckSessionIsOpen();
+ ///
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ ThrowIfNotSeekable();
- // Setup this object for writing.
- SetupWrite();
+ Flush();
- // Write data to the file stream.
- while (count > 0)
+ var newPosition = origin switch
{
- // Determine how many bytes we can write to the buffer.
- var tempLen = _writeBufferSize - _bufferPosition;
- if (tempLen <= 0)
- {
- // flush write buffer, and mark it empty
- await FlushWriteBufferAsync(cancellationToken).ConfigureAwait(false);
+ SeekOrigin.Begin => offset,
+ SeekOrigin.Current => _position + offset,
+ SeekOrigin.End => _session.RequestFStat(Handle).Size + offset,
+ _ => throw new ArgumentOutOfRangeException(nameof(origin))
+ };
- // we can now write or buffer the full buffer size
- tempLen = _writeBufferSize;
- }
-
- // limit the number of bytes to write to the actual number of bytes requested
- if (tempLen > count)
- {
- tempLen = count;
- }
+ if (newPosition < 0)
+ {
+ throw new IOException("An attempt was made to move the position before the beginning of the stream.");
+ }
- // Can we short-cut the internal buffer?
- if (_bufferPosition == 0 && tempLen == _writeBufferSize)
- {
- await _session.RequestWriteAsync(_handle, (ulong)_position, buffer, offset, tempLen, cancellationToken).ConfigureAwait(false);
- }
- else
- {
- // No: copy the data to the write buffer first.
- Buffer.BlockCopy(buffer, offset, GetOrCreateWriteBuffer(), _bufferPosition, tempLen);
- _bufferPosition += tempLen;
- }
+ var readBufferStart = _position; // inclusive
+ var readBufferEnd = _position + _readBuffer.Length; // exclusive
- // Advance the buffer and stream positions.
- _position += tempLen;
- offset += tempLen;
- count -= tempLen;
+ if (readBufferStart <= newPosition && newPosition <= readBufferEnd)
+ {
+ _readBuffer = _readBuffer.Slice((int)(newPosition - readBufferStart));
}
-
- // If the buffer is full, then do a speculative flush now,
- // rather than waiting for the next call to this method.
- if (_bufferPosition >= _writeBufferSize)
+ else
{
- await _session.RequestWriteAsync(_handle, (ulong)(_position - _bufferPosition), GetOrCreateWriteBuffer(), 0, _bufferPosition, cancellationToken).ConfigureAwait(false);
- _bufferPosition = 0;
+ InvalidateReads();
}
+
+ return _position = newPosition;
}
- ///
- /// Writes a byte to the current position in the stream and advances the position within the stream by one byte.
- ///
- /// The byte to write to the stream.
- /// An I/O error occurs.
- /// The stream does not support writing, or the stream is already closed.
- /// Methods were called after the stream was closed.
- public override void WriteByte(byte value)
+ ///
+ public override void SetLength(long value)
{
- // Lock down the file stream while we do this.
- lock (_lock)
- {
- CheckSessionIsOpen();
-
- // Setup the object for writing.
- SetupWrite();
-
- var writeBuffer = GetOrCreateWriteBuffer();
+ ThrowHelper.ThrowIfNegative(value);
+ ThrowIfNotWriteable();
+ ThrowIfNotSeekable();
- // Flush the current buffer if it is full.
- if (_bufferPosition >= _writeBufferSize)
- {
- using (var wait = new AutoResetEvent(initialState: false))
- {
- _session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), writeBuffer, 0, _bufferPosition, wait);
- }
+ Flush();
+ InvalidateReads();
- _bufferPosition = 0;
- }
+ var attributes = _session.RequestFStat(Handle);
+ attributes.Size = value;
+ _session.RequestFSetStat(Handle, attributes);
- // Write the byte into the buffer and advance the posn.
- writeBuffer[_bufferPosition++] = value;
- ++_position;
+ if (_position > value)
+ {
+ _position = value;
}
}
- ///
- /// Releases the unmanaged resources used by the and optionally releases the managed resources.
- ///
- /// to release both managed and unmanaged resources; to release only unmanaged resources.
+ ///
protected override void Dispose(bool disposing)
{
- base.Dispose(disposing);
+ if (_disposed)
+ {
+ return;
+ }
- if (_session != null)
+ try
{
- if (disposing)
+ if (disposing && _session.IsOpen)
{
- lock (_lock)
+ try
{
- if (_session != null)
+ Flush();
+ }
+ finally
+ {
+ if (_session.IsOpen)
{
- _canRead = false;
- _canSeek = false;
- _canWrite = false;
-
- if (_handle != null)
- {
- if (_session.IsOpen)
- {
- if (_bufferOwnedByWrite)
- {
- FlushWriteBuffer();
- }
-
- _session.RequestClose(_handle);
- }
-
- _handle = null;
- }
-
- _session = null;
+ _session.RequestClose(Handle);
}
}
}
}
+ finally
+ {
+ _disposed = true;
+ InvalidateReads();
+ base.Dispose(disposing);
+ }
}
- private byte[] GetOrCreateReadBuffer()
- {
- _readBuffer ??= new byte[_readBufferSize];
- return _readBuffer;
- }
-
- private byte[] GetOrCreateWriteBuffer()
- {
- _writeBuffer ??= new byte[_writeBufferSize];
- return _writeBuffer;
- }
-
- ///
- /// Flushes the read data from the buffer.
- ///
- private void FlushReadBuffer()
+#if NET
+ ///
+#pragma warning disable CA2215 // Dispose methods should call base class dispose
+ public override async ValueTask DisposeAsync()
+#pragma warning restore CA2215 // Dispose methods should call base class dispose
+#else
+ internal async ValueTask DisposeAsync()
+#endif
{
- _bufferPosition = 0;
- _bufferLen = 0;
- }
+ if (_disposed)
+ {
+ return;
+ }
- ///
- /// Flush any buffered write data to the file.
- ///
- private void FlushWriteBuffer()
- {
- if (_bufferPosition > 0)
+ try
{
- using (var wait = new AutoResetEvent(initialState: false))
+ if (_session.IsOpen)
{
- _session.RequestWrite(_handle, (ulong)(_position - _bufferPosition), _writeBuffer, 0, _bufferPosition, wait);
+ try
+ {
+ await FlushAsync().ConfigureAwait(false);
+ }
+ finally
+ {
+ if (_session.IsOpen)
+ {
+ await _session.RequestCloseAsync(Handle, CancellationToken.None).ConfigureAwait(false);
+ }
+ }
}
-
- _bufferPosition = 0;
}
- }
-
- private async Task FlushWriteBufferAsync(CancellationToken cancellationToken)
- {
- if (_bufferPosition > 0)
+ finally
{
- await _session.RequestWriteAsync(_handle, (ulong)(_position - _bufferPosition), _writeBuffer, 0, _bufferPosition, cancellationToken).ConfigureAwait(false);
- _bufferPosition = 0;
+ _disposed = true;
+ InvalidateReads();
+ base.Dispose(disposing: false);
}
}
- ///
- /// Setups the read.
- ///
- private void SetupRead()
+ private void ThrowIfNotSeekable()
{
- if (!CanRead)
+ if (!CanSeek)
{
- throw new NotSupportedException("Read not supported.");
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
+ Throw();
}
- if (_bufferOwnedByWrite)
+ static void Throw()
{
- FlushWriteBuffer();
- _bufferOwnedByWrite = false;
+ throw new NotSupportedException("Stream does not support seeking.");
}
}
- ///
- /// Setups the write.
- ///
- private void SetupWrite()
+ private void ThrowIfNotWriteable()
{
if (!CanWrite)
{
- throw new NotSupportedException("Write not supported.");
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
+ Throw();
}
- if (!_bufferOwnedByWrite)
+ static void Throw()
{
- FlushReadBuffer();
- _bufferOwnedByWrite = true;
+ throw new NotSupportedException("Stream does not support writing.");
}
}
- private void CheckSessionIsOpen()
+ private void ThrowIfNotReadable()
{
- ThrowHelper.ThrowObjectDisposedIf(_session is null, this);
+ if (!CanRead)
+ {
+ ThrowHelper.ThrowObjectDisposedIf(_disposed, this);
+ Throw();
+ }
- if (!_session.IsOpen)
+ static void Throw()
{
- throw new ObjectDisposedException(GetType().FullName, "Cannot access a closed SFTP session.");
+ throw new NotSupportedException("Stream does not support reading.");
}
}
}
diff --git a/src/Renci.SshNet/Sftp/SftpSession.cs b/src/Renci.SshNet/Sftp/SftpSession.cs
index 1de63eaf2..9208af4a9 100644
--- a/src/Renci.SshNet/Sftp/SftpSession.cs
+++ b/src/Renci.SshNet/Sftp/SftpSession.cs
@@ -251,23 +251,6 @@ public async Task GetCanonicalPathAsync(string path, CancellationToken c
return canonizedPath + slash + pathParts[pathParts.Length - 1];
}
- ///
- /// Creates an for reading the content of the file represented by a given .
- ///
- /// The handle of the file to read.
- /// The SFTP session.
- /// The maximum number of bytes to read with each chunk.
- /// The maximum number of pending reads.
- /// The size of the file or when the size could not be determined.
- ///
- /// An for reading the content of the file represented by the
- /// specified .
- ///
- public ISftpFileReader CreateFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSize)
- {
- return new SftpFileReader(handle, sftpSession, chunkSize, maxPendingReads, fileSize);
- }
-
internal string GetFullRemotePath(string path)
{
var fullPath = path;
@@ -820,6 +803,8 @@ public byte[] RequestRead(byte[] handle, ulong offset, uint length)
///
public Task RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken)
{
+ Debug.Assert(length > 0, "This implementation cannot distinguish between EOF and zero-length reads");
+
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
@@ -1083,7 +1068,7 @@ public SftpFileAttributes EndLStat(SFtpStatAsyncResult asyncResult)
///
/// File attributes.
///
- public SftpFileAttributes RequestFStat(byte[] handle, bool nullOnError)
+ public SftpFileAttributes RequestFStat(byte[] handle, bool nullOnError = false)
{
SshException exception = null;
SftpFileAttributes attributes = null;
diff --git a/src/Renci.SshNet/SftpClient.cs b/src/Renci.SshNet/SftpClient.cs
index 949c64b3e..f2c3a2b89 100644
--- a/src/Renci.SshNet/SftpClient.cs
+++ b/src/Renci.SshNet/SftpClient.cs
@@ -1,5 +1,6 @@
#nullable enable
using System;
+using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
@@ -899,17 +900,33 @@ public async Task ExistsAsync(string path, CancellationToken cancellationT
///
public void DownloadFile(string path, Stream output, Action? downloadCallback = null)
{
+ ThrowHelper.ThrowIfNullOrWhiteSpace(path);
+ ThrowHelper.ThrowIfNull(output);
CheckDisposed();
- InternalDownloadFile(path, output, asyncResult: null, downloadCallback);
+ InternalDownloadFile(
+ path,
+ output,
+ asyncResult: null,
+ downloadCallback,
+ isAsync: false,
+ CancellationToken.None).GetAwaiter().GetResult();
}
///
public Task DownloadFileAsync(string path, Stream output, CancellationToken cancellationToken = default)
{
+ ThrowHelper.ThrowIfNullOrWhiteSpace(path);
+ ThrowHelper.ThrowIfNull(output);
CheckDisposed();
- return InternalDownloadFileAsync(path, output, cancellationToken);
+ return InternalDownloadFile(
+ path,
+ output,
+ asyncResult: null,
+ downloadCallback: null,
+ isAsync: true,
+ cancellationToken);
}
///
@@ -976,17 +993,25 @@ public IAsyncResult BeginDownloadFile(string path, Stream output, AsyncCallback?
///
public IAsyncResult BeginDownloadFile(string path, Stream output, AsyncCallback? asyncCallback, object? state, Action? downloadCallback = null)
{
- CheckDisposed();
ThrowHelper.ThrowIfNullOrWhiteSpace(path);
ThrowHelper.ThrowIfNull(output);
+ CheckDisposed();
var asyncResult = new SftpDownloadAsyncResult(asyncCallback, state);
- ThreadAbstraction.ExecuteThread(() =>
+ _ = DoDownloadAndSetResult();
+
+ async Task DoDownloadAndSetResult()
{
try
{
- InternalDownloadFile(path, output, asyncResult, downloadCallback);
+ await InternalDownloadFile(
+ path,
+ output,
+ asyncResult,
+ downloadCallback,
+ isAsync: true,
+ CancellationToken.None).ConfigureAwait(false);
asyncResult.SetAsCompleted(exception: null, completedSynchronously: false);
}
@@ -994,7 +1019,7 @@ public IAsyncResult BeginDownloadFile(string path, Stream output, AsyncCallback?
{
asyncResult.SetAsCompleted(exp, completedSynchronously: false);
}
- });
+ }
return asyncResult;
}
@@ -1050,7 +1075,7 @@ public void UploadFile(Stream input, string path, bool canOverride, Action
@@ -2233,32 +2258,44 @@ private List InternalListDirectory(string path, SftpListDirectoryAsyn
return result;
}
- ///
- /// Internals the download file.
- ///
- /// The path.
- /// The output.
- /// An that references the asynchronous request.
- /// The download callback.
- /// is .
- /// is or contains whitespace.
- /// Client not connected.
- private void InternalDownloadFile(string path, Stream output, SftpDownloadAsyncResult? asyncResult, Action? downloadCallback)
+#pragma warning disable S6966 // Awaitable method should be used
+ private async Task InternalDownloadFile(
+ string path,
+ Stream output,
+ SftpDownloadAsyncResult? asyncResult,
+ Action? downloadCallback,
+ bool isAsync,
+ CancellationToken cancellationToken)
{
- ThrowHelper.ThrowIfNull(output);
- ThrowHelper.ThrowIfNullOrWhiteSpace(path);
+ Debug.Assert(!string.IsNullOrWhiteSpace(path));
+ Debug.Assert(output is not null);
+ Debug.Assert(isAsync || cancellationToken == default);
if (_sftpSession is null)
{
throw new SshConnectionException("Client not connected.");
}
- var fullPath = _sftpSession.GetCanonicalPath(path);
+ SftpFileStream sftpStream;
- using (var fileReader = ServiceFactory.CreateSftpFileReader(fullPath, _sftpSession, _bufferSize))
+ if (isAsync)
+ {
+ var fullPath = await _sftpSession.GetCanonicalPathAsync(path, cancellationToken).ConfigureAwait(false);
+ sftpStream = await OpenAsync(fullPath, FileMode.Open, FileAccess.Read, cancellationToken).ConfigureAwait(false);
+ }
+ else
{
- var totalBytesRead = 0UL;
+ var fullPath = _sftpSession.GetCanonicalPath(path);
+ sftpStream = Open(fullPath, FileMode.Open, FileAccess.Read);
+ }
+ // The below is effectively sftpStream.CopyTo{Async}(output) with consideration
+ // for downloadCallback/asyncResult.
+
+ var buffer = ArrayPool.Shared.Rent(81920);
+ try
+ {
+ ulong totalBytesRead = 0;
while (true)
{
// Cancel download
@@ -2267,15 +2304,33 @@ private void InternalDownloadFile(string path, Stream output, SftpDownloadAsyncR
break;
}
- var data = fileReader.Read();
- if (data.Length == 0)
+ var bytesRead = isAsync
+#if NET
+ ? await sftpStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false)
+#else
+ ? await sftpStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)
+#endif
+ : sftpStream.Read(buffer, 0, buffer.Length);
+
+ if (bytesRead == 0)
{
break;
}
- output.Write(data, 0, data.Length);
+ if (isAsync)
+ {
+#if NET
+ await output.WriteAsync(buffer.AsMemory(0, bytesRead), cancellationToken).ConfigureAwait(false);
+#else
+ await output.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
+#endif
+ }
+ else
+ {
+ output.Write(buffer, 0, bytesRead);
+ }
- totalBytesRead += (ulong)data.Length;
+ totalBytesRead += (ulong)bytesRead;
asyncResult?.Update(totalBytesRead);
@@ -2289,28 +2344,21 @@ private void InternalDownloadFile(string path, Stream output, SftpDownloadAsyncR
}
}
}
- }
-
- private async Task InternalDownloadFileAsync(string path, Stream output, CancellationToken cancellationToken)
- {
- ThrowHelper.ThrowIfNull(output);
- ThrowHelper.ThrowIfNullOrWhiteSpace(path);
-
- if (_sftpSession is null)
+ finally
{
- throw new SshConnectionException("Client not connected.");
- }
+ ArrayPool.Shared.Return(buffer);
- cancellationToken.ThrowIfCancellationRequested();
-
- var fullPath = await _sftpSession.GetCanonicalPathAsync(path, cancellationToken).ConfigureAwait(false);
- var openStreamTask = SftpFileStream.OpenAsync(_sftpSession, fullPath, FileMode.Open, FileAccess.Read, (int)_bufferSize, cancellationToken);
-
- using (var input = await openStreamTask.ConfigureAwait(false))
- {
- await input.CopyToAsync(output, 81920, cancellationToken).ConfigureAwait(false);
+ if (isAsync)
+ {
+ await sftpStream.DisposeAsync().ConfigureAwait(false);
+ }
+ else
+ {
+ sftpStream.Dispose();
+ }
}
}
+#pragma warning restore S6966 // Awaitable method should be used
#pragma warning disable S6966 // Awaitable method should be used
private async Task InternalUploadFile(
diff --git a/test/Renci.SshNet.IntegrationTests/OldIntegrationTests/SftpClientTest.Download.cs b/test/Renci.SshNet.IntegrationTests/OldIntegrationTests/SftpClientTest.Download.cs
index 8a2be6bae..e6e62cd5d 100644
--- a/test/Renci.SshNet.IntegrationTests/OldIntegrationTests/SftpClientTest.Download.cs
+++ b/test/Renci.SshNet.IntegrationTests/OldIntegrationTests/SftpClientTest.Download.cs
@@ -65,7 +65,7 @@ public async Task Test_Sftp_DownloadAsync_Cancellation_Requested()
var cancelledToken = new CancellationToken(true);
- await Assert.ThrowsExactlyAsync(() => sftp.DownloadFileAsync("/xxx/eee/yyy", Stream.Null, cancelledToken));
+ await Assert.ThrowsAsync(() => sftp.DownloadFileAsync("/xxx/eee/yyy", Stream.Null, cancelledToken));
}
}
diff --git a/test/Renci.SshNet.IntegrationTests/SftpTests.cs b/test/Renci.SshNet.IntegrationTests/SftpTests.cs
index 3c65bb0c3..b673c6808 100644
--- a/test/Renci.SshNet.IntegrationTests/SftpTests.cs
+++ b/test/Renci.SshNet.IntegrationTests/SftpTests.cs
@@ -4353,11 +4353,11 @@ public void Sftp_SftpFileStream_Seek_BeyondEndOfFile_SeekOriginBegin()
Assert.AreEqual(0x04, fs.ReadByte());
var soughtOverReadBuffer = new byte[seekOffset - 1];
- Assert.AreEqual(soughtOverReadBuffer.Length, fs.Read(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length));
- Assert.IsTrue(new byte[soughtOverReadBuffer.Length].IsEqualTo(soughtOverReadBuffer));
+ fs.ReadExactly(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length);
+ CollectionAssert.AreEqual(new byte[soughtOverReadBuffer.Length], soughtOverReadBuffer);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(readBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -4397,11 +4397,11 @@ public void Sftp_SftpFileStream_Seek_BeyondEndOfFile_SeekOriginBegin()
Assert.AreEqual(0x04, fs.ReadByte());
var soughtOverReadBuffer = new byte[seekOffset - 1];
- Assert.AreEqual(soughtOverReadBuffer.Length, fs.Read(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length));
- Assert.IsTrue(new byte[soughtOverReadBuffer.Length].IsEqualTo(soughtOverReadBuffer));
+ fs.ReadExactly(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length);
+ CollectionAssert.AreEqual(new byte[soughtOverReadBuffer.Length], soughtOverReadBuffer);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(readBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -4438,7 +4438,7 @@ public void Sftp_SftpFileStream_Seek_BeyondEndOfFile_SeekOriginBegin()
Assert.AreEqual(0x00, fs.ReadByte());
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(writeBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -4474,11 +4474,11 @@ public void Sftp_SftpFileStream_Seek_BeyondEndOfFile_SeekOriginBegin()
Assert.AreEqual(0x04, fs.ReadByte());
var soughtOverReadBuffer = new byte[550 - 1];
- Assert.AreEqual(550 - 1, fs.Read(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length));
- Assert.IsTrue(new byte[550 - 1].IsEqualTo(soughtOverReadBuffer));
+ fs.ReadExactly(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length);
+ CollectionAssert.AreEqual(new byte[550 - 1], soughtOverReadBuffer);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(writeBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -4599,11 +4599,11 @@ public void Sftp_SftpFileStream_Seek_BeyondEndOfFile_SeekOriginEnd()
Assert.AreEqual(0x04, fs.ReadByte());
var soughtOverReadBuffer = new byte[seekOffset];
- Assert.AreEqual(soughtOverReadBuffer.Length, fs.Read(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length));
- Assert.IsTrue(new byte[soughtOverReadBuffer.Length].IsEqualTo(soughtOverReadBuffer));
+ fs.ReadExactly(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length);
+ CollectionAssert.AreEqual(new byte[soughtOverReadBuffer.Length], soughtOverReadBuffer);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(readBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -4641,11 +4641,11 @@ public void Sftp_SftpFileStream_Seek_BeyondEndOfFile_SeekOriginEnd()
Assert.AreEqual(0x04, fs.ReadByte());
var soughtOverReadBuffer = new byte[seekOffset];
- Assert.AreEqual(soughtOverReadBuffer.Length, fs.Read(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length));
- Assert.IsTrue(new byte[soughtOverReadBuffer.Length].IsEqualTo(soughtOverReadBuffer));
+ fs.ReadExactly(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length);
+ CollectionAssert.AreEqual(new byte[soughtOverReadBuffer.Length], soughtOverReadBuffer);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(readBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -4681,11 +4681,11 @@ public void Sftp_SftpFileStream_Seek_BeyondEndOfFile_SeekOriginEnd()
Assert.AreEqual(0x04, fs.ReadByte());
var soughtOverReadBuffer = new byte[seekOffset];
- Assert.AreEqual(soughtOverReadBuffer.Length, fs.Read(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length));
- Assert.IsTrue(new byte[soughtOverReadBuffer.Length].IsEqualTo(soughtOverReadBuffer));
+ fs.ReadExactly(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length);
+ CollectionAssert.AreEqual(new byte[soughtOverReadBuffer.Length], soughtOverReadBuffer);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(writeBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -4722,11 +4722,11 @@ public void Sftp_SftpFileStream_Seek_BeyondEndOfFile_SeekOriginEnd()
Assert.AreEqual(0x04, fs.ReadByte());
var soughtOverReadBuffer = new byte[seekOffset];
- Assert.AreEqual(soughtOverReadBuffer.Length, fs.Read(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length));
- Assert.IsTrue(new byte[soughtOverReadBuffer.Length].IsEqualTo(soughtOverReadBuffer));
+ fs.ReadExactly(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length);
+ CollectionAssert.AreEqual(new byte[soughtOverReadBuffer.Length], soughtOverReadBuffer);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(writeBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -4813,7 +4813,7 @@ public void Sftp_SftpFileStream_Seek_NegativeOffSet_SeekOriginEnd()
Assert.AreEqual(writeBuffer.Length, fs.Length);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(writeBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -4844,8 +4844,8 @@ public void Sftp_SftpFileStream_Seek_NegativeOffSet_SeekOriginEnd()
Assert.AreEqual(writeBuffer.Length + 1, fs.Length);
var readBuffer = new byte[writeBuffer.Length - 3];
- Assert.AreEqual(readBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
- Assert.IsTrue(readBuffer.SequenceEqual(writeBuffer.Take(readBuffer.Length)));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
+ CollectionAssert.AreEqual(writeBuffer.Take(readBuffer.Length), readBuffer);
Assert.AreEqual(0x01, fs.ReadByte());
Assert.AreEqual(0x05, fs.ReadByte());
@@ -4884,8 +4884,8 @@ public void Sftp_SftpFileStream_Seek_NegativeOffSet_SeekOriginEnd()
// First part of file should not have been touched
var readBuffer = new byte[(int)client.BufferSize * 2];
- Assert.AreEqual(readBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
- Assert.IsTrue(readBuffer.SequenceEqual(writeBuffer.Take(readBuffer.Length)));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
+ CollectionAssert.AreEqual(writeBuffer.Take(readBuffer.Length), readBuffer);
// Check part that should have been updated
Assert.AreEqual(0x01, fs.ReadByte());
@@ -4895,8 +4895,10 @@ public void Sftp_SftpFileStream_Seek_NegativeOffSet_SeekOriginEnd()
// Remaining bytes should not have been touched
readBuffer = new byte[((int)client.BufferSize * 2) - 4];
- Assert.AreEqual(readBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
- Assert.IsTrue(readBuffer.SequenceEqual(writeBuffer.Skip(((int)client.BufferSize * 2) + 4).Take(readBuffer.Length)));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
+ CollectionAssert.AreEqual(
+ writeBuffer.Skip(((int)client.BufferSize * 2) + 4).Take(readBuffer.Length).ToArray(),
+ readBuffer);
// Ensure we've reached end of the stream
Assert.AreEqual(-1, fs.ReadByte());
@@ -4987,7 +4989,7 @@ public void Sftp_SftpFileStream_Seek_WithinReadBuffer()
{
var readBuffer = new byte[200];
- Assert.AreEqual(readBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
var newPosition = fs.Seek(offset: 3L, SeekOrigin.Begin);
@@ -5062,11 +5064,11 @@ public void Sftp_SftpFileStream_Seek_WithinReadBuffer()
Assert.AreEqual(0x04, fs.ReadByte());
var soughtOverReadBuffer = new byte[seekOffset - 1];
- Assert.AreEqual(soughtOverReadBuffer.Length, fs.Read(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length));
- Assert.IsTrue(new byte[soughtOverReadBuffer.Length].IsEqualTo(soughtOverReadBuffer));
+ fs.ReadExactly(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length);
+ CollectionAssert.AreEqual(new byte[soughtOverReadBuffer.Length], soughtOverReadBuffer);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(readBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -5104,11 +5106,11 @@ public void Sftp_SftpFileStream_Seek_WithinReadBuffer()
Assert.AreEqual(0x04, fs.ReadByte());
var soughtOverReadBuffer = new byte[seekOffset - 1];
- Assert.AreEqual(soughtOverReadBuffer.Length, fs.Read(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length));
- Assert.IsTrue(new byte[soughtOverReadBuffer.Length].IsEqualTo(soughtOverReadBuffer));
+ fs.ReadExactly(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length);
+ CollectionAssert.AreEqual(new byte[soughtOverReadBuffer.Length], soughtOverReadBuffer);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(readBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -5148,7 +5150,7 @@ public void Sftp_SftpFileStream_Seek_WithinReadBuffer()
Assert.AreEqual(0x00, fs.ReadByte());
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(writeBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -5187,11 +5189,11 @@ public void Sftp_SftpFileStream_Seek_WithinReadBuffer()
Assert.AreEqual(0x04, fs.ReadByte());
var soughtOverReadBuffer = new byte[seekOffset - 1];
- Assert.AreEqual(seekOffset - 1, fs.Read(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length));
- Assert.IsTrue(new byte[seekOffset - 1].IsEqualTo(soughtOverReadBuffer));
+ fs.ReadExactly(soughtOverReadBuffer, offset: 0, soughtOverReadBuffer.Length);
+ CollectionAssert.AreEqual(new byte[seekOffset - 1], soughtOverReadBuffer);
var readBuffer = new byte[writeBuffer.Length];
- Assert.AreEqual(writeBuffer.Length, fs.Read(readBuffer, offset: 0, readBuffer.Length));
+ fs.ReadExactly(readBuffer, offset: 0, readBuffer.Length);
CollectionAssert.AreEqual(writeBuffer, readBuffer);
// Ensure we've reached end of the stream
@@ -6203,6 +6205,136 @@ public void Sftp_SetLastWriteTimeUtc()
}
}
+ [TestMethod]
+ public void Sftp_SftpFileStream_Fuzz()
+ {
+ const int OperationCount = 100;
+ const int MaxBufferSize = 1000;
+ const int MaxFileSize = 15_000;
+
+ int seed = Environment.TickCount;
+
+ Console.WriteLine("Using seed " + seed);
+
+ var random = new Random(seed);
+
+ using var client = new SftpClient(_connectionInfoFactory.Create())
+ {
+ BufferSize = 100
+ };
+ client.Connect();
+
+ // We will perform operations on an SftpFileStream and a local
+ // System.IO.FileStream, and check that the results are the same.
+ // This could use a MemoryStream for the local side, except for the
+ // fact that performing a 0-byte write at a position beyond the length
+ // of the MemoryStream causes its length to increase, which is not the
+ // case for FileStream. Since we've got 'FileStream' in the name, we
+ // check that we align with FileStream's behaviour.
+
+ string remoteFilePath = GenerateUniqueRemoteFileName();
+ string localFilePath = Path.GetTempFileName();
+
+ byte[] fileBytes = new byte[1024];
+ random.NextBytes(fileBytes);
+
+ File.WriteAllBytes(localFilePath, fileBytes);
+ client.WriteAllBytes(remoteFilePath, fileBytes);
+
+ try
+ {
+ using (var local = File.Open(localFilePath, FileMode.Open, FileAccess.ReadWrite))
+ using (var remote = client.Open(remoteFilePath, FileMode.Open, FileAccess.ReadWrite))
+ {
+ for (int i = 0; i < OperationCount; i++)
+ {
+#pragma warning disable IDE0010 // Add missing cases
+ int op = random.Next(5);
+ switch (op)
+ {
+ case 0 when local.Length < MaxFileSize: // Write
+ {
+ var buffer = new byte[random.Next(0, MaxBufferSize)];
+ random.NextBytes(buffer);
+ int offset = random.Next(0, buffer.Length + 1);
+ int count = random.Next(0, buffer.Length - offset + 1);
+
+ remote.Write(buffer, offset, count);
+ local.Write(buffer, offset, count);
+ break;
+ }
+ case 1: // Read
+ {
+ var remoteBuffer = new byte[random.Next(0, MaxBufferSize)];
+ var localBuffer = new byte[remoteBuffer.Length];
+ int offset = random.Next(0, remoteBuffer.Length + 1);
+ int count = random.Next(0, remoteBuffer.Length - offset + 1);
+
+ int remoteRead = ReadExactly(remote, remoteBuffer, offset, count);
+ int localRead = ReadExactly(local, localBuffer, offset, count);
+
+ Assert.AreEqual(localRead, remoteRead);
+ CollectionAssert.AreEqual(localBuffer, remoteBuffer);
+ break;
+ }
+ case 2 when local.Length < MaxFileSize: // Seek
+ {
+ int position = (int)local.Position;
+ int length = (int)local.Length;
+
+ SeekOrigin origin = (SeekOrigin)random.Next(0, 3);
+ long offset = 0;
+ switch (origin)
+ {
+ case SeekOrigin.Begin:
+ offset = random.Next(0, length * 2);
+ break;
+ case SeekOrigin.Current:
+ offset = random.Next(-position, position);
+ break;
+ case SeekOrigin.End:
+ offset = random.Next(-length, length);
+ break;
+ }
+ long newPosRemote = remote.Seek(offset, origin);
+ long newPosLocal = local.Seek(offset, origin);
+ Assert.AreEqual(newPosLocal, newPosRemote);
+ Assert.AreEqual(local.Length, remote.Length);
+ break;
+ }
+ case 3: // SetLength
+ {
+ long newLength = random.Next(0, MaxFileSize);
+ remote.SetLength(newLength);
+ local.SetLength(newLength);
+ Assert.AreEqual(local.Length, remote.Length);
+ Assert.AreEqual(local.Position, remote.Position);
+ break;
+ }
+ case 4: // Flush
+ {
+ remote.Flush();
+ local.Flush();
+ break;
+ }
+ }
+#pragma warning restore IDE0010 // Add missing cases
+ }
+ }
+
+ CollectionAssert.AreEqual(File.ReadAllBytes(localFilePath), client.ReadAllBytes(remoteFilePath));
+ }
+ finally
+ {
+ File.Delete(localFilePath);
+
+ if (client.Exists(remoteFilePath))
+ {
+ client.DeleteFile(remoteFilePath);
+ }
+ }
+ }
+
private static IEnumerable