Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add abort on content stream disposal, fix NRE, fix abort direction, a…
…dd test
  • Loading branch information
CarnaViire committed Jun 16, 2021
commit bef50e223005e70d349a0b2d50fbf28160b816a0
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ public void Dispose()
if (!_disposed)
{
_disposed = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need _disposed, or does _stream being null now indicate disposal?

_stream.Dispose();
DisposeSyncHelper();
var stream = Interlocked.Exchange(ref _stream, null!);
stream.Dispose();
DisposeSyncHelper(stream);
Comment on lines +84 to +86
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var stream = Interlocked.Exchange(ref _stream, null!);
stream.Dispose();
DisposeSyncHelper(stream);
QuicStream stream = Interlocked.Exchange(ref _stream, null!);
if (stream is not null)
{
stream.Dispose();
DisposeSyncHelper(stream);
}

}
}

Expand All @@ -92,16 +93,15 @@ public async ValueTask DisposeAsync()
if (!_disposed)
{
_disposed = true;
await _stream.DisposeAsync().ConfigureAwait(false);
DisposeSyncHelper();
var stream = Interlocked.Exchange(ref _stream, null!);
await stream.DisposeAsync().ConfigureAwait(false);
DisposeSyncHelper(stream);
Comment on lines +95 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var stream = Interlocked.Exchange(ref _stream, null!);
await stream.DisposeAsync().ConfigureAwait(false);
DisposeSyncHelper(stream);
QuicStream stream = Interlocked.Exchange(ref _stream, null!);
if (stream is not null)
{
await stream.DisposeAsync().ConfigureAwait(false);
DisposeSyncHelper(stream);
}

}
}

