Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions sandbox/MicroBenchmark/RequestReplyBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,36 @@ namespace MicroBenchmark;
[PlainExporter]
public class RequestReplyBench
{
private NatsConnection _nats;
private CancellationTokenSource _cts;
private Task _subscription;
private NatsConnection _nats1;
private NatsConnection _nats2;

[GlobalSetup]
public async Task SetupAsync()
{
_nats = new NatsConnection();
await _nats.ConnectAsync();
_cts = new CancellationTokenSource();
_subscription = Task.Run(async () =>
{
await foreach (var msg in _nats.SubscribeAsync<int>("req_rep_bench", cancellationToken: _cts.Token))
{
await msg.ReplyAsync(0xBEEF);
}
});
_nats1 = new NatsConnection();
_nats2 = new NatsConnection(new NatsOpts { RequestReplyMode = NatsRequestReplyMode.Direct });
await _nats1.ConnectAsync();
await _nats2.ConnectAsync();
}

[GlobalCleanup]
public async Task CleanupAsync()
{
await _cts.CancelAsync();
await _subscription;
await _nats.DisposeAsync();
await _nats1.DisposeAsync();
await _nats2.DisposeAsync();
}

[Benchmark(Baseline = true)]
public async Task<string> RequestReplyAsync() => await GetResultAsync(_nats1);

[Benchmark]
public async Task<int> RequestReplyAsync()
public async Task<string> RequestReplyDirectAsync() => await GetResultAsync(_nats2);

private static async Task<string> GetResultAsync(NatsConnection nats)
{
var reply = await _nats.RequestAsync<int, int>("req_rep_bench", 0xDEAD);
var reply = await nats.RequestAsync<string>("$JS.API.INFO");
var result = reply.Data;
ArgumentOutOfRangeException.ThrowIfNotEqual(0xBEEF, result);
ArgumentException.ThrowIfNullOrEmpty(result);
return result;
}
}
137 changes: 137 additions & 0 deletions src/NATS.Client.Core/Internal/ReplyTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System.Buffers;
using System.Buffers.Text;
using System.Collections.Concurrent;
using System.Text;

namespace NATS.Client.Core.Internal;

internal sealed class ReplyTask<T> : ReplyTaskBase, IDisposable
{
private readonly object _gate;
private readonly ReplyTaskFactory _factory;
private readonly long _id;
private readonly NatsConnection _connection;
private readonly INatsDeserialize<T> _deserializer;
private readonly TimeSpan _requestTimeout;
private readonly TaskCompletionSource _tcs;
private NatsMsg<T> _msg;

public ReplyTask(ReplyTaskFactory factory, long id, string subject, NatsConnection connection, INatsDeserialize<T> deserializer, TimeSpan requestTimeout)
{
_factory = factory;
_id = id;
Subject = subject;
_connection = connection;
_deserializer = deserializer;
_requestTimeout = requestTimeout;
_tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_gate = new object();
}

public string Subject { get; }

public async ValueTask<NatsMsg<T>> GetResultAsync(CancellationToken cancellationToken)
{
try
{
await _tcs.Task
.WaitAsync(_requestTimeout, cancellationToken)
.ConfigureAwait(false);
}
catch (TimeoutException)
{
NatsNoReplyException.Throw();
}

lock (_gate)
{
return _msg;
}
}

public override void SetResult(string? replyTo, ReadOnlySequence<byte> payload, ReadOnlySequence<byte>? headersBuffer)
{
lock (_gate)
{
_msg = NatsMsg<T>.Build(Subject, replyTo, headersBuffer, payload, _connection, _connection.HeaderParser, _deserializer);
}

_tcs.TrySetResult();
}

public void Dispose() => _factory.Return(_id);
}

internal abstract class ReplyTaskBase
{
public abstract void SetResult(string? replyTo, ReadOnlySequence<byte> payload, ReadOnlySequence<byte>? headersBuffer);
}

