From 5fc24d3337e65bde8ff5eda2e48ee1752cd8873f Mon Sep 17 00:00:00 2001 From: Natalia Kondratyeva Date: Tue, 13 Jul 2021 20:19:32 +0200 Subject: [PATCH] Cosmetic changes to Read PR --- .../Implementations/MsQuic/MsQuicStream.cs | 74 ++++++++++++------- 1 file changed, 46 insertions(+), 28 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 585aca851b3f0b..0c29a9285d6752 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -32,6 +32,8 @@ private sealed class State { public SafeMsQuicStreamHandle Handle = null!; // set in ctor. public GCHandle StateGCHandle; + + public MsQuicStream? Stream; // roots the stream in the pinned state to prevent GC during an async read I/O. public MsQuicConnection.State ConnectionState = null!; // set in ctor. public string TraceId = null!; // set in ctor. @@ -48,7 +50,7 @@ private sealed class State // set when ReadState.PendingRead: public Memory ReceiveUserBuffer; public CancellationTokenRegistration ReceiveCancellationRegistration; - public MsQuicStream? RootedReceiveStream; // roots the stream in the pinned state to prevent GC during an async read I/O. + // Resettable completions to be used for multiple calls to receive. public readonly ResettableCompletionSource ReceiveResettableCompletionSource = new ResettableCompletionSource(); public SendState SendState; @@ -363,27 +365,38 @@ internal override ValueTask ReadAsync(Memory destination, Cancellatio NetEventSource.Info(_state, $"{TraceId()} Stream reading into Memory of '{destination.Length}' bytes."); } - ReadState readState; - long abortError = -1; - bool canceledSynchronously = false; + ReadState initialReadState; // value before transitions + long abortError; + bool preCanceled = false; lock (_state) { - readState = _state.ReadState; + initialReadState = _state.ReadState; abortError = _state.ReadErrorCode; - if (readState != ReadState.PendingRead && cancellationToken.IsCancellationRequested) + // Failure scenario: pre-canceled token. Transition: any -> Aborted + // PendingRead state indicates there is another concurrent read operation in flight + // which is forbidden, so it is handled separately + if (initialReadState != ReadState.PendingRead && cancellationToken.IsCancellationRequested) { - readState = ReadState.Aborted; + initialReadState = ReadState.Aborted; _state.ReadState = ReadState.Aborted; - canceledSynchronously = true; + preCanceled = true; + } + + // Success scenario: EOS already reached, completing synchronously. No transition (final state) + if (initialReadState == ReadState.ReadsCompleted) + { + return new ValueTask(0); } - else if (readState == ReadState.None) + + // Success scenario: no data available yet, will return a task to wait on. Transition None->PendingRead + if (initialReadState == ReadState.None) { - Debug.Assert(_state.RootedReceiveStream is null); + Debug.Assert(_state.Stream is null); _state.ReceiveUserBuffer = destination; - _state.RootedReceiveStream = this; + _state.Stream = this; _state.ReadState = ReadState.PendingRead; if (cancellationToken.CanBeCanceled) @@ -396,7 +409,7 @@ internal override ValueTask ReadAsync(Memory destination, Cancellatio lock (state) { completePendingRead = state.ReadState == ReadState.PendingRead; - state.RootedReceiveStream = null; + state.Stream = null; state.ReceiveUserBuffer = null; state.ReadState = ReadState.Aborted; } @@ -414,7 +427,9 @@ internal override ValueTask ReadAsync(Memory destination, Cancellatio return _state.ReceiveResettableCompletionSource.GetValueTask(); } - else if (readState == ReadState.IndividualReadComplete) + + // Success scenario: data already available, completing synchronously. Transition IndividualReadComplete->None + if (initialReadState == ReadState.IndividualReadComplete) { _state.ReadState = ReadState.None; @@ -431,25 +446,22 @@ internal override ValueTask ReadAsync(Memory destination, Cancellatio } } + // All success scenarios returned at this point. Failure scenarios below: + Exception? ex = null; - switch (readState) + switch (initialReadState) { - case ReadState.ReadsCompleted: - return new ValueTask(0); case ReadState.PendingRead: ex = new InvalidOperationException("Only one read is supported at a time."); break; case ReadState.Aborted: - ex = - canceledSynchronously ? new OperationCanceledException(cancellationToken) : // aborted by token being canceled before the async op started. - abortError == -1 ? new QuicOperationAbortedException() : // aborted by user via some other operation. - new QuicStreamAbortedException(abortError); // aborted by peer. - + ex = preCanceled ? new OperationCanceledException(cancellationToken) : + ThrowHelper.GetStreamAbortedException(abortError); break; case ReadState.ConnectionClosed: default: - Debug.Assert(readState == ReadState.ConnectionClosed, $"{nameof(ReadState)} of '{readState}' is unaccounted for in {nameof(ReadAsync)}."); + Debug.Assert(initialReadState == ReadState.ConnectionClosed, $"{nameof(ReadState)} of '{initialReadState}' is unaccounted for in {nameof(ReadAsync)}."); ex = GetConnectionAbortedException(_state); break; } @@ -490,7 +502,7 @@ internal override void AbortRead(long errorCode) if (_state.ReadState == ReadState.PendingRead) { shouldComplete = true; - _state.RootedReceiveStream = null; + _state.Stream = null; _state.ReceiveUserBuffer = null; } if (_state.ReadState < ReadState.ReadsCompleted) @@ -833,7 +845,7 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt) if (receiveEvent.BufferCount == 0) { // This is a 0-length receive that happens once reads are finished (via abort or otherwise). - // State changes for this are handled elsewhere. + // State changes for this are handled in PEER_SEND_SHUTDOWN / PEER_SEND_ABORT / SHUTDOWN_COMPLETE event handlers. return MsQuicStatusCodes.Success; } @@ -847,6 +859,12 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt) case ReadState.None: // ReadAsync() hasn't been called yet. Stash the buffer so the next ReadAsync call completes synchronously. + // We are overwriting state.ReceiveQuicBuffers here even if we only partially consumed them + // and it is intended, because unconsumed data will arrive again from the point we've stopped. + // New RECEIVE event wouldn't come until we call EnableReceive(), and we call it only after we've consumed + // as much as we could and said so to msquic in ReceiveComplete(taken), so new event will have all the + // remaining data. + if ((uint)state.ReceiveQuicBuffers.Length < receiveEvent.BufferCount) { QuicBuffer[] oldReceiveBuffers = state.ReceiveQuicBuffers; @@ -872,7 +890,7 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt) state.ReceiveCancellationRegistration.Unregister(); shouldComplete = true; - state.RootedReceiveStream = null; + state.Stream = null; state.ReadState = ReadState.None; readLength = CopyMsQuicBuffersToUserBuffer(new ReadOnlySpan(receiveEvent.Buffers, (int)receiveEvent.BufferCount), state.ReceiveUserBuffer.Span); @@ -975,7 +993,7 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt if (state.ReadState == ReadState.PendingRead) { shouldReadComplete = true; - state.RootedReceiveStream = null; + state.Stream = null; state.ReceiveUserBuffer = null; } if (state.ReadState < ReadState.ReadsCompleted) @@ -1029,7 +1047,7 @@ private static uint HandleEventPeerSendAborted(State state, ref StreamEvent evt) if (state.ReadState == ReadState.PendingRead) { shouldComplete = true; - state.RootedReceiveStream = null; + state.Stream = null; state.ReceiveUserBuffer = null; } state.ReadState = ReadState.Aborted; @@ -1057,7 +1075,7 @@ private static uint HandleEventPeerSendShutdown(State state) if (state.ReadState == ReadState.PendingRead) { shouldComplete = true; - state.RootedReceiveStream = null; + state.Stream = null; state.ReceiveUserBuffer = null; } if (state.ReadState < ReadState.ReadsCompleted)