Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Code review feedback
  • Loading branch information
rzikm committed Apr 25, 2022
commit 346d025a1865d7a4febf3453013a0cf08a6c720e
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,6 @@ private async ValueTask<QuicStreamProvider> OpenStreamAsync(QUIC_STREAM_OPEN_FLA
throw new InvalidOperationException(SR.net_quic_not_connected);
}

cancellationToken.ThrowIfCancellationRequested();
var stream = new MsQuicStream(_state, flags);

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa
// but after TryAddStream to prevent unnecessary RemoveStream in finalizer
_state.ConnectionState = connectionState;

// Inbound streams are already started
_state.StartCompletionSource.SetResult();

_state.Handle = streamHandle;
_canRead = true;
_canWrite = !flags.HasFlag(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL);
Expand Down Expand Up @@ -1123,22 +1126,23 @@ private static uint HandleEventStartComplete(State state, ref StreamEvent evt)
// - InvalidState - stream already started before, or connection aborted locally
// - StreamLimitReached - only if QUIC_STREAM_START_FLAG_FAIL_BLOCKED was specified (not in our case).
//
// We disregard duplicate StreamStart calls
if (status == MsQuicStatusCodes.Aborted)
{
state.StartCompletionSource.SetException(
state.StartCompletionSource.TrySetException(
ExceptionDispatchInfo.SetCurrentStackTrace(GetConnectionAbortedException(state)));
}
else
{
state.StartCompletionSource.SetException(
// TODO: Should we throw QuicOperationAbortedException when status is InvalidState?
// [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")]
state.StartCompletionSource.TrySetException(
ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException($"StreamStart finished with status {MsQuicStatusCodes.GetError(status)}")));
}
}
else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0)
{
// Start succeeded and we were within stream limits, stream already usable.
state.StartCompletionSource.SetResult();
state.StartCompletionSource.TrySetResult();
}
// if PeerAccepted == 0, we will later receive PEER_ACCEPTED event, which will
// complete the StartCompletionSource
Expand Down Expand Up @@ -1245,6 +1249,10 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt
state.ShutdownCompletionSource.SetResult();
}

// If we are receiving stream shutdown notification, the start comletion source must have been already completed
// eihter by StreamOpen or PeerAccepted event, Connection closing, or it was cancelled by user.
Debug.Assert(state.StartCompletionSource.Task.IsCompleted);

// Dispose was called before complete event.
bool releaseHandles = Interlocked.Exchange(ref state.ShutdownDone, State.ShutdownDone_NotificationReceived) == State.ShutdownDone_Disposed;
if (releaseHandles)
Expand Down Expand Up @@ -1664,14 +1672,18 @@ private static bool CleanupReadStateAndCheckPending(State state, ReadState final
return shouldComplete;
}

internal ValueTask StartAsync(CancellationToken cancellationToken)
internal async ValueTask StartAsync(CancellationToken cancellationToken)
{
Debug.Assert(!Monitor.IsEntered(_state));

using var registration = cancellationToken.UnsafeRegister((state, token) => {
((State)state!).StartCompletionSource.SetCanceled(token);
}, _state);

uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");

return new ValueTask(_state.StartCompletionSource.Task.WaitAsync(cancellationToken));
await _state.StartCompletionSource.Task.ConfigureAwait(false);
}

// Read state transitions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,11 @@ ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationTok
if (localAbort)
{
await clientConnection.CloseAsync(0);
await Assert.ThrowsAsync<QuicOperationAbortedException>(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3)));
// TODO: This may not always throw QuicOperationAbortedException due to a data race with MsQuic worker threads
// (CloseAsync may be processed before OpenStreamAsync as it is scheduled to the front of the operation queue)
// To be revisited once we standartize on exceptions.
// [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")]
await Assert.ThrowsAnyAsync<QuicException>(() => waitTask.AsTask().WaitAsync(TimeSpan.FromSeconds(3)));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ await RunClientServer(

// Pending ops should fail
await Assert.ThrowsAsync<QuicOperationAbortedException>(() => acceptTask);

// TODO: This may not always throw QuicOperationAbortedException due to a data race with MsQuic worker threads
// (CloseAsync may be processed before OpenStreamAsync as it is scheduled to the front of the operation queue)
// To be revisited once we standartize on exceptions.
// [ActiveIssue("https://github.com/dotnet/runtime/issues/55619")]
await Assert.ThrowsAsync<QuicOperationAbortedException>(() => connectTask);

// Subsequent attempts should fail
Expand Down