Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
844c88e
Added asserts to send buffer helper
ManickaP Jul 15, 2023
ddb8ee7
Postpone confirming the last RECEIVE event until the data are read
ManickaP Jul 15, 2023
bd5a627
Removed lock
ManickaP Jul 24, 2023
08f1bae
Debug tests
ManickaP Jul 24, 2023
a3ae2b7
ReadsClosed test fixed, and some better logging
ManickaP Jul 28, 2023
aeb7495
Final task keep alive, abort order, timeout for graceful write-side s…
ManickaP Aug 8, 2023
9d9334f
Tests
ManickaP Aug 9, 2023
80185f0
Always wait for SEND_COMPLETE
ManickaP Aug 9, 2023
33e18ed
Exclude BigPayload on platforms where it can OOM
ManickaP Aug 10, 2023
f6da31e
Removed unintended code changes
ManickaP Aug 11, 2023
79023fe
Reverted postponing reading FIN, if data have chance to get buffered …
ManickaP Aug 11, 2023
3b24271
Clean ups
ManickaP Aug 11, 2023
e9655b7
Fixed waiting for SEND_COMPLETE
ManickaP Aug 11, 2023
2800015
Hold back setting FinalTaskSource and overwrite result if no waiter i…
ManickaP Aug 12, 2023
0c35a2b
Cancellation and completion
ManickaP Aug 12, 2023
d3ba302
Comments, fixed FinalTaskSource
ManickaP Aug 12, 2023
5c33e2e
Fix assert
ManickaP Aug 14, 2023
77b7636
Test reseting control stream made more resilient
ManickaP Aug 14, 2023
e4c2f17
Merge branch 'main' into mapichov/quic-stream
ManickaP Sep 4, 2023
d6f5061
Attempt to fix still running write while disposing the stream in case…
ManickaP Sep 5, 2023
dbd5aec
Merge branch 'main' into mapichov/quic-stream
ManickaP Sep 6, 2023
8d22715
Attempt to fix stress build
ManickaP Sep 6, 2023
f6261e9
Sync Dispose in H3Stream waits for read and write as well
ManickaP Sep 12, 2023
9f626a8
Merge branch 'main' into mapichov/quic-stream
ManickaP Sep 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Reverted postponing reading FIN, if data have chance to get buffered …
…with FIN, we will do that.
  • Loading branch information
ManickaP committed Aug 11, 2023
commit 79023fe633837d2a213274f91f93c7f18cc2201f
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ private static bool GetLinqExpressionsBuiltWithIsInterpretingOnly()

public static bool IsRareEnumsSupported => !IsNativeAot;

public static bool IsIntMaxValueArrayIndexSupported => !s_largeArrayIsNotSupported.Value;
public static bool IsNotIntMaxValueArrayIndexSupported => s_largeArrayIsNotSupported.Value;

public static bool IsAssemblyLoadingSupported => !IsNativeAot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ internal struct ReceiveBuffers
private readonly object _syncRoot;
private MultiArrayBuffer _buffer;
private bool _final;
private int _lastReceiveSize;

public ReceiveBuffers()
{
Expand Down Expand Up @@ -48,7 +47,6 @@ public int CopyFrom(ReadOnlySpan<QUIC_BUFFER> quicBuffers, int totalLength, bool
}

_final = final;
_lastReceiveSize = totalLength;
_buffer.EnsureAvailableSpace(totalLength);

int totalCopied = 0;
Expand All @@ -68,7 +66,7 @@ public int CopyFrom(ReadOnlySpan<QUIC_BUFFER> quicBuffers, int totalLength, bool
}
}

public int CopyTo(Memory<byte> buffer, out bool completed, out bool empty, out int lastReceiveSize)
public int CopyTo(Memory<byte> buffer, out bool completed, out bool empty)
{
lock (_syncRoot)
{
Expand All @@ -83,7 +81,6 @@ public int CopyTo(Memory<byte> buffer, out bool completed, out bool empty, out i

completed = _buffer.IsEmpty && _final;
empty = _buffer.IsEmpty;
lastReceiveSize = _lastReceiveSize;

return copied;
}
Expand Down
14 changes: 2 additions & 12 deletions src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,21 +284,14 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
}

// Copy data from the buffer, reduce target and increment total.
int copied = _receiveBuffers.CopyTo(buffer, out bool complete, out bool empty, out int lastReceiveSize);
int copied = _receiveBuffers.CopyTo(buffer, out bool complete, out bool empty);
buffer = buffer.Slice(copied);
totalCopied += copied;

// Make sure the task transitions into final state before the method finishes.
if (complete)
{
_receiveTcs.TrySetResult(final: true);
unsafe
{
// Confirm the last data which came with the FIN flag.
MsQuicApi.Api.StreamReceiveComplete(
_handle,
(ulong)lastReceiveSize);
}
}

// Unblock the next await to end immediately, i.e. there were/are any data in the buffer.
Expand Down Expand Up @@ -551,9 +544,6 @@ private unsafe int HandleEventReceive(ref RECEIVE_DATA data)
(int)data.TotalBufferLength,
data.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN));

// If we copied all the data and also received FIN flag, postpone the confirmation of the data until they are consumed.
bool lastReceive = (totalCopied == data.TotalBufferLength) && data.Flags.HasFlag(QUIC_RECEIVE_FLAGS.FIN);

