diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs
index 1acf6d674..6b5ed4fd7 100644
--- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs
+++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs
@@ -117,7 +117,12 @@ public ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, i
}
else
{
- _logger.LogWarning(NatsLogEvents.Subscription, "Can\'t find subscription for {Subject}/{Sid}", subject, sid);
+ // This can happen when a subscription is removed before the message is received. (there may be multiple messages in flight)
+ // Main scenarios are:
+ // (1) Request/Reply pattern where the request is sent and the reply is received after the subscription was removed due to a timeout.
+ // (2) JetStream consumer subscription was removed due to a timeout or cancellation before all the messages were received.
+ // Note this log level was set to warning before, however, it is not an error condition and can happen in normal operation, so debug is more appropriate.
+ _logger.LogDebug(NatsLogEvents.Subscription, "Can\'t find subscription for {Subject}/{Sid}", subject, sid);
}
}
diff --git a/src/NATS.Client.Core/NatsSubBase.cs b/src/NATS.Client.Core/NatsSubBase.cs
index 521266ff0..829c0feb3 100644
--- a/src/NATS.Client.Core/NatsSubBase.cs
+++ b/src/NATS.Client.Core/NatsSubBase.cs
@@ -285,6 +285,15 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea
// Need to await to handle any exceptions
await ReceiveInternalAsync(subject, replyTo, headersBuffer, payloadBuffer).ConfigureAwait(false);
}
+ catch (TaskCanceledException)
+ {
+ // If there are cancellations, we don't want to throw an exception.
+ // These can happen if the subscription is disposed or unsubscribed while
+ // processing the message as part of a normal flow.
+ }
+ catch (OperationCanceledException)
+ {
+ }
catch (ChannelClosedException)
{
// When user disposes or unsubscribes there maybe be messages still coming in
diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
index c075e63a7..2d28a81f0 100644
--- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
+++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
@@ -323,6 +323,18 @@ private async Task CommandLoop()
_logger.LogError(NatsKVLogEvents.Internal, "Internal error: unexpected command {Command}", subCommand);
}
}
+ catch (ChannelClosedException)
+ {
+ throw;
+ }
+ catch (TaskCanceledException)
+ {
+ throw;
+ }
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
catch (Exception e)
{
_logger.LogWarning(NatsKVLogEvents.Internal, e, "Command error");
@@ -330,6 +342,12 @@ private async Task CommandLoop()
}
}
}
+ catch (ChannelClosedException)
+ {
+ }
+ catch (TaskCanceledException)
+ {
+ }
catch (OperationCanceledException)
{
}
diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs
index 83712287d..2552f12ea 100644
--- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs
+++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs
@@ -292,4 +292,10 @@ public class NatsKVOpts
public static readonly NatsKVOpts Default = new();
public bool UseDirectGetApiWithKeysInSubject { get; init; }
+
+ ///
+ /// Indicates whether the Watch operation should throw an exception if the cancellation token is triggered.
+ /// Default is true.
+ ///
+ public bool WatcherThrowOnCancellation { get; init; } = true;
}
diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs
index c3b0d09e1..d9eb4d65a 100644
--- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs
+++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs
@@ -1,6 +1,7 @@
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
+using System.Threading.Channels;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
@@ -462,9 +463,41 @@ public async IAsyncEnumerable> WatchAsync(IEnumerable
}
}
- await foreach (var entry in watcher.Entries.ReadAllAsync(cancellationToken).ConfigureAwait(false))
+ while (true)
{
- yield return entry;
+ bool waitToReadAsync;
+
+ if (_opts.WatcherThrowOnCancellation)
+ {
+ waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ try
+ {
+ waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (ChannelClosedException)
+ {
+ break;
+ }
+ catch (TaskCanceledException)
+ {
+ break;
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ }
+
+ if (!waitToReadAsync)
+ break;
+
+ while (watcher.Entries.TryRead(out var msg))
+ {
+ yield return msg;
+ }
}
}
@@ -488,11 +521,43 @@ public async IAsyncEnumerable> HistoryAsync(string key, INatsD
await using var watcher = await WatchInternalAsync([key], serializer, opts, cancellationToken);
- await foreach (var entry in watcher.Entries.ReadAllAsync(cancellationToken).ConfigureAwait(false))
+ while (true)
{
- yield return entry;
- if (entry.Delta == 0)
- yield break;
+ bool waitToReadAsync;
+
+ if (_opts.WatcherThrowOnCancellation)
+ {
+ waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ try
+ {
+ waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (ChannelClosedException)
+ {
+ break;
+ }
+ catch (TaskCanceledException)
+ {
+ break;
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ }
+
+ if (!waitToReadAsync)
+ break;
+
+ while (watcher.Entries.TryRead(out var entry))
+ {
+ yield return entry;
+ if (entry.Delta == 0)
+ yield break;
+ }
}
}
@@ -527,12 +592,44 @@ public async ValueTask PurgeDeletesAsync(NatsKVPurgeOpts? opts = default, Cancel
if (watcher.InitialConsumer.Info.NumPending == 0)
return;
- await foreach (var entry in watcher.Entries.ReadAllAsync(cancellationToken).ConfigureAwait(false))
+ while (true)
{
- if (entry.Operation is NatsKVOperation.Purge or NatsKVOperation.Del)
- deleted.Add(entry);
- if (entry.Delta == 0)
- goto PURGE_LOOP_DONE;
+ bool waitToReadAsync;
+
+ if (_opts.WatcherThrowOnCancellation)
+ {
+ waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ try
+ {
+ waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (ChannelClosedException)
+ {
+ break;
+ }
+ catch (TaskCanceledException)
+ {
+ break;
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ }
+
+ if (!waitToReadAsync)
+ break;
+
+ while (watcher.Entries.TryRead(out var entry))
+ {
+ if (entry.Operation is NatsKVOperation.Purge or NatsKVOperation.Del)
+ deleted.Add(entry);
+ if (entry.Delta == 0)
+ goto PURGE_LOOP_DONE;
+ }
}
}
@@ -572,12 +669,44 @@ public async IAsyncEnumerable GetKeysAsync(IEnumerable filters,
if (watcher.InitialConsumer.Info.NumPending == 0)
yield break;
- await foreach (var entry in watcher.Entries.ReadAllAsync(cancellationToken).ConfigureAwait(false))
+ while (true)
{
- if (entry.Operation is NatsKVOperation.Put)
- yield return entry.Key;
- if (entry.Delta == 0)
- yield break;
+ bool waitToReadAsync;
+
+ if (_opts.WatcherThrowOnCancellation)
+ {
+ waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ else
+ {
+ try
+ {
+ waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (ChannelClosedException)
+ {
+ break;
+ }
+ catch (TaskCanceledException)
+ {
+ break;
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ }
+
+ if (!waitToReadAsync)
+ break;
+
+ while (watcher.Entries.TryRead(out var entry))
+ {
+ if (entry.Operation is NatsKVOperation.Put)
+ yield return entry.Key;
+ if (entry.Delta == 0)
+ yield break;
+ }
}
}
diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs
index cac612846..35b02ac5a 100644
--- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs
+++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs
@@ -727,4 +727,48 @@ public async Task ReadAfterDelete()
// Should be no results here.
Assert.False(results.Any());
}
+
+ [Fact]
+ public async Task Watcher_cancellation_no_warn_logs()
+ {
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ var cancellationToken = cts.Token;
+
+ var loggerFactory = new InMemoryTestLoggerFactory(LogLevel.Information, log =>
+ {
+ _output.WriteLine($"LOG:{log.LogLevel}: {log.Message}");
+ });
+
+ await using var nats1 = new NatsConnection(new NatsOpts
+ {
+ Url = _server.Url,
+ LoggerFactory = loggerFactory,
+ });
+ var prefix = _server.GetNextId();
+ await nats1.ConnectRetryAsync();
+ var js1 = new NatsJSContext(nats1);
+ var kv1 = new NatsKVContext(js1, new NatsKVOpts
+ {
+ WatcherThrowOnCancellation = false,
+ });
+ var s = await kv1.CreateStoreAsync($"{prefix}b1", cancellationToken);
+
+ var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+ await foreach (var e in s.WatchAsync(cancellationToken: cts2.Token).ConfigureAwait(false))
+ {
+ }
+
+ await Assert.ThrowsAsync(async () =>
+ {
+ var kv2 = new NatsKVContext(js1, new NatsKVOpts
+ {
+ // WatcherThrowOnCancellation = true, // Default
+ });
+ var s2 = await kv2.CreateStoreAsync($"{prefix}b1", cancellationToken);
+ var cts3 = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+ await foreach (var e in s2.WatchAsync(cancellationToken: cts3.Token).ConfigureAwait(false))
+ {
+ }
+ });
+ }
}