Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Added verification of the consumer sequence number for pull ordered c…
…onsumer
  • Loading branch information
sergeimam committed Oct 10, 2025
commit a34811b5c1eb7dbbdea20d53e5ed7b977ffaa514
124 changes: 98 additions & 26 deletions src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public async IAsyncEnumerable<NatsJSMsg<T>> ConsumeAsync<T>(
consumerName = consumer.Info.Name;
_logger.LogInformation(NatsJSLogEvents.NewConsumer, "Created {ConsumerName} with sequence {Seq}", consumerName, seq);

ulong cseq = 0;
NatsJSProtocolException? protocolException = default;

await using (var cc = await consumer.OrderedConsumeInternalAsync(serializer, opts, cancellationToken))
Expand Down Expand Up @@ -126,7 +127,15 @@ public async IAsyncEnumerable<NatsJSMsg<T>> ConsumeAsync<T>(
if (msg.Metadata is not { } metadata)
continue;

var expected = cseq + 1;
if (metadata.Sequence.Consumer != expected)
{
_logger.LogWarning(NatsJSLogEvents.Retry, $"Consumer sequence mismatch. Expected {expected}, was {metadata.Sequence.Consumer} Retrying...");
goto CONSUME_LOOP;
}

seq = metadata.Sequence.Stream;
cseq = metadata.Sequence.Consumer;

yield return msg;
}
Expand Down Expand Up @@ -180,23 +189,57 @@ public async IAsyncEnumerable<NatsJSMsg<T>> FetchAsync<T>(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken, cancellationToken).Token;
var processed = 0;

var consumer = await RecreateConsumer(_fetchConsumerName, _fetchSeq, cancellationToken);
_fetchConsumerName = consumer.Info.Name;

await foreach (var msg in consumer.FetchAsync(opts, serializer, cancellationToken))
while (!cancellationToken.IsCancellationRequested)
{
if (msg.Metadata is not { } metadata)
continue;
if (processed >= opts.MaxMsgs)
yield break;

_fetchSeq = metadata.Sequence.Stream;
yield return msg;
}
var mismatch = false;
ulong cseq = 0;

var consumer = await RecreateConsumer(_fetchConsumerName, _fetchSeq, cancellationToken);
_fetchConsumerName = consumer.Info.Name;

try
{
var fetchOpts = opts with { MaxMsgs = opts.MaxMsgs - processed };

await foreach (var msg in consumer.FetchAsync(fetchOpts, serializer, cancellationToken))
{
if (msg.Metadata is not { } metadata)
continue;

var expected = cseq + 1;
if (metadata.Sequence.Consumer != expected)
{
_logger.LogWarning(NatsJSLogEvents.Retry, $"Consumer sequence mismatch. Expected {expected}, was {metadata.Sequence.Consumer} Retrying...");
mismatch = true;
break;
}

_fetchSeq = metadata.Sequence.Stream;
cseq = metadata.Sequence.Consumer;

processed++;

yield return msg;
}
}
finally
{
var deleted = await TryDeleteConsumer(_fetchConsumerName, cancellationToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you care if the delete succeeds? It should be ephemeral anyway. The key is to always generate an ordered consumer name. We agreed that the user can provide a prefix for ordered consumer names, which will allow them to be found more easily in logs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_fetchConsumerName is kept around to delete the ephemeral when we finish with it just to be a good citizen. so if we couldn't delete it this time round and there is another fetch we try our best to delete again potentially. I don't think prefix feature is implemented in this client.


if (deleted)
_fetchConsumerName = string.Empty;
}

var deleted = await TryDeleteConsumer(_fetchConsumerName, cancellationToken);
if (!mismatch)
yield break;

if (deleted)
_fetchConsumerName = string.Empty;
await Task.Delay(100, cancellationToken);
}
}

/// <inheritdoc />
Expand All @@ -206,26 +249,55 @@ public async IAsyncEnumerable<NatsJSMsg<T>> FetchNoWaitAsync<T>(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken, cancellationToken).Token;
var processed = 0;

var consumer = await RecreateConsumer(_fetchConsumerName, _fetchSeq, cancellationToken);
_fetchConsumerName = consumer.Info.Name;
try
while (!cancellationToken.IsCancellationRequested)
{
await foreach (var msg in consumer.FetchNoWaitAsync(opts, serializer, cancellationToken))
if (processed >= opts.MaxMsgs)
yield break;

var mismatch = false;
ulong cseq = 0;
var consumer = await RecreateConsumer(_fetchConsumerName, _fetchSeq, cancellationToken);
_fetchConsumerName = consumer.Info.Name;

try
{
if (msg.Metadata is not { } metadata)
continue;
var fetchOpts = opts with { MaxMsgs = opts.MaxMsgs - processed };

await foreach (var msg in consumer.FetchNoWaitAsync(fetchOpts, serializer, cancellationToken))
{
if (msg.Metadata is not { } metadata)
continue;

var expected = cseq + 1;
if (metadata.Sequence.Consumer != expected)
{
_logger.LogWarning(NatsJSLogEvents.Retry, $"Consumer sequence mismatch. Expected {expected}, was {metadata.Sequence.Consumer} Retrying...");
mismatch = true;
break;
}

_fetchSeq = metadata.Sequence.Stream;
yield return msg;
_fetchSeq = metadata.Sequence.Stream;
cseq = metadata.Sequence.Consumer;

processed++;

yield return msg;
}
}
}
finally
{
var deleted = await TryDeleteConsumer(_fetchConsumerName, cancellationToken);
finally
{
var deleted = await TryDeleteConsumer(_fetchConsumerName, cancellationToken);

if (deleted)
_fetchConsumerName = string.Empty;
}

if (!mismatch)
yield break;

if (deleted)
_fetchConsumerName = string.Empty;
await Task.Delay(100, cancellationToken);
}
}

Expand Down
Loading