Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
ResiliencePipelineRegistry is now disposable
  • Loading branch information
martintmk committed Aug 17, 2023
commit be2abdf07561638249da11464afd3c63a3d61a8e
71 changes: 46 additions & 25 deletions src/Polly.Core/CircuitBreaker/CircuitBreakerManualControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ namespace Polly.CircuitBreaker;
/// <remarks>
/// The instance of this class can be reused across multiple circuit breakers.
/// </remarks>
public sealed class CircuitBreakerManualControl : IDisposable
public sealed class CircuitBreakerManualControl
{
private readonly HashSet<Action> _onDispose = new();
private readonly object _lock = new();
private readonly HashSet<Func<ResilienceContext, Task>> _onIsolate = new();
private readonly HashSet<Func<ResilienceContext, Task>> _onReset = new();
private bool _isolated;
Expand All @@ -26,18 +26,31 @@ public CircuitBreakerManualControl()
/// <param name="isIsolated">Determines whether the circit breaker is isolated immediately after construction.</param>
public CircuitBreakerManualControl(bool isIsolated) => _isolated = isIsolated;

internal void Initialize(Func<ResilienceContext, Task> onIsolate, Func<ResilienceContext, Task> onReset, Action onDispose)
{
_onDispose.Add(onDispose);
_onIsolate.Add(onIsolate);
_onReset.Add(onReset);
internal bool IsEmpty => _onIsolate.Count == 0 && _onReset.Count == 0;

if (_isolated)
internal IDisposable Initialize(Func<ResilienceContext, Task> onIsolate, Func<ResilienceContext, Task> onReset)
{
lock (_lock)
{
var context = ResilienceContextPool.Shared.Get().Initialize<VoidResult>(isSynchronous: true);

// if the control indicates that circuit breaker should be isolated, we isolate it right away
IsolateAsync(context).GetAwaiter().GetResult();
_onIsolate.Add(onIsolate);
_onReset.Add(onReset);

if (_isolated)
{
var context = ResilienceContextPool.Shared.Get().Initialize<VoidResult>(isSynchronous: true);

// if the control indicates that circuit breaker should be isolated, we isolate it right away
IsolateAsync(context).GetAwaiter().GetResult();
}

return new RegistrationDisposable(() =>
{
lock (_lock)
{
_onIsolate.Remove(onIsolate);
_onReset.Remove(onReset);
}
});
}
}

Expand All @@ -54,7 +67,14 @@ internal async Task IsolateAsync(ResilienceContext context)

_isolated = true;

foreach (var action in _onIsolate)
Func<ResilienceContext, Task>[] callbacks;

lock (_lock)
{
callbacks = _onIsolate.ToArray();
}

foreach (var action in callbacks)
{
await action(context).ConfigureAwait(context.ContinueOnCapturedContext);
}
Expand Down Expand Up @@ -95,7 +115,14 @@ internal async Task CloseAsync(ResilienceContext context)

context.Initialize<VoidResult>(isSynchronous: false);

foreach (var action in _onReset)
Func<ResilienceContext, Task>[] callbacks;

lock (_lock)
{
callbacks = _onReset.ToArray();
}

foreach (var action in callbacks)
{
await action(context).ConfigureAwait(context.ContinueOnCapturedContext);
}
Expand All @@ -121,18 +148,12 @@ public async Task CloseAsync(CancellationToken cancellationToken = default)
}
}

/// <summary>
/// Disposes the current class.
/// </summary>
public void Dispose()
private class RegistrationDisposable : IDisposable
{
foreach (var action in _onDispose)
{
action();
}
private readonly Action _disposeAction;

public RegistrationDisposable(Action disposeAction) => _disposeAction = disposeAction;

_onDispose.Clear();
_onIsolate.Clear();
_onReset.Clear();
public void Dispose() => _disposeAction();
}
}
14 changes: 10 additions & 4 deletions src/Polly.Core/CircuitBreaker/CircuitBreakerResilienceStrategy.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
namespace Polly.CircuitBreaker;

