Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Early draft of Open(Uni|Bidi)rectionalStreamAsync
  • Loading branch information
rzikm committed Apr 11, 2022
commit 8f7c2109f6c0e365e13ec0e4075e1d7a220b1c83
8 changes: 5 additions & 3 deletions src/libraries/System.Net.Quic/ref/System.Net.Quic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ public void Dispose() { }
public int GetRemoteAvailableUnidirectionalStreamCount() { throw null; }
public System.Net.Quic.QuicStream OpenBidirectionalStream() { throw null; }
public System.Net.Quic.QuicStream OpenUnidirectionalStream() { throw null; }
public System.Threading.Tasks.ValueTask<System.Net.Quic.QuicStream> OpenBidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask<System.Net.Quic.QuicStream> OpenUnidirectionalStreamAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get { throw null; } }
public System.Threading.Tasks.ValueTask WaitForAvailableBidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask WaitForAvailableUnidirectionalStreamsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class QuicConnectionAbortedException : System.Net.Quic.QuicException
{
public QuicConnectionAbortedException(string message, long errorCode) : base (default(string)) { }
public QuicConnectionAbortedException(string message, long errorCode) : base(default(string)) { }
public long ErrorCode { get { throw null; } }
}
public partial class QuicException : System.Exception
Expand Down Expand Up @@ -71,7 +73,7 @@ public QuicListenerOptions() { }
}
public partial class QuicOperationAbortedException : System.Net.Quic.QuicException
{
public QuicOperationAbortedException(string message) : base (default(string)) { }
public QuicOperationAbortedException(string message) : base(default(string)) { }
}
public partial class QuicOptions
{
Expand Down Expand Up @@ -125,7 +127,7 @@ public override void WriteByte(byte value) { }
}
public partial class QuicStreamAbortedException : System.Net.Quic.QuicException
{
public QuicStreamAbortedException(string message, long errorCode) : base (default(string)) { }
public QuicStreamAbortedException(string message, long errorCode) : base(default(string)) { }
public long ErrorCode { get { throw null; } }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,52 @@ internal override QuicStreamProvider OpenBidirectionalStream()
return OpenStream(streamId, true);
}

internal async override ValueTask<QuicStreamProvider> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken)
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

while (!streamLimit.Unidirectional.TryIncrement())
{
await WaitForAvailableUnidirectionalStreamsAsync(cancellationToken).ConfigureAwait(false);
}

long streamId;
lock (_syncObject)
{
streamId = _nextOutboundUnidirectionalStream;
_nextOutboundUnidirectionalStream += 4;
}

return OpenStream(streamId, false);
}

internal async override ValueTask<QuicStreamProvider> OpenBidirectionalStreamAsync(CancellationToken cancellationToken)
{
PeerStreamLimit? streamLimit = RemoteStreamLimit;
if (streamLimit is null)
{
throw new InvalidOperationException("Not connected");
}

while (!streamLimit.Bidirectional.TryIncrement())
{
await WaitForAvailableBidirectionalStreamsAsync(cancellationToken).ConfigureAwait(false);
}

long streamId;
lock (_syncObject)
{
streamId = _nextOutboundBidirectionalStream;
_nextOutboundBidirectionalStream += 4;
}

return OpenStream(streamId, true);
}

internal MockStream OpenStream(long streamId, bool bidirectional)
{
CheckDisposed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ internal enum QUIC_STREAM_EVENT_TYPE : uint
SEND_SHUTDOWN_COMPLETE = 6,
SHUTDOWN_COMPLETE = 7,
IDEAL_SEND_BUFFER_SIZE = 8,
PEER_ACCEPTED = 9
}

#if SOCKADDR_HAS_LENGTH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,28 +582,45 @@ internal override ValueTask WaitForAvailableBidirectionalStreamsAsync(Cancellati
return new ValueTask(tcs.Task.WaitAsync(cancellationToken));
}

internal override QuicStreamProvider OpenUnidirectionalStream()
private QuicStreamProvider OpenStream(QUIC_STREAM_OPEN_FLAGS flags)
{
ThrowIfDisposed();
if (!Connected)
{
throw new InvalidOperationException(SR.net_quic_not_connected);
}

return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL);
var stream = new MsQuicStream(_state, flags);

// should complete synchronously
ValueTask startTask = stream.StartAsync(QUIC_STREAM_START_FLAGS.FAIL_BLOCKED | QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, default);
startTask.AsTask().GetAwaiter().GetResult();
Debug.Assert(startTask.IsCompleted);

return stream;
}

internal override QuicStreamProvider OpenBidirectionalStream()
internal override QuicStreamProvider OpenUnidirectionalStream() => OpenStream(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL);
internal override QuicStreamProvider OpenBidirectionalStream() => OpenStream(QUIC_STREAM_OPEN_FLAGS.NONE);

private async ValueTask<QuicStreamProvider> OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS flags, CancellationToken cancellationToken)
{
ThrowIfDisposed();
if (!Connected)
{
throw new InvalidOperationException(SR.net_quic_not_connected);
}

return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.NONE);
var stream = new MsQuicStream(_state, flags);

await stream.StartAsync(QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL | QUIC_STREAM_START_FLAGS.INDICATE_PEER_ACCEPT, cancellationToken).ConfigureAwait(false);

return stream;
}

internal override ValueTask<QuicStreamProvider> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL, cancellationToken);
internal override ValueTask<QuicStreamProvider> OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default) => OpenStreamAsync(QUIC_STREAM_OPEN_FLAGS.NONE, cancellationToken);

