Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
34ace26
refactor: update controller and finalizer interfaces to return `Resul…
kimpenhaus Jun 4, 2025
fddb472
Merge branch 'main' into result-pattern
kimpenhaus Jun 4, 2025
ac36c04
refactor: update controller and finalizer interfaces to return `Resul…
kimpenhaus Jun 4, 2025
e67f7b7
Merge remote-tracking branch 'origin/result-pattern' into result-pattern
kimpenhaus Jun 17, 2025
d31a132
refactor: mark `OperatorBuilderGenerator` as `sealed` and use constan…
kimpenhaus Jun 27, 2025
5e00fa3
feat: add FusionCache for resource watcher to enable L1 and L2 cachin…
kimpenhaus Jun 27, 2025
6d8def6
refactor: optimize resource watcher cache handling and remove redunda…
kimpenhaus Jun 30, 2025
3d2e607
refactor: enhance resource watcher cache configuration and logging scope
kimpenhaus Jul 1, 2025
f184048
refactor: rename cache extension methods and adjust visibility
kimpenhaus Jul 1, 2025
9bcf9c0
Merge branch 'buehler:main' into entity_cache
kimpenhaus Jul 7, 2025
5e4de87
refactor: update cache key prefix in `CacheExtensions` to use `CacheC…
kimpenhaus Jul 7, 2025
a8f2ba3
Merge remote-tracking branch 'origin/entity_cache' into entity_cache
kimpenhaus Jul 7, 2025
8ce68a8
docs: add caching documentation and adjust sidebar positions
kimpenhaus Jul 7, 2025
2a6f4fc
docs: enhance caching documentation with configuration examples and F…
kimpenhaus Jul 7, 2025
275878b
Merge branch 'buehler:main' into result-pattern
kimpenhaus Jul 9, 2025
66eef75
Merge branch 'entity_cache' into result-pattern
kimpenhaus Jul 9, 2025
d83914a
Merge branch 'main' into result-pattern
kimpenhaus Aug 19, 2025
1ad141c
refactor(watcher): streamline deletion logic and update FusionCache d…
kimpenhaus Sep 5, 2025
6ded18f
refactor(operator): remove unused FusionCache dependency from project…
kimpenhaus Sep 5, 2025
9dce9df
feat(queue): add `RequeueType` and enhance requeue handling
kimpenhaus Sep 8, 2025
12aeda4
refactor(reconciliation): introduce `Reconciler` to centralize entity…
kimpenhaus Sep 10, 2025
a9c122c
Merge branch 'dotnet:main' into result-pattern
kimpenhaus Sep 10, 2025
f1d20f6
feat(operator): add `IReconciler` registration in `OperatorBuilder`
kimpenhaus Sep 10, 2025
4fc59eb
feat(reconciler): enhance finalizer management with configurable auto…
kimpenhaus Sep 11, 2025
fda45f2
refactor(generator): enhance syntax model resolution to support const…
kimpenhaus Sep 12, 2025
e875b83
feat(finalizer): fix identifier generation and add unit tests
kimpenhaus Sep 12, 2025
84d97ef
test(finalizer): update and expand unit tests for entity finalizers
kimpenhaus Sep 12, 2025
d6d4b34
refactor(result): make `ErrorMessage` readonly and allow setting `Req…
kimpenhaus Sep 15, 2025
5c935aa
refactor(queue): replace `TimedEntityQueue` with `ITimedEntityQueue` …
kimpenhaus Sep 15, 2025
b222cac
feat(operator): introduce `LeaderElectionType` for configurable leade…
kimpenhaus Sep 15, 2025
88180ed
refactor(operator): expose `Settings` in `OperatorBuilder` and update…
kimpenhaus Sep 16, 2025
9bee002
feat(queue): add logging to `TimedEntityQueue` and update test implem…
kimpenhaus Sep 16, 2025
54c59b3
refactor(admission): seal `AdmissionStatus` and `MutationResult`; upd…
kimpenhaus Sep 23, 2025
152a5aa
Merge branch 'dotnet:main' into result-pattern
kimpenhaus Sep 23, 2025
d48c2a5
refactor(reconciliation): migrate `Result` to `ReconciliationResult` …
kimpenhaus Sep 24, 2025
f9aa5cf
Merge remote-tracking branch 'origin/result-pattern' into result-pattern
kimpenhaus Sep 24, 2025
415473c
refactor(syntax-receiver): update metadata names to reflect new recon…
kimpenhaus Sep 24, 2025
9df73fc
refactor(reconciliation): update imports to reflect new `Reconciliati…
kimpenhaus Sep 24, 2025
4f000e9
fix(reconciliation): ensure finalizers are executed for entities mark…
kimpenhaus Sep 24, 2025
d21719e
feat(watcher): integrate `FusionCache` for bookmark version caching i…
kimpenhaus Sep 25, 2025
c3b3af0
refactor(watcher): remove `FusionCache` usage for bookmark version ha…
kimpenhaus Sep 25, 2025
2e21cb8
refactor(watcher): remove `FusionCache` dependency in `LeaderAwareRes…
kimpenhaus Sep 25, 2025
5f5f3e8
feat(reconciliation): introduce `ReconciliationContext` and trigger s…
kimpenhaus Oct 2, 2025
efa9a91
refactor(queue): make `Enqueue` and `Remove` methods asynchronous
kimpenhaus Oct 10, 2025
b215401
refactor(queue): add `CancellationToken` support for `Enqueue` and `R…
kimpenhaus Oct 10, 2025
5bd1a77
Merge branch 'dotnet:main' into result-pattern
kimpenhaus Oct 20, 2025
daeba95
refactor(queue): remove unused `Reconciliation` import in `EntityRequ…
kimpenhaus Oct 21, 2025
24f2ca7
refactor(queue): add `JsonConstructor` to `RequeueEntry`
kimpenhaus Oct 21, 2025
299f24e
Merge branch 'dotnet:main' into result-pattern
kimpenhaus Oct 29, 2025
5b0fc66
refactor(reconciliation): consolidate event-specific reconciliation m…
kimpenhaus Oct 30, 2025
254b646
refactor(tests): remove unused `Reconciliation` folder entry from `.c…
kimpenhaus Oct 30, 2025
a6e01ba
refactor(docs): update reconciliation methods and examples to use `Re…
kimpenhaus Oct 30, 2025
53b2513
docs(operator): add advanced configuration guide and update related s…
kimpenhaus Oct 30, 2025
515d4cd
test: add comprehensive unit tests for reconciliation and queue logic
kimpenhaus Oct 30, 2025
8bd859b
test: extend reconciliation tests with finalizer handling and caching…
kimpenhaus Oct 30, 2025
1ceb116
test: exclude test projects from code coverage analysis
kimpenhaus Oct 30, 2025
7a65fed
chore(deps): upgrade `KubernetesClient` to version `18.0.5`
kimpenhaus Oct 29, 2025
014a335
refactor: update object initialization to use object initializer shor…
kimpenhaus Oct 31, 2025
464730a
refactor: apply object initializer shorthand and seal generator classes
kimpenhaus Oct 31, 2025
a8a1366
refactor: mark entities and tests as `sealed`, add null checks in fin…
kimpenhaus Oct 31, 2025
812252c
refactor: adjust formatting for consistency and readability across te…
kimpenhaus Oct 31, 2025
ed5c140
refactor: fixed whitespace formatting
kimpenhaus Oct 31, 2025
91f1caa
Merge branch 'k8s-client-v18' into result-pattern
kimpenhaus Oct 31, 2025
58ea528
chore: add Apache 2.0 license headers and improve readability
kimpenhaus Oct 31, 2025
1e63804
docs: remove details in finalizer configuration guide
kimpenhaus Oct 31, 2025
18450f6
docs: update advanced configuration guide with time synchronization tip
kimpenhaus Oct 31, 2025
42ea861
refactor: improve naming consistency
kimpenhaus Nov 3, 2025
b744026
Merge branch 'dotnet:main' into result-pattern
kimpenhaus Nov 4, 2025
067a773
try to fix CodeQL recommendations: https://codeql.github.com/codeql-q…
kimpenhaus Nov 4, 2025
17944e9
restore finalizer integration tests with new configuration options
kimpenhaus Nov 4, 2025
8bc4345
fixed `No service for type` test issues
kimpenhaus Nov 4, 2025
1289fa7
reconcile: move `Remove` calls to specific reconciliation methods
kimpenhaus Nov 4, 2025
cb78196
Merge branch 'main' into result-pattern
kimpenhaus Nov 9, 2025
9824d93
refactor: enforce `sealed` keyword for entity-related classes and upd…
kimpenhaus Nov 12, 2025
fe6ba3f
refactor: remove `IsFailure` property in favor of `!IsSuccess` usage
kimpenhaus Nov 12, 2025
7dadcbd
chore: updated readme.md to latest changes
kimpenhaus Nov 12, 2025
0e01663
refactor: remove `InternalsVisibleTo` attribute from AssemblyInfo.cs
kimpenhaus Nov 12, 2025
c4358a9
log: upgrade log level from Debug to Warning for missing finalizer sc…
kimpenhaus Nov 13, 2025
f19ca77
refactor: update `RequeueEntry` and `TimedQueueEntry` to improve init…
kimpenhaus Nov 13, 2025
6466988
refactor: replace factory method in `RequeueEntry` with object initia…
kimpenhaus Nov 13, 2025
dbc8643
refactor: add requeue strategy support and improve contextual logging…
kimpenhaus Nov 14, 2025
6c5ef30
refactor: simplify `TryAdd` call in `TimedQueueEntry` with compact ob…
kimpenhaus Nov 14, 2025
6f3f48d
refactor: convert `RequeueEntry` to readonly record struct for improv…
kimpenhaus Nov 14, 2025
84fe20b
Merge branch 'main' into result-pattern
kimpenhaus Nov 14, 2025
d1b285f
log: downgrade log level from Warning to Information for missing fina…
kimpenhaus Nov 14, 2025
e5a5fb8
Merge branch 'main' into result-pattern
kimpenhaus Nov 20, 2025
d4aa7b8
Merge branch 'main' into result-pattern
kimpenhaus Nov 20, 2025
310307e
Merge branch 'main' into result-pattern
kimpenhaus Nov 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: add FusionCache for resource watcher to enable L1 and L2 cachin…
…g (hybrid cache)

