Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added NATS option to disable dynamic subject creation for consumers.
  • Loading branch information
davidterins committed Jun 29, 2024
commit 041450759fb461f7e18e7833990fec2a96dd6372
5 changes: 5 additions & 0 deletions src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public class NATSOptions
/// </summary>
public int ConnectionPoolSize { get; set; } = 10;

/// <summary>
/// Allows a nats consumer client to dynamically create a stream and configure the expected subjects on the stream. Defaults to true.
/// </summary>
public bool EnableSubscriberClientStreamAndSubjectCreation { get; set; } = true;

/// <summary>
/// Used to setup all NATs client options
/// </summary>
Expand Down
47 changes: 25 additions & 22 deletions src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,37 +43,40 @@ public NATSConsumerClient(string groupName, byte groupConcurrent, IOptions<NATSO

public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
{
Connect();

var jsm = _consumerClient!.CreateJetStreamManagementContext();

var streamGroup = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x));

foreach (var subjectStream in streamGroup)
if (_natsOptions.EnableSubscriberClientStreamAndSubjectCreation)
{
var builder = StreamConfiguration.Builder()
.WithName(subjectStream.Key)
.WithNoAck(false)
.WithStorageType(StorageType.Memory)
.WithSubjects(subjectStream.ToList());
Connect();

_natsOptions.StreamOptions?.Invoke(builder);
var jsm = _consumerClient!.CreateJetStreamManagementContext();

try
{
jsm.GetStreamInfo(subjectStream.Key); // this throws if the stream does not exist
var streamSubjectsGroups = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x));

jsm.UpdateStream(builder.Build());
}
catch (NATSJetStreamException)
foreach (var streamSubjectsGroup in streamSubjectsGroups)
{
var builder = StreamConfiguration.Builder()
.WithName(streamSubjectsGroup.Key)
.WithNoAck(false)
.WithStorageType(StorageType.Memory)
.WithSubjects(streamSubjectsGroup.ToList());

_natsOptions.StreamOptions?.Invoke(builder);

try
{
jsm.AddStream(builder.Build());
jsm.GetStreamInfo(streamSubjectsGroup.Key); // this throws if the stream does not exist

jsm.UpdateStream(builder.Build());
}
catch
catch (NATSJetStreamException)
{
// ignored
try
{
jsm.AddStream(builder.Build());
}
catch
{
// ignored
}
}
}
}
Expand Down