internal override int GetRemoteAvailableUnidirectionalStreamCount()
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ private sealed class State
// Set once writes have been shutdown.
public readonly TaskCompletionSource ShutdownWriteCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

// Set once stream has been started and within peer's advertised stream limits
public readonly TaskCompletionSource StartCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

public ShutdownState ShutdownState;
// The value makes sure that we release the handles only once.
public int ShutdownDone;
Expand Down Expand Up @@ -182,10 +185,6 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F
}

QuicExceptionHelpers.ThrowIfFailed(status, "Failed to open stream to peer.");

Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, QUIC_STREAM_START_FLAGS.FAIL_BLOCKED | QUIC_STREAM_START_FLAGS.SHUTDOWN_ON_FAIL);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");
}
catch
{
Expand Down Expand Up @@ -888,6 +887,31 @@ private void EnableReceive()
QuicExceptionHelpers.ThrowIfFailed(status, "StreamReceiveSetEnabled failed.");
}

internal async ValueTask StartAsync(QUIC_STREAM_START_FLAGS flags, CancellationToken cancellationToken)
{
Debug.Assert(!Monitor.IsEntered(_state));

try
{
cancellationToken.ThrowIfCancellationRequested();
using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(static (s, token) =>
{
((State)s!).StartCompletionSource.TrySetCanceled(token);
}, _state);

uint status = MsQuicApi.Api.StreamStartDelegate(_state.Handle, flags);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not start stream.");

await _state.StartCompletionSource.Task.ConfigureAwait(false);
}
catch
{
_state.Handle?.Dispose();
_state.StateGCHandle.Free();
throw;
}
}

/// <summary>
/// Callback calls for a single instance of a stream are serialized by msquic.
/// They happen on a msquic thread and shouldn't take too long to not to block msquic.
Expand Down Expand Up @@ -944,6 +968,8 @@ private static uint HandleEvent(State state, ref StreamEvent evt)
// Shutdown for both sending and receiving is completed.
case QUIC_STREAM_EVENT_TYPE.SHUTDOWN_COMPLETE:
return HandleEventShutdownComplete(state, ref evt);
case QUIC_STREAM_EVENT_TYPE.PEER_ACCEPTED:
return HandleEventPeerAccepted(state);
default:
return MsQuicStatusCodes.Success;
}
Expand Down Expand Up @@ -1111,6 +1137,14 @@ private static uint HandleEventStartComplete(State state, ref StreamEvent evt)
{
// Store the start status code and check it when propagating shutdown event, which we'll get since we set SHUTDOWN_ON_FAIL in StreamStart.
state.StartStatus = evt.Data.StartComplete.Status;
if (state.StartStatus != MsQuicStatusCodes.Success)
{
state.StartCompletionSource.TrySetException(ExceptionDispatchInfo.SetCurrentStackTrace(new QuicException($"Stream start failed with {MsQuicStatusCodes.GetError(state.StartStatus)}")));
}
else if ((evt.Data.StartComplete.PeerAccepted & 1) != 0)
{
state.StartCompletionSource.TrySetResult();
}
return MsQuicStatusCodes.Success;
}

Expand Down Expand Up @@ -1223,6 +1257,12 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt
return MsQuicStatusCodes.Success;
}

private static uint HandleEventPeerAccepted(State state)
{
state.StartCompletionSource.TrySetResult();
return MsQuicStatusCodes.Success;
}

private static uint HandleEventPeerSendAborted(State state, ref StreamEvent evt)
{
bool shouldComplete = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ internal abstract class QuicConnectionProvider : IDisposable

internal abstract QuicStreamProvider OpenBidirectionalStream();

internal abstract ValueTask<QuicStreamProvider> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default);
internal abstract ValueTask<QuicStreamProvider> OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default);

internal abstract int GetRemoteAvailableUnidirectionalStreamCount();

internal abstract int GetRemoteAvailableBidirectionalStreamCount();
Expand All @@ -32,7 +35,7 @@ internal abstract class QuicConnectionProvider : IDisposable

internal abstract System.Net.Security.SslApplicationProtocol NegotiatedApplicationProtocol { get; }

internal abstract System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get ; }
internal abstract System.Security.Cryptography.X509Certificates.X509Certificate? RemoteCertificate { get; }

internal abstract ValueTask CloseAsync(long errorCode, CancellationToken cancellationToken = default);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ internal QuicConnection(QuicConnectionProvider provider)
/// <returns></returns>
public QuicStream OpenBidirectionalStream() => new QuicStream(_provider.OpenBidirectionalStream());

/// <summary>
/// Create an outbound unidirectional stream.
/// </summary>
/// <returns></returns>
public async ValueTask<QuicStream> OpenUnidirectionalStreamAsync(CancellationToken cancellationToken = default) => new QuicStream(await _provider.OpenUnidirectionalStreamAsync(cancellationToken).ConfigureAwait(false));

/// <summary>
/// Create an outbound bidirectional stream.
/// </summary>
/// <returns></returns>
public async ValueTask<QuicStream> OpenBidirectionalStreamAsync(CancellationToken cancellationToken = default) => new QuicStream(await _provider.OpenBidirectionalStreamAsync(cancellationToken).ConfigureAwait(false));


/// <summary>
/// Accept an incoming stream.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,6 @@ public async Task WaitForAvailableUnidirectionStreamsAsyncWorks()
}

[Fact]
[ActiveIssue("https://github.com/dotnet/runtime/issues/67302")]
public async Task WaitForAvailableBidirectionStreamsAsyncWorks()
{
QuicListenerOptions listenerOptions = CreateQuicListenerOptions();
Expand Down