Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Sends abort read/write if H/3 stream is disposed before respective co…
…ntents are finsihed
  • Loading branch information
ManickaP authored and github-actions committed Aug 24, 2021
commit e17498ec79c777d3301b2b35705a7b54ec8e9ae7
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ internal sealed class Http3LoopbackStream : IDisposable
private const int MaximumVarIntBytes = 8;
private const long VarIntMax = (1L << 62) - 1;

private const long DataFrame = 0x0;
private const long HeadersFrame = 0x1;
private const long SettingsFrame = 0x4;
private const long GoAwayFrame = 0x7;
public const long DataFrame = 0x0;
public const long HeadersFrame = 0x1;
public const long SettingsFrame = 0x4;
public const long GoAwayFrame = 0x7;

public const long ControlStream = 0x0;
public const long PushStream = 0x1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void Dispose()
if (!_disposed)
{
_disposed = true;
AbortStream();
_stream.Dispose();
DisposeSyncHelper();
}
Expand All @@ -94,6 +95,7 @@ public async ValueTask DisposeAsync()
if (!_disposed)
{
_disposed = true;
AbortStream();
await _stream.DisposeAsync().ConfigureAwait(false);
DisposeSyncHelper();
}
Expand Down Expand Up @@ -365,6 +367,9 @@ private async Task SendContentAsync(HttpContent content, CancellationToken cance
await content.CopyToAsync(writeStream, null, cancellationToken).ConfigureAwait(false);
}

// Set to 0 to recognize that the whole request body has been sent and therefore there's no need to abort write side in case of a premature disposal.
_requestContentLengthRemaining = 0;