internal sealed class ReplyTaskFactory
{
private readonly byte[] _inboxPrefix;
private readonly string _inboxPrefixString;
private readonly NatsConnection _connection;
private readonly ConcurrentDictionary<long, ReplyTaskBase> _replies;
private readonly INatsSerializerRegistry _serializerRegistry;
private readonly TimeSpan _requestTimeout;
private readonly int _subjectMaxLength;
private readonly bool _allocSubject;
private long _nextId;

public ReplyTaskFactory(NatsConnection connection)
{
_connection = connection;
_inboxPrefixString = _connection.InboxPrefix + ".";
_inboxPrefix = Encoding.UTF8.GetBytes(_inboxPrefixString);
_subjectMaxLength = _inboxPrefix.Length + 20; // 20 digits for long
_allocSubject = _subjectMaxLength < 128;
_serializerRegistry = _connection.Opts.SerializerRegistry;
_requestTimeout = _connection.Opts.RequestTimeout;
_replies = new ConcurrentDictionary<long, ReplyTaskBase>();
}

public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deserializer, TimeSpan? requestTimeout)
{
deserializer ??= _serializerRegistry.GetDeserializer<TReply>();
var id = Interlocked.Increment(ref _nextId);

string subject;
if (_allocSubject)
{
Span<byte> buffer = stackalloc byte[_subjectMaxLength];
_inboxPrefix.CopyTo(buffer);
var idSpan = buffer.Slice(_inboxPrefix.Length);
if (Utf8Formatter.TryFormat(id, idSpan, out var written))
{
var subjectSpan = buffer.Slice(0, written + _inboxPrefix.Length);
subject = Encoding.UTF8.GetString(subjectSpan);
}
else
{
subject = _inboxPrefixString + id;
}
}
else
{
subject = _inboxPrefixString + id;
}

var rt = new ReplyTask<TReply>(this, id, subject, _connection, deserializer, requestTimeout ?? _requestTimeout);
_replies.TryAdd(id, rt);
return rt;
}

public void Return(long id) => _replies.TryRemove(id, out _);

public bool TrySetResult(long id, string? replyTo, in ReadOnlySequence<byte> payloadBuffer, in ReadOnlySequence<byte>? headersBuffer)
{
if (_replies.TryGetValue(id, out var rt))
{
rt.SetResult(replyTo, payloadBuffer, headersBuffer);
return true;
}

return false;
}
}
14 changes: 13 additions & 1 deletion src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal sealed class SubscriptionManager : INatsSubscriptionManager, IAsyncDisp

private int _sid; // unique alphanumeric subscription ID, generated by the client(per connection).
private InboxSub _inboxSub;
private int _inboxSid;

public SubscriptionManager(NatsConnection connection, string inboxPrefix)
{
Expand All @@ -46,6 +47,8 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)

internal InboxSubBuilder InboxSubBuilder { get; }

internal int InboxSid => Interlocked.CompareExchange(ref _inboxSid, 0, 0);

public ValueTask SubscribeAsync(NatsSubBase sub, CancellationToken cancellationToken)
{
if (Telemetry.HasListeners())
Expand Down Expand Up @@ -228,7 +231,7 @@ internal INatsSubscriptionManager GetManagerFor(string subject)
return this;
}

private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken)
internal async Task InitializeInboxSubscriptionAsync(CancellationToken cancellationToken)
{
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
{
Expand Down Expand Up @@ -257,14 +260,23 @@ await SubscribeInternalAsync(
_inboxSubLock.Release();
}
}
}

private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken)
{
await InitializeInboxSubscriptionAsync(cancellationToken).ConfigureAwait(false);
await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false);
}

