Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ public ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, i
}
else
{
_logger.LogWarning(NatsLogEvents.Subscription, "Can\'t find subscription for {Subject}/{Sid}", subject, sid);
// This can happen when a subscription is removed before the message is received. (there may be multiple messages in flight)
// Main scenarios are:
// (1) Request/Reply pattern where the request is sent and the reply is received after the subscription was removed due to a timeout.
// (2) JetStream consumer subscription was removed due to a timeout or cancellation before all the messages were received.
// Note this log level was set to warning before, however, it is not an error condition and can happen in normal operation, so debug is more appropriate.
_logger.LogDebug(NatsLogEvents.Subscription, "Can\'t find subscription for {Subject}/{Sid}", subject, sid);
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea
// Need to await to handle any exceptions
await ReceiveInternalAsync(subject, replyTo, headersBuffer, payloadBuffer).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// If there are cancellations, we don't want to throw an exception.
// These can happen if the subscription is disposed or unsubscribed while
// processing the message as part of a normal flow.
}
catch (OperationCanceledException)
{
}
catch (ChannelClosedException)
{
// When user disposes or unsubscribes there maybe be messages still coming in
Expand Down
18 changes: 18 additions & 0 deletions src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,31 @@ private async Task CommandLoop()
_logger.LogError(NatsKVLogEvents.Internal, "Internal error: unexpected command {Command}", subCommand);
}
}
catch (ChannelClosedException)
{
throw;
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception e)
{
_logger.LogWarning(NatsKVLogEvents.Internal, e, "Command error");
}
}
}
}
catch (ChannelClosedException)
{
}
catch (TaskCanceledException)
{
}
catch (OperationCanceledException)
{
}
Expand Down
6 changes: 6 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,4 +292,10 @@ public class NatsKVOpts
public static readonly NatsKVOpts Default = new();

public bool UseDirectGetApiWithKeysInSubject { get; init; }

/// <summary>
/// Indicates whether the Watch operation should throw an exception if the cancellation token is triggered.
/// Default is true.
/// </summary>
public bool WatcherThrowOnCancellation { get; init; } = true;
}
161 changes: 145 additions & 16 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
using System.Threading.Channels;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
Expand Down Expand Up @@ -462,9 +463,41 @@ public async IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(IEnumerable<string>
}
}

await foreach (var entry in watcher.Entries.ReadAllAsync(cancellationToken).ConfigureAwait(false))
while (true)
{
yield return entry;
bool waitToReadAsync;

if (_opts.WatcherThrowOnCancellation)
{
waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
else
{
try
{
waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
break;
}
catch (TaskCanceledException)
{
break;
}
catch (OperationCanceledException)
{
break;
}
}

if (!waitToReadAsync)
break;

while (watcher.Entries.TryRead(out var msg))
{
yield return msg;
}
}
}

Expand All @@ -488,11 +521,43 @@ public async IAsyncEnumerable<NatsKVEntry<T>> HistoryAsync<T>(string key, INatsD

await using var watcher = await WatchInternalAsync<T>([key], serializer, opts, cancellationToken);

await foreach (var entry in watcher.Entries.ReadAllAsync(cancellationToken).ConfigureAwait(false))
while (true)
{
yield return entry;
if (entry.Delta == 0)
yield break;
bool waitToReadAsync;

if (_opts.WatcherThrowOnCancellation)
{
waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
else
{
try
{
waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
break;
}
catch (TaskCanceledException)
{
break;
}
catch (OperationCanceledException)
{
break;
}
}

if (!waitToReadAsync)
break;

while (watcher.Entries.TryRead(out var entry))
{
yield return entry;
if (entry.Delta == 0)
yield break;
}
}
}

Expand Down Expand Up @@ -527,12 +592,44 @@ public async ValueTask PurgeDeletesAsync(NatsKVPurgeOpts? opts = default, Cancel
if (watcher.InitialConsumer.Info.NumPending == 0)
return;

await foreach (var entry in watcher.Entries.ReadAllAsync(cancellationToken).ConfigureAwait(false))
while (true)
{
if (entry.Operation is NatsKVOperation.Purge or NatsKVOperation.Del)
deleted.Add(entry);
if (entry.Delta == 0)
goto PURGE_LOOP_DONE;
bool waitToReadAsync;

if (_opts.WatcherThrowOnCancellation)
{
waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
else
{
try
{
waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
break;
}
catch (TaskCanceledException)
{
break;
}
catch (OperationCanceledException)
{
break;
}
}

if (!waitToReadAsync)
break;

while (watcher.Entries.TryRead(out var entry))
{
if (entry.Operation is NatsKVOperation.Purge or NatsKVOperation.Del)
deleted.Add(entry);
if (entry.Delta == 0)
goto PURGE_LOOP_DONE;
}
}
}

Expand Down Expand Up @@ -572,12 +669,44 @@ public async IAsyncEnumerable<string> GetKeysAsync(IEnumerable<string> filters,
if (watcher.InitialConsumer.Info.NumPending == 0)
yield break;

await foreach (var entry in watcher.Entries.ReadAllAsync(cancellationToken).ConfigureAwait(false))
while (true)
{
if (entry.Operation is NatsKVOperation.Put)
yield return entry.Key;
if (entry.Delta == 0)
yield break;
bool waitToReadAsync;

if (_opts.WatcherThrowOnCancellation)
{
waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
else
{
try
{
waitToReadAsync = await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
break;
}
catch (TaskCanceledException)
{
break;
}
catch (OperationCanceledException)
{
break;
}
}

if (!waitToReadAsync)
break;

while (watcher.Entries.TryRead(out var entry))
{
if (entry.Operation is NatsKVOperation.Put)
yield return entry.Key;
if (entry.Delta == 0)
yield break;
}
}
}

Expand Down
44 changes: 44 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -727,4 +727,48 @@ public async Task ReadAfterDelete()
// Should be no results here.
Assert.False(results.Any());
}

[Fact]
public async Task Watcher_cancellation_no_warn_logs()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var loggerFactory = new InMemoryTestLoggerFactory(LogLevel.Information, log =>
{
_output.WriteLine($"LOG:{log.LogLevel}: {log.Message}");
});

await using var nats1 = new NatsConnection(new NatsOpts
{
Url = _server.Url,
LoggerFactory = loggerFactory,
});
var prefix = _server.GetNextId();
await nats1.ConnectRetryAsync();
var js1 = new NatsJSContext(nats1);
var kv1 = new NatsKVContext(js1, new NatsKVOpts
{
WatcherThrowOnCancellation = false,
});
var s = await kv1.CreateStoreAsync($"{prefix}b1", cancellationToken);

var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(1));
await foreach (var e in s.WatchAsync<string>(cancellationToken: cts2.Token).ConfigureAwait(false))
{
}

await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
var kv2 = new NatsKVContext(js1, new NatsKVOpts
{
// WatcherThrowOnCancellation = true, // Default
});
var s2 = await kv2.CreateStoreAsync($"{prefix}b1", cancellationToken);
var cts3 = new CancellationTokenSource(TimeSpan.FromSeconds(1));
await foreach (var e in s2.WatchAsync<string>(cancellationToken: cts3.Token).ConfigureAwait(false))
{
}
});
}
}
Loading