Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/NATS.Client.JetStream/Models/StreamConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,15 @@ public StreamConfig()
[System.Text.Json.Serialization.JsonPropertyName("allow_atomic")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public bool AllowAtomicPublish { get; set; }

/// <summary>
/// PersistMode allows to opt-in to different persistence mode settings.
/// </summary>
/// <remarks>Supported by server v2.12</remarks>
[System.Text.Json.Serialization.JsonPropertyName("persist_mode")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
#if NET6_0
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonStringEnumConverter<StreamConfigPersistMode>))]
#endif
public StreamConfigPersistMode? PersistMode { get; set; }
}
7 changes: 7 additions & 0 deletions src/NATS.Client.JetStream/Models/StreamConfigPersistMode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace NATS.Client.JetStream.Models;

public enum StreamConfigPersistMode
{
Default = 0,
Async = 1,
}
24 changes: 24 additions & 0 deletions src/NATS.Client.JetStream/NatsJSJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ internal partial class NatsJSJsonSerializerContext : JsonSerializerContext
new JsonStringEnumConverter<StreamConfigDiscard>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigRetention>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigStorage>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<StreamConfigPersistMode>(JsonNamingPolicy.SnakeCaseLower),
new JsonStringEnumConverter<ConsumerCreateAction>(JsonNamingPolicy.SnakeCaseLower),
},
});
Expand Down Expand Up @@ -223,6 +224,17 @@ public override TEnum Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSe
}
}

if (typeToConvert == typeof(StreamConfigPersistMode))
{
switch (stringValue)
{
case "default":
return (TEnum)(object)StreamConfigPersistMode.Default;
case "async":
return (TEnum)(object)StreamConfigPersistMode.Async;
}
}

throw new InvalidOperationException($"Reading unknown enum type {typeToConvert.Name} or value {stringValue}");
}

Expand Down Expand Up @@ -345,6 +357,18 @@ public override void Write(Utf8JsonWriter writer, TEnum value, JsonSerializerOpt
return;
}
}
else if (value is StreamConfigPersistMode streamConfigPersistMode)
{
switch (streamConfigPersistMode)
{
case StreamConfigPersistMode.Default:
writer.WriteStringValue("default");
return;
case StreamConfigPersistMode.Async:
writer.WriteStringValue("async");
return;
}
}

throw new InvalidOperationException($"Writing unknown enum value {value.GetType().Name}.{value}");
}
Expand Down
85 changes: 85 additions & 0 deletions tests/NATS.Client.JetStream.Tests/EnumJsonTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,89 @@ public void ConsumerCreateRequestAction_Test(ConsumerCreateAction value, string
Assert.NotNull(result);
Assert.Equal(value, result.Action);
}

[Fact]
public void StreamConfigPersistMode_null_not_serialized()
{
var serializer = NatsJSJsonSerializer<StreamConfig>.Default;

// When PersistMode is null (not explicitly set), it should not be included in JSON
var bw = new NatsBufferWriter<byte>();
var config = new StreamConfig { PersistMode = null };
serializer.Serialize(bw, config);

var json = Encoding.UTF8.GetString(bw.WrittenSpan.ToArray());
Assert.DoesNotContain("persist_mode", json);

// Deserialize and verify it remains null
var result = serializer.Deserialize(new ReadOnlySequence<byte>(bw.WrittenMemory));
Assert.NotNull(result);
Assert.Null(result.PersistMode);
}

[Theory]
[InlineData(StreamConfigPersistMode.Default, "\"persist_mode\":\"default\"")]
[InlineData(StreamConfigPersistMode.Async, "\"persist_mode\":\"async\"")]
public void StreamConfigPersistMode_explicit_value_serialized(StreamConfigPersistMode value, string expected)
{
var serializer = NatsJSJsonSerializer<StreamConfig>.Default;

// When PersistMode is explicitly set (even to Default), it should be included in JSON
var bw = new NatsBufferWriter<byte>();
var config = new StreamConfig { PersistMode = value };
serializer.Serialize(bw, config);

var json = Encoding.UTF8.GetString(bw.WrittenSpan.ToArray());
Assert.Contains(expected, json);

// Deserialize and verify the value is preserved
var result = serializer.Deserialize(new ReadOnlySequence<byte>(bw.WrittenMemory));
Assert.NotNull(result);
Assert.Equal(value, result.PersistMode);
}

