Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
scalablecory committed Jun 22, 2021
commit 97ea324de05d22e13660ef1fb77c29ec35900b82
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F

QuicExceptionHelpers.ThrowIfFailed(status, "Failed to open stream to peer.");

// TODO: StreamStart is blocking on another thread here.
// We should refactor this to use the ASYNC flag.

status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.FAIL_BLOCKED);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");
}
Expand All @@ -143,12 +146,15 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F
throw;
}

// TODO: our callback starts getting called as soon as we call StreamStart.
// Should this stuff be moved before that call?

if (!connectionState.TryAddStream(this))
{
_state.Handle?.Dispose();
_stateHandle.Free();
throw new ObjectDisposedException(nameof(QuicConnection));
}
}

_state.ConnectionState = connectionState;

Expand Down Expand Up @@ -325,8 +331,8 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio

if (readState != ReadState.PendingRead && cancellationToken.IsCancellationRequested)
{
readState = ReadState.Aborted;
_state.ReadState = ReadState.Aborted;
readState = ReadState.StreamAborted;
_state.ReadState = ReadState.StreamAborted;
canceledSynchronously = true;
}
else if (readState == ReadState.None)
Expand All @@ -348,7 +354,7 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
{
completePendingRead = state.ReadState == ReadState.PendingRead;
state.RootedReceiveStream = null;
state.ReadState = ReadState.Aborted;
state.ReadState = ReadState.StreamAborted;
}

if (completePendingRead)
Expand Down Expand Up @@ -390,14 +396,17 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
case ReadState.PendingRead:
ex = new InvalidOperationException("Only one read is supported at a time.");
break;
case ReadState.Aborted:
default:
Debug.Assert(readState == ReadState.Aborted, $"{nameof(ReadState)} of '{readState}' is unaccounted for in {nameof(ReadAsync)}.");

case ReadState.StreamAborted:
ex =
canceledSynchronously ? new OperationCanceledException(cancellationToken) : // aborted by token being canceled before the async op started.
abortError == -1 ? new QuicOperationAbortedException() : // aborted by user via some other operation.
new QuicStreamAbortedException(abortError); // aborted by peer.

break;
case ReadState.ConnectionAborted:
default:
Debug.Assert(readState == ReadState.ConnectionAborted, $"{nameof(ReadState)} of '{readState}' is unaccounted for in {nameof(ReadAsync)}.");
ex = GetConnectionAbortedException(_state);
break;
}

Expand Down Expand Up @@ -460,7 +469,7 @@ internal override void Abort(long errorCode, QuicAbortDirection abortDirection =
{
completeReads = _state.ReadState == ReadState.PendingRead;
_state.RootedReceiveStream = null;
_state.ReadState = ReadState.Aborted;
_state.ReadState = ReadState.StreamAborted;
flags |= QUIC_STREAM_SHUTDOWN_FLAGS.ABORT_RECEIVE;
}
}
Expand Down Expand Up @@ -542,13 +551,13 @@ public override void Dispose()
t.GetAwaiter().GetResult();
}

// TODO: there's a bug here where the safe handle is no longer valid.
// This shouldn't happen because the safe handle is effectively pinned
// until after disposal completes.
~MsQuicStream()
{
DisposeAsyncThrowaway(this);

// This is weird due to needing to keep _state alive for MsQuic's callback.
// See DisposeAsync implementation for details.

static async void DisposeAsyncThrowaway(MsQuicStream stream)
{
await stream.DisposeAsync(cancellationToken: default, async: true).ConfigureAwait(false);
Expand Down Expand Up @@ -600,7 +609,6 @@ private async ValueTask DisposeAsync(CancellationToken cancellationToken, bool a
{
NetEventSource.Info(_state, $"[Stream#{_state.GetHashCode()}] disposed");
}

}

private void EnableReceive()
Expand Down Expand Up @@ -692,7 +700,6 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)
ArrayPool<QuicBuffer>.Shared.Return(oldReceiveBuffers);
}
}
}