private void DisposeSyncHelper()
private void DisposeSyncHelper(QuicStream stream)
{
_connection.RemoveStream(_stream);
_connection = null!;
_stream = null!;
Interlocked.Exchange(ref _connection, null!).RemoveStream(stream);
Copy link
Member

@stephentoub stephentoub Jun 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we've got sole ownership of the stream now due to the Interlocked in the dispose methods above, do we still need this interlocked? Or is there another caller that could get here outside of Dispose{Async}?

Also, if it is still needed, then presumably there's a race condition around nulling this out, in which case the .RemoveStream should be ?.RemoveStream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've mostly done that to be sure the change is visible to null checks in Http3RequestStream.HandleReadResponseContentException https://github.com/dotnet/runtime/pull/54334/files#diff-b79affd636899ac98b816c1398f9dca19bc4850a939a3815c09c5a9931094a4cR1113-R1120 In the issue I'm trying to solve, disposal and this exception handling are happening concurrently. Here #52800 (comment) @scalablecory says the change might not be visible otherwise. On the other hand, @wfurt offline told me that we usually don't worry about this for example for setting and checking _disposed flag. I am not sure what should be the best practice here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest simply not nulling these fields out (i.e. _stream and _connection).

I'm not quite sure why you null them out today. Is it because you want to avoid doing stuff like Abort on the connection or AbortRead on the QuicStream if we are disposed? Because as the code stands, this is all racy and you aren't avoiding these calls consistently.

You need to either:
(a) Implement full locking here to prevent the races
(b) Avoid the races entirely by not modifying these. (There's still a race here, but it's in msquic code and they are presumably handling this today.)


_sendBuffer.Dispose();
_recvBuffer.Dispose();
Expand Down Expand Up @@ -1111,18 +1111,26 @@ private void HandleReadResponseContentException(Exception ex, CancellationToken
throw new IOException(SR.net_http_client_execution_error, new HttpRequestException(SR.net_http_client_execution_error, abortException));
case Http3ConnectionException _:
// A connection-level protocol error has occurred on our stream.
_connection.Abort(ex);
_connection?.Abort(ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can this happen? It seems weird that errors on a single HTTP stream would cause the whole connection to be killed.

throw new IOException(SR.net_http_client_execution_error, new HttpRequestException(SR.net_http_client_execution_error, ex));
case OperationCanceledException oce when oce.CancellationToken == cancellationToken:
_stream.AbortWrite((long)Http3ErrorCode.RequestCancelled);
_stream?.AbortRead((long)Http3ErrorCode.RequestCancelled);
ExceptionDispatchInfo.Throw(ex); // Rethrow.
return; // Never reached.
default:
_stream.AbortWrite((long)Http3ErrorCode.InternalError);
_stream?.AbortRead((long)Http3ErrorCode.InternalError);
throw new IOException(SR.net_http_client_execution_error, new HttpRequestException(SR.net_http_client_execution_error, ex));
}
}

private void CancelResponseContentRead()
{
if (_responseDataPayloadRemaining != -1) // -1 indicates EOS
{
_stream.AbortRead((long)Http3ErrorCode.RequestCancelled);
}
}

private async ValueTask<bool> ReadNextDataFrameAsync(HttpResponseMessage response, CancellationToken cancellationToken)
{
if (_responseDataPayloadRemaining == -1)
Expand Down Expand Up @@ -1194,6 +1202,7 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
_stream.CancelResponseContentRead();
// This will remove the stream from the connection properly.
_stream.Dispose();
}
Expand All @@ -1216,6 +1225,7 @@ public override async ValueTask DisposeAsync()
{
if (_stream != null)
{
_stream.CancelResponseContentRead();
await _stream.DisposeAsync().ConfigureAwait(false);
_stream = null!;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,72 @@ public async Task Public_Interop_Upgrade_Success(string uri)
}
}

[ConditionalFact(nameof(IsMsQuicSupported))]
public async Task ResponseCancellationViaBothDisposeAndCancellationToken_Success()
{
if (UseQuicImplementationProvider != QuicImplementationProviders.MsQuic)
{
return;
}

using Http3LoopbackServer server = CreateHttp3LoopbackServer();

var pauseServerTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
var pauseClientTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);

Task serverTask = Task.Run(async () =>
{
using Http3LoopbackConnection connection = (Http3LoopbackConnection)await server.EstablishGenericConnectionAsync();
using Http3LoopbackStream stream = await connection.AcceptRequestStreamAsync();
HttpRequestData request = await stream.ReadRequestDataAsync().ConfigureAwait(false);

int contentLength = 2*1024;
var headers = new List<HttpHeaderData>();
headers.Append(new HttpHeaderData("Content-Length", contentLength.ToString(CultureInfo.InvariantCulture)));

await stream.SendResponseHeadersAsync(HttpStatusCode.OK, headers).ConfigureAwait(false);
await stream.SendDataFrameAsync(new byte[1024]).ConfigureAwait(false);

await pauseServerTcs.Task.WaitAsync(TimeSpan.FromSeconds(10));

var ex = await Assert.ThrowsAsync<QuicStreamAbortedException>(() => stream.SendDataFrameAsync(new byte[1024]));

await stream.ShutdownSendAsync().ConfigureAwait(false);
pauseClientTcs.SetResult(true);
});

Task clientTask = Task.Run(async () =>
{
using HttpClient client = CreateHttpClient();

using HttpRequestMessage request = new()
{
Method = HttpMethod.Get,
RequestUri = server.Address,
Version = HttpVersion30,
VersionPolicy = HttpVersionPolicy.RequestVersionExact
};
HttpResponseMessage response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).WaitAsync(TimeSpan.FromSeconds(10));

Stream stream = await response.Content.ReadAsStreamAsync();

int bytesRead = await stream.ReadAsync(new byte[1024]);
Assert.Equal(1024, bytesRead);

var cts = new CancellationTokenSource();

cts.Token.Register(() => response.Dispose());
cts.CancelAfter(200);

await Assert.ThrowsAsync<OperationCanceledException>(() => stream.ReadAsync(new byte[1024], cancellationToken: cts.Token).AsTask());

pauseServerTcs.SetResult(true);
await pauseClientTcs.Task.WaitAsync(TimeSpan.FromSeconds(3));
});

await new[] { clientTask, serverTask }.WhenAllOrAnyFailed(20_000);
}

/// <summary>
/// These are public interop test servers for various QUIC and HTTP/3 implementations,
/// taken from https://github.com/quicwg/base-drafts/wiki/Implementations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ private async ValueTask<CancellationTokenRegistration> HandleWriteStartState(Can
if (_state.SendState == SendState.Aborted)
{
cancellationToken.ThrowIfCancellationRequested();
if (_state.SendErrorCode != -1)
{
throw new QuicStreamAbortedException(_state.SendErrorCode);
}
throw new OperationCanceledException(SR.net_quic_sending_aborted);
}
else if (_state.SendState == SendState.ConnectionClosed)
Expand Down