Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Minor change
  • Loading branch information
rzikm committed Apr 12, 2022
commit 7744b97e4161445be2e26f159fd7024744cb1579
Original file line number Diff line number Diff line change
Expand Up @@ -590,20 +590,7 @@ private async ValueTask<QuicStreamProvider> OpenStreamAsync(QUIC_STREAM_OPEN_FLA
throw new InvalidOperationException(SR.net_quic_not_connected);
}

cancellationToken.ThrowIfCancellationRequested();
var stream = new MsQuicStream(_state, flags);

try
{
await stream.StartAsync(QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, cancellationToken).ConfigureAwait(false);
}
catch
{
stream.Dispose();
throw;
}

return stream;
return await MsQuicStream.CreateOutbound(_state, cancellationToken).ConfigureAwait(false);
}

internal override ValueTask<QuicStreamProvider> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa
catch
{
_state.StateGCHandle.Free();
// don't free the streamHandle, it will be freed by the caller
throw;
}

Expand All @@ -149,6 +150,38 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa
}
}

internal static async ValueTask<MsQuicStream> CreateOutbound(MsQuicConnection.State connectionState, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
var stream = new MsQuicStream(connectionState, flags);
State state = stream._state;

try
{
Debug.Assert(!Monitor.IsEntered(state));

cancellationToken.ThrowIfCancellationRequested();
using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) =>
{
((State)s!).StartCompletionSource.TrySetException(new OperationCanceledException(token));
}, state);

// Fire of start of the stream
uint status = MsQuicApi.Api.StreamStartDelegate(state.Handle, QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");

// wait unit start completes.
await state.StartCompletionSource.Task.ConfigureAwait(false);
}
catch
{
stream.Dispose();
throw;
}

return stream;
}

// outbound.
internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_FLAGS flags)
{
Expand Down Expand Up @@ -891,22 +924,6 @@ private void EnableReceive()
QuicExceptionHelpers.ThrowIfFailed(status, "StreamReceiveSetEnabled failed.");
}

internal async ValueTask StartAsync(QUIC_STREAM_START_FLAGS flags, CancellationToken cancellationToken)
{
Debug.Assert(!Monitor.IsEntered(_state));

cancellationToken.ThrowIfCancellationRequested();
using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) =>
{
((State)s!).StartCompletionSource.TrySetException(new OperationCanceledException(token));
}, _state);

uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, flags);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");

await _state.StartCompletionSource.Task.ConfigureAwait(false);
}

/// <summary>
/// Callback calls for a single instance of a stream are serialized by msquic.
/// They happen on a msquic thread and shouldn't take too long to not to block msquic.
Expand Down
54 changes: 18 additions & 36 deletions src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -393,19 +393,13 @@ public async Task ConnectWithClientCertificate(bool sendCerttificate)
[InlineData(true)]
public async Task WaitForAvailableStreamsAsyncWorks(bool unidirectional)
{
ValueTask WaitForAvailableStreamsAsync(QuicConnection connection)
{
return unidirectional
? connection.WaitForAvailableUnidirectionalStreamsAsync()
: connection.WaitForAvailableBidirectionalStreamsAsync();
}
ValueTask WaitForAvailableStreamsAsync(QuicConnection connection) => unidirectional
? connection.WaitForAvailableUnidirectionalStreamsAsync()
: connection.WaitForAvailableBidirectionalStreamsAsync();

ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection)
{
return unidirectional
? connection.OpenUnidirectionalStreamAsync()
: connection.OpenBidirectionalStreamAsync();
}
ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection) => unidirectional
? connection.OpenUnidirectionalStreamAsync()
: connection.OpenBidirectionalStreamAsync();

QuicListenerOptions listenerOptions = CreateQuicListenerOptions();
listenerOptions.MaxUnidirectionalStreams = 1;
Expand Down Expand Up @@ -435,12 +429,9 @@ ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection)
[InlineData(true)]
public async Task OpenStreamAsync_BlocksUntilAvailable(bool unidirectional)
{
ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection)
{
return unidirectional
? connection.OpenUnidirectionalStreamAsync()
: connection.OpenBidirectionalStreamAsync();
}
ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection) => unidirectional
? connection.OpenUnidirectionalStreamAsync()
: connection.OpenBidirectionalStreamAsync();

QuicListenerOptions listenerOptions = CreateQuicListenerOptions();
listenerOptions.MaxUnidirectionalStreams = 1;
Expand Down Expand Up @@ -469,12 +460,9 @@ ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection)
[InlineData(true)]
public async Task OpenStreamAsync_Canceled_Throws_OperationCanceledException(bool unidirectional)
{
ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationToken token = default)
{
return unidirectional
? connection.OpenUnidirectionalStreamAsync(token)
: connection.OpenBidirectionalStreamAsync(token);
}
ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional
? connection.OpenUnidirectionalStreamAsync(token)
: connection.OpenBidirectionalStreamAsync(token);

QuicListenerOptions listenerOptions = CreateQuicListenerOptions();
listenerOptions.MaxUnidirectionalStreams = 1;
Expand Down Expand Up @@ -512,12 +500,9 @@ ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationTok
[InlineData(true)]
public async Task OpenStreamAsync_PreCanceled_Throws_OperationCanceledException(bool unidirectional)
{
ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationToken token = default)
{
return unidirectional
? connection.OpenUnidirectionalStreamAsync(token)
: connection.OpenBidirectionalStreamAsync(token);
}
ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional
? connection.OpenUnidirectionalStreamAsync(token)
: connection.OpenBidirectionalStreamAsync(token);

(QuicConnection clientConnection, QuicConnection serverConnection) = await CreateConnectedQuicConnection(null, CreateQuicListenerOptions());

Expand All @@ -536,12 +521,9 @@ ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationTok
[InlineData(true, true)]
public async Task OpenStreamAsync_ConnectionAbort_Throws(bool unidirectional, bool localAbort)
{
ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationToken token = default)
{
return unidirectional
? connection.OpenUnidirectionalStreamAsync(token)
: connection.OpenBidirectionalStreamAsync(token);
}
ValueTask<QuicStream> OpenStreamAsync(QuicConnection connection, CancellationToken token = default) => unidirectional
? connection.OpenUnidirectionalStreamAsync(token)
: connection.OpenBidirectionalStreamAsync(token);

QuicListenerOptions listenerOptions = CreateQuicListenerOptions();
listenerOptions.MaxUnidirectionalStreams = 1;
Expand Down