internal sealed class CircuitBreakerResilienceStrategy<T> : ResilienceStrategy<T>
internal sealed class CircuitBreakerResilienceStrategy<T> : ResilienceStrategy<T>, IDisposable
{
private readonly Func<OutcomeArguments<T, CircuitBreakerPredicateArguments>, ValueTask<bool>> _handler;
private readonly CircuitStateController<T> _controller;
private readonly IDisposable? _manualControlRegistration;

public CircuitBreakerResilienceStrategy(
Func<OutcomeArguments<T, CircuitBreakerPredicateArguments>, ValueTask<bool>> handler,
Expand All @@ -15,10 +16,15 @@ public CircuitBreakerResilienceStrategy(
_controller = controller;

stateProvider?.Initialize(() => _controller.CircuitState, () => _controller.LastHandledOutcome);
manualControl?.Initialize(
_manualControlRegistration = manualControl?.Initialize(
async c => await _controller.IsolateCircuitAsync(c).ConfigureAwait(c.ContinueOnCapturedContext),
async c => await _controller.CloseCircuitAsync(c).ConfigureAwait(c.ContinueOnCapturedContext),
_controller.Dispose);
async c => await _controller.CloseCircuitAsync(c).ConfigureAwait(c.ContinueOnCapturedContext));
}

public void Dispose()
{
_manualControlRegistration?.Dispose();
_controller.Dispose();
}

protected internal override async ValueTask<Outcome<T>> ExecuteCore<TState>(Func<ResilienceContext, TState, ValueTask<Outcome<T>>> callback, ResilienceContext context, TState state)
Expand Down
3 changes: 2 additions & 1 deletion src/Polly.Core/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ Polly.CircuitBreaker.CircuitBreakerManualControl
Polly.CircuitBreaker.CircuitBreakerManualControl.CircuitBreakerManualControl() -> void
Polly.CircuitBreaker.CircuitBreakerManualControl.CircuitBreakerManualControl(bool isIsolated) -> void
Polly.CircuitBreaker.CircuitBreakerManualControl.CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
Polly.CircuitBreaker.CircuitBreakerManualControl.Dispose() -> void
Polly.CircuitBreaker.CircuitBreakerManualControl.IsolateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
Polly.CircuitBreaker.CircuitBreakerPredicateArguments
Polly.CircuitBreaker.CircuitBreakerPredicateArguments.CircuitBreakerPredicateArguments() -> void
Expand Down Expand Up @@ -167,6 +166,8 @@ Polly.Registry.ConfigureBuilderContext<TKey>.PipelineKey.get -> TKey
Polly.Registry.ResiliencePipelineProvider<TKey>
Polly.Registry.ResiliencePipelineProvider<TKey>.ResiliencePipelineProvider() -> void
Polly.Registry.ResiliencePipelineRegistry<TKey>
Polly.Registry.ResiliencePipelineRegistry<TKey>.Dispose() -> void
Polly.Registry.ResiliencePipelineRegistry<TKey>.DisposeAsync() -> System.Threading.Tasks.ValueTask
Polly.Registry.ResiliencePipelineRegistry<TKey>.GetOrAddPipeline(TKey key, System.Action<Polly.ResiliencePipelineBuilder!, Polly.Registry.ConfigureBuilderContext<TKey>!>! configure) -> Polly.ResiliencePipeline!
Polly.Registry.ResiliencePipelineRegistry<TKey>.GetOrAddPipeline(TKey key, System.Action<Polly.ResiliencePipelineBuilder!>! configure) -> Polly.ResiliencePipeline!
Polly.Registry.ResiliencePipelineRegistry<TKey>.GetOrAddPipeline<TResult>(TKey key, System.Action<Polly.ResiliencePipelineBuilder<TResult>!, Polly.Registry.ConfigureBuilderContext<TKey>!>! configure) -> Polly.ResiliencePipeline<TResult>!
Expand Down
26 changes: 23 additions & 3 deletions src/Polly.Core/Registry/ResiliencePipelineRegistry.TResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Polly.Registry;
public sealed partial class ResiliencePipelineRegistry<TKey> : ResiliencePipelineProvider<TKey>
where TKey : notnull
{
private sealed class GenericRegistry<TResult>
private sealed class GenericRegistry<TResult> : IDisposable, IAsyncDisposable
{
private readonly Func<ResiliencePipelineBuilder<TResult>> _activator;
private readonly ConcurrentDictionary<TKey, Action<ResiliencePipelineBuilder<TResult>, ConfigureBuilderContext<TKey>>> _builders;
Expand Down Expand Up @@ -52,14 +52,34 @@ public ResiliencePipeline<TResult> GetOrAdd(TKey key, Action<ResiliencePipelineB
#if NETCOREAPP3_0_OR_GREATER
return _strategies.GetOrAdd(key, static (_, factory) =>
{
return new ResiliencePipeline<TResult>(CreatePipelineComponent(factory.instance._activator, factory.context, factory.configure));
return new ResiliencePipeline<TResult>(CreatePipelineComponent(factory.instance._activator, factory.context, factory.configure), DisposeBehavior.Reject);
},
(instance: this, context, configure));
#else
return _strategies.GetOrAdd(key, _ => new ResiliencePipeline<TResult>(CreatePipelineComponent(_activator, context, configure)));
return _strategies.GetOrAdd(key, _ => new ResiliencePipeline<TResult>(CreatePipelineComponent(_activator, context, configure), DisposeBehavior.Reject));
#endif
}

public bool TryAddBuilder(TKey key, Action<ResiliencePipelineBuilder<TResult>, ConfigureBuilderContext<TKey>> configure) => _builders.TryAdd(key, configure);

public void Dispose()
{
foreach (var strategy in _strategies.Values)
{
strategy.DisposeHelper.ForceDispose();
}

_strategies.Clear();
}

public async ValueTask DisposeAsync()
{
foreach (var strategy in _strategies.Values)
{
await strategy.DisposeHelper.ForceDisposeAsync().ConfigureAwait(false);
}

_strategies.Clear();
}
}
}
71 changes: 68 additions & 3 deletions src/Polly.Core/Registry/ResiliencePipelineRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Polly.Registry;
/// These callbacks are called when the resilience pipeline is not yet cached and it's retrieved for the first time.
/// </para>
/// </remarks>
public sealed partial class ResiliencePipelineRegistry<TKey> : ResiliencePipelineProvider<TKey>
public sealed partial class ResiliencePipelineRegistry<TKey> : ResiliencePipelineProvider<TKey>, IDisposable, IAsyncDisposable
where TKey : notnull
{
private readonly Func<ResiliencePipelineBuilder> _activator;
Expand All @@ -28,6 +28,7 @@ public sealed partial class ResiliencePipelineRegistry<TKey> : ResiliencePipelin
private readonly Func<TKey, string> _builderNameFormatter;
private readonly IEqualityComparer<TKey> _builderComparer;
private readonly IEqualityComparer<TKey> _pipelineComparer;
private bool _disposed;

/// <summary>
/// Initializes a new instance of the <see cref="ResiliencePipelineRegistry{TKey}"/> class with the default comparer.
Expand Down Expand Up @@ -63,12 +64,16 @@ public ResiliencePipelineRegistry(ResiliencePipelineRegistryOptions<TKey> option
/// <inheritdoc/>
public override bool TryGetPipeline<TResult>(TKey key, [NotNullWhen(true)] out ResiliencePipeline<TResult>? pipeline)
{
EnsureNotDisposed();

return GetGenericRegistry<TResult>().TryGet(key, out pipeline);
}

/// <inheritdoc/>
public override bool TryGetPipeline(TKey key, [NotNullWhen(true)] out ResiliencePipeline? pipeline)
{
EnsureNotDisposed();

if (_pipelines.TryGetValue(key, out pipeline))
{
return true;
Expand All @@ -94,6 +99,8 @@ public ResiliencePipeline GetOrAddPipeline(TKey key, Action<ResiliencePipelineBu
{
Guard.NotNull(configure);

EnsureNotDisposed();

return GetOrAddPipeline(key, (builder, _) => configure(builder));
}

Expand All @@ -107,6 +114,8 @@ public ResiliencePipeline GetOrAddPipeline(TKey key, Action<ResiliencePipelineBu
{
Guard.NotNull(configure);

EnsureNotDisposed();

if (_pipelines.TryGetValue(key, out var pipeline))
{
return pipeline;
Expand All @@ -117,11 +126,11 @@ public ResiliencePipeline GetOrAddPipeline(TKey key, Action<ResiliencePipelineBu
#if NETCOREAPP3_0_OR_GREATER
return _pipelines.GetOrAdd(key, static (_, factory) =>
{
return new ResiliencePipeline(CreatePipelineComponent(factory.instance._activator, factory.context, factory.configure));
return new ResiliencePipeline(CreatePipelineComponent(factory.instance._activator, factory.context, factory.configure), DisposeBehavior.Reject);
},
(instance: this, context, configure));
#else
return _pipelines.GetOrAdd(key, _ => new ResiliencePipeline(CreatePipelineComponent(_activator, context, configure)));
return _pipelines.GetOrAdd(key, _ => new ResiliencePipeline(CreatePipelineComponent(_activator, context, configure), DisposeBehavior.Reject));
#endif
}

Expand All @@ -136,6 +145,8 @@ public ResiliencePipeline<TResult> GetOrAddPipeline<TResult>(TKey key, Action<Re
{
Guard.NotNull(configure);

EnsureNotDisposed();

return GetOrAddPipeline<TResult>(key, (builder, _) => configure(builder));
}

Expand All @@ -150,6 +161,8 @@ public ResiliencePipeline<TResult> GetOrAddPipeline<TResult>(TKey key, Action<Re
{
Guard.NotNull(configure);

EnsureNotDisposed();

return GetGenericRegistry<TResult>().GetOrAdd(key, configure);
}

Expand All @@ -167,6 +180,8 @@ public bool TryAddBuilder(TKey key, Action<ResiliencePipelineBuilder, ConfigureB
{
Guard.NotNull(configure);

EnsureNotDisposed();

return _builders.TryAdd(key, configure);
}

Expand All @@ -185,9 +200,51 @@ public bool TryAddBuilder<TResult>(TKey key, Action<ResiliencePipelineBuilder<TR
{
Guard.NotNull(configure);

EnsureNotDisposed();

return GetGenericRegistry<TResult>().TryAddBuilder(key, configure);
}

/// <inheritdoc/>
public void Dispose()
{
_disposed = true;

foreach (var pipeline in _pipelines.Values)
{
pipeline.DisposeHelper.ForceDispose();
}

_pipelines.Clear();

foreach (var disposable in _genericRegistry.Values.Cast<IDisposable>())
{
disposable.Dispose();
}

_genericRegistry.Clear();
}

/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
_disposed = true;

foreach (var pipeline in _pipelines.Values)
{
await pipeline.DisposeHelper.ForceDisposeAsync().ConfigureAwait(false);
}

_pipelines.Clear();

foreach (var disposable in _genericRegistry.Values.Cast<IAsyncDisposable>())
{
await disposable.DisposeAsync().ConfigureAwait(false);
}

_genericRegistry.Clear();
}

private static PipelineComponent CreatePipelineComponent<TBuilder>(
Func<TBuilder> activator,
ConfigureBuilderContext<TKey> context,
Expand Down Expand Up @@ -235,4 +292,12 @@ private GenericRegistry<TResult> GetGenericRegistry<TResult>()
_instanceNameFormatter);
});
}

private void EnsureNotDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException("ResiliencePipelineRegistry", "The resilience pipeline registry has been disposed and cannot be used anymore.");
}
}
}
4 changes: 2 additions & 2 deletions src/Polly.Core/ResiliencePipeline.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ static async (context, state) =>
}
}

private static ResilienceContext GetAsyncContext(CancellationToken cancellationToken) => GetAsyncContext<VoidResult>(cancellationToken);
private ResilienceContext GetAsyncContext(CancellationToken cancellationToken) => GetAsyncContext<VoidResult>(cancellationToken);

private static void InitializeAsyncContext(ResilienceContext context) => InitializeAsyncContext<VoidResult>(context);
private void InitializeAsyncContext(ResilienceContext context) => InitializeAsyncContext<VoidResult>(context);
}
9 changes: 7 additions & 2 deletions src/Polly.Core/ResiliencePipeline.AsyncT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ static async (context, state) =>
}
}

private static ResilienceContext GetAsyncContext<TResult>(CancellationToken cancellationToken)
private ResilienceContext GetAsyncContext<TResult>(CancellationToken cancellationToken)
{
var context = Pool.Get(cancellationToken);

Expand All @@ -199,5 +199,10 @@ private static ResilienceContext GetAsyncContext<TResult>(CancellationToken canc
return context;
}

private static void InitializeAsyncContext<TResult>(ResilienceContext context) => context.Initialize<TResult>(isSynchronous: false);
private void InitializeAsyncContext<TResult>(ResilienceContext context)
{
DisposeHelper.EnsureNotDisposed();

context.Initialize<TResult>(isSynchronous: false);
}
}
Loading