From 933023b6f818ec54dfe84e7d31ec50dfe5044c13 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 12 Mar 2024 15:50:49 +0800 Subject: [PATCH 01/16] Prevent block inside ResolveAsync from blocking PollingResolver.Refresh (#2385) --- .../Balancer/PollingResolver.cs | 4 +- .../Balancer/ConnectionManagerTests.cs | 17 ++++- .../Balancer/PickFirstBalancerTests.cs | 2 + .../Balancer/ResolverTests.cs | 68 +++++++++++++++++++ .../Balancer/RoundRobinBalancerTests.cs | 3 + .../Balancer/WaitForReadyTests.cs | 13 ++-- test/Shared/TestResolver.cs | 6 +- 7 files changed, 106 insertions(+), 7 deletions(-) diff --git a/src/Grpc.Net.Client/Balancer/PollingResolver.cs b/src/Grpc.Net.Client/Balancer/PollingResolver.cs index f5c9aea61..0d4ced710 100644 --- a/src/Grpc.Net.Client/Balancer/PollingResolver.cs +++ b/src/Grpc.Net.Client/Balancer/PollingResolver.cs @@ -135,7 +135,9 @@ public sealed override void Refresh() if (_resolveTask.IsCompleted) { - _resolveTask = ResolveNowAsync(_cts.Token); + // Run ResolveAsync in a background task. + // This is done to prevent synchronous block inside ResolveAsync from blocking future Refresh calls. + _resolveTask = Task.Run(() => ResolveNowAsync(_cts.Token), _cts.Token); _resolveTask.ContinueWith(static (t, state) => { var pollingResolver = (PollingResolver)state!; diff --git a/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs b/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs index 227335e34..1aa0ab7a2 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs @@ -409,6 +409,7 @@ public async Task PickAsync_DoesNotDeadlockAfterReconnect_WithResolverError() services.AddNUnitLogger(); await using var serviceProvider = services.BuildServiceProvider(); var loggerFactory = serviceProvider.GetRequiredService(); + var logger = loggerFactory.CreateLogger(GetType()); var resolver = new TestResolver(loggerFactory); @@ -427,22 +428,29 @@ public async Task PickAsync_DoesNotDeadlockAfterReconnect_WithResolverError() clientChannel)); // Act + logger.LogInformation("Client connecting."); var connectTask = clientChannel.ConnectAsync(waitForReady: true, cancellationToken: CancellationToken.None); + + logger.LogInformation("Starting pick on connecting channel."); var pickTask = clientChannel.PickAsync( new PickContext { Request = new HttpRequestMessage() }, waitForReady: true, CancellationToken.None).AsTask(); + logger.LogInformation("Waiting for resolve to complete."); + await resolver.HasResolvedTask.DefaultTimeout(); + resolver.UpdateAddresses(new List { new BalancerAddress("localhost", 80) }); await Task.WhenAll(connectTask, pickTask).DefaultTimeout(); - // Simulate transport/network issue + logger.LogInformation("Simulate transport/network issue."); transportFactory.Transports.ForEach(t => t.Disconnect()); resolver.UpdateError(new Status(StatusCode.Unavailable, "Test error")); + logger.LogInformation("Starting pick on disconnected channel."); pickTask = clientChannel.PickAsync( new PickContext { Request = new HttpRequestMessage() }, waitForReady: true, @@ -454,7 +462,10 @@ public async Task PickAsync_DoesNotDeadlockAfterReconnect_WithResolverError() // Assert // Should not timeout (deadlock) + logger.LogInformation("Wait for pick task to complete."); await pickTask.DefaultTimeout(); + + logger.LogInformation("Done."); } [Test] @@ -489,6 +500,8 @@ public async Task PickAsync_DoesNotDeadlockAfterReconnect_WithZeroAddressResolve waitForReady: true, CancellationToken.None).AsTask(); + await resolver.HasResolvedTask.DefaultTimeout(); + resolver.UpdateAddresses(new List { new BalancerAddress("localhost", 80) @@ -560,6 +573,8 @@ public async Task PickAsync_ExecutionContext_DoesNotCaptureAsyncLocalsInConnect( waitForReady: true, CancellationToken.None).AsTask(); + await resolver.HasResolvedTask.DefaultTimeout(); + resolver.UpdateAddresses(new List { new BalancerAddress("localhost", 80) diff --git a/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs b/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs index 598bfda7c..58af4b636 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs @@ -196,6 +196,8 @@ public async Task ResolverError_HasFailedSubchannel_SubchannelShutdown() _ = channel.ConnectAsync(); // Assert + await resolver.HasResolvedTask.DefaultTimeout(); + var subchannels = channel.ConnectionManager.GetSubchannels(); Assert.AreEqual(1, subchannels.Count); diff --git a/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs b/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs index da8a46b8c..de549fcf5 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs @@ -42,6 +42,74 @@ namespace Grpc.Net.Client.Tests.Balancer; [TestFixture] public class ResolverTests { + [Test] + public async Task Refresh_BlockInsideResolveAsync_ResolverNotBlocked() + { + // Arrange + var waitHandle = new ManualResetEvent(false); + + var services = new ServiceCollection(); + var testSink = new TestSink(); + services.AddLogging(b => + { + b.AddProvider(new TestLoggerProvider(testSink)); + }); + services.AddNUnitLogger(); + var loggerFactory = services.BuildServiceProvider().GetRequiredService(); + + var logger = loggerFactory.CreateLogger(); + logger.LogInformation("Starting."); + + var lockingResolver = new LockingPollingResolver(loggerFactory, waitHandle); + lockingResolver.Start(result => { }); + + // Act + logger.LogInformation("Refresh call 1. This should block."); + var refreshTask1 = Task.Run(lockingResolver.Refresh); + + logger.LogInformation("Refresh call 2. This should complete."); + var refreshTask2 = Task.Run(lockingResolver.Refresh); + + // Assert + await Task.WhenAny(refreshTask1, refreshTask2).DefaultTimeout(); + + logger.LogInformation("Setting wait handle."); + waitHandle.Set(); + + logger.LogInformation("Finishing."); + } + + private class LockingPollingResolver : PollingResolver + { + private ManualResetEvent? _waitHandle; + private readonly object _lock = new(); + + public LockingPollingResolver(ILoggerFactory loggerFactory, ManualResetEvent waitHandle) : base(loggerFactory) + { + _waitHandle = waitHandle; + } + + protected override Task ResolveAsync(CancellationToken cancellationToken) + { + lock (_lock) + { + // Block the first caller. + if (_waitHandle != null) + { + _waitHandle.WaitOne(); + _waitHandle = null; + } + } + + Listener(ResolverResult.ForResult(new List + { + new BalancerAddress("localhost", 80) + })); + + return Task.CompletedTask; + } + } + [Test] public async Task Resolver_ResolveNameFromServices_Success() { diff --git a/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs b/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs index 7bcef90b2..c2c488b30 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs @@ -187,7 +187,10 @@ public async Task ResolverError_HasFailedSubchannel_SubchannelShutdown() _ = channel.ConnectAsync(); // Assert + await resolver.HasResolvedTask.DefaultTimeout(); + var subchannels = channel.ConnectionManager.GetSubchannels(); + Assert.AreEqual(1, subchannels.Count); Assert.AreEqual(1, subchannels[0]._addresses.Count); diff --git a/test/Grpc.Net.Client.Tests/Balancer/WaitForReadyTests.cs b/test/Grpc.Net.Client.Tests/Balancer/WaitForReadyTests.cs index f6206c6dd..ccba35f57 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/WaitForReadyTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/WaitForReadyTests.cs @@ -58,21 +58,26 @@ public async Task ResolverReturnsNoAddresses_CallWithWaitForReady_Wait() }); var services = new ServiceCollection(); + services.AddNUnitLogger(); - var resolver = new TestResolver(); - - services.AddSingleton(new TestResolverFactory(resolver)); + services.AddSingleton(); + services.AddSingleton(s => new TestResolverFactory(s.GetRequiredService())); services.AddSingleton(new TestSubchannelTransportFactory()); + var serviceProvider = services.BuildServiceProvider(); + + var resolver = serviceProvider.GetRequiredService(); var invoker = HttpClientCallInvokerFactory.Create(testMessageHandler, "test:///localhost", configure: o => { o.Credentials = ChannelCredentials.Insecure; - o.ServiceProvider = services.BuildServiceProvider(); + o.ServiceProvider = serviceProvider; }); // Act var call = invoker.AsyncUnaryCall(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions().WithWaitForReady(), new HelloRequest()); + await resolver.HasResolvedTask.DefaultTimeout(); + var responseTask = call.ResponseAsync; Assert.IsFalse(responseTask.IsCompleted); diff --git a/test/Shared/TestResolver.cs b/test/Shared/TestResolver.cs index 7a025aaed..8136f981b 100644 --- a/test/Shared/TestResolver.cs +++ b/test/Shared/TestResolver.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -34,6 +34,7 @@ internal class TestResolver : PollingResolver { private readonly Func? _onRefreshAsync; private readonly TaskCompletionSource _hasResolvedTcs; + private readonly ILogger _logger; private ResolverResult? _result; public Task HasResolvedTask => _hasResolvedTcs.Task; @@ -46,15 +47,18 @@ public TestResolver(ILoggerFactory? loggerFactory = null, Func? onRefreshA { _onRefreshAsync = onRefreshAsync; _hasResolvedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _logger = (ILogger?)loggerFactory?.CreateLogger() ?? NullLogger.Instance; } public void UpdateAddresses(List addresses, ServiceConfig? serviceConfig = null, Status? serviceConfigStatus = null) { + _logger.LogInformation("Updating result addresses: {Addresses}", string.Join(", ", addresses)); UpdateResult(ResolverResult.ForResult(addresses, serviceConfig, serviceConfigStatus)); } public void UpdateError(Status status) { + _logger.LogInformation("Updating result error: {Status}", status); UpdateResult(ResolverResult.ForFailure(status)); } From 874cd11637ac71e36f1565401c923964ff117d8c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 19 Mar 2024 08:27:13 +0800 Subject: [PATCH 02/16] Bump follow-redirects from 1.15.4 to 1.15.6 in /testassets/InteropTestsGrpcWebWebsite/Tests (#2392) Bumps [follow-redirects](https://github.com/follow-redirects/follow-redirects) from 1.15.4 to 1.15.6. - [Release notes](https://github.com/follow-redirects/follow-redirects/releases) - [Commits](https://github.com/follow-redirects/follow-redirects/compare/v1.15.4...v1.15.6) --- updated-dependencies: - dependency-name: follow-redirects dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .../Tests/package-lock.json | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/testassets/InteropTestsGrpcWebWebsite/Tests/package-lock.json b/testassets/InteropTestsGrpcWebWebsite/Tests/package-lock.json index 7e9b5af4f..4e02d8464 100644 --- a/testassets/InteropTestsGrpcWebWebsite/Tests/package-lock.json +++ b/testassets/InteropTestsGrpcWebWebsite/Tests/package-lock.json @@ -1948,9 +1948,9 @@ } }, "node_modules/follow-redirects": { - "version": "1.15.4", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.4.tgz", - "integrity": "sha512-Cr4D/5wlrb0z9dgERpUL3LrmPKVDsETIJhaCMeDfuFYcqa5bldGV6wBsAN6X/vxlXQtFBMrXdXxdL8CbDTGniw==", + "version": "1.15.6", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz", + "integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==", "funding": [ { "type": "individual", @@ -5623,9 +5623,9 @@ } }, "follow-redirects": { - "version": "1.15.4", - "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.4.tgz", - "integrity": "sha512-Cr4D/5wlrb0z9dgERpUL3LrmPKVDsETIJhaCMeDfuFYcqa5bldGV6wBsAN6X/vxlXQtFBMrXdXxdL8CbDTGniw==" + "version": "1.15.6", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.6.tgz", + "integrity": "sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==" }, "form-data": { "version": "4.0.0", From a51fec8ff1b98ffb333c357780efd06c0e65dbb6 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Fri, 5 Apr 2024 06:09:28 +0800 Subject: [PATCH 03/16] Update microsoft-support.md (#2403) --- microsoft-support.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/microsoft-support.md b/microsoft-support.md index 4d4790a18..926641777 100644 --- a/microsoft-support.md +++ b/microsoft-support.md @@ -22,7 +22,7 @@ Support is provided for the following grpc-dotnet packages: Notes: * Applications must be using a [currently supported .NET release](https://dotnet.microsoft.com/platform/support/policy). -* Minimum supported grpc-dotnet version is currently v2.37.0. The minimum grpc-dotnet version supported is increased when major new .NET versions are released. +* Minimum supported grpc-dotnet version is currently v2.59.0. The minimum grpc-dotnet version supported is increased when major new .NET versions are released. * Minimum supported version is the earliest major and minor release required to obtain assisted support. Please utilize public community channels for assistance or log issues directly on GitHub for releases before the minimum supported version. * Assisted support is only available for the official builds released from https://github.com/grpc/grpc-dotnet, and no assisted support option is available for individual forks. * Please note that new features and security\bug fixes are provided in the latest released version and are not backported to previous versions. To obtain the latest updates and features, please upgrade to the latest version. From a4d83f6d75cf93004a46bc618d48332e66f4d704 Mon Sep 17 00:00:00 2001 From: Jakub Januszkiewicz Date: Fri, 12 Apr 2024 14:49:48 +0200 Subject: [PATCH 04/16] fix a couple typos in README.md (#2397) --- src/Grpc.StatusProto/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Grpc.StatusProto/README.md b/src/Grpc.StatusProto/README.md index baef8e8f7..2f6268fbb 100644 --- a/src/Grpc.StatusProto/README.md +++ b/src/Grpc.StatusProto/README.md @@ -32,7 +32,7 @@ and the [Google APIs overview of the error model](https://cloud.google.com/apis/ ## .NET implementation of the richer error model -The error model is define by the protocol buffers files [status.proto](https://github.com/googleapis/googleapis/blob/master/google/rpc/status.proto) +The error model is defined by the protocol buffers files [status.proto](https://github.com/googleapis/googleapis/blob/master/google/rpc/status.proto) and [error_details.proto](https://github.com/googleapis/googleapis/blob/master/google/rpc/error_details.proto), and the `Google.Api.CommonProtos` NuGet package that provides the generated .NET classes from these proto files. @@ -97,7 +97,7 @@ in [grpc/status.h](https://github.com/grpc/grpc/blob/master/include/grpc/status. The recommendation is to use the values in `Google.Rpc.Code` as a convention. This is a must for Google APIs and strongly recommended for third party services. -But users can use a different domain of values if they want and and as long as their +But users can use a different domain of values if they want to and as long as their services are mutually compatible, things will work fine. In the richer error model the `RpcException` will contain both a `Grpc.Core.Status` (for the From f2709f1bf570d9cc814282217d29aef8d584bbbc Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Sat, 13 Apr 2024 13:39:16 +0800 Subject: [PATCH 05/16] Interrupt existing subchannel connect attempt when reconnect is requested (#2410) --- .../Balancer/Internal/ISubchannelTransport.cs | 2 +- .../Internal/PassiveSubchannelTransport.cs | 2 +- .../SocketConnectivitySubchannelTransport.cs | 2 +- src/Grpc.Net.Client/Balancer/Subchannel.cs | 36 ++++- .../Balancer/ConnectionManagerTests.cs | 6 +- .../Balancer/PickFirstBalancerTests.cs | 20 +-- .../Balancer/ResolverTests.cs | 132 +++++++++++++++++- .../Balancer/RoundRobinBalancerTests.cs | 6 +- .../Grpc.Net.Client.Tests/GrpcChannelTests.cs | 4 +- .../Balancer/TestSubChannelTransport.cs | 8 +- .../TestSubChannelTransportFactory.cs | 24 +++- 11 files changed, 207 insertions(+), 35 deletions(-) diff --git a/src/Grpc.Net.Client/Balancer/Internal/ISubchannelTransport.cs b/src/Grpc.Net.Client/Balancer/Internal/ISubchannelTransport.cs index 5ef60ec9b..fceffb3e6 100644 --- a/src/Grpc.Net.Client/Balancer/Internal/ISubchannelTransport.cs +++ b/src/Grpc.Net.Client/Balancer/Internal/ISubchannelTransport.cs @@ -34,7 +34,7 @@ internal interface ISubchannelTransport : IDisposable TransportStatus TransportStatus { get; } ValueTask GetStreamAsync(DnsEndPoint endPoint, CancellationToken cancellationToken); - ValueTask TryConnectAsync(ConnectContext context); + ValueTask TryConnectAsync(ConnectContext context, int attempt); void Disconnect(); } diff --git a/src/Grpc.Net.Client/Balancer/Internal/PassiveSubchannelTransport.cs b/src/Grpc.Net.Client/Balancer/Internal/PassiveSubchannelTransport.cs index 979c8dc79..a4ea4093c 100644 --- a/src/Grpc.Net.Client/Balancer/Internal/PassiveSubchannelTransport.cs +++ b/src/Grpc.Net.Client/Balancer/Internal/PassiveSubchannelTransport.cs @@ -52,7 +52,7 @@ public void Disconnect() _subchannel.UpdateConnectivityState(ConnectivityState.Idle, "Disconnected."); } - public ValueTask TryConnectAsync(ConnectContext context) + public ValueTask TryConnectAsync(ConnectContext context, int attempt) { Debug.Assert(_subchannel._addresses.Count == 1); Debug.Assert(CurrentEndPoint == null); diff --git a/src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs b/src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs index 85669a691..b54666d88 100644 --- a/src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs +++ b/src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs @@ -144,7 +144,7 @@ private void DisconnectUnsynchronized() _currentEndPoint = null; } - public async ValueTask TryConnectAsync(ConnectContext context) + public async ValueTask TryConnectAsync(ConnectContext context, int attempt) { Debug.Assert(CurrentEndPoint == null); diff --git a/src/Grpc.Net.Client/Balancer/Subchannel.cs b/src/Grpc.Net.Client/Balancer/Subchannel.cs index a1c2f9323..4209af875 100644 --- a/src/Grpc.Net.Client/Balancer/Subchannel.cs +++ b/src/Grpc.Net.Client/Balancer/Subchannel.cs @@ -174,6 +174,8 @@ public void UpdateAddresses(IReadOnlyList addresses) return; } + SubchannelLog.AddressesUpdated(_logger, Id, addresses); + // Get a copy of the current address before updating addresses. // Updating addresses to not contain this value changes the property to return null. var currentAddress = CurrentAddress; @@ -278,6 +280,8 @@ private void CancelInProgressConnect() _connectContext.CancelConnect(); _connectContext.Dispose(); } + + _delayInterruptTcs?.TrySetResult(null); } } @@ -313,7 +317,7 @@ private async Task ConnectTransportAsync() } } - switch (await _transport.TryConnectAsync(connectContext).ConfigureAwait(false)) + switch (await _transport.TryConnectAsync(connectContext, attempt).ConfigureAwait(false)) { case ConnectResult.Success: return; @@ -345,17 +349,21 @@ private async Task ConnectTransportAsync() { // Task.Delay won. Check CTS to see if it won because of cancellation. delayCts.Token.ThrowIfCancellationRequested(); + SubchannelLog.ConnectBackoffComplete(_logger, Id); } else { SubchannelLog.ConnectBackoffInterrupted(_logger, Id); - // Delay interrupt was triggered. Reset back-off. - backoffPolicy = _manager.BackoffPolicyFactory.Create(); - // Cancel the Task.Delay that's no longer needed. // https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/519ef7d231c01116f02bc04354816a735f2a36b6/AsyncGuidance.md#using-a-timeout delayCts.Cancel(); + + // Check to connect context token to see if the delay was interrupted because of a connect cancellation. + connectContext.CancellationToken.ThrowIfCancellationRequested(); + + // Delay interrupt was triggered. Reset back-off. + backoffPolicy = _manager.BackoffPolicyFactory.Create(); } } } @@ -532,6 +540,12 @@ internal static class SubchannelLog private static readonly Action _cancelingConnect = LoggerMessage.Define(LogLevel.Debug, new EventId(17, "CancelingConnect"), "Subchannel id '{SubchannelId}' canceling connect."); + private static readonly Action _connectBackoffComplete = + LoggerMessage.Define(LogLevel.Trace, new EventId(18, "ConnectBackoffComplete"), "Subchannel id '{SubchannelId}' connect backoff complete."); + + private static readonly Action _addressesUpdated = + LoggerMessage.Define(LogLevel.Trace, new EventId(19, "AddressesUpdated"), "Subchannel id '{SubchannelId}' updated with addresses: {Addresses}"); + public static void SubchannelCreated(ILogger logger, string subchannelId, IReadOnlyList addresses) { if (logger.IsEnabled(LogLevel.Debug)) @@ -620,5 +634,19 @@ public static void CancelingConnect(ILogger logger, string subchannelId) { _cancelingConnect(logger, subchannelId, null); } + + public static void ConnectBackoffComplete(ILogger logger, string subchannelId) + { + _connectBackoffComplete(logger, subchannelId, null); + } + + public static void AddressesUpdated(ILogger logger, string subchannelId, IReadOnlyList addresses) + { + if (logger.IsEnabled(LogLevel.Trace)) + { + var addressesText = string.Join(", ", addresses.Select(a => a.EndPoint.Host + ":" + a.EndPoint.Port)); + _addressesUpdated(logger, subchannelId, addressesText, null); + } + } } #endif diff --git a/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs b/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs index 1aa0ab7a2..98f7c7c4c 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs @@ -188,7 +188,7 @@ public async Task PickAsync_ErrorConnectingToSubchannel_ThrowsError() new BalancerAddress("localhost", 80) }); - var transportFactory = new TestSubchannelTransportFactory((s, c) => + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => { return Task.FromException(new Exception("Test error!")); }); @@ -357,7 +357,7 @@ public async Task UpdateAddresses_ConnectIsInProgress_InProgressConnectIsCancele var syncPoint = new SyncPoint(runContinuationsAsynchronously: true); - var transportFactory = new TestSubchannelTransportFactory(async (s, c) => + var transportFactory = TestSubchannelTransportFactory.Create(async (s, c) => { c.Register(state => ((SyncPoint)state!).Continue(), syncPoint); @@ -548,7 +548,7 @@ public async Task PickAsync_ExecutionContext_DoesNotCaptureAsyncLocalsInConnect( var callbackAsyncLocalValues = new List(); - var transportFactory = new TestSubchannelTransportFactory((subchannel, cancellationToken) => + var transportFactory = TestSubchannelTransportFactory.Create((subchannel, cancellationToken) => { callbackAsyncLocalValues.Add(asyncLocal.Value); if (callbackAsyncLocalValues.Count >= 2) diff --git a/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs b/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs index 58af4b636..84f209dde 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs @@ -58,7 +58,7 @@ public async Task ChangeAddresses_HasReadySubchannel_OldSubchannelShutdown() services.AddSingleton(new TestResolverFactory(resolver)); var subChannelConnections = new List(); - var transportFactory = new TestSubchannelTransportFactory((s, c) => + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => { lock (subChannelConnections) { @@ -176,7 +176,7 @@ public async Task ResolverError_HasFailedSubchannel_SubchannelShutdown() new BalancerAddress("localhost", 80) }); - var transportFactory = new TestSubchannelTransportFactory((s, c) => Task.FromResult(new TryConnectResult(ConnectivityState.TransientFailure))); + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => Task.FromResult(new TryConnectResult(ConnectivityState.TransientFailure))); services.AddSingleton(new TestResolverFactory(resolver)); services.AddSingleton(transportFactory); var serviceProvider = services.BuildServiceProvider(); @@ -234,7 +234,7 @@ public async Task RequestConnection_InitialConnectionFails_ExponentialBackoff() var connectivityState = ConnectivityState.TransientFailure; services.AddSingleton(new TestResolverFactory(resolver)); - services.AddSingleton(new TestSubchannelTransportFactory(async (s, c) => + services.AddSingleton(TestSubchannelTransportFactory.Create(async (s, c) => { await syncPoint.WaitToContinue(); return new TryConnectResult(connectivityState); @@ -290,7 +290,7 @@ public async Task RequestConnection_InitialConnectionEnds_EntersIdleState() }); var transportConnectCount = 0; - var transportFactory = new TestSubchannelTransportFactory((s, c) => + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => { transportConnectCount++; return Task.FromResult(new TryConnectResult(ConnectivityState.Ready)); @@ -340,7 +340,7 @@ public async Task RequestConnection_IdleConnectionConnectAsync_StateToReady() resolver.UpdateAddresses(new List { new BalancerAddress("localhost", 80) }); var transportConnectCount = 0; - var transportFactory = new TestSubchannelTransportFactory((s, c) => + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => { transportConnectCount++; return Task.FromResult(new TryConnectResult(ConnectivityState.Ready)); @@ -385,7 +385,7 @@ public async Task RequestConnection_IdleConnectionPick_StateToReady() resolver.UpdateAddresses(new List { new BalancerAddress("localhost", 80) }); var transportConnectCount = 0; - var transportFactory = new TestSubchannelTransportFactory((s, c) => + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => { transportConnectCount++; return Task.FromResult(new TryConnectResult(ConnectivityState.Ready)); @@ -448,7 +448,7 @@ public async Task UnaryCall_TransportConnecting_OnePickStarted() var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var transportConnectCount = 0; - var transportFactory = new TestSubchannelTransportFactory((s, c) => + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => { transportConnectCount++; @@ -510,7 +510,7 @@ public async Task UnaryCall_TransportConnecting_ErrorAfterTransientFailure() var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var transportConnectCount = 0; - var transportFactory = new TestSubchannelTransportFactory((s, c) => + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => { transportConnectCount++; @@ -576,7 +576,7 @@ public async Task DeadlineExceeded_MultipleCalls_CallsWaitForDeadline() var resolver = new TestResolver(); resolver.UpdateAddresses(new List { new BalancerAddress("localhost", 80) }); - var transportFactory = new TestSubchannelTransportFactory((s, c) => + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => { return Task.FromResult(new TryConnectResult(ConnectivityState.Connecting)); }); @@ -640,7 +640,7 @@ public async Task ConnectTimeout_MultipleCalls_AttemptReconnect() resolver.UpdateAddresses(new List { new BalancerAddress("localhost", 80) }); var tryConnectTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var transportFactory = new TestSubchannelTransportFactory((s, ct) => + var transportFactory = TestSubchannelTransportFactory.Create((s, ct) => { return tryConnectTcs.Task; }); diff --git a/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs b/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs index de549fcf5..bb6e3d1eb 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs @@ -19,6 +19,7 @@ #if SUPPORT_LOAD_BALANCING using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Net; using System.Threading; @@ -298,7 +299,7 @@ public async Task Resolver_ServiceConfigInResult() var currentConnectivityState = ConnectivityState.Ready; services.AddSingleton(new TestResolverFactory(resolver)); - services.AddSingleton(new TestSubchannelTransportFactory(async (s, c) => + services.AddSingleton(TestSubchannelTransportFactory.Create(async (s, i, c) => { await syncPoint.WaitToContinue(); return new TryConnectResult(currentConnectivityState); @@ -551,5 +552,134 @@ public async Task ResolveServiceConfig_ErrorOnSecondResolve_PickSuccess() var balancer = (ChildHandlerLoadBalancer)channel.ConnectionManager._balancer!; return (T?)balancer._current?.LoadBalancer; } + + internal class TestBackoffPolicyFactory : IBackoffPolicyFactory + { + private readonly TimeSpan _backoff; + + public TestBackoffPolicyFactory() : this(TimeSpan.FromSeconds(20)) + { + } + + public TestBackoffPolicyFactory(TimeSpan backoff) + { + _backoff = backoff; + } + + public IBackoffPolicy Create() + { + return new TestBackoffPolicy(_backoff); + } + + private class TestBackoffPolicy : IBackoffPolicy + { + private readonly TimeSpan _backoff; + + public TestBackoffPolicy(TimeSpan backoff) + { + _backoff = backoff; + } + + public TimeSpan NextBackoff() + { + return _backoff; + } + } + } + + [Test] + public async Task Resolver_UpdateResultsAfterPreviousConnect_InterruptConnect() + { + // Arrange + var services = new ServiceCollection(); + + // add logger + services.AddNUnitLogger(); + var loggerFactory = services.BuildServiceProvider().GetRequiredService(); + var logger = loggerFactory.CreateLogger(); + + // add resolver and balancer + var resolver = new TestResolver(loggerFactory); + var result = ResolverResult.ForResult(new List { new BalancerAddress("localhost", 80) }, serviceConfig: null, serviceConfigStatus: null); + resolver.UpdateResult(result); + + services.AddSingleton(new TestResolverFactory(resolver)); + services.AddSingleton(new TestBackoffPolicyFactory(TimeSpan.FromSeconds(0.2))); + + var tryConnectData = new List<(IReadOnlyList BalancerAddresses, int Attempt, bool IsCancellationRequested)>(); + + var tryConnectCount = 0; + services.AddSingleton( + TestSubchannelTransportFactory.Create((subchannel, attempt, cancellationToken) => + { + var addresses = subchannel.GetAddresses(); + var isCancellationRequested = cancellationToken.IsCancellationRequested; + ConnectivityState state; + + var i = Interlocked.Increment(ref tryConnectCount); + if (i == 1) + { + state = ConnectivityState.Ready; + } + else + { + state = attempt >= 2 ? ConnectivityState.Ready : ConnectivityState.TransientFailure; + } + + logger.LogInformation("TryConnect attempt {Attempt} to addresses {Addresses}. State: {ConnectivityState}, IsCancellationRequested: {IsCancellationRequested}", attempt, string.Join(", ", addresses), state, isCancellationRequested); + + lock (tryConnectData) + { + tryConnectData.Add((addresses, attempt, isCancellationRequested)); + } + + return Task.FromResult(new TryConnectResult(state)); + })); + + var channelOptions = new GrpcChannelOptions + { + Credentials = ChannelCredentials.Insecure, + ServiceProvider = services.BuildServiceProvider(), + }; + + // Act + var channel = GrpcChannel.ForAddress("test:///test_addr", channelOptions); + + logger.LogInformation("Client connecting."); + await channel.ConnectionManager.ConnectAsync(waitForReady: true, CancellationToken.None); + + logger.LogInformation("Client updating resolver."); + result = ResolverResult.ForResult(new List { new BalancerAddress("localhost", 81) }, serviceConfig: null, serviceConfigStatus: null); + resolver.UpdateResult(result); + + logger.LogInformation("Client picking."); + await ExceptionAssert.ThrowsAsync(async () => await channel.ConnectionManager.PickAsync( + new PickContext(), + waitForReady: false, + CancellationToken.None)); + + logger.LogInformation("Client updating Resolver."); + result = ResolverResult.ForResult(new List { new BalancerAddress("localhost", 82) }, serviceConfig: null, serviceConfigStatus: null); + resolver.UpdateResult(result); + + logger.LogInformation("Client picking and waiting for ready."); + await channel.ConnectionManager.PickAsync( + new PickContext(), + waitForReady: true, + CancellationToken.None); + + // Assert + logger.LogInformation("TryConnectData count: {Count}", tryConnectData.Count); + foreach (var data in tryConnectData) + { + logger.LogInformation("Attempt: {Attempt}, BalancerAddresses: {BalancerAddresses}, IsCancellationRequested: {IsCancellationRequested}", data.Attempt, string.Join(", ", data.BalancerAddresses), data.IsCancellationRequested); + } + + var duplicate = tryConnectData.GroupBy(d => new { Address = d.BalancerAddresses.Single(), d.Attempt }).FirstOrDefault(g => g.Count() >= 2); + if (duplicate != null) + { + Assert.Fail($"Duplicate attempts to address. Count: {duplicate.Count()}, Address: {duplicate.Key.Address}"); + } + } } #endif diff --git a/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs b/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs index c2c488b30..f7bd97e66 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs @@ -163,7 +163,7 @@ public async Task ResolverError_HasFailedSubchannel_SubchannelShutdown() services.AddNUnitLogger(); services.AddSingleton(); services.AddSingleton(); - var transportFactory = new TestSubchannelTransportFactory((s, c) => Task.FromResult(new TryConnectResult(ConnectivityState.TransientFailure))); + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => Task.FromResult(new TryConnectResult(ConnectivityState.TransientFailure))); services.AddSingleton(transportFactory); var serviceProvider = services.BuildServiceProvider(); @@ -221,7 +221,7 @@ public async Task HasSubchannels_SubchannelStatusChanges_RefreshResolver() var connectState = ConnectivityState.Ready; - var transportFactory = new TestSubchannelTransportFactory((s, c) => + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => { logger.LogInformation($"Transport factory returning state: {connectState}"); return Task.FromResult(new TryConnectResult(connectState)); @@ -304,7 +304,7 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged() var connectState = ConnectivityState.Ready; var subChannelConnections = new List(); - var transportFactory = new TestSubchannelTransportFactory((s, c) => + var transportFactory = TestSubchannelTransportFactory.Create((s, c) => { lock (subChannelConnections) { diff --git a/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs b/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs index 487c5672c..01fdd8e3c 100644 --- a/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs +++ b/test/Grpc.Net.Client.Tests/GrpcChannelTests.cs @@ -934,7 +934,7 @@ public async Task ConnectAsync_ShiftThroughStates_CompleteOnReady() var services = new ServiceCollection(); services.AddNUnitLogger(); services.AddSingleton(); - services.AddSingleton(new TestSubchannelTransportFactory(async (s, c) => + services.AddSingleton(TestSubchannelTransportFactory.Create(async (s, c) => { await syncPoint.WaitToContinue(); return new TryConnectResult(currentConnectivityState); @@ -998,7 +998,7 @@ public async Task ConnectAsync_DisposeDuringConnect_ConnectTaskCanceled() var services = new ServiceCollection(); services.AddNUnitLogger(); services.AddSingleton(); - services.AddSingleton(new TestSubchannelTransportFactory(async (s, c) => + services.AddSingleton(TestSubchannelTransportFactory.Create(async (s, c) => { await syncPoint.WaitToContinue(); return new TryConnectResult(currentConnectivityState); diff --git a/test/Grpc.Net.Client.Tests/Infrastructure/Balancer/TestSubChannelTransport.cs b/test/Grpc.Net.Client.Tests/Infrastructure/Balancer/TestSubChannelTransport.cs index 171b9bfc2..25cd8b554 100644 --- a/test/Grpc.Net.Client.Tests/Infrastructure/Balancer/TestSubChannelTransport.cs +++ b/test/Grpc.Net.Client.Tests/Infrastructure/Balancer/TestSubChannelTransport.cs @@ -34,7 +34,7 @@ internal class TestSubchannelTransport : ISubchannelTransport private readonly TaskCompletionSource _connectTcs; private readonly TestSubchannelTransportFactory _factory; - private readonly Func>? _onTryConnect; + private readonly Func>? _onTryConnect; public Subchannel Subchannel { get; } @@ -44,7 +44,7 @@ internal class TestSubchannelTransport : ISubchannelTransport public Task TryConnectTask => _connectTcs.Task; - public TestSubchannelTransport(TestSubchannelTransportFactory factory, Subchannel subchannel, Func>? onTryConnect) + public TestSubchannelTransport(TestSubchannelTransportFactory factory, Subchannel subchannel, Func>? onTryConnect) { _factory = factory; Subchannel = subchannel; @@ -79,9 +79,9 @@ public async #else Task #endif - TryConnectAsync(ConnectContext context) + TryConnectAsync(ConnectContext context, int attempt) { - var (newState, connectResult) = await (_onTryConnect?.Invoke(context.CancellationToken) ?? Task.FromResult(new TryConnectResult(ConnectivityState.Ready))); + var (newState, connectResult) = await (_onTryConnect?.Invoke(attempt, context.CancellationToken) ?? Task.FromResult(new TryConnectResult(ConnectivityState.Ready))); CurrentEndPoint = Subchannel._addresses[0].EndPoint; var newStatus = newState == ConnectivityState.TransientFailure ? new Status(StatusCode.Internal, "") : Status.DefaultSuccess; diff --git a/test/Grpc.Net.Client.Tests/Infrastructure/Balancer/TestSubChannelTransportFactory.cs b/test/Grpc.Net.Client.Tests/Infrastructure/Balancer/TestSubChannelTransportFactory.cs index d9a4ae2d8..47ac87a22 100644 --- a/test/Grpc.Net.Client.Tests/Infrastructure/Balancer/TestSubChannelTransportFactory.cs +++ b/test/Grpc.Net.Client.Tests/Infrastructure/Balancer/TestSubChannelTransportFactory.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -34,22 +34,36 @@ internal record TryConnectResult(ConnectivityState ConnectivityState, ConnectRes internal class TestSubchannelTransportFactory : ISubchannelTransportFactory { - private readonly Func>? _onSubchannelTryConnect; + private readonly Func>? _onSubchannelTryConnect; public List Transports { get; } = new List(); public TimeSpan? ConnectTimeout { get; set; } - public TestSubchannelTransportFactory(Func>? onSubchannelTryConnect = null) + public TestSubchannelTransportFactory() + { + } + + private TestSubchannelTransportFactory(Func>? onSubchannelTryConnect = null) { _onSubchannelTryConnect = onSubchannelTryConnect; } + public static TestSubchannelTransportFactory Create(Func> onSubchannelTryConnect) + { + return new TestSubchannelTransportFactory(onSubchannelTryConnect); + } + + public static TestSubchannelTransportFactory Create(Func> onSubchannelTryConnect) + { + return Create((subchannel, attempt, cancellationToken) => onSubchannelTryConnect(subchannel, cancellationToken)); + } + public ISubchannelTransport Create(Subchannel subchannel) { - Func>? onTryConnect = null; + Func>? onTryConnect = null; if (_onSubchannelTryConnect != null) { - onTryConnect = (c) => _onSubchannelTryConnect(subchannel, c); + onTryConnect = (attempt, c) => _onSubchannelTryConnect(subchannel, attempt, c); } var transport = new TestSubchannelTransport(this, subchannel, onTryConnect); From add886e45b2091343a826a91ba03ccd74cf916f8 Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Wed, 17 Apr 2024 14:24:11 +0800 Subject: [PATCH 06/16] Update Directory.Packages.props (#2413) Co-authored-by: James Newton-King --- Directory.Packages.props | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index 7546b8718..9e663f064 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -6,7 +6,8 @@ 6.0.11 2.58.0 1.6.0 - 1.6.0-beta.2 + 1.8.1 + 1.8.0-beta.1 8.0.0 6.0.0 @@ -50,7 +51,7 @@ - + From c04d01a0acf29c790886da66032d1d8bfa1700e3 Mon Sep 17 00:00:00 2001 From: James Thompson Date: Wed, 17 Apr 2024 17:26:43 +1000 Subject: [PATCH 07/16] #2401 Add new TFM's so package dependency can be removed (#2402) Co-authored-by: James Newton-King --- src/Grpc.HealthCheck/Grpc.HealthCheck.csproj | 10 +- src/Grpc.HealthCheck/Health.cs | 450 ------------------- src/Grpc.HealthCheck/HealthGrpc.cs | 301 ------------- src/Grpc.HealthCheck/HealthServiceImpl.cs | 8 +- src/Grpc.HealthCheck/health.proto | 73 +++ 5 files changed, 84 insertions(+), 758 deletions(-) delete mode 100644 src/Grpc.HealthCheck/Health.cs delete mode 100644 src/Grpc.HealthCheck/HealthGrpc.cs create mode 100644 src/Grpc.HealthCheck/health.proto diff --git a/src/Grpc.HealthCheck/Grpc.HealthCheck.csproj b/src/Grpc.HealthCheck/Grpc.HealthCheck.csproj index f7b92dc1a..f14250984 100755 --- a/src/Grpc.HealthCheck/Grpc.HealthCheck.csproj +++ b/src/Grpc.HealthCheck/Grpc.HealthCheck.csproj @@ -5,7 +5,7 @@ true true - net462;netstandard2.0 + net462;netstandard2.0;net6.0;net7.0;net8.0 README.md @@ -19,9 +19,13 @@ - + + + + - + + diff --git a/src/Grpc.HealthCheck/Health.cs b/src/Grpc.HealthCheck/Health.cs deleted file mode 100644 index 2bc976f1b..000000000 --- a/src/Grpc.HealthCheck/Health.cs +++ /dev/null @@ -1,450 +0,0 @@ -// -// Generated by the protocol buffer compiler. DO NOT EDIT! -// source: grpc/health/v1/health.proto -// -#pragma warning disable 1591, 0612, 3021 -#region Designer generated code - -using pb = global::Google.Protobuf; -using pbc = global::Google.Protobuf.Collections; -using pbr = global::Google.Protobuf.Reflection; -using scg = global::System.Collections.Generic; -namespace Grpc.Health.V1 { - - /// Holder for reflection information generated from grpc/health/v1/health.proto - public static partial class HealthReflection { - - #region Descriptor - /// File descriptor for grpc/health/v1/health.proto - public static pbr::FileDescriptor Descriptor { - get { return descriptor; } - } - private static pbr::FileDescriptor descriptor; - - static HealthReflection() { - byte[] descriptorData = global::System.Convert.FromBase64String( - string.Concat( - "ChtncnBjL2hlYWx0aC92MS9oZWFsdGgucHJvdG8SDmdycGMuaGVhbHRoLnYx", - "IiUKEkhlYWx0aENoZWNrUmVxdWVzdBIPCgdzZXJ2aWNlGAEgASgJIqkBChNI", - "ZWFsdGhDaGVja1Jlc3BvbnNlEkEKBnN0YXR1cxgBIAEoDjIxLmdycGMuaGVh", - "bHRoLnYxLkhlYWx0aENoZWNrUmVzcG9uc2UuU2VydmluZ1N0YXR1cyJPCg1T", - "ZXJ2aW5nU3RhdHVzEgsKB1VOS05PV04QABILCgdTRVJWSU5HEAESDwoLTk9U", - "X1NFUlZJTkcQAhITCg9TRVJWSUNFX1VOS05PV04QAzKuAQoGSGVhbHRoElAK", - "BUNoZWNrEiIuZ3JwYy5oZWFsdGgudjEuSGVhbHRoQ2hlY2tSZXF1ZXN0GiMu", - "Z3JwYy5oZWFsdGgudjEuSGVhbHRoQ2hlY2tSZXNwb25zZRJSCgVXYXRjaBIi", - "LmdycGMuaGVhbHRoLnYxLkhlYWx0aENoZWNrUmVxdWVzdBojLmdycGMuaGVh", - "bHRoLnYxLkhlYWx0aENoZWNrUmVzcG9uc2UwAUJhChFpby5ncnBjLmhlYWx0", - "aC52MUILSGVhbHRoUHJvdG9QAVosZ29vZ2xlLmdvbGFuZy5vcmcvZ3JwYy9o", - "ZWFsdGgvZ3JwY19oZWFsdGhfdjGqAg5HcnBjLkhlYWx0aC5WMWIGcHJvdG8z")); - descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, - new pbr::FileDescriptor[] { }, - new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] { - new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Health.V1.HealthCheckRequest), global::Grpc.Health.V1.HealthCheckRequest.Parser, new[]{ "Service" }, null, null, null, null), - new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Health.V1.HealthCheckResponse), global::Grpc.Health.V1.HealthCheckResponse.Parser, new[]{ "Status" }, null, new[]{ typeof(global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus) }, null, null) - })); - } - #endregion - - } - #region Messages - public sealed partial class HealthCheckRequest : pb::IMessage - #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE - , pb::IBufferMessage - #endif - { - private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new HealthCheckRequest()); - private pb::UnknownFieldSet _unknownFields; - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public static pb::MessageParser Parser { get { return _parser; } } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public static pbr::MessageDescriptor Descriptor { - get { return global::Grpc.Health.V1.HealthReflection.Descriptor.MessageTypes[0]; } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - pbr::MessageDescriptor pb::IMessage.Descriptor { - get { return Descriptor; } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public HealthCheckRequest() { - OnConstruction(); - } - - partial void OnConstruction(); - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public HealthCheckRequest(HealthCheckRequest other) : this() { - service_ = other.service_; - _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public HealthCheckRequest Clone() { - return new HealthCheckRequest(this); - } - - /// Field number for the "service" field. - public const int ServiceFieldNumber = 1; - private string service_ = ""; - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public string Service { - get { return service_; } - set { - service_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); - } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public override bool Equals(object other) { - return Equals(other as HealthCheckRequest); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public bool Equals(HealthCheckRequest other) { - if (ReferenceEquals(other, null)) { - return false; - } - if (ReferenceEquals(other, this)) { - return true; - } - if (Service != other.Service) return false; - return Equals(_unknownFields, other._unknownFields); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public override int GetHashCode() { - int hash = 1; - if (Service.Length != 0) hash ^= Service.GetHashCode(); - if (_unknownFields != null) { - hash ^= _unknownFields.GetHashCode(); - } - return hash; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public override string ToString() { - return pb::JsonFormatter.ToDiagnosticString(this); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public void WriteTo(pb::CodedOutputStream output) { - #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE - output.WriteRawMessage(this); - #else - if (Service.Length != 0) { - output.WriteRawTag(10); - output.WriteString(Service); - } - if (_unknownFields != null) { - _unknownFields.WriteTo(output); - } - #endif - } - - #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) { - if (Service.Length != 0) { - output.WriteRawTag(10); - output.WriteString(Service); - } - if (_unknownFields != null) { - _unknownFields.WriteTo(ref output); - } - } - #endif - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public int CalculateSize() { - int size = 0; - if (Service.Length != 0) { - size += 1 + pb::CodedOutputStream.ComputeStringSize(Service); - } - if (_unknownFields != null) { - size += _unknownFields.CalculateSize(); - } - return size; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public void MergeFrom(HealthCheckRequest other) { - if (other == null) { - return; - } - if (other.Service.Length != 0) { - Service = other.Service; - } - _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public void MergeFrom(pb::CodedInputStream input) { - #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE - input.ReadRawMessage(this); - #else - uint tag; - while ((tag = input.ReadTag()) != 0) { - switch(tag) { - default: - _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input); - break; - case 10: { - Service = input.ReadString(); - break; - } - } - } - #endif - } - - #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) { - uint tag; - while ((tag = input.ReadTag()) != 0) { - switch(tag) { - default: - _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input); - break; - case 10: { - Service = input.ReadString(); - break; - } - } - } - } - #endif - - } - - public sealed partial class HealthCheckResponse : pb::IMessage - #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE - , pb::IBufferMessage - #endif - { - private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new HealthCheckResponse()); - private pb::UnknownFieldSet _unknownFields; - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public static pb::MessageParser Parser { get { return _parser; } } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public static pbr::MessageDescriptor Descriptor { - get { return global::Grpc.Health.V1.HealthReflection.Descriptor.MessageTypes[1]; } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - pbr::MessageDescriptor pb::IMessage.Descriptor { - get { return Descriptor; } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public HealthCheckResponse() { - OnConstruction(); - } - - partial void OnConstruction(); - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public HealthCheckResponse(HealthCheckResponse other) : this() { - status_ = other.status_; - _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public HealthCheckResponse Clone() { - return new HealthCheckResponse(this); - } - - /// Field number for the "status" field. - public const int StatusFieldNumber = 1; - private global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus status_ = global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.Unknown; - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus Status { - get { return status_; } - set { - status_ = value; - } - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public override bool Equals(object other) { - return Equals(other as HealthCheckResponse); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public bool Equals(HealthCheckResponse other) { - if (ReferenceEquals(other, null)) { - return false; - } - if (ReferenceEquals(other, this)) { - return true; - } - if (Status != other.Status) return false; - return Equals(_unknownFields, other._unknownFields); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public override int GetHashCode() { - int hash = 1; - if (Status != global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.Unknown) hash ^= Status.GetHashCode(); - if (_unknownFields != null) { - hash ^= _unknownFields.GetHashCode(); - } - return hash; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public override string ToString() { - return pb::JsonFormatter.ToDiagnosticString(this); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public void WriteTo(pb::CodedOutputStream output) { - #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE - output.WriteRawMessage(this); - #else - if (Status != global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.Unknown) { - output.WriteRawTag(8); - output.WriteEnum((int) Status); - } - if (_unknownFields != null) { - _unknownFields.WriteTo(output); - } - #endif - } - - #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) { - if (Status != global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.Unknown) { - output.WriteRawTag(8); - output.WriteEnum((int) Status); - } - if (_unknownFields != null) { - _unknownFields.WriteTo(ref output); - } - } - #endif - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public int CalculateSize() { - int size = 0; - if (Status != global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.Unknown) { - size += 1 + pb::CodedOutputStream.ComputeEnumSize((int) Status); - } - if (_unknownFields != null) { - size += _unknownFields.CalculateSize(); - } - return size; - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public void MergeFrom(HealthCheckResponse other) { - if (other == null) { - return; - } - if (other.Status != global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.Unknown) { - Status = other.Status; - } - _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields); - } - - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public void MergeFrom(pb::CodedInputStream input) { - #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE - input.ReadRawMessage(this); - #else - uint tag; - while ((tag = input.ReadTag()) != 0) { - switch(tag) { - default: - _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input); - break; - case 8: { - Status = (global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus) input.ReadEnum(); - break; - } - } - } - #endif - } - - #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) { - uint tag; - while ((tag = input.ReadTag()) != 0) { - switch(tag) { - default: - _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input); - break; - case 8: { - Status = (global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus) input.ReadEnum(); - break; - } - } - } - } - #endif - - #region Nested types - /// Container for nested types declared in the HealthCheckResponse message type. - [global::System.Diagnostics.DebuggerNonUserCodeAttribute] - [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] - public static partial class Types { - public enum ServingStatus { - [pbr::OriginalName("UNKNOWN")] Unknown = 0, - [pbr::OriginalName("SERVING")] Serving = 1, - [pbr::OriginalName("NOT_SERVING")] NotServing = 2, - /// - /// Used only by the Watch method. - /// - [pbr::OriginalName("SERVICE_UNKNOWN")] ServiceUnknown = 3, - } - - } - #endregion - - } - - #endregion - -} - -#endregion Designer generated code diff --git a/src/Grpc.HealthCheck/HealthGrpc.cs b/src/Grpc.HealthCheck/HealthGrpc.cs deleted file mode 100644 index 1984f6e66..000000000 --- a/src/Grpc.HealthCheck/HealthGrpc.cs +++ /dev/null @@ -1,301 +0,0 @@ -// -// Generated by the protocol buffer compiler. DO NOT EDIT! -// source: grpc/health/v1/health.proto -// -// Original file comments: -// Copyright 2015 The gRPC Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// The canonical version of this proto can be found at -// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto -// -#pragma warning disable 0414, 1591 -#region Designer generated code - -using grpc = global::Grpc.Core; - -namespace Grpc.Health.V1 { - public static partial class Health - { - static readonly string __ServiceName = "grpc.health.v1.Health"; - - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - static void __Helper_SerializeMessage(global::Google.Protobuf.IMessage message, grpc::SerializationContext context) - { - #if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION - if (message is global::Google.Protobuf.IBufferMessage) - { - context.SetPayloadLength(message.CalculateSize()); - global::Google.Protobuf.MessageExtensions.WriteTo(message, context.GetBufferWriter()); - context.Complete(); - return; - } - #endif - context.Complete(global::Google.Protobuf.MessageExtensions.ToByteArray(message)); - } - - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - static class __Helper_MessageCache - { - public static readonly bool IsBufferMessage = global::System.Reflection.IntrospectionExtensions.GetTypeInfo(typeof(global::Google.Protobuf.IBufferMessage)).IsAssignableFrom(typeof(T)); - } - - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - static T __Helper_DeserializeMessage(grpc::DeserializationContext context, global::Google.Protobuf.MessageParser parser) where T : global::Google.Protobuf.IMessage - { - #if !GRPC_DISABLE_PROTOBUF_BUFFER_SERIALIZATION - if (__Helper_MessageCache.IsBufferMessage) - { - return parser.ParseFrom(context.PayloadAsReadOnlySequence()); - } - #endif - return parser.ParseFrom(context.PayloadAsNewBuffer()); - } - - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - static readonly grpc::Marshaller __Marshaller_grpc_health_v1_HealthCheckRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::Grpc.Health.V1.HealthCheckRequest.Parser)); - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - static readonly grpc::Marshaller __Marshaller_grpc_health_v1_HealthCheckResponse = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::Grpc.Health.V1.HealthCheckResponse.Parser)); - - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - static readonly grpc::Method __Method_Check = new grpc::Method( - grpc::MethodType.Unary, - __ServiceName, - "Check", - __Marshaller_grpc_health_v1_HealthCheckRequest, - __Marshaller_grpc_health_v1_HealthCheckResponse); - - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - static readonly grpc::Method __Method_Watch = new grpc::Method( - grpc::MethodType.ServerStreaming, - __ServiceName, - "Watch", - __Marshaller_grpc_health_v1_HealthCheckRequest, - __Marshaller_grpc_health_v1_HealthCheckResponse); - - /// Service descriptor - public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor - { - get { return global::Grpc.Health.V1.HealthReflection.Descriptor.Services[0]; } - } - - /// Base class for server-side implementations of Health - [grpc::BindServiceMethod(typeof(Health), "BindService")] - public abstract partial class HealthBase - { - /// - /// If the requested service is unknown, the call will fail with status - /// NOT_FOUND. - /// - /// The request received from the client. - /// The context of the server-side call handler being invoked. - /// The response to send back to the client (wrapped by a task). - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public virtual global::System.Threading.Tasks.Task Check(global::Grpc.Health.V1.HealthCheckRequest request, grpc::ServerCallContext context) - { - throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); - } - - /// - /// Performs a watch for the serving status of the requested service. - /// The server will immediately send back a message indicating the current - /// serving status. It will then subsequently send a new message whenever - /// the service's serving status changes. - /// - /// If the requested service is unknown when the call is received, the - /// server will send a message setting the serving status to - /// SERVICE_UNKNOWN but will *not* terminate the call. If at some - /// future point, the serving status of the service becomes known, the - /// server will send a new message with the service's serving status. - /// - /// If the call terminates with status UNIMPLEMENTED, then clients - /// should assume this method is not supported and should not retry the - /// call. If the call terminates with any other status (including OK), - /// clients should retry the call with appropriate exponential backoff. - /// - /// The request received from the client. - /// Used for sending responses back to the client. - /// The context of the server-side call handler being invoked. - /// A task indicating completion of the handler. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public virtual global::System.Threading.Tasks.Task Watch(global::Grpc.Health.V1.HealthCheckRequest request, grpc::IServerStreamWriter responseStream, grpc::ServerCallContext context) - { - throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); - } - - } - - /// Client for Health - public partial class HealthClient : grpc::ClientBase - { - /// Creates a new client for Health - /// The channel to use to make remote calls. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public HealthClient(grpc::ChannelBase channel) : base(channel) - { - } - /// Creates a new client for Health that uses a custom CallInvoker. - /// The callInvoker to use to make remote calls. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public HealthClient(grpc::CallInvoker callInvoker) : base(callInvoker) - { - } - /// Protected parameterless constructor to allow creation of test doubles. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - protected HealthClient() : base() - { - } - /// Protected constructor to allow creation of configured clients. - /// The client configuration. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - protected HealthClient(ClientBaseConfiguration configuration) : base(configuration) - { - } - - /// - /// If the requested service is unknown, the call will fail with status - /// NOT_FOUND. - /// - /// The request to send to the server. - /// The initial metadata to send with the call. This parameter is optional. - /// An optional deadline for the call. The call will be cancelled if deadline is hit. - /// An optional token for canceling the call. - /// The response received from the server. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public virtual global::Grpc.Health.V1.HealthCheckResponse Check(global::Grpc.Health.V1.HealthCheckRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) - { - return Check(request, new grpc::CallOptions(headers, deadline, cancellationToken)); - } - /// - /// If the requested service is unknown, the call will fail with status - /// NOT_FOUND. - /// - /// The request to send to the server. - /// The options for the call. - /// The response received from the server. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public virtual global::Grpc.Health.V1.HealthCheckResponse Check(global::Grpc.Health.V1.HealthCheckRequest request, grpc::CallOptions options) - { - return CallInvoker.BlockingUnaryCall(__Method_Check, null, options, request); - } - /// - /// If the requested service is unknown, the call will fail with status - /// NOT_FOUND. - /// - /// The request to send to the server. - /// The initial metadata to send with the call. This parameter is optional. - /// An optional deadline for the call. The call will be cancelled if deadline is hit. - /// An optional token for canceling the call. - /// The call object. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public virtual grpc::AsyncUnaryCall CheckAsync(global::Grpc.Health.V1.HealthCheckRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) - { - return CheckAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken)); - } - /// - /// If the requested service is unknown, the call will fail with status - /// NOT_FOUND. - /// - /// The request to send to the server. - /// The options for the call. - /// The call object. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public virtual grpc::AsyncUnaryCall CheckAsync(global::Grpc.Health.V1.HealthCheckRequest request, grpc::CallOptions options) - { - return CallInvoker.AsyncUnaryCall(__Method_Check, null, options, request); - } - /// - /// Performs a watch for the serving status of the requested service. - /// The server will immediately send back a message indicating the current - /// serving status. It will then subsequently send a new message whenever - /// the service's serving status changes. - /// - /// If the requested service is unknown when the call is received, the - /// server will send a message setting the serving status to - /// SERVICE_UNKNOWN but will *not* terminate the call. If at some - /// future point, the serving status of the service becomes known, the - /// server will send a new message with the service's serving status. - /// - /// If the call terminates with status UNIMPLEMENTED, then clients - /// should assume this method is not supported and should not retry the - /// call. If the call terminates with any other status (including OK), - /// clients should retry the call with appropriate exponential backoff. - /// - /// The request to send to the server. - /// The initial metadata to send with the call. This parameter is optional. - /// An optional deadline for the call. The call will be cancelled if deadline is hit. - /// An optional token for canceling the call. - /// The call object. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public virtual grpc::AsyncServerStreamingCall Watch(global::Grpc.Health.V1.HealthCheckRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) - { - return Watch(request, new grpc::CallOptions(headers, deadline, cancellationToken)); - } - /// - /// Performs a watch for the serving status of the requested service. - /// The server will immediately send back a message indicating the current - /// serving status. It will then subsequently send a new message whenever - /// the service's serving status changes. - /// - /// If the requested service is unknown when the call is received, the - /// server will send a message setting the serving status to - /// SERVICE_UNKNOWN but will *not* terminate the call. If at some - /// future point, the serving status of the service becomes known, the - /// server will send a new message with the service's serving status. - /// - /// If the call terminates with status UNIMPLEMENTED, then clients - /// should assume this method is not supported and should not retry the - /// call. If the call terminates with any other status (including OK), - /// clients should retry the call with appropriate exponential backoff. - /// - /// The request to send to the server. - /// The options for the call. - /// The call object. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public virtual grpc::AsyncServerStreamingCall Watch(global::Grpc.Health.V1.HealthCheckRequest request, grpc::CallOptions options) - { - return CallInvoker.AsyncServerStreamingCall(__Method_Watch, null, options, request); - } - /// Creates a new instance of client from given ClientBaseConfiguration. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - protected override HealthClient NewInstance(ClientBaseConfiguration configuration) - { - return new HealthClient(configuration); - } - } - - /// Creates service definition that can be registered with a server - /// An object implementing the server-side handling logic. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public static grpc::ServerServiceDefinition BindService(HealthBase serviceImpl) - { - return grpc::ServerServiceDefinition.CreateBuilder() - .AddMethod(__Method_Check, serviceImpl.Check) - .AddMethod(__Method_Watch, serviceImpl.Watch).Build(); - } - - /// Register service method with a service binder with or without implementation. Useful when customizing the service binding logic. - /// Note: this method is part of an experimental API that can change or be removed without any prior notice. - /// Service methods will be bound by calling AddMethod on this object. - /// An object implementing the server-side handling logic. - [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] - public static void BindService(grpc::ServiceBinderBase serviceBinder, HealthBase serviceImpl) - { - serviceBinder.AddMethod(__Method_Check, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.Check)); - serviceBinder.AddMethod(__Method_Watch, serviceImpl == null ? null : new grpc::ServerStreamingServerMethod(serviceImpl.Watch)); - } - - } -} -#endregion diff --git a/src/Grpc.HealthCheck/HealthServiceImpl.cs b/src/Grpc.HealthCheck/HealthServiceImpl.cs index c465cdaf7..80003ee68 100644 --- a/src/Grpc.HealthCheck/HealthServiceImpl.cs +++ b/src/Grpc.HealthCheck/HealthServiceImpl.cs @@ -157,7 +157,7 @@ public override async Task Watch(HealthCheckRequest request, IServerStreamWriter lock (watchersLock) { - if (!watchers.TryGetValue(service, out List> channelWriters)) + if (!watchers.TryGetValue(service, out List>? channelWriters)) { channelWriters = new List>(); watchers.Add(service, channelWriters); @@ -170,7 +170,7 @@ public override async Task Watch(HealthCheckRequest request, IServerStreamWriter context.CancellationToken.Register(() => { lock (watchersLock) { - if (watchers.TryGetValue(service, out List> channelWriters)) + if (watchers.TryGetValue(service, out List>? channelWriters)) { // Remove the writer from the watchers if (channelWriters.Remove(channel.Writer)) @@ -196,7 +196,7 @@ public override async Task Watch(HealthCheckRequest request, IServerStreamWriter // Loop will exit when the call is canceled and the writer is marked as complete. while (await channel.Reader.WaitToReadAsync().ConfigureAwait(false)) { - if (channel.Reader.TryRead(out HealthCheckResponse item)) + if (channel.Reader.TryRead(out HealthCheckResponse? item)) { await responseStream.WriteAsync(item).ConfigureAwait(false); } @@ -207,7 +207,7 @@ private void NotifyStatus(string service, HealthCheckResponse.Types.ServingStatu { lock (watchersLock) { - if (watchers.TryGetValue(service, out List> channelWriters)) + if (watchers.TryGetValue(service, out List>? channelWriters)) { HealthCheckResponse response = new HealthCheckResponse { Status = status }; diff --git a/src/Grpc.HealthCheck/health.proto b/src/Grpc.HealthCheck/health.proto new file mode 100644 index 000000000..13b03f567 --- /dev/null +++ b/src/Grpc.HealthCheck/health.proto @@ -0,0 +1,73 @@ +// Copyright 2015 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The canonical version of this proto can be found at +// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto + +syntax = "proto3"; + +package grpc.health.v1; + +option csharp_namespace = "Grpc.Health.V1"; +option go_package = "google.golang.org/grpc/health/grpc_health_v1"; +option java_multiple_files = true; +option java_outer_classname = "HealthProto"; +option java_package = "io.grpc.health.v1"; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; // Used only by the Watch method. + } + ServingStatus status = 1; +} + +// Health is gRPC's mechanism for checking whether a server is able to handle +// RPCs. Its semantics are documented in +// https://github.com/grpc/grpc/blob/master/doc/health-checking.md. +service Health { + // Check gets the health of the specified service. If the requested service + // is unknown, the call will fail with status NOT_FOUND. If the caller does + // not specify a service name, the server should respond with its overall + // health status. + // + // Clients should set a deadline when calling Check, and can declare the + // server unhealthy if they do not receive a timely response. + // + // Check implementations should be idempotent and side effect free. + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + + // Performs a watch for the serving status of the requested service. + // The server will immediately send back a message indicating the current + // serving status. It will then subsequently send a new message whenever + // the service's serving status changes. + // + // If the requested service is unknown when the call is received, the + // server will send a message setting the serving status to + // SERVICE_UNKNOWN but will *not* terminate the call. If at some + // future point, the serving status of the service becomes known, the + // server will send a new message with the service's serving status. + // + // If the call terminates with status UNIMPLEMENTED, then clients + // should assume this method is not supported and should not retry the + // call. If the call terminates with any other status (including OK), + // clients should retry the call with appropriate exponential backoff. + rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); +} From 9cd97cebcadb8f1a355d494e1f0fb122000f1ebf Mon Sep 17 00:00:00 2001 From: Weihan Li Date: Wed, 17 Apr 2024 15:27:33 +0800 Subject: [PATCH 08/16] support `ReadAllAsync` for netstandard2.0 (#2411) Co-authored-by: James Newton-King --- Directory.Packages.props | 1 + src/Grpc.Net.Common/AsyncStreamReaderExtensions.cs | 4 ---- src/Grpc.Net.Common/Grpc.Net.Common.csproj | 4 ++++ test/Grpc.Net.Client.Tests/Grpc.Net.Client.Tests.csproj | 2 ++ test/Grpc.Net.Client.Tests/ReadAllAsyncTests.cs | 4 ---- .../Grpc.Net.Client.Web.Tests.csproj | 2 ++ test/Shared/TaskExtensions.cs | 4 +--- 7 files changed, 10 insertions(+), 11 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index 9e663f064..7ae59277d 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -60,6 +60,7 @@ + diff --git a/src/Grpc.Net.Common/AsyncStreamReaderExtensions.cs b/src/Grpc.Net.Common/AsyncStreamReaderExtensions.cs index ae21e07fa..0fec0c5a5 100644 --- a/src/Grpc.Net.Common/AsyncStreamReaderExtensions.cs +++ b/src/Grpc.Net.Common/AsyncStreamReaderExtensions.cs @@ -16,8 +16,6 @@ #endregion -#if !NETSTANDARD2_0 && !NET462 - using System.Runtime.CompilerServices; using Grpc.Shared; @@ -50,5 +48,3 @@ private static async IAsyncEnumerable ReadAllAsyncCore(IAsyncStreamReader< } } } - -#endif diff --git a/src/Grpc.Net.Common/Grpc.Net.Common.csproj b/src/Grpc.Net.Common/Grpc.Net.Common.csproj index c1301091d..55581ce2a 100644 --- a/src/Grpc.Net.Common/Grpc.Net.Common.csproj +++ b/src/Grpc.Net.Common/Grpc.Net.Common.csproj @@ -16,4 +16,8 @@ + + + + diff --git a/test/Grpc.Net.Client.Tests/Grpc.Net.Client.Tests.csproj b/test/Grpc.Net.Client.Tests/Grpc.Net.Client.Tests.csproj index adc6008ca..f612232dc 100644 --- a/test/Grpc.Net.Client.Tests/Grpc.Net.Client.Tests.csproj +++ b/test/Grpc.Net.Client.Tests/Grpc.Net.Client.Tests.csproj @@ -46,6 +46,8 @@ + + diff --git a/test/Grpc.Net.Client.Tests/ReadAllAsyncTests.cs b/test/Grpc.Net.Client.Tests/ReadAllAsyncTests.cs index 1e50b55a0..fff586587 100644 --- a/test/Grpc.Net.Client.Tests/ReadAllAsyncTests.cs +++ b/test/Grpc.Net.Client.Tests/ReadAllAsyncTests.cs @@ -23,8 +23,6 @@ using Grpc.Tests.Shared; using NUnit.Framework; -#if !NET462 - namespace Grpc.Net.Client.Tests; [TestFixture] @@ -275,5 +273,3 @@ public async Task MoveNextAsync_CancelCall_ThrowOperationCanceledOnCancellation_ Assert.AreEqual(StatusCode.Cancelled, call.GetStatus().StatusCode); } } - -#endif diff --git a/test/Grpc.Net.Client.Web.Tests/Grpc.Net.Client.Web.Tests.csproj b/test/Grpc.Net.Client.Web.Tests/Grpc.Net.Client.Web.Tests.csproj index 4acd98fbe..4a28bef2c 100644 --- a/test/Grpc.Net.Client.Web.Tests/Grpc.Net.Client.Web.Tests.csproj +++ b/test/Grpc.Net.Client.Web.Tests/Grpc.Net.Client.Web.Tests.csproj @@ -21,6 +21,8 @@ + + diff --git a/test/Shared/TaskExtensions.cs b/test/Shared/TaskExtensions.cs index 948323a80..f14729d46 100644 --- a/test/Shared/TaskExtensions.cs +++ b/test/Shared/TaskExtensions.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -102,7 +102,6 @@ private static string CreateMessage(TimeSpan timeout, string? filePath, int line ? $"The operation timed out after reaching the limit of {timeout.TotalMilliseconds}ms." : $"The operation at {filePath}:{lineNumber} timed out after reaching the limit of {timeout.TotalMilliseconds}ms."; -#if !NET462 public static IAsyncEnumerable DefaultTimeout(this IAsyncEnumerable enumerable, [CallerFilePath] string? filePath = null, [CallerLineNumber] int lineNumber = default) @@ -169,5 +168,4 @@ public ValueTask MoveNextAsync() return new ValueTask(_enumerator.MoveNextAsync().AsTask().TimeoutAfter(_timeout, _filePath, _lineNumber)); } } -#endif } From 2a36215697ced47e5b2beb73d2f5887dd4f51384 Mon Sep 17 00:00:00 2001 From: Drew Noakes Date: Wed, 17 Apr 2024 17:44:04 +1000 Subject: [PATCH 09/16] Fix ObjectDisposedException message (#2415) --- src/Grpc.Net.Client/GrpcChannel.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Grpc.Net.Client/GrpcChannel.cs b/src/Grpc.Net.Client/GrpcChannel.cs index a65ce166b..0c4cea722 100644 --- a/src/Grpc.Net.Client/GrpcChannel.cs +++ b/src/Grpc.Net.Client/GrpcChannel.cs @@ -545,7 +545,7 @@ internal void RegisterActiveCall(IDisposable grpcCall) { // Test the disposed flag inside the lock to ensure there is no chance of a race and adding a call after dispose. // Note that a GrpcCall has been created but hasn't been started. The error will prevent it from starting. - ObjectDisposedThrowHelper.ThrowIf(Disposed, nameof(GrpcChannel)); + ObjectDisposedThrowHelper.ThrowIf(Disposed, typeof(GrpcChannel)); _activeCalls.Add(grpcCall); } From c9c902c8daa54d2bd47a638df6a9d47fc5cae5d7 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Fri, 19 Apr 2024 06:53:25 +0800 Subject: [PATCH 10/16] Enable multiple connections with WinHttpHandler by default (#2416) --- src/Shared/HttpHandlerFactory.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Shared/HttpHandlerFactory.cs b/src/Shared/HttpHandlerFactory.cs index bafc8b36f..6d0661c7c 100644 --- a/src/Shared/HttpHandlerFactory.cs +++ b/src/Shared/HttpHandlerFactory.cs @@ -38,7 +38,12 @@ public static HttpMessageHandler CreatePrimaryHandler() #endif #if NET462 - return new WinHttpHandler(); + // Create WinHttpHandler with EnableMultipleHttp2Connections set to true. That will + // allow a gRPC channel to create new connections if the maximum allow concurrency is exceeded. + return new WinHttpHandler + { + EnableMultipleHttp2Connections = true + }; #elif !NETSTANDARD2_0 return new HttpClientHandler(); #else From 2d9df5892c61da09d20a0c1bc11d56c170ff0fa2 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Mon, 29 Apr 2024 20:37:31 +0800 Subject: [PATCH 11/16] Fix memory leak when using call context propagation with cancellation token (#2421) --- .../ContextPropagationInterceptor.cs | 63 ++++++-- .../DefaultGrpcClientFactoryTests.cs | 152 +++++++++++++++++- 2 files changed, 204 insertions(+), 11 deletions(-) diff --git a/src/Grpc.AspNetCore.Server.ClientFactory/ContextPropagationInterceptor.cs b/src/Grpc.AspNetCore.Server.ClientFactory/ContextPropagationInterceptor.cs index fa6a2d3ef..d2baa257b 100644 --- a/src/Grpc.AspNetCore.Server.ClientFactory/ContextPropagationInterceptor.cs +++ b/src/Grpc.AspNetCore.Server.ClientFactory/ContextPropagationInterceptor.cs @@ -22,7 +22,6 @@ using Grpc.Core.Interceptors; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; namespace Grpc.AspNetCore.ClientFactory; @@ -53,14 +52,15 @@ public override AsyncClientStreamingCall AsyncClientStreami } else { + var state = CreateContextState(call, cts); return new AsyncClientStreamingCall( requestStream: call.RequestStream, - responseAsync: call.ResponseAsync, + responseAsync: OnResponseAsync(call.ResponseAsync, state), responseHeadersAsync: ClientStreamingCallbacks.GetResponseHeadersAsync, getStatusFunc: ClientStreamingCallbacks.GetStatus, getTrailersFunc: ClientStreamingCallbacks.GetTrailers, disposeAction: ClientStreamingCallbacks.Dispose, - CreateContextState(call, cts)); + state); } } @@ -73,14 +73,15 @@ public override AsyncDuplexStreamingCall AsyncDuplexStreami } else { + var state = CreateContextState(call, cts); return new AsyncDuplexStreamingCall( requestStream: call.RequestStream, - responseStream: call.ResponseStream, + responseStream: new ResponseStreamWrapper(call.ResponseStream, state), responseHeadersAsync: DuplexStreamingCallbacks.GetResponseHeadersAsync, getStatusFunc: DuplexStreamingCallbacks.GetStatus, getTrailersFunc: DuplexStreamingCallbacks.GetTrailers, disposeAction: DuplexStreamingCallbacks.Dispose, - CreateContextState(call, cts)); + state); } } @@ -93,13 +94,14 @@ public override AsyncServerStreamingCall AsyncServerStreamingCall( - responseStream: call.ResponseStream, + responseStream: new ResponseStreamWrapper(call.ResponseStream, state), responseHeadersAsync: ServerStreamingCallbacks.GetResponseHeadersAsync, getStatusFunc: ServerStreamingCallbacks.GetStatus, getTrailersFunc: ServerStreamingCallbacks.GetTrailers, disposeAction: ServerStreamingCallbacks.Dispose, - CreateContextState(call, cts)); + state); } } @@ -112,13 +114,14 @@ public override AsyncUnaryCall AsyncUnaryCall(TR } else { + var state = CreateContextState(call, cts); return new AsyncUnaryCall( - responseAsync: call.ResponseAsync, + responseAsync: OnResponseAsync(call.ResponseAsync, state), responseHeadersAsync: UnaryCallbacks.GetResponseHeadersAsync, getStatusFunc: UnaryCallbacks.GetStatus, getTrailersFunc: UnaryCallbacks.GetTrailers, disposeAction: UnaryCallbacks.Dispose, - CreateContextState(call, cts)); + state); } } @@ -129,6 +132,19 @@ public override TResponse BlockingUnaryCall(TRequest reques return response; } + // Automatically dispose state after awaiting the response. + private static async Task OnResponseAsync(Task task, IDisposable state) + { + try + { + return await task.ConfigureAwait(false); + } + finally + { + state.Dispose(); + } + } + private ClientInterceptorContext ConfigureContext(ClientInterceptorContext context, out CancellationTokenSource? linkedCts) where TRequest : class where TResponse : class @@ -197,7 +213,7 @@ private bool TryGetServerCallContext([NotNullWhen(true)] out ServerCallContext? private ContextState CreateContextState(TCall call, CancellationTokenSource cancellationTokenSource) where TCall : IDisposable => new ContextState(call, cancellationTokenSource); - private class ContextState : IDisposable where TCall : IDisposable + private sealed class ContextState : IDisposable where TCall : IDisposable { public ContextState(TCall call, CancellationTokenSource cancellationTokenSource) { @@ -215,6 +231,33 @@ public void Dispose() } } + // Automatically dispose state after reading to the end of the stream. + private sealed class ResponseStreamWrapper : IAsyncStreamReader + { + private readonly IAsyncStreamReader _inner; + private readonly IDisposable _state; + private bool _disposed; + + public ResponseStreamWrapper(IAsyncStreamReader inner, IDisposable state) + { + _inner = inner; + _state = state; + } + + public TResponse Current => _inner.Current; + + public async Task MoveNext(CancellationToken cancellationToken) + { + var result = await _inner.MoveNext(cancellationToken); + if (!result && !_disposed) + { + _state.Dispose(); + _disposed = true; + } + return result; + } + } + private static class Log { private static readonly Action _propagateServerCallContextFailure = diff --git a/test/Grpc.AspNetCore.Server.ClientFactory.Tests/DefaultGrpcClientFactoryTests.cs b/test/Grpc.AspNetCore.Server.ClientFactory.Tests/DefaultGrpcClientFactoryTests.cs index 8ed16eb7a..e6c22b041 100644 --- a/test/Grpc.AspNetCore.Server.ClientFactory.Tests/DefaultGrpcClientFactoryTests.cs +++ b/test/Grpc.AspNetCore.Server.ClientFactory.Tests/DefaultGrpcClientFactoryTests.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -20,6 +20,7 @@ using Greet; using Grpc.AspNetCore.Server.ClientFactory.Tests.TestObjects; using Grpc.Core; +using Grpc.Core.Interceptors; using Grpc.Net.ClientFactory; using Grpc.Net.ClientFactory.Internal; using Grpc.Tests.Shared; @@ -91,6 +92,155 @@ public async Task CreateClient_ServerCallContextHasValues_PropogatedDeadlineAndC Assert.AreEqual(cancellationToken, options.CancellationToken); } + [Test] + public async Task CreateClient_Unary_ServerCallContextHasValues_StateDisposed() + { + // Arrange + var baseAddress = new Uri("http://localhost"); + var deadline = DateTime.UtcNow.AddDays(1); + var cancellationToken = new CancellationTokenSource().Token; + + var interceptor = new OnDisposedInterceptor(); + + var services = new ServiceCollection(); + services.AddOptions(); + services.AddSingleton(CreateHttpContextAccessorWithServerCallContext(deadline: deadline, cancellationToken: cancellationToken)); + services + .AddGrpcClient(o => + { + o.Address = baseAddress; + }) + .EnableCallContextPropagation() + .AddInterceptor(() => interceptor) + .ConfigurePrimaryHttpMessageHandler(() => ClientTestHelpers.CreateTestMessageHandler(new HelloReply())); + + var serviceProvider = services.BuildServiceProvider(validateScopes: true); + + var clientFactory = CreateGrpcClientFactory(serviceProvider); + var client = clientFactory.CreateClient(nameof(Greeter.GreeterClient)); + + // Checking that token register calls don't build up on CTS and create a memory leak. + var cts = new CancellationTokenSource(); + + // Act + // Send calls in a different method so there is no chance that a stack reference + // to a gRPC call is still alive after calls are complete. + var response = await client.SayHelloAsync(new HelloRequest(), cancellationToken: cts.Token); + + // Assert + Assert.IsTrue(interceptor.ContextDisposed); + } + + [Test] + public async Task CreateClient_ServerStreaming_ServerCallContextHasValues_StateDisposed() + { + // Arrange + var baseAddress = new Uri("http://localhost"); + var deadline = DateTime.UtcNow.AddDays(1); + var cancellationToken = new CancellationTokenSource().Token; + + var interceptor = new OnDisposedInterceptor(); + + var services = new ServiceCollection(); + services.AddOptions(); + services.AddSingleton(CreateHttpContextAccessorWithServerCallContext(deadline: deadline, cancellationToken: cancellationToken)); + services + .AddGrpcClient(o => + { + o.Address = baseAddress; + }) + .EnableCallContextPropagation() + .AddInterceptor(() => interceptor) + .ConfigurePrimaryHttpMessageHandler(() => ClientTestHelpers.CreateTestMessageHandler(new HelloReply())); + + var serviceProvider = services.BuildServiceProvider(validateScopes: true); + + var clientFactory = CreateGrpcClientFactory(serviceProvider); + var client = clientFactory.CreateClient(nameof(Greeter.GreeterClient)); + + // Checking that token register calls don't build up on CTS and create a memory leak. + var cts = new CancellationTokenSource(); + + // Act + // Send calls in a different method so there is no chance that a stack reference + // to a gRPC call is still alive after calls are complete. + var call = client.SayHellos(new HelloRequest(), cancellationToken: cts.Token); + + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + + // Assert + Assert.IsTrue(interceptor.ContextDisposed); + } + + private sealed class OnDisposedInterceptor : Interceptor + { + public bool ContextDisposed { get; private set; } + + public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) + { + return continuation(request, context); + } + + public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) + { + var call = continuation(request, context); + return new AsyncUnaryCall(call.ResponseAsync, + call.ResponseHeadersAsync, + call.GetStatus, + call.GetTrailers, + () => + { + call.Dispose(); + ContextDisposed = true; + }); + } + + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) + { + var call = continuation(request, context); + return new AsyncServerStreamingCall(call.ResponseStream, + call.ResponseHeadersAsync, + call.GetStatus, + call.GetTrailers, + () => + { + call.Dispose(); + ContextDisposed = true; + }); + } + + public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) + { + var call = continuation(context); + return new AsyncClientStreamingCall(call.RequestStream, + call.ResponseAsync, + call.ResponseHeadersAsync, + call.GetStatus, + call.GetTrailers, + () => + { + call.Dispose(); + ContextDisposed = true; + }); + } + + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) + { + var call = continuation(context); + return new AsyncDuplexStreamingCall(call.RequestStream, + call.ResponseStream, + call.ResponseHeadersAsync, + call.GetStatus, + call.GetTrailers, + () => + { + call.Dispose(); + ContextDisposed = true; + }); + } + } + [TestCase(Canceller.Context)] [TestCase(Canceller.User)] public async Task CreateClient_ServerCallContextAndUserCancellationToken_PropogatedDeadlineAndCancellation(Canceller canceller) From 8199f6642240f6925a731efb060c8009637d3e17 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Mon, 29 Apr 2024 20:37:40 +0800 Subject: [PATCH 12/16] Fix HTTP/3 test errors on .NET 6 (#2423) --- test/FunctionalTests/Client/ClientFactoryTests.cs | 4 ++-- test/FunctionalTests/Client/ConnectionTests.cs | 2 +- test/FunctionalTests/Web/Client/AuthTests.cs | 6 +++--- test/FunctionalTests/Web/Client/ClientFactoryTests.cs | 6 +++--- test/FunctionalTests/Web/Client/ConnectionTests.cs | 2 +- test/FunctionalTests/Web/Client/IssueTests.cs | 6 +++--- .../Web/Client/ServerStreamingMethodTests.cs | 6 +++--- test/FunctionalTests/Web/Client/TrailerMetadataTests.cs | 8 ++++---- test/FunctionalTests/Web/Client/UnaryMethodTests.cs | 6 +++--- test/FunctionalTests/Web/Server/DeadlineTests.cs | 6 +++--- test/FunctionalTests/Web/Server/UnaryMethodTests.cs | 6 +++--- 11 files changed, 29 insertions(+), 29 deletions(-) diff --git a/test/FunctionalTests/Client/ClientFactoryTests.cs b/test/FunctionalTests/Client/ClientFactoryTests.cs index c07fb18e4..59f0991c8 100644 --- a/test/FunctionalTests/Client/ClientFactoryTests.cs +++ b/test/FunctionalTests/Client/ClientFactoryTests.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -81,7 +81,7 @@ Task UnaryCall(HelloRequest request, ServerCallContext context) Assert.AreEqual("Hello world", response2.Message); } -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [Test] [RequireHttp3] public async Task ClientFactory_Http3_Success() diff --git a/test/FunctionalTests/Client/ConnectionTests.cs b/test/FunctionalTests/Client/ConnectionTests.cs index cfb1f31c9..a5c118577 100644 --- a/test/FunctionalTests/Client/ConnectionTests.cs +++ b/test/FunctionalTests/Client/ConnectionTests.cs @@ -98,7 +98,7 @@ Task UnaryUds(HelloRequest request, ServerCallContext context) } #endif -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [Test] [RequireHttp3] public async Task Http3() diff --git a/test/FunctionalTests/Web/Client/AuthTests.cs b/test/FunctionalTests/Web/Client/AuthTests.cs index 8f9b9c38d..2a7836b71 100644 --- a/test/FunctionalTests/Web/Client/AuthTests.cs +++ b/test/FunctionalTests/Web/Client/AuthTests.cs @@ -28,16 +28,16 @@ namespace Grpc.AspNetCore.FunctionalTests.Web.Client; [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http3WithTls)] #endif public class AuthTests : GrpcWebFunctionalTestBase diff --git a/test/FunctionalTests/Web/Client/ClientFactoryTests.cs b/test/FunctionalTests/Web/Client/ClientFactoryTests.cs index b3f18201e..241c25cd4 100644 --- a/test/FunctionalTests/Web/Client/ClientFactoryTests.cs +++ b/test/FunctionalTests/Web/Client/ClientFactoryTests.cs @@ -28,16 +28,16 @@ namespace Grpc.AspNetCore.FunctionalTests.Web.Client; [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http3WithTls)] #endif public class ClientFactoryTests : GrpcWebFunctionalTestBase diff --git a/test/FunctionalTests/Web/Client/ConnectionTests.cs b/test/FunctionalTests/Web/Client/ConnectionTests.cs index 477a3b353..84135c57b 100644 --- a/test/FunctionalTests/Web/Client/ConnectionTests.cs +++ b/test/FunctionalTests/Web/Client/ConnectionTests.cs @@ -72,7 +72,7 @@ private GrpcChannel CreateGrpcWebChannel(TestServerEndpointName endpointName, Ve [TestCase(TestServerEndpointName.Http2WithTls, "1.1", false)] #endif [TestCase(TestServerEndpointName.Http2WithTls, null, true)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestCase(TestServerEndpointName.Http3WithTls, null, true)] #endif public async Task SendValidRequest_WithConnectionOptions(TestServerEndpointName endpointName, string? version, bool success) diff --git a/test/FunctionalTests/Web/Client/IssueTests.cs b/test/FunctionalTests/Web/Client/IssueTests.cs index 4fbd524d6..da5a586d5 100644 --- a/test/FunctionalTests/Web/Client/IssueTests.cs +++ b/test/FunctionalTests/Web/Client/IssueTests.cs @@ -27,16 +27,16 @@ namespace Grpc.AspNetCore.FunctionalTests.Web.Client; [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http3WithTls)] #endif public class IssueTests : GrpcWebFunctionalTestBase diff --git a/test/FunctionalTests/Web/Client/ServerStreamingMethodTests.cs b/test/FunctionalTests/Web/Client/ServerStreamingMethodTests.cs index 3e7994c45..97575d2a4 100644 --- a/test/FunctionalTests/Web/Client/ServerStreamingMethodTests.cs +++ b/test/FunctionalTests/Web/Client/ServerStreamingMethodTests.cs @@ -29,16 +29,16 @@ namespace Grpc.AspNetCore.FunctionalTests.Web.Client; [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http3WithTls)] #endif public class ServerStreamingMethodTests : GrpcWebFunctionalTestBase diff --git a/test/FunctionalTests/Web/Client/TrailerMetadataTests.cs b/test/FunctionalTests/Web/Client/TrailerMetadataTests.cs index f7382031e..94593937d 100644 --- a/test/FunctionalTests/Web/Client/TrailerMetadataTests.cs +++ b/test/FunctionalTests/Web/Client/TrailerMetadataTests.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -26,16 +26,16 @@ namespace Grpc.AspNetCore.FunctionalTests.Web.Client; [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http3WithTls)] #endif public class TrailerMetadataTests : GrpcWebFunctionalTestBase diff --git a/test/FunctionalTests/Web/Client/UnaryMethodTests.cs b/test/FunctionalTests/Web/Client/UnaryMethodTests.cs index e6dfbcbd0..2d5852af5 100644 --- a/test/FunctionalTests/Web/Client/UnaryMethodTests.cs +++ b/test/FunctionalTests/Web/Client/UnaryMethodTests.cs @@ -26,16 +26,16 @@ namespace Grpc.AspNetCore.FunctionalTests.Web.Client; [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http3WithTls)] #endif public class UnaryMethodTests : GrpcWebFunctionalTestBase diff --git a/test/FunctionalTests/Web/Server/DeadlineTests.cs b/test/FunctionalTests/Web/Server/DeadlineTests.cs index 7fa2f7f5c..da0f44c4e 100644 --- a/test/FunctionalTests/Web/Server/DeadlineTests.cs +++ b/test/FunctionalTests/Web/Server/DeadlineTests.cs @@ -28,16 +28,16 @@ namespace Grpc.AspNetCore.FunctionalTests.Web.Server; [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http3WithTls)] #endif public class DeadlineTests : GrpcWebFunctionalTestBase diff --git a/test/FunctionalTests/Web/Server/UnaryMethodTests.cs b/test/FunctionalTests/Web/Server/UnaryMethodTests.cs index b6c9ef7c2..71180bd51 100644 --- a/test/FunctionalTests/Web/Server/UnaryMethodTests.cs +++ b/test/FunctionalTests/Web/Server/UnaryMethodTests.cs @@ -27,16 +27,16 @@ namespace Grpc.AspNetCore.FunctionalTests.Web.Server; [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWeb, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http1)] [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.GrpcWebText, TestServerEndpointName.Http3WithTls)] #endif [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http2)] -#if NET6_0_OR_GREATER +#if NET7_0_OR_GREATER [TestFixture(GrpcTestMode.Grpc, TestServerEndpointName.Http3WithTls)] #endif public class UnaryMethodTests : GrpcWebFunctionalTestBase From 63914f2653eeb3961b74cc5613df11520e10cf75 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 30 Apr 2024 18:38:27 +0800 Subject: [PATCH 13/16] Add semaphore to limit subchannel connect to prevent race conditions (#2422) --- src/Grpc.Net.Client/Balancer/Subchannel.cs | 109 +++++++++++++----- .../Balancer/ConnectionTests.cs | 5 +- .../Balancer/PickFirstBalancerTests.cs | 39 +++++++ 3 files changed, 126 insertions(+), 27 deletions(-) diff --git a/src/Grpc.Net.Client/Balancer/Subchannel.cs b/src/Grpc.Net.Client/Balancer/Subchannel.cs index 4209af875..1bbc22ad3 100644 --- a/src/Grpc.Net.Client/Balancer/Subchannel.cs +++ b/src/Grpc.Net.Client/Balancer/Subchannel.cs @@ -54,6 +54,7 @@ public sealed class Subchannel : IDisposable internal readonly ConnectionManager _manager; private readonly ILogger _logger; + private readonly SemaphoreSlim _connectSemaphore; private ISubchannelTransport _transport = default!; private ConnectContext? _connectContext; @@ -89,6 +90,7 @@ internal Subchannel(ConnectionManager manager, IReadOnlyList ad { Lock = new object(); _logger = manager.LoggerFactory.CreateLogger(GetType()); + _connectSemaphore = new SemaphoreSlim(1); Id = manager.GetNextId(); _addresses = addresses.ToList(); @@ -213,7 +215,10 @@ public void UpdateAddresses(IReadOnlyList addresses) if (requireReconnect) { - CancelInProgressConnect(); + lock (Lock) + { + CancelInProgressConnectUnsynchronized(); + } _transport.Disconnect(); RequestConnection(); } @@ -268,43 +273,76 @@ public void RequestConnection() } } - private void CancelInProgressConnect() + private void CancelInProgressConnectUnsynchronized() { - lock (Lock) - { - if (_connectContext != null && !_connectContext.Disposed) - { - SubchannelLog.CancelingConnect(_logger, Id); + Debug.Assert(Monitor.IsEntered(Lock)); - // Cancel connect cancellation token. - _connectContext.CancelConnect(); - _connectContext.Dispose(); - } + if (_connectContext != null && !_connectContext.Disposed) + { + SubchannelLog.CancelingConnect(_logger, Id); - _delayInterruptTcs?.TrySetResult(null); + // Cancel connect cancellation token. + _connectContext.CancelConnect(); + _connectContext.Dispose(); } + + _delayInterruptTcs?.TrySetResult(null); } - private ConnectContext GetConnectContext() + private ConnectContext GetConnectContextUnsynchronized() { - lock (Lock) - { - // There shouldn't be a previous connect in progress, but cancel the CTS to ensure they're no longer running. - CancelInProgressConnect(); + Debug.Assert(Monitor.IsEntered(Lock)); - var connectContext = _connectContext = new ConnectContext(_transport.ConnectTimeout ?? Timeout.InfiniteTimeSpan); - return connectContext; - } + // There shouldn't be a previous connect in progress, but cancel the CTS to ensure they're no longer running. + CancelInProgressConnectUnsynchronized(); + + var connectContext = _connectContext = new ConnectContext(_transport.ConnectTimeout ?? Timeout.InfiniteTimeSpan); + return connectContext; } private async Task ConnectTransportAsync() { - var connectContext = GetConnectContext(); + ConnectContext connectContext; + Task? waitSemaporeTask = null; + lock (Lock) + { + // Don't start connecting if the subchannel has been shutdown. Transport/semaphore will be disposed if shutdown. + if (_state == ConnectivityState.Shutdown) + { + return; + } + + connectContext = GetConnectContextUnsynchronized(); + + // Use a semaphore to limit one connection attempt at a time. This is done to prevent a race conditional where a canceled connect + // overwrites the status of a successful connect. + // + // Try to get semaphore without waiting. If semaphore is already taken then start a task to wait for it to be released. + // Start this inside a lock to make sure subchannel isn't shutdown before waiting for semaphore. + if (!_connectSemaphore.Wait(0)) + { + SubchannelLog.QueuingConnect(_logger, Id); + waitSemaporeTask = _connectSemaphore.WaitAsync(connectContext.CancellationToken); + } + } - var backoffPolicy = _manager.BackoffPolicyFactory.Create(); + if (waitSemaporeTask != null) + { + try + { + await waitSemaporeTask.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Canceled while waiting for semaphore. + return; + } + } try { + var backoffPolicy = _manager.BackoffPolicyFactory.Create(); + SubchannelLog.ConnectingTransport(_logger, Id); for (var attempt = 0; ; attempt++) @@ -384,6 +422,13 @@ private async Task ConnectTransportAsync() // Dispose context because it might have been created with a connect timeout. // Want to clean up the connect timeout timer. connectContext.Dispose(); + + // Subchannel could have been disposed while connect is running. + // If subchannel is shutting down then don't release semaphore to avoid ObjectDisposedException. + if (_state != ConnectivityState.Shutdown) + { + _connectSemaphore.Release(); + } } } } @@ -482,8 +527,12 @@ public void Dispose() } _stateChangedRegistrations.Clear(); - CancelInProgressConnect(); - _transport.Dispose(); + lock (Lock) + { + CancelInProgressConnectUnsynchronized(); + _transport.Dispose(); + _connectSemaphore.Dispose(); + } } } @@ -505,7 +554,7 @@ internal static class SubchannelLog LoggerMessage.Define(LogLevel.Debug, new EventId(5, "ConnectionRequestedInNonIdleState"), "Subchannel id '{SubchannelId}' connection requested in non-idle state of {State}."); private static readonly Action _connectingTransport = - LoggerMessage.Define(LogLevel.Trace, new EventId(6, "ConnectingTransport"), "Subchannel id '{SubchannelId}' connecting to transport."); + LoggerMessage.Define(LogLevel.Debug, new EventId(6, "ConnectingTransport"), "Subchannel id '{SubchannelId}' connecting to transport."); private static readonly Action _startingConnectBackoff = LoggerMessage.Define(LogLevel.Trace, new EventId(7, "StartingConnectBackoff"), "Subchannel id '{SubchannelId}' starting connect backoff of {BackoffDuration}."); @@ -514,7 +563,7 @@ internal static class SubchannelLog LoggerMessage.Define(LogLevel.Trace, new EventId(8, "ConnectBackoffInterrupted"), "Subchannel id '{SubchannelId}' connect backoff interrupted."); private static readonly Action _connectCanceled = - LoggerMessage.Define(LogLevel.Trace, new EventId(9, "ConnectCanceled"), "Subchannel id '{SubchannelId}' connect canceled."); + LoggerMessage.Define(LogLevel.Debug, new EventId(9, "ConnectCanceled"), "Subchannel id '{SubchannelId}' connect canceled."); private static readonly Action _connectError = LoggerMessage.Define(LogLevel.Error, new EventId(10, "ConnectError"), "Subchannel id '{SubchannelId}' unexpected error while connecting to transport."); @@ -546,6 +595,9 @@ internal static class SubchannelLog private static readonly Action _addressesUpdated = LoggerMessage.Define(LogLevel.Trace, new EventId(19, "AddressesUpdated"), "Subchannel id '{SubchannelId}' updated with addresses: {Addresses}"); + private static readonly Action _queuingConnect = + LoggerMessage.Define(LogLevel.Debug, new EventId(20, "QueuingConnect"), "Subchannel id '{SubchannelId}' queuing connect because a connect is already in progress."); + public static void SubchannelCreated(ILogger logger, string subchannelId, IReadOnlyList addresses) { if (logger.IsEnabled(LogLevel.Debug)) @@ -648,5 +700,10 @@ public static void AddressesUpdated(ILogger logger, string subchannelId, IReadOn _addressesUpdated(logger, subchannelId, addressesText, null); } } + + public static void QueuingConnect(ILogger logger, string subchannelId) + { + _queuingConnect(logger, subchannelId, null); + } } #endif diff --git a/test/FunctionalTests/Balancer/ConnectionTests.cs b/test/FunctionalTests/Balancer/ConnectionTests.cs index b89a1772c..1d82786a2 100644 --- a/test/FunctionalTests/Balancer/ConnectionTests.cs +++ b/test/FunctionalTests/Balancer/ConnectionTests.cs @@ -93,8 +93,11 @@ async Task UnaryMethod(HelloRequest request, ServerCallContext conte var client = TestClientFactory.Create(channel, endpoint.Method); + // Act var ex = await ExceptionAssert.ThrowsAsync(() => client.UnaryCall(new HelloRequest()).ResponseAsync).DefaultTimeout(); Assert.AreEqual("A connection could not be established within the configured ConnectTimeout.", ex.Status.DebugException!.Message); + + await ExceptionAssert.ThrowsAsync(() => connectTcs.Task).DefaultTimeout(); } [Test] @@ -167,7 +170,7 @@ Task UnaryMethod(HelloRequest request, ServerCallContext context) connectionIdleTimeout: connectionIdleTimeout).DefaultTimeout(); Logger.LogInformation("Connecting channel."); - await channel.ConnectAsync(); + await channel.ConnectAsync().DefaultTimeout(); // Wait for timeout plus a little extra to avoid issues from imprecise timers. await Task.Delay(connectionIdleTimeout + TimeSpan.FromMilliseconds(50)); diff --git a/test/FunctionalTests/Balancer/PickFirstBalancerTests.cs b/test/FunctionalTests/Balancer/PickFirstBalancerTests.cs index d257214c0..438133060 100644 --- a/test/FunctionalTests/Balancer/PickFirstBalancerTests.cs +++ b/test/FunctionalTests/Balancer/PickFirstBalancerTests.cs @@ -61,6 +61,45 @@ private GrpcChannel CreateGrpcWebChannel(TestServerEndpointName endpointName, Ve return channel; } + [Test] + public async Task UnaryCall_CallAfterConnectionTimeout_Success() + { + // Ignore errors + SetExpectedErrorsFilter(writeContext => + { + return true; + }); + + string? host = null; + Task UnaryMethod(HelloRequest request, ServerCallContext context) + { + host = context.Host; + return Task.FromResult(new HelloReply { Message = request.Name }); + } + + // Arrange + using var endpoint = BalancerHelpers.CreateGrpcEndpoint(50051, UnaryMethod, nameof(UnaryMethod)); + + var connectCount = 0; + var channel = await BalancerHelpers.CreateChannel(LoggerFactory, new PickFirstConfig(), new[] { endpoint.Address }, connectTimeout: TimeSpan.FromMilliseconds(200), socketConnect: + async (socket, endpoint, cancellationToken) => + { + if (Interlocked.Increment(ref connectCount) == 1) + { + await Task.Delay(1000, cancellationToken); + } + await socket.ConnectAsync(endpoint, cancellationToken); + }).DefaultTimeout(); + var client = TestClientFactory.Create(channel, endpoint.Method); + + // Assert + var ex = await ExceptionAssert.ThrowsAsync(() => client.UnaryCall(new HelloRequest { Name = "Balancer" }).ResponseAsync).DefaultTimeout(); + Assert.AreEqual(StatusCode.Unavailable, ex.StatusCode); + Assert.IsInstanceOf(typeof(TimeoutException), ex.InnerException); + + await client.UnaryCall(new HelloRequest { Name = "Balancer" }).ResponseAsync.DefaultTimeout(); + } + [Test] public async Task UnaryCall_CallAfterCancellation_Success() { From c80f4594d0bb69194bc765b2c20e7f2e6d177530 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Fri, 3 May 2024 07:06:05 +0800 Subject: [PATCH 14/16] Don't capture async locals in resolver (#2426) --- .../Balancer/PollingResolver.cs | 33 +++++++++--- src/Grpc.Net.Client/Balancer/Subchannel.cs | 2 +- src/Shared/NonCapturingTimer.cs | 2 +- .../Balancer/ResolverTests.cs | 53 +++++++++++++++++++ 4 files changed, 81 insertions(+), 9 deletions(-) diff --git a/src/Grpc.Net.Client/Balancer/PollingResolver.cs b/src/Grpc.Net.Client/Balancer/PollingResolver.cs index 0d4ced710..bbec7b675 100644 --- a/src/Grpc.Net.Client/Balancer/PollingResolver.cs +++ b/src/Grpc.Net.Client/Balancer/PollingResolver.cs @@ -135,14 +135,33 @@ public sealed override void Refresh() if (_resolveTask.IsCompleted) { - // Run ResolveAsync in a background task. - // This is done to prevent synchronous block inside ResolveAsync from blocking future Refresh calls. - _resolveTask = Task.Run(() => ResolveNowAsync(_cts.Token), _cts.Token); - _resolveTask.ContinueWith(static (t, state) => + // Don't capture the current ExecutionContext and its AsyncLocals onto the connect + var restoreFlow = false; + try + { + if (!ExecutionContext.IsFlowSuppressed()) + { + ExecutionContext.SuppressFlow(); + restoreFlow = true; + } + + // Run ResolveAsync in a background task. + // This is done to prevent synchronous block inside ResolveAsync from blocking future Refresh calls. + _resolveTask = Task.Run(() => ResolveNowAsync(_cts.Token), _cts.Token); + _resolveTask.ContinueWith(static (t, state) => + { + var pollingResolver = (PollingResolver)state!; + Log.ResolveTaskCompleted(pollingResolver._logger, pollingResolver.GetType()); + }, this); + } + finally { - var pollingResolver = (PollingResolver)state!; - Log.ResolveTaskCompleted(pollingResolver._logger, pollingResolver.GetType()); - }, this); + // Restore the current ExecutionContext + if (restoreFlow) + { + ExecutionContext.RestoreFlow(); + } + } } else { diff --git a/src/Grpc.Net.Client/Balancer/Subchannel.cs b/src/Grpc.Net.Client/Balancer/Subchannel.cs index 1bbc22ad3..0f163de9d 100644 --- a/src/Grpc.Net.Client/Balancer/Subchannel.cs +++ b/src/Grpc.Net.Client/Balancer/Subchannel.cs @@ -257,7 +257,7 @@ public void RequestConnection() } // Don't capture the current ExecutionContext and its AsyncLocals onto the connect - bool restoreFlow = false; + var restoreFlow = false; if (!ExecutionContext.IsFlowSuppressed()) { ExecutionContext.SuppressFlow(); diff --git a/src/Shared/NonCapturingTimer.cs b/src/Shared/NonCapturingTimer.cs index 674333969..e957c20ba 100644 --- a/src/Shared/NonCapturingTimer.cs +++ b/src/Shared/NonCapturingTimer.cs @@ -13,7 +13,7 @@ public static Timer Create(TimerCallback callback, object? state, TimeSpan dueTi ArgumentNullThrowHelper.ThrowIfNull(callback); // Don't capture the current ExecutionContext and its AsyncLocals onto the timer - bool restoreFlow = false; + var restoreFlow = false; try { if (!ExecutionContext.IsFlowSuppressed()) diff --git a/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs b/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs index bb6e3d1eb..03af800f3 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs @@ -111,6 +111,59 @@ protected override Task ResolveAsync(CancellationToken cancellationToken) } } + [Test] + public async Task Refresh_AsyncLocal_NotCaptured() + { + // Arrange + var services = new ServiceCollection(); + services.AddNUnitLogger(); + var loggerFactory = services.BuildServiceProvider().GetRequiredService(); + + var asyncLocal = new AsyncLocal(); + asyncLocal.Value = new object(); + + var callbackAsyncLocalValues = new List(); + + var resolver = new CallbackPollingResolver(loggerFactory, new TestBackoffPolicyFactory(TimeSpan.FromMilliseconds(100)), (listener) => + { + callbackAsyncLocalValues.Add(asyncLocal.Value); + if (callbackAsyncLocalValues.Count >= 2) + { + listener(ResolverResult.ForResult(new List())); + } + + return Task.CompletedTask; + }); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + resolver.Start(result => tcs.TrySetResult(result)); + + // Act + resolver.Refresh(); + + // Assert + await tcs.Task.DefaultTimeout(); + + Assert.AreEqual(2, callbackAsyncLocalValues.Count); + Assert.IsNull(callbackAsyncLocalValues[0]); + Assert.IsNull(callbackAsyncLocalValues[1]); + } + + private class CallbackPollingResolver : PollingResolver + { + private readonly Func, Task> _callback; + + public CallbackPollingResolver(ILoggerFactory loggerFactory, IBackoffPolicyFactory backoffPolicyFactory, Func, Task> callback) : base(loggerFactory, backoffPolicyFactory) + { + _callback = callback; + } + + protected override Task ResolveAsync(CancellationToken cancellationToken) + { + return _callback(Listener); + } + } + [Test] public async Task Resolver_ResolveNameFromServices_Success() { From 989420e7090d0b7162bf571d1ca0b5f3a8916897 Mon Sep 17 00:00:00 2001 From: apolcyn Date: Tue, 7 May 2024 16:18:49 -0700 Subject: [PATCH 15/16] Update Grpc.Tools to 2.63 (#2429) --- Directory.Packages.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index 7ae59277d..c0b910e66 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -31,7 +31,7 @@ - + From 48aeb802b09c24f8ee98c976be8292a1d115dee6 Mon Sep 17 00:00:00 2001 From: apolcyn Date: Wed, 8 May 2024 12:41:35 -0700 Subject: [PATCH 16/16] Bump 2.63.x branch to 2.63.0-pre1 (#2430) * bump to 2.63.pre1 * finish update --- build/version.props | 4 ++-- src/Grpc.Core.Api/VersionInfo.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/build/version.props b/build/version.props index efa01a749..67d14f290 100644 --- a/build/version.props +++ b/build/version.props @@ -2,13 +2,13 @@ - 2.61.0-dev + 2.63.0-pre1 2.0.0.0 - 2.61.0.0 + 2.63.0.0 diff --git a/src/Grpc.Core.Api/VersionInfo.cs b/src/Grpc.Core.Api/VersionInfo.cs index c82a96783..428ac67c6 100644 --- a/src/Grpc.Core.Api/VersionInfo.cs +++ b/src/Grpc.Core.Api/VersionInfo.cs @@ -36,10 +36,10 @@ public static class VersionInfo /// /// Current AssemblyFileVersion of gRPC C# assemblies /// - public const string CurrentAssemblyFileVersion = "2.61.0.0"; + public const string CurrentAssemblyFileVersion = "2.63.0.0"; /// /// Current version of gRPC C# /// - public const string CurrentVersion = "2.61.0-dev"; + public const string CurrentVersion = "2.63.0-pre1"; }