if (totalCopied < data.TotalBufferLength)
{
Volatile.Write(ref _receivedNeedsEnable, 1);
Expand All @@ -562,7 +552,7 @@ private unsafe int HandleEventReceive(ref RECEIVE_DATA data)
_receiveTcs.TrySetResult();

data.TotalBufferLength = totalCopied;
return (_receiveBuffers.HasCapacity() && Interlocked.CompareExchange(ref _receivedNeedsEnable, 0, 1) == 1) ? QUIC_STATUS_CONTINUE : lastReceive ? QUIC_STATUS_PENDING : QUIC_STATUS_SUCCESS;
return (_receiveBuffers.HasCapacity() && Interlocked.CompareExchange(ref _receivedNeedsEnable, 0, 1) == 1) ? QUIC_STATUS_CONTINUE : QUIC_STATUS_SUCCESS;
}
private unsafe int HandleEventSendComplete(ref SEND_COMPLETE_DATA data)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ async ValueTask ReleaseOnReadsClosedAsync()
public static IEnumerable<object[]> PayloadSizeAndTwoBools()
{
var boolValues = new [] { true, false };
var payloadValues = PlatformDetection.IsIntMaxValueArrayIndexSupported ?
var payloadValues = !PlatformDetection.IsInHelix ?
new [] { SmallestPayload, SmallPayload, BufferPayload, BufferPlusPayload, BigPayload } :
new [] { SmallestPayload, SmallPayload, BufferPayload, BufferPlusPayload };
return
Expand All @@ -1232,6 +1232,7 @@ from bool2 in boolValues
[MemberData(nameof(PayloadSizeAndTwoBools))]
public async Task ReadsClosedFinishes_ConnectionClose(int payloadSize, bool closeServer, bool useDispose)
{
//using var logger = new TestUtilities.TestEventListener(Console.Out, "Private.InternalDiagnostics.System.Net.Quic");
using SemaphoreSlim serverSem = new SemaphoreSlim(0);
using SemaphoreSlim clientSem = new SemaphoreSlim(0);

Expand All @@ -1243,6 +1244,11 @@ await RunClientServer(

await using QuicStream stream = await connection.AcceptInboundStreamAsync();
await stream.WriteAsync(new byte[payloadSize], completeWrites: true);
// Make sure the data gets received by the peer if we expect the reading side to get buffered including FIN.
if (payloadSize <= BufferPayload)
{
await stream.WritesClosed;
}
serverSem.Release();
await clientSem.WaitAsync();

Expand All @@ -1260,15 +1266,7 @@ await RunClientServer(
}
}

var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.ReadsClosed);
if (expectedError == QuicError.OperationAborted)
{
Assert.Null(ex.ApplicationErrorCode);
}
else
{
Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode);
}
await CheckReadsClosed(stream, expectedError, expectedErrorCode);
},
clientFunction: async connection =>
{
Expand All @@ -1277,6 +1275,10 @@ await RunClientServer(

await using QuicStream stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional);
await stream.WriteAsync(new byte[payloadSize], completeWrites: true);
if (payloadSize <= BufferPayload)
{
await stream.WritesClosed;
}
clientSem.Release();
await serverSem.WaitAsync();

Expand All @@ -1294,6 +1296,23 @@ await RunClientServer(
}
}

await CheckReadsClosed(stream, expectedError, expectedErrorCode);
}
);

async ValueTask CheckReadsClosed(QuicStream stream, QuicError expectedError, long expectedErrorCode)
{
// All data should be buffered if they fit in the internal buffer, reading should still pass.
if (payloadSize <= BufferPayload)
{
Assert.False(stream.ReadsClosed.IsCompleted);
var buffer = new byte[BufferPayload];
var length = await ReadAll(stream, buffer);
Assert.True(stream.ReadsClosed.IsCompletedSuccessfully);
Assert.Equal(payloadSize, length);
}
else
{
var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.ReadsClosed);
if (expectedError == QuicError.OperationAborted)
{
Expand All @@ -1304,7 +1323,7 @@ await RunClientServer(
Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode);
}
}
);
}
}

[Theory]
Expand Down Expand Up @@ -1339,15 +1358,7 @@ await RunClientServer(
}
}

var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.WritesClosed);
if (expectedError == QuicError.OperationAborted)
{
Assert.Null(ex.ApplicationErrorCode);
}
else
{
Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode);
}
await CheckWritesClosed(stream, expectedError, expectedErrorCode);
},
clientFunction: async connection =>
{
Expand All @@ -1373,17 +1384,22 @@ await RunClientServer(
}
}

var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.WritesClosed);
if (expectedError == QuicError.OperationAborted)
{
Assert.Null(ex.ApplicationErrorCode);
}
else
{
Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode);
}
await CheckWritesClosed(stream, expectedError, expectedErrorCode);
}
);

async ValueTask CheckWritesClosed(QuicStream stream, QuicError expectedError, long expectedErrorCode)
{
var ex = await AssertThrowsQuicExceptionAsync(expectedError, () => stream.WritesClosed);
if (expectedError == QuicError.OperationAborted)
{
Assert.Null(ex.ApplicationErrorCode);
}
else
{
Assert.Equal(expectedErrorCode, ex.ApplicationErrorCode);
}
}
}

[Theory]
Expand Down