Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sandbox/MicroBenchmark/JSPublishBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ public class JSPublishBench
private NatsConnection _nats;
private NatsJSContext _js;

[Params(NatsRequestReplyMode.Direct, NatsRequestReplyMode.SharedInbox)]
public NatsRequestReplyMode Mode { get; set; }

[Params(1, 10, 1_000)]
public int Batch { get; set; }

[GlobalSetup]
public async Task Setup()
{
_nats = new NatsConnection();
_nats = new NatsConnection(new NatsOpts { RequestReplyMode = Mode });
_js = new NatsJSContext(_nats);
await _nats.ConnectAsync();
await _js.CreateStreamAsync(new StreamConfig("bench_test1", ["bench_test1"]));
Expand Down
5 changes: 4 additions & 1 deletion sandbox/MicroBenchmark/KVBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ public class KvBench
private NatsKVContext _kv;
private NatsKVStore _store;

[Params(NatsRequestReplyMode.Direct, NatsRequestReplyMode.SharedInbox)]
public NatsRequestReplyMode Mode { get; set; }

[GlobalSetup]
public async Task SetupAsync()
{
_nats = new NatsConnection();
_nats = new NatsConnection(new NatsOpts { RequestReplyMode = Mode });
_js = new NatsJSContext(_nats);
_kv = new NatsKVContext(_js);
_store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark"));
Expand Down
87 changes: 87 additions & 0 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,47 @@ public async ValueTask<NatsResult<PubAckResponse>> TryPublishAsync<T>(

for (var i = 0; i < retryMax; i++)
{
if (Connection.Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feature toggle needs to be explicitly turned on to avoid existing applications being affected.

{
var noReply = false;
NatsMsg<PubAckResponse> msg;
try
{
msg = await Connection.RequestAsync<T, PubAckResponse>(
subject: subject,
data: data,
headers: headers,
requestSerializer: serializer,
replySerializer: NatsJSJsonSerializer<PubAckResponse>.Default,
requestOpts: opts,
replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout },
cancellationToken).ConfigureAwait(false);
}
catch (NatsNoReplyException)
{
noReply = true;
msg = default;
}
catch (Exception ex)
{
return ex;
}

if (noReply || msg.HasNoResponders)
{
_logger.LogDebug(NatsJSLogEvents.PublishNoResponseRetry, "No response received, retrying {RetryCount}/{RetryMax}", i + 1, retryMax);
await Task.Delay(retryWait, cancellationToken);
continue;
}

if (msg.Data == null)
{
return new NatsJSException("No response data received");
}

return msg.Data;
}

await using var sub = await Connection.CreateRequestSubAsync<T, PubAckResponse>(
subject: subject,
data: data,
Expand Down Expand Up @@ -353,6 +394,52 @@ internal async ValueTask<NatsResult<NatsJSResponse<TResponse>>> TryJSRequestAsyn
// Validator.ValidateObject(request, new ValidationContext(request));
}

if (Connection.Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
{
NatsMsg<NatsJSApiResult<TResponse>> msg;
try
{
msg = await Connection.RequestAsync<TRequest, NatsJSApiResult<TResponse>>(
subject: subject,
data: request,
headers: null,
replyOpts: new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout },
requestSerializer: NatsJSJsonSerializer<TRequest>.Default,
replySerializer: NatsJSJsonDocumentSerializer<TResponse>.Default,
cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (NatsNoReplyException)
{
return new NatsJSApiNoResponseException();
}
catch (NatsException e)
{
return e;
}

if (msg.HasNoResponders)
{
return new NatsNoRespondersException();
}

if (msg.Error is { } messageError)
{
return messageError;
}

if (msg.Data.HasException)
{
return msg.Data.Exception;
}

if (msg.Data.HasError)
{
return new NatsJSResponse<TResponse>(null, msg.Data.Error);
}

return new NatsJSResponse<TResponse>(msg.Data.Value, null);
}

await using var sub = await Connection.CreateRequestSubAsync<TRequest, NatsJSApiResult<TResponse>>(
subject: subject,
data: request,
Expand Down
8 changes: 5 additions & 3 deletions tests/NATS.Client.JetStream.Tests/ConsumeResponseTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ namespace NATS.Client.JetStream.Tests;

public class ConsumeResponseTest
{
[Fact]
public async Task Consume_response()
[Theory]
[InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task Consume_response(NatsRequestReplyMode mode)
{
var headers = new Stack<string>();
headers.Push("NATS/1.0 400 Bad Test Request");
Expand Down Expand Up @@ -35,7 +37,7 @@ public async Task Consume_response()
});

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var nats = new NatsConnection(new NatsOpts { Url = ms.Url });
await using var nats = new NatsConnection(new NatsOpts { Url = ms.Url, RequestReplyMode = mode });
var js = nats.CreateJetStreamContext();
var consumer = await js.GetConsumerAsync("x", "x", cts.Token);

Expand Down
30 changes: 21 additions & 9 deletions tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ public ConsumerFetchTest(ITestOutputHelper output, NatsServerFixture server)
_server = server;
}

[Fact]
public async Task Fetch_test()
[Theory]
[InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task Fetch_test(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token);
Expand All @@ -45,11 +47,13 @@ public async Task Fetch_test()
Assert.Equal(10, count);
}

[Fact]
public async Task FetchNoWait_test()
[Theory]
[InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task FetchNoWait_test(NatsRequestReplyMode mode)
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
await js.CreateStreamAsync($"{prefix}s1", new[] { $"{prefix}s1.*" }, cts.Token);
Expand All @@ -73,10 +77,14 @@ public async Task FetchNoWait_test()
Assert.Equal(10, count);
}

[Fact]
public async Task Fetch_dispose_test()
[Theory]

// TODO: Fix this test
// [InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task Fetch_dispose_test(NatsRequestReplyMode mode)
{
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
Expand Down Expand Up @@ -104,8 +112,10 @@ public async Task Fetch_dispose_test()
var signal2 = new WaitSignal();
var reader = Task.Run(async () =>
{
var x = 0;
await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token))
{
_output.WriteLine($"rcv:{++x}");
await msg.AckAsync(cancellationToken: cts.Token);
signal1.Pulse();
await signal2;
Expand All @@ -124,6 +134,7 @@ await Retry.Until(
async () =>
{
var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token);
_output.WriteLine($"pend1:{c.Info.NumAckPending}");
return c.Info.NumAckPending == 9;
},
retryDelay: TimeSpan.FromSeconds(1),
Expand All @@ -140,6 +151,7 @@ await Retry.Until(
async () =>
{
var c = await js.GetConsumerAsync($"{prefix}s1", $"{prefix}c1", cts.Token);
_output.WriteLine($"pend:{c.Info.NumAckPending}");
return c.Info.NumAckPending == 0;
},
retryDelay: TimeSpan.FromSeconds(1),
Expand Down
16 changes: 10 additions & 6 deletions tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ public ConsumerSetupTest(ITestOutputHelper output, NatsServerFixture server)
_server = server;
}

[Fact]
public async Task Create_push_consumer()
[Theory]
[InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task Create_push_consumer(NatsRequestReplyMode mode)
{
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);

Expand Down Expand Up @@ -82,10 +84,12 @@ await js.CreateOrUpdateConsumerAsync(
Assert.Equal(pauseUntil, config.PauseUntil);
}

[Fact]
public async Task Consumer_config()
[Theory]
[InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task Consumer_config(NatsRequestReplyMode mode)
{
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
var prefix = _server.GetNextId();
var js = new NatsJSContextFactory().CreateContext(nats);

Expand Down
16 changes: 10 additions & 6 deletions tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ public DoubleAckNakDelayTests(ITestOutputHelper output, NatsServerFixture server
_server = server;
}

[Fact]
public async Task Double_ack_received_messages()
[Theory]
[InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task Double_ack_received_messages(NatsRequestReplyMode mode)
{
var proxy = _server.CreateProxy();
await using var nats = proxy.CreateNatsConnection();
await using var nats = proxy.CreateNatsConnection(mode);
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();

Expand Down Expand Up @@ -53,11 +55,13 @@ public async Task Double_ack_received_messages()
}
}

[Fact]
public async Task Delay_nak_received_messages()
[Theory]
[InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task Delay_nak_received_messages(NatsRequestReplyMode mode)
{
var proxy = _server.CreateProxy();
await using var nats = proxy.CreateNatsConnection();
await using var nats = proxy.CreateNatsConnection(mode);
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();

Expand Down
8 changes: 5 additions & 3 deletions tests/NATS.Client.JetStream.Tests/JetStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ public async Task Stream_invalid_name_test(string? streamName)
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.DeleteMessageAsync(streamName!, new StreamMsgDeleteRequest()));
}

[Fact]
public async Task Create_stream_test()
[Theory]
[InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task Create_stream_test(NatsRequestReplyMode mode)
{
await using var server = await NatsServerProcess.StartAsync();
await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestTimeout = TimeSpan.FromSeconds(10) });
await using var nats = new NatsConnection(new NatsOpts { Url = server.Url, RequestTimeout = TimeSpan.FromSeconds(10), RequestReplyMode = mode });

// Happy user
{
Expand Down
16 changes: 10 additions & 6 deletions tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ public ManageConsumerTest(ITestOutputHelper output, NatsServerFixture server)
_server = server;
}

[Fact]
public async Task Create_get_consumer()
[Theory]
[InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task Create_get_consumer(NatsRequestReplyMode mode)
{
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestTimeout = TimeSpan.FromSeconds(10) });
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestTimeout = TimeSpan.FromSeconds(10), RequestReplyMode = mode });
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
Expand All @@ -47,10 +49,12 @@ public async Task Create_get_consumer()
}
}

[Fact]
public async Task List_delete_consumer()
[Theory]
[InlineData(NatsRequestReplyMode.Direct)]
[InlineData(NatsRequestReplyMode.SharedInbox)]
public async Task List_delete_consumer(NatsRequestReplyMode mode)
{
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url, RequestReplyMode = mode });
await nats.ConnectRetryAsync();
var prefix = _server.GetNextId();
var js = new NatsJSContext(nats);
Expand Down
Loading