- Integrated FusionCache for robust caching in resource watchers.
- Enhanced default configuration with extensible settings in `OperatorSettings`.
- Improved concurrency handling using `SemaphoreSlim` for entity events.
- Updated tests and dependencies to reflect caching changes.
  • Loading branch information
kimpenhaus committed Jul 1, 2025
commit 5e00fa3fd04d6b26526dba6b95ecab6c09f18a79
16 changes: 14 additions & 2 deletions src/KubeOps.Abstractions/Builder/OperatorSettings.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using System.Text.RegularExpressions;

using ZiggyCreatures.Caching.Fusion;

namespace KubeOps.Abstractions.Builder;

/// <summary>
/// Operator settings.
/// </summary>
public sealed class OperatorSettings
public sealed partial class OperatorSettings
{
private const string DefaultOperatorName = "KubernetesOperator";
private const string NonCharReplacement = "-";
Expand All @@ -15,7 +17,7 @@ public sealed class OperatorSettings
/// Defaults to "kubernetesoperator" when not set.
/// </summary>
public string Name { get; set; } =
new Regex(@"(\W|_)", RegexOptions.CultureInvariant).Replace(
OperatorNameRegex().Replace(
DefaultOperatorName,
NonCharReplacement)
.ToLowerInvariant();
Expand Down Expand Up @@ -59,4 +61,14 @@ public sealed class OperatorSettings
/// The wait timeout if the lease cannot be acquired.
/// </summary>
public TimeSpan LeaderElectionRetryPeriod { get; set; } = TimeSpan.FromSeconds(2);

/// <summary>
/// Allows configuration of the FusionCache settings for resource watcher entity caching.
/// This property is optional and can be used to customize caching behavior for resource watcher entities.
/// If not set, a default cache configuration is applied.
/// </summary>
public Action<FusionCacheOptions>? ConfigureResourceWatcherEntityCache { get; set; }

[GeneratedRegex(@"(\W|_)", RegexOptions.CultureInvariant)]
private static partial Regex OperatorNameRegex();
}
1 change: 1 addition & 0 deletions src/KubeOps.Abstractions/KubeOps.Abstractions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<ItemGroup>
<PackageReference Include="KubernetesClient" Version="16.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.6"/>
<PackageReference Include="ZiggyCreatures.FusionCache" Version="2.3.0" />
</ItemGroup>

</Project>
15 changes: 15 additions & 0 deletions src/KubeOps.Operator/Builder/OperatorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

using ZiggyCreatures.Caching.Fusion;

namespace KubeOps.Operator.Builder;

internal sealed class OperatorBuilder : IOperatorBuilder
Expand All @@ -36,6 +38,13 @@ public OperatorBuilder(IServiceCollection services, OperatorSettings settings)

public IServiceCollection Services { get; }

private static Action<FusionCacheOptions> DefaultCacheConfiguration
=> options =>
{
options.DefaultEntryOptions
.SetDuration(Timeout.InfiniteTimeSpan);
};

public IOperatorBuilder AddController<TImplementation, TEntity>()
where TImplementation : class, IEntityController<TEntity>
where TEntity : IKubernetesObject<V1ObjectMeta>
Expand Down Expand Up @@ -111,6 +120,12 @@ private void AddOperatorBase()
Services.AddSingleton(_settings);
Services.AddSingleton(new ActivitySource(_settings.Name));

// add and configure resource watcher entity cache
Services
.AddFusionCache(CacheConstants.CacheNames.ResourceWatcher)
.WithOptions(
options => (_settings.ConfigureResourceWatcherEntityCache ?? DefaultCacheConfiguration).Invoke(options));

// Add the default configuration and the client separately. This allows external users to override either
// just the config (e.g. for integration tests) or to replace the whole client, e.g. with a mock.
// We also add the k8s.IKubernetes as a singleton service, in order to allow to access internal services
Expand Down
19 changes: 19 additions & 0 deletions src/KubeOps.Operator/Constants/CacheConstants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace KubeOps.Operator.Constants;

/// <summary>
/// Provides constant values used for caching purposes within the operator.
/// </summary>
public static class CacheConstants
{
/// <summary>
/// Contains constant values representing names used within the operator's caching mechanisms.
/// </summary>
public static class CacheNames
{
/// <summary>
/// Represents a constant string used as a name for the resource watcher
/// in the operator's caching mechanisms.
/// </summary>
public const string ResourceWatcher = "ResourceWatcher";
}
}
1 change: 1 addition & 0 deletions src/KubeOps.Operator/KubeOps.Operator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.6"/>
<PackageReference Include="ZiggyCreatures.FusionCache" Version="2.3.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

using ZiggyCreatures.Caching.Fusion;

namespace KubeOps.Operator.Watcher;

internal sealed class LeaderAwareResourceWatcher<TEntity>(
Expand All @@ -21,6 +23,7 @@ internal sealed class LeaderAwareResourceWatcher<TEntity>(
TimedEntityQueue<TEntity> queue,
OperatorSettings settings,
IEntityLabelSelector<TEntity> labelSelector,
IFusionCacheProvider cacheProvider,
IKubernetesClient client,
IHostApplicationLifetime hostApplicationLifetime,
LeaderElector elector)
Expand All @@ -31,6 +34,7 @@ internal sealed class LeaderAwareResourceWatcher<TEntity>(
queue,
settings,
labelSelector,
cacheProvider,
client)
where TEntity : IKubernetesObject<V1ObjectMeta>
{
Expand Down
73 changes: 52 additions & 21 deletions src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
using KubeOps.Abstractions.Entities;
using KubeOps.Abstractions.Finalizer;
using KubeOps.KubernetesClient;
using KubeOps.Operator.Constants;
using KubeOps.Operator.Logging;
using KubeOps.Operator.Queue;

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

using ZiggyCreatures.Caching.Fusion;

namespace KubeOps.Operator.Watcher;

public class ResourceWatcher<TEntity>(
Expand All @@ -28,12 +31,14 @@ public class ResourceWatcher<TEntity>(
TimedEntityQueue<TEntity> requeue,
OperatorSettings settings,
IEntityLabelSelector<TEntity> labelSelector,
IFusionCacheProvider cacheProvider,
IKubernetesClient client)
: IHostedService, IAsyncDisposable, IDisposable
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly ConcurrentDictionary<string, long> _entityCache = new();
private readonly ConcurrentDictionary<string, SemaphoreSlim> _entityLocks = new();

private readonly IFusionCache _entityCache = cacheProvider.GetCache(CacheConstants.CacheNames.ResourceWatcher);
private CancellationTokenSource _cancellationTokenSource = new();
private uint _watcherReconnectRetries;
private Task? _eventWatcher;
Expand Down Expand Up @@ -132,42 +137,68 @@ static async ValueTask CastAndDispose(IDisposable resource)

protected virtual async Task OnEventAsync(WatchEventType type, TEntity entity, CancellationToken cancellationToken)
{
SemaphoreSlim? semaphore;

switch (type)
{
case WatchEventType.Added:
if (_entityCache.TryAdd(entity.Uid(), entity.Generation() ?? 0))
semaphore = _entityLocks.GetOrAdd(entity.Uid(), _ => new(1, 1));
await semaphore.WaitAsync(cancellationToken);

try
{
// Only perform reconciliation if the entity was not already in the cache.
await ReconcileModificationAsync(entity, cancellationToken);
var cachedGeneration = await _entityCache.TryGetAsync<long?>(entity.Uid(), token: cancellationToken);

if (!cachedGeneration.HasValue)
{
// Only perform reconciliation if the entity was not already in the cache.
await _entityCache.SetAsync(entity.Uid(), entity.Generation() ?? 0, token: cancellationToken);
await ReconcileModificationAsync(entity, cancellationToken);
}
else
{
logger.LogDebug(
"""Received ADDED event for entity "{Kind}/{Name}" which was already in the cache. Skip event.""",
entity.Kind,
entity.Name());
}
}
else
finally
{
logger.LogDebug(
"""Received ADDED event for entity "{Kind}/{Name}" which was already in the cache. Skip event.""",
entity.Kind,
entity.Name());
semaphore.Release();
}

break;
case WatchEventType.Modified:
switch (entity)
{
case { Metadata.DeletionTimestamp: null }:
_entityCache.TryGetValue(entity.Uid(), out var cachedGeneration);
semaphore = _entityLocks.GetOrAdd(entity.Uid(), _ => new(1, 1));
await semaphore.WaitAsync(cancellationToken);

// Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed.
if (entity.Generation() <= cachedGeneration)
try
{
logger.LogDebug(
"""Entity "{Kind}/{Name}" modification did not modify generation. Skip event.""",
entity.Kind,
entity.Name());
return;
var cachedGeneration = await _entityCache.TryGetAsync<long?>(entity.Uid(), token: cancellationToken);

// Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed.
if (cachedGeneration.HasValue && cachedGeneration >= entity.Generation())
{
logger.LogDebug(
"""Entity "{Kind}/{Name}" modification did not modify generation. Skip event.""",
entity.Kind,
entity.Name());
return;
}

// update cached generation since generation now changed
await _entityCache.SetAsync(entity.Uid(), entity.Generation() ?? 1, token: cancellationToken);
await ReconcileModificationAsync(entity, cancellationToken);
}
finally
{
semaphore.Release();
}

// update cached generation since generation now changed
_entityCache.TryUpdate(entity.Uid(), entity.Generation() ?? 1, cachedGeneration);
await ReconcileModificationAsync(entity, cancellationToken);
break;
case { Metadata: { DeletionTimestamp: not null, Finalizers.Count: > 0 } }:
await ReconcileFinalizersSequentialAsync(entity, cancellationToken);
Expand Down Expand Up @@ -311,7 +342,7 @@ e.InnerException is EndOfStreamException &&
private async Task ReconcileDeletionAsync(TEntity entity, CancellationToken cancellationToken)
{
requeue.Remove(entity);
_entityCache.TryRemove(entity.Uid(), out _);
await _entityCache.RemoveAsync(entity.Uid(), token: cancellationToken);

await using var scope = provider.CreateAsyncScope();
var controller = scope.ServiceProvider.GetRequiredService<IEntityController<TEntity>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
using KubeOps.Abstractions.Builder;
using KubeOps.Abstractions.Entities;
using KubeOps.KubernetesClient;
using KubeOps.Operator.Constants;
using KubeOps.Operator.Queue;
using KubeOps.Operator.Watcher;

using Microsoft.Extensions.Logging;

using Moq;

using ZiggyCreatures.Caching.Fusion;

namespace KubeOps.Operator.Test.Watcher;

public sealed class ResourceWatcherTest
Expand All @@ -28,13 +31,27 @@ public async Task Restarting_Watcher_Should_Trigger_New_Watch()
var timedEntityQueue = new TimedEntityQueue<V1Pod>();
var operatorSettings = new OperatorSettings { Namespace = "unit-test" };
var kubernetesClient = Mock.Of<IKubernetesClient>();
var cache = Mock.Of<IFusionCache>();
var cacheProvider = Mock.Of<IFusionCacheProvider>();
var labelSelector = new DefaultEntityLabelSelector<V1Pod>();

Mock.Get(kubernetesClient)
.Setup(client => client.WatchAsync<V1Pod>("unit-test", null, null, true, It.IsAny<CancellationToken>()))
.Returns<string?, string?, string?, bool?, CancellationToken>((_, _, _, _, cancellationToken) => WaitForCancellationAsync<(WatchEventType, V1Pod)>(cancellationToken));

var resourceWatcher = new ResourceWatcher<V1Pod>(activitySource, logger, serviceProvider, timedEntityQueue, operatorSettings, labelSelector, kubernetesClient);
Mock.Get(cacheProvider)
.Setup(cp => cp.GetCache(It.Is<string>(s => s == CacheConstants.CacheNames.ResourceWatcher)))
.Returns(() => cache);

var resourceWatcher = new ResourceWatcher<V1Pod>(
activitySource,
logger,
serviceProvider,
timedEntityQueue,
operatorSettings,
labelSelector,
cacheProvider,
kubernetesClient);

// Act.
// Start and stop the watcher.
Expand Down