[Fact]
public void StreamConfigPersistMode_roundtrip_preserves_server_values()
{
var serializer = NatsJSJsonSerializer<StreamConfig>.Default;

// Test case 1: Server returns "default" - should preserve it
var jsonWithDefault = "{\"persist_mode\":\"default\",\"retention\":\"limits\",\"storage\":\"file\"}";
var bytes = Encoding.UTF8.GetBytes(jsonWithDefault);
var configFromServer = serializer.Deserialize(new ReadOnlySequence<byte>(bytes));
Assert.NotNull(configFromServer);
Assert.Equal(StreamConfigPersistMode.Default, configFromServer.PersistMode);

// When we serialize it again, it should include persist_mode
var bw1 = new NatsBufferWriter<byte>();
serializer.Serialize(bw1, configFromServer);
var jsonOut1 = Encoding.UTF8.GetString(bw1.WrittenSpan.ToArray());
Assert.Contains("\"persist_mode\":\"default\"", jsonOut1);

// Test case 2: Server returns "async" - should preserve it
var jsonWithAsync = "{\"persist_mode\":\"async\",\"retention\":\"limits\",\"storage\":\"file\"}";
bytes = Encoding.UTF8.GetBytes(jsonWithAsync);
configFromServer = serializer.Deserialize(new ReadOnlySequence<byte>(bytes));
Assert.NotNull(configFromServer);
Assert.Equal(StreamConfigPersistMode.Async, configFromServer.PersistMode);

// When we serialize it again, it should include persist_mode
var bw2 = new NatsBufferWriter<byte>();
serializer.Serialize(bw2, configFromServer);
var jsonOut2 = Encoding.UTF8.GetString(bw2.WrittenSpan.ToArray());
Assert.Contains("\"persist_mode\":\"async\"", jsonOut2);

// Test case 3: Server doesn't return persist_mode - should remain null
var jsonWithoutPersistMode = "{\"retention\":\"limits\",\"storage\":\"file\"}";
bytes = Encoding.UTF8.GetBytes(jsonWithoutPersistMode);
configFromServer = serializer.Deserialize(new ReadOnlySequence<byte>(bytes));
Assert.NotNull(configFromServer);
Assert.Null(configFromServer.PersistMode);

// When we serialize it again, it should NOT include persist_mode
var bw3 = new NatsBufferWriter<byte>();
serializer.Serialize(bw3, configFromServer);
var jsonOut3 = Encoding.UTF8.GetString(bw3.WrittenSpan.ToArray());
Assert.DoesNotContain("persist_mode", jsonOut3);
}
}
81 changes: 81 additions & 0 deletions tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,4 +296,85 @@ public async Task AllowAtomicPublish_property_should_be_set_on_stream()
var reRetrievedStream = await js.GetStreamAsync($"{prefix}atomic", cancellationToken: cts.Token);
Assert.False(reRetrievedStream.Info.Config.AllowAtomicPublish);
}

[SkipIfNatsServer(versionEarlierThan: "2.12")]
public async Task PersistMode_property_should_be_set_on_stream()
{
await using var nats = new NatsConnection(new NatsOpts { Url = _server.Url });
var prefix = _server.GetNextId();
await nats.ConnectRetryAsync();

var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

// Test 1: Create a stream with PersistMode set to Async
var streamConfigAsync = new StreamConfig($"{prefix}persist-async", [$"{prefix}persist-async.*"])
{
PersistMode = StreamConfigPersistMode.Async,
};

var streamAsync = await js.CreateStreamAsync(streamConfigAsync, cts.Token);

// Verify the property is set on the created stream
Assert.Equal(StreamConfigPersistMode.Async, streamAsync.Info.Config.PersistMode);

// Get the stream and verify the property is persisted
var retrievedStreamAsync = await js.GetStreamAsync($"{prefix}persist-async", cancellationToken: cts.Token);
Assert.Equal(StreamConfigPersistMode.Async, retrievedStreamAsync.Info.Config.PersistMode);

// Test 2: Create a stream with PersistMode set to Default
var streamConfigDefault = new StreamConfig($"{prefix}persist-default", [$"{prefix}persist-default.*"])
{
PersistMode = StreamConfigPersistMode.Default,
};

var streamDefault = await js.CreateStreamAsync(streamConfigDefault, cts.Token);

// Verify the property is set on the created stream
// Server v2.12 may return null for default value, which is acceptable
// The key is that we sent it in the request
Assert.True(
streamDefault.Info.Config.PersistMode == StreamConfigPersistMode.Default ||
streamDefault.Info.Config.PersistMode == null,
$"Expected PersistMode to be Default or null, but was {streamDefault.Info.Config.PersistMode}");

// Get the stream and verify the property
var retrievedStreamDefault = await js.GetStreamAsync($"{prefix}persist-default", cancellationToken: cts.Token);
Assert.True(
retrievedStreamDefault.Info.Config.PersistMode == StreamConfigPersistMode.Default ||
retrievedStreamDefault.Info.Config.PersistMode == null,
$"Expected PersistMode to be Default or null, but was {retrievedStreamDefault.Info.Config.PersistMode}");

// Test 3: Create a stream without PersistMode set (should be null)
var streamConfigNull = new StreamConfig($"{prefix}persist-null", [$"{prefix}persist-null.*"])
{
// PersistMode not set, should remain null
};

var streamNull = await js.CreateStreamAsync(streamConfigNull, cts.Token);
Assert.Null(streamNull.Info.Config.PersistMode);

// Verify the property might be null or server might return a default
// The key is that we didn't send it in the request
var retrievedStreamNull = await js.GetStreamAsync($"{prefix}persist-null", cancellationToken: cts.Token);
Assert.Null(retrievedStreamNull.Info.Config.PersistMode);

// Server behavior may vary - it might return null or a default value
// The important thing is our client didn't send persist_mode in the JSON

// Test 4: Verify that updating PersistMode throws an exception
var updatedConfig = streamConfigAsync with { PersistMode = StreamConfigPersistMode.Default };
var exception = await Assert.ThrowsAsync<NatsJSApiException>(
async () => await js.UpdateStreamAsync(updatedConfig, cts.Token));

// Verify the error message indicates persist mode cannot be changed
Assert.Contains("persist mode", exception.Message, StringComparison.OrdinalIgnoreCase);
Assert.Equal(500, exception.Error.Code);
Assert.Equal(10052, exception.Error.ErrCode);
Assert.Equal("stream configuration update can not change persist mode", exception.Error.Description);

var updatedAsync = await js.GetStreamAsync($"{prefix}persist-async", cancellationToken: cts.Token);
Assert.Equal(StreamConfigPersistMode.Async, updatedAsync.Info.Config.PersistMode);
}
}
Loading