Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/NatsDistributedCache/NatsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public async Task SetAsync(
{
// todo: remove cast after https://github.com/nats-io/nats.net/pull/852 is released
await ((NatsKVStore)kvStore)
.PutAsync(GetEncodedKey(key), entry, ttl ?? TimeSpan.Zero, CacheEntrySerializer, token)
.PutWithTtlAsync(GetEncodedKey(key), entry, ttl ?? TimeSpan.Zero, CacheEntrySerializer, token)
.ConfigureAwait(false);
}
catch (Exception ex)
Expand Down
131 changes: 131 additions & 0 deletions src/NatsDistributedCache/NatsExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;

namespace CodeCargo.Nats.DistributedCache;

public static class NatsExtensions
{
private const string NatsTtl = "Nats-TTL";
private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);
private static readonly NatsKVException KeyCannotBeEmptyException = new("Key cannot be empty");
private static readonly NatsKVException KeyCannotStartOrEndWithPeriodException = new("Key cannot start or end with a period");
private static readonly NatsKVException KeyContainsInvalidCharactersException = new("Key contains invalid characters");

/// <summary>
/// Put a value into the bucket using the key
/// </summary>
/// <param name="store">NATS key-value store instance</param>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="ttl">Time-to-live value</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>Revision number</returns>
/// <remarks>
/// TTLs should only be used when the store is configured with a storage type that supports expiration,
/// and with history set to 1. Otherwise, the TTL behavior is undefined.
/// History is set to 1 by default, so you should be fine unless you changed it explicitly.
/// </remarks>
public static async ValueTask<ulong> PutWithTtlAsync<T>(this INatsKVStore store, string key, T value, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
var result = await TryPutWithTtlAsync(store, key, value, ttl, serializer, cancellationToken);
if (!result.Success)
{
ThrowException(result.Error);
}

return result.Value;
}

/// <summary>
/// Put a value into the bucket using the key
/// </summary>
/// <param name="store">NATS key-value store instance</param>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="ttl">Time-to-live value</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>Revision number</returns>
/// <remarks>
/// TTLs should only be used when the store is configured with a storage type that supports expiration,
/// and with history set to 1. Otherwise, the TTL behavior is undefined.
/// History is set to 1 by default, so you should be fine unless you changed it explicitly.
/// </remarks>
public static async ValueTask<NatsResult<ulong>> TryPutWithTtlAsync<T>(this INatsKVStore store, string key, T value, TimeSpan ttl = default, INatsSerialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
var keyValidResult = TryValidateKey(key);
if (!keyValidResult.Success)
{
return keyValidResult.Error;
}

NatsHeaders? headers = default;
if (ttl != default)
{
headers = new NatsHeaders
{
{ NatsTtl, ToTtlString(ttl) },
};
}

var publishResult = await store.JetStreamContext.TryPublishAsync($"$KV.{store.Bucket}.{key}", value, serializer: serializer, headers: headers, cancellationToken: cancellationToken);
if (publishResult.Success)
{
var ack = publishResult.Value;
if (ack.Error != null)
{
return new NatsJSApiException(ack.Error);
}
else if (ack.Duplicate)
{
return new NatsJSDuplicateMessageException(ack.Seq);
}

return ack.Seq;
}
else
{
return publishResult.Error;
}
}

/// <summary>
/// Valid keys are \A[-/_=\.a-zA-Z0-9]+\z, additionally they may not start or end in .
/// </summary>
private static NatsResult TryValidateKey(string key)
{
if (string.IsNullOrWhiteSpace(key))
{
return KeyCannotBeEmptyException;
}

if (key[0] == '.' || key[^1] == '.')
{
return KeyCannotStartOrEndWithPeriodException;
}

if (!ValidKeyRegex.IsMatch(key))
{
return KeyContainsInvalidCharactersException;
}

return NatsResult.Default;
}

/// <summary>
/// For the TTL header, we need to convert the TimeSpan to a Go time.ParseDuration string.
/// </summary>
/// <param name="ttl">TTL</param>
/// <returns>String representing the number of seconds Go time.ParseDuration() can understand.</returns>
private static string ToTtlString(TimeSpan ttl)
=> ttl == TimeSpan.MaxValue ? "never" : $"{(int)ttl.TotalSeconds:D}s";

[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowException(Exception exception) => throw exception;
}
197 changes: 197 additions & 0 deletions test/IntegrationTests/Extensions/NatsExtensionsPutWithTtlTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
using NATS.Client.Core;
using NATS.Client.KeyValueStore;
using NATS.Net;

namespace CodeCargo.Nats.DistributedCache.IntegrationTests.Extensions;

/// <summary>
/// Integration tests for NatsExtensions.PutWithTtlAsync
/// </summary>
public class NatsExtensionsPutWithTtlTests(NatsIntegrationFixture fixture) : TestBase(fixture)
{
[Fact]
public async Task PutWithTtlAsync_StoresValue()
{
var key = MethodKey();
var value = "test-value";

var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

var revision = await store.PutWithTtlAsync(key, value);

Assert.True(revision > 0);

var entry = await store.GetEntryAsync<string>(key);
Assert.Equal(value, entry.Value);
}

[Fact]
public async Task PutWithTtlAsync_WithTtl_ExpiresAfterTtl()
{
var key = MethodKey();
var value = "test-value-ttl";
var ttl = TimeSpan.FromSeconds(2);

var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

await store.PutWithTtlAsync(key, value, ttl);

// Value should exist immediately
var entry = await store.TryGetEntryAsync<string>(key);
Assert.True(entry.Success);
Assert.Equal(value, entry.Value.Value);

// Wait for TTL to expire
await Task.Delay(TimeSpan.FromSeconds(3));

// Value should be expired
var expiredEntry = await store.TryGetEntryAsync<string>(key);
Assert.False(expiredEntry.Success);
}

[Fact]
public async Task PutWithTtlAsync_WithoutTtl_DoesNotExpire()
{
var key = MethodKey();
var value = "test-value-no-ttl";

var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

await store.PutWithTtlAsync(key, value);

// Value should exist immediately
var entry = await store.TryGetEntryAsync<string>(key);
Assert.True(entry.Success);
Assert.Equal(value, entry.Value.Value);

// Wait a bit
await Task.Delay(TimeSpan.FromSeconds(2));

// Value should still exist
var stillExists = await store.TryGetEntryAsync<string>(key);
Assert.True(stillExists.Success);
Assert.Equal(value, stillExists.Value.Value);
}

[Fact]
public async Task TryPutWithTtlAsync_WithEmptyKey_ReturnsError()
{
var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

var result = await store.TryPutWithTtlAsync(string.Empty, "value");

Assert.False(result.Success);
Assert.IsType<NatsKVException>(result.Error);
Assert.Contains("empty", result.Error.Message, StringComparison.OrdinalIgnoreCase);
}

[Fact]
public async Task TryPutWithTtlAsync_WithKeyStartingWithPeriod_ReturnsError()
{
var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

var result = await store.TryPutWithTtlAsync(".invalid-key", "value");

Assert.False(result.Success);
Assert.IsType<NatsKVException>(result.Error);
Assert.Contains("period", result.Error.Message, StringComparison.OrdinalIgnoreCase);
}

[Fact]
public async Task TryPutWithTtlAsync_WithKeyEndingWithPeriod_ReturnsError()
{
var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

var result = await store.TryPutWithTtlAsync("invalid-key.", "value");

Assert.False(result.Success);
Assert.IsType<NatsKVException>(result.Error);
Assert.Contains("period", result.Error.Message, StringComparison.OrdinalIgnoreCase);
}

[Fact]
public async Task TryPutWithTtlAsync_WithInvalidCharacters_ReturnsError()
{
var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

var result = await store.TryPutWithTtlAsync("invalid key with spaces", "value");

Assert.False(result.Success);
Assert.IsType<NatsKVException>(result.Error);
Assert.Contains("invalid", result.Error.Message, StringComparison.OrdinalIgnoreCase);
}

[Fact]
public async Task TryPutWithTtlAsync_WithValidKey_ReturnsSuccess()
{
var key = MethodKey();
var value = "test-value";

var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

var result = await store.TryPutWithTtlAsync(key, value);

Assert.True(result.Success);
Assert.True(result.Value > 0);
}

[Theory]
[InlineData("simple-key")]
[InlineData("key_with_underscore")]
[InlineData("key/with/slashes")]
[InlineData("key.with.periods")]
[InlineData("key=with=equals")]
[InlineData("MixedCase123")]
public async Task TryPutWithTtlAsync_WithValidKeyFormats_Succeeds(string key)
{
var value = "test-value";

var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

var result = await store.TryPutWithTtlAsync(key, value);

Assert.True(result.Success);

var entry = await store.TryGetEntryAsync<string>(key);
Assert.True(entry.Success);
Assert.Equal(value, entry.Value.Value);
}

[Fact]
public async Task PutWithTtlAsync_WithInvalidKey_ThrowsException()
{
var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

await Assert.ThrowsAsync<NatsKVException>(() =>
store.PutWithTtlAsync(string.Empty, "value").AsTask());
}

[Fact]
public async Task PutWithTtlAsync_OverwritesExistingValue()
{
var key = MethodKey();
var value1 = "first-value";
var value2 = "second-value";

var kvContext = NatsConnection.CreateKeyValueStoreContext();
var store = await kvContext.GetStoreAsync("cache");

var revision1 = await store.PutWithTtlAsync(key, value1);
var revision2 = await store.PutWithTtlAsync(key, value2);

Assert.True(revision2 > revision1);

var entry = await store.GetEntryAsync<string>(key);
Assert.Equal(value2, entry.Value);
}
}