for (uint i = 0; i < receiveEvent.BufferCount; ++i)
{
Expand All @@ -712,9 +719,8 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)

readLength = CopyMsQuicBuffersToUserBuffer(new ReadOnlySpan<QuicBuffer>(receiveEvent.Buffers, (int)receiveEvent.BufferCount), state.ReceiveUserBuffer.Span);
break;
case ReadState.Aborted:
default:
Debug.Assert(state.ReadState == ReadState.Aborted, $"Unexpected {nameof(ReadState)} '{state.ReadState}' in {nameof(HandleEventRecv)}.");
Debug.Assert(state.ReadState is ReadState.StreamAborted or ReadState.ConnectionAborted, $"Unexpected {nameof(ReadState)} '{state.ReadState}' in {nameof(HandleEventRecv)}.");

// There was a race between a user aborting the read stream and the callback being ran.
// This will eat any received data.
Expand Down Expand Up @@ -791,7 +797,7 @@ private static uint HandleEventPeerSendAborted(State state, ref StreamEvent evt)
{
shouldComplete = true;
}
state.ReadState = ReadState.Aborted;
state.ReadState = ReadState.StreamAborted;
state.ReadErrorCode = (long)evt.Data.PeerSendAborted.ErrorCode;
}

Expand Down Expand Up @@ -1101,6 +1107,7 @@ private void ThrowIfDisposed()
private static uint HandleEventConnectionClose(State state)
{
long errorCode = state.ConnectionState.AbortErrorCode;

if (NetEventSource.Log.IsEnabled())
{
NetEventSource.Info(state, $"[Stream#{state.GetHashCode()}] handling Connection#{state.ConnectionState.GetHashCode()} close" +
Expand All @@ -1109,34 +1116,19 @@ private static uint HandleEventConnectionClose(State state)

bool shouldCompleteRead = false;
bool shouldCompleteSend = false;
bool shouldCompleteShutdownWrite = false;
bool shouldCompleteShutdown = false;

lock (state)
{
if (state.ReadState == ReadState.None)
{
shouldCompleteRead = true;
}
state.ReadState = ReadState.ConnectionClosed;
shouldCompleteRead = state.ReadState == ReadState.PendingRead;
shouldCompleteSend = state.SendState is SendState.None or SendState.Pending;

if (state.SendState == SendState.None || state.SendState == SendState.Pending)
if (state.ReadState is not ReadState.EndOfReadStream or ReadState.StreamAborted)
{
shouldCompleteSend = true;
state.ReadState = ReadState.ConnectionAborted;
}
state.SendState = SendState.ConnectionClosed;

if (state.ShutdownWriteState == ShutdownWriteState.None)
{
shouldCompleteShutdownWrite = true;
}
state.ShutdownWriteState = ShutdownWriteState.ConnectionClosed;

if (state.ShutdownState == ShutdownState.None)
{
shouldCompleteShutdown = true;
}
state.ShutdownState = ShutdownState.ConnectionClosed;
state.SendState = SendState.ConnectionClosed;
}

if (shouldCompleteRead)
Expand All @@ -1151,12 +1143,6 @@ private static uint HandleEventConnectionClose(State state)
ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state)));
}

if (shouldCompleteShutdownWrite)
{
state.ShutdownWriteCompletionSource.SetException(
ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state)));
}

if (shouldCompleteShutdown)
{
state.ShutdownCompletionSource.SetException(
Expand Down Expand Up @@ -1192,14 +1178,14 @@ private enum ReadState
EndOfReadStream,

/// <summary>
/// User has aborted the stream, either via a cancellation token on ReadAsync(), or via Abort(read).
/// The stream has been aborted, either by user or by peer.
/// </summary>
Aborted,
StreamAborted,

/// <summary>
/// Connection was closed, either by user or by the peer.
/// The connection has been aborted, either by user or by peer.
/// </summary>
ConnectionClosed
ConnectionAborted
}

private enum SendState
Expand Down
Loading