Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixup! Remove non-Async versions
  • Loading branch information
rzikm committed Apr 11, 2022
commit 43acea1c2d83a462b4fd7aaeb85e64c1c0420b32
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public override void Dispose()
stream.Dispose();
}

// We don't dispose the connection currently, because this causes races when the server connection is closed before
// the client has received and handled all response data.
// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832
// We don't dispose the connection currently, because this causes races when the server connection is closed before
// the client has received and handled all response data.
// See discussion in https://github.com/dotnet/runtime/pull/57223#discussion_r687447832
#if false
// Dispose the connection
// If we already waited for graceful shutdown from the client, then the connection is already closed and this will simply release the handle.
Expand All @@ -79,14 +79,14 @@ public async Task CloseAsync(long errorCode)
await _connection.CloseAsync(errorCode).ConfigureAwait(false);
}

public Http3LoopbackStream OpenUnidirectionalStream()
public async ValueTask<Http3LoopbackStream> OpenUnidirectionalStreamAsync()
{
return new Http3LoopbackStream(_connection.OpenUnidirectionalStream());
return new Http3LoopbackStream(await _connection.OpenUnidirectionalStreamAsync());
}

public Http3LoopbackStream OpenBidirectionalStream()
public async ValueTask<Http3LoopbackStream> OpenBidirectionalStreamAsync()
{
return new Http3LoopbackStream(_connection.OpenBidirectionalStream());
return new Http3LoopbackStream(await _connection.OpenBidirectionalStreamAsync());
}

public static int GetRequestId(QuicStream stream)
Expand Down Expand Up @@ -185,10 +185,10 @@ public async Task<Http3LoopbackStream> AcceptRequestStreamAsync()

public async Task EstablishControlStreamAsync()
{
_outboundControlStream = OpenUnidirectionalStream();
_outboundControlStream = await OpenUnidirectionalStreamAsync();
await _outboundControlStream.SendUnidirectionalStreamTypeAsync(Http3LoopbackStream.ControlStream);
await _outboundControlStream.SendSettingsFrameAsync();
}
}

public override async Task<byte[]> ReadRequestBodyAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,40 +174,49 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, lon
// Allocate an active request
QuicStream? quicStream = null;
Http3RequestStream? requestStream = null;
ValueTask waitTask = default;

try
{
try
{
while (true)
if (_connection != null)
{
ValueTask<QuicStream> openTask;
bool synchronous = false;

// unfortunately, the compiler cannot infer that the task is consumed only once
#pragma warning disable CA2012 // ValueTasks instances should only be consumed once
lock (SyncObj)
{
if (_connection == null)
{
break;
}
openTask = _connection.OpenBidirectionalStreamAsync(cancellationToken);

if (_connection.GetRemoteAvailableBidirectionalStreamCount() > 0)
if (openTask.IsCompleted)
{
quicStream = _connection.OpenBidirectionalStream();
// hot path for synchronous completion: finish while still holding the lock
synchronous = true;
quicStream = openTask.Result;
requestStream = new Http3RequestStream(request, this, quicStream);
_activeRequests.Add(quicStream, requestStream);
break;
}

waitTask = _connection.WaitForAvailableBidirectionalStreamsAsync(cancellationToken);
}

if (HttpTelemetry.Log.IsEnabled() && !waitTask.IsCompleted && queueStartingTimestamp == 0)
if (!synchronous)
{
// We avoid logging RequestLeftQueue if a stream was available immediately (synchronously)
queueStartingTimestamp = Stopwatch.GetTimestamp();
}
// cold path: waiting until a stream is available
if (HttpTelemetry.Log.IsEnabled() && queueStartingTimestamp == 0)
{
queueStartingTimestamp = Stopwatch.GetTimestamp();
}

// Wait for an available stream (based on QUIC MAX_STREAMS) if there isn't one available yet.
await waitTask.ConfigureAwait(false);
quicStream = await openTask.ConfigureAwait(false);
requestStream = new Http3RequestStream(request, this, quicStream);

lock (SyncObj)
{
_activeRequests.Add(quicStream, requestStream);
}
}
#pragma warning restore CA2021
}
}
finally
Expand Down Expand Up @@ -377,7 +386,7 @@ private async Task SendSettingsAsync()
{
try
{
_clientControl = _connection!.OpenUnidirectionalStream();
_clientControl = await _connection!.OpenUnidirectionalStreamAsync().ConfigureAwait(false);
await _clientControl.WriteAsync(_pool.Settings.Http3SettingsFrame, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
Expand Down