private async ValueTask SubscribeInternalAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
{
var sid = GetNextSid();

if (sub is InboxSub)
{
Interlocked.Exchange(ref _inboxSid, sid);
}

if (_debug)
{
_logger.LogDebug(NatsLogEvents.Subscription, "New subscription {Subject}/{Sid}", sub.Subject, sid);
Expand Down
19 changes: 19 additions & 0 deletions src/NATS.Client.Core/NatsConnection.RequestReply.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using NATS.Client.Core.Internal;
Expand Down Expand Up @@ -38,6 +39,15 @@ public async ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(
try
{
replyOpts = SetReplyOptsDefaults(replyOpts);

if (Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
{
using var rt = _replyTaskFactory.CreateReplyTask(replySerializer, replyOpts.Timeout);
requestSerializer ??= Opts.SerializerRegistry.GetSerializer<TRequest>();
await PublishAsync(subject, data, headers, rt.Subject, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false);
return await rt.GetResultAsync(cancellationToken).ConfigureAwait(false);
}

await using var sub1 = await CreateRequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);

Expand All @@ -56,6 +66,15 @@ public async ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(
}

replyOpts = SetReplyOptsDefaults(replyOpts);

if (Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
{
using var rt = _replyTaskFactory.CreateReplyTask(replySerializer, replyOpts.Timeout);
requestSerializer ??= Opts.SerializerRegistry.GetSerializer<TRequest>();
await PublishAsync(subject, data, headers, rt.Subject, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false);
return await rt.GetResultAsync(cancellationToken).ConfigureAwait(false);
}

await using var sub = await CreateRequestSubAsync<TRequest, TReply>(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken)
.ConfigureAwait(false);

Expand Down
38 changes: 38 additions & 0 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public partial class NatsConnection : INatsConnection
private readonly Channel<(NatsEvent, NatsEventArgs)> _eventChannel;
private readonly ClientOpts _clientOpts;
private readonly SubscriptionManager _subscriptionManager;
private readonly ReplyTaskFactory _replyTaskFactory;

private ServerInfo? _writableServerInfo;
private int _pongCount;
Expand Down Expand Up @@ -83,6 +84,7 @@ public NatsConnection(NatsOpts opts)
CommandWriter = new CommandWriter("main", this, _pool, Opts, Counter, EnqueuePing);
InboxPrefix = NewInbox(opts.InboxPrefix);
_subscriptionManager = new SubscriptionManager(this, InboxPrefix);
_replyTaskFactory = new ReplyTaskFactory(this);
_clientOpts = ClientOpts.Create(Opts);
HeaderParser = new NatsHeaderParser(opts.HeaderEncoding);
_defaultSubscriptionChannelOpts = new BoundedChannelOptions(opts.SubPendingChannelCapacity)
Expand Down Expand Up @@ -194,6 +196,11 @@ public async ValueTask ConnectAsync()

// Only Closed(initial) state, can run initial connect.
await InitialConnectAsync().ConfigureAwait(false);

if (Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
{
await _subscriptionManager.InitializeInboxSubscriptionAsync(_disposedCts.Token).ConfigureAwait(false);
}
}

/// <inheritdoc />
Expand Down Expand Up @@ -268,6 +275,37 @@ internal string SpanDestinationName(string subject)

internal ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer)
{
if (Opts.RequestReplyMode == NatsRequestReplyMode.Direct)
{
// Direct mode, check if the subject is an inbox
// and if so, check if the subject is a reply to a request
// by checking if the subject length is less than two NUIDs + dots
// e.g. _INBOX.Hu5HPpWesrJhvQq2NG3YJ6.Hu5HPpWesrJhvQq2NG3YLw
// vs. _INBOX.Hu5HPpWesrJhvQq2NG3YJ6.1234
// otherwise, it's not a reply in direct mode.
if (_subscriptionManager.InboxSid == sid && subject.Length < InboxPrefix.Length + 1 + 22 + 1 + 22)
{
var idString = subject.AsSpan().Slice(InboxPrefix.Length + 1)
#if NETSTANDARD2_0
.ToString()
#endif
;

if (long.TryParse(idString, out var id))
{
if (_replyTaskFactory.TrySetResult(id, replyTo, payloadBuffer, headersBuffer))
{
return default;
}

// if we can't set the result, either the task is already timed out or
// it's not a reply to a request.
}

// if we can't parse the id, it's not a reply.
}
}

return _subscriptionManager.PublishToClientHandlersAsync(subject, replyTo, sid, headersBuffer, payloadBuffer);
}

Expand Down
5 changes: 5 additions & 0 deletions src/NATS.Client.Core/NatsException.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;

namespace NATS.Client.Core;
Expand All @@ -24,6 +25,10 @@ public NatsNoReplyException()
: base("No reply received")
{
}

[DoesNotReturn]
[MethodImpl(MethodImplOptions.NoInlining)]
public static void Throw() => throw new NatsNoReplyException();
}

public sealed class NatsNoRespondersException : NatsException
Expand Down
Loading