Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/NATS.Client.Services/NatsSvcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,18 @@ public async ValueTask StopAsync(CancellationToken cancellationToken = default)
}

// Drain buffers
await _nats.PingAsync(cancellationToken);
// Ping only when connection is open, otherwise ping hangs waiting for reconnection.
// This is problematic when stopping a service during connection issues
// or if the server is stopped especially in a test environment.
if (_nats.ConnectionState == NatsConnectionState.Open)
{
// We're stopping, so timeout quickly in case connection state changes
// which means we don't care about flushing any buffers anymore
// it's just a best effort attempt
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, cancellationToken);
linkedCts.CancelAfter(_nats.Opts.ConnectTimeout);
await _nats.PingAsync(linkedCts.Token).ConfigureAwait(false);
}

foreach (var ep in _endPoints.Values)
{
Expand Down
41 changes: 41 additions & 0 deletions tests/NATS.Client.Services.Tests/ServicesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using NATS.Client.Platform.Windows.Tests;
using NATS.Client.Services.Internal;
using NATS.Client.Services.Models;
using NATS.Client.TestUtilities2;

namespace NATS.Client.Services.Tests;

Expand Down Expand Up @@ -481,6 +482,46 @@ await s2.AddEndpointAsync<string>(
}
}

[Fact]
public async Task Service_dispose_should_not_hang_when_connection_is_not_open()
{
await using var server = await NatsServerProcess.StartAsync();
await using var nats = new NatsConnection(new NatsOpts { Url = server.Url });
await nats.ConnectRetryAsync();
var svc = new NatsSvcContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(120));
var cancellationToken = cts.Token;

// Service stoppes and disposed normally when connection is open
var s1 = await svc.AddServiceAsync($"s1", "1.0.0", cancellationToken: cancellationToken);
await s1.StopAsync(cancellationToken);
await s1.DisposeAsync();

// Service stop/dispose should not hang when connection is closed
var s2 = await svc.AddServiceAsync($"s2", "1.0.0", cancellationToken: cancellationToken);

// Stop the server to close the connection
await server.StopAsync();

// Check that StopAsync does not hang
var task = s2.StopAsync(cancellationToken).AsTask();
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(60), cancellationToken);
await Task.WhenAny(task, timeoutTask).ContinueWith(
completedTask =>
{
if (completedTask.Result == timeoutTask)
{
throw new Exception("Service hanged!");
}

return task;
},
cancellationToken).Unwrap();

await s2.DisposeAsync();
}

[Theory]
[InlineData("foo bar")]
[InlineData("foo\tbar")]
Expand Down
Loading