Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -352,21 +352,13 @@ protected override async ValueTask ReceiveInternalAsync(
{
_logger.LogDebug(NatsJSLogEvents.LeadershipChange, "Leadership Change");
_notificationChannel?.Notify(NatsJSLeadershipChangeNotification.Default);
lock (_pendingGate)
{
_pendingBytes = 0;
_pendingMsgs = 0;
}
ResetPending();
}
else if (headers.Code == 503)
{
_logger.LogDebug(NatsJSLogEvents.NoResponders, "503 no responders");
_notificationChannel?.Notify(NatsJSNoRespondersNotification.Default);
lock (_pendingGate)
{
_pendingBytes = 0;
_pendingMsgs = 0;
}
ResetPending();
}
else if (headers.HasTerminalJSError())
{
Expand Down
48 changes: 48 additions & 0 deletions tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
using NATS.Client.Core.Tests;
using NATS.Client.Core2.Tests;
using NATS.Client.JetStream.Models;
using NATS.Client.Platform.Windows.Tests;
using NATS.Client.TestUtilities;
using NATS.Client.TestUtilities2;
using NATS.Net;

namespace NATS.Client.JetStream.Tests;

Expand Down Expand Up @@ -559,4 +561,50 @@ public async Task Consume_right_amount_of_messages_when_ack_wait_exceeded()
// Should not have redeliveries
Assert.Equal(2, count);
}

[Fact]
public async Task Consume_pending_reset_on_reconnect_when_using_ephemeral_consumer_503()
{
var logger = new InMemoryTestLoggerFactory(LogLevel.Debug);
await using var server = await NatsServerProcess.StartAsync();
await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, LoggerFactory = logger });
await nats.ConnectRetryAsync();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));

var js = nats.CreateJetStreamContext();
await js.CreateStreamAsync(new StreamConfig("s1", ["s1.*"]), cts.Token);
var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig("c1") { MemStorage = true }, cts.Token);

_ = Task.Run(
async () =>
{
await foreach (var msg in consumer.ConsumeAsync<string>(cancellationToken: cts.Token))
{
}
},
cts.Token);

await Retry.Until(
"first pull request received",
() => logger.Logs.Any(m => m.EventId == NatsJSLogEvents.PullRequest));

await server.RestartAsync();

await Retry.Until(
reason: "more pull request received",
condition: () => logger.Logs.Count(m => m.EventId == NatsJSLogEvents.PullRequest) > 1,
retryDelay: TimeSpan.FromSeconds(1),
timeout: TimeSpan.FromSeconds(60));

// After reconnect, we should not flood with pull requests
// since the consumer should reset (zero-out) its pending messages.
// This is to prevent flooding the server with pull requests
// when the consumer is not receiving messages.
// Wait for a few seconds to ensure more pull requests are sent.
await Task.Delay(3000, cts.Token);

var pullRequestCount = logger.Logs.Count(m => m.EventId == NatsJSLogEvents.PullRequest);
pullRequestCount.Should().BeLessThanOrEqualTo(4, "should not flood with pull requests after reconnect");
}
}
Loading