if (_sendBuffer.ActiveLength != 0)
{
// Our initial send buffer, which has our headers, is normally sent out on the first write to the Http3WriteStream.
Expand Down Expand Up @@ -1217,6 +1222,25 @@ private async ValueTask<bool> ReadNextDataFrameAsync(HttpResponseMessage respons
public void Trace(string message, [CallerMemberName] string? memberName = null) =>
_connection.Trace(StreamId, message, memberName);

private void AbortStream()
{
// ToDo : locking??? we don't have any in H/3 stream, we don't have a sync root or anything.
// Check if the response body has been fully consumed.
bool requestBodyFinished = _requestContentLengthRemaining == 0; // -1 is used for unknown Content-Length, 0 for the end of content writing
bool responseBodyFinished = _responseDataPayloadRemaining == -1; // -1 is used for EOF, 0 for consumed DATA frame payload before the next read

// If the request body isn't completed, cancel it now.
if (!requestBodyFinished)
{
_stream.AbortWrite((long)Http3ErrorCode.RequestCancelled);
}
// If the response body isn't completed, cancel it now.
if (!responseBodyFinished)
{
_stream.AbortRead((long)Http3ErrorCode.RequestCancelled);
}
}

// TODO: it may be possible for Http3RequestStream to implement Stream directly and avoid this allocation.
private sealed class Http3ReadStream : HttpBaseStream
{
Expand All @@ -1240,36 +1264,42 @@ public Http3ReadStream(Http3RequestStream stream)

protected override void Dispose(bool disposing)
{
if (_stream != null)
Http3RequestStream? stream = Interlocked.Exchange(ref _stream, null);
if (stream is null)
{
if (disposing)
{
// This will remove the stream from the connection properly.
_stream.Dispose();
}
else
{
// We shouldn't be using a managed instance here, but don't have much choice -- we
// need to remove the stream from the connection's GOAWAY collection.
_stream._connection.RemoveStream(_stream._stream);
_stream._connection = null!;
}
return;
}

_stream = null;
_response = null;
if (disposing)
{
// This will remove the stream from the connection properly.
stream.Dispose();
}
else
{
// We shouldn't be using a managed instance here, but don't have much choice -- we
// need to remove the stream from the connection's GOAWAY collection.
stream._connection.RemoveStream(stream._stream);
stream._connection = null!;
}

_response = null;

base.Dispose(disposing);
}

public override async ValueTask DisposeAsync()
{
if (_stream != null)
Http3RequestStream? stream = Interlocked.Exchange(ref _stream, null);
if (stream is null)
{
await _stream.DisposeAsync().ConfigureAwait(false);
_stream = null!;
return;
}

await stream.DisposeAsync().ConfigureAwait(false);

_response = null;

await base.DisposeAsync().ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,117 @@ public async Task ReservedFrameType_Throws()
await new[] { clientTask, serverTask }.WhenAllOrAnyFailed(20_000);
}

[Fact]
public async Task RequestSentResponseDisposed_ThrowsOnServer()
{
byte[] data = Encoding.UTF8.GetBytes(new string('a', 1024));

using Http3LoopbackServer server = CreateHttp3LoopbackServer();

Task serverTask = Task.Run(async () =>
{
using Http3LoopbackConnection connection = (Http3LoopbackConnection)await server.EstablishGenericConnectionAsync();
using Http3LoopbackStream stream = await connection.AcceptRequestStreamAsync();
HttpRequestData request = await stream.ReadRequestDataAsync();
await stream.SendResponseHeadersAsync();

Stopwatch sw = Stopwatch.StartNew();
bool hasFailed = false;
while (sw.Elapsed < TestHelper.PassingTestTimeout)
{
try
{
await stream.SendResponseBodyAsync(data, isFinal: false);
}
catch (QuicStreamAbortedException)
{
hasFailed = true;
break;
}
}
Assert.True(hasFailed, $"Expected {nameof(QuicStreamAbortedException)}, instead ran successfully for {sw.Elapsed}");
});

Task clientTask = Task.Run(async () =>
{
using HttpClient client = CreateHttpClient();
using HttpRequestMessage request = new()
{
Method = HttpMethod.Get,
RequestUri = server.Address,
Version = HttpVersion30,
VersionPolicy = HttpVersionPolicy.RequestVersionExact
};

var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
var stream = await response.Content.ReadAsStreamAsync();
byte[] buffer = new byte[512];
for (int i = 0; i < 5; ++i)
{
var count = await stream.ReadAsync(buffer);
}

// We haven't finished reading the whole respose, but we're disposing it, which should turn into an exception on the server-side.
response.Dispose();
});

await new[] { clientTask, serverTask }.WhenAllOrAnyFailed(20_000);
}

[Fact]
public async Task RequestSendingResponseDisposed_ThrowsOnServer()
{
byte[] data = Encoding.UTF8.GetBytes(new string('a', 1024));

using Http3LoopbackServer server = CreateHttp3LoopbackServer();

Task serverTask = Task.Run(async () =>
{
using Http3LoopbackConnection connection = (Http3LoopbackConnection)await server.EstablishGenericConnectionAsync();
using Http3LoopbackStream stream = await connection.AcceptRequestStreamAsync();
HttpRequestData request = await stream.ReadRequestDataAsync(false);
await stream.SendResponseHeadersAsync();

Stopwatch sw = Stopwatch.StartNew();
bool hasFailed = false;
while (sw.Elapsed < TestHelper.PassingTestTimeout)
{
try
{
var (frameType, payload) = await stream.ReadFrameAsync();
Assert.Equal(Http3LoopbackStream.DataFrame, frameType);
}
catch (QuicStreamAbortedException)
{
hasFailed = true;
break;
}
}
Assert.True(hasFailed, $"Expected {nameof(QuicStreamAbortedException)}, instead ran successfully for {sw.Elapsed}");
});

Task clientTask = Task.Run(async () =>
{
using HttpClient client = CreateHttpClient();
using HttpRequestMessage request = new()
{
Method = HttpMethod.Get,
RequestUri = server.Address,
Version = HttpVersion30,
VersionPolicy = HttpVersionPolicy.RequestVersionExact,
Content = new ByteAtATimeContent(60*4, Task.CompletedTask, new TaskCompletionSource<bool>(), 250)
};

var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
var stream = await response.Content.ReadAsStreamAsync();

// We haven't finished sending the whole request, but we're disposing the response, which should turn into an exception on the server-side.
response.Dispose();
});

await new[] { clientTask, serverTask }.WhenAllOrAnyFailed(20_000);
}

[Fact]
public async Task ServerCertificateCustomValidationCallback_Succeeds()
{
Expand Down