-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathTimeoutResilienceStrategy.cs
More file actions
90 lines (70 loc) · 3.6 KB
/
TimeoutResilienceStrategy.cs
File metadata and controls
90 lines (70 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
using Polly.Telemetry;
namespace Polly.Timeout;
internal sealed class TimeoutResilienceStrategy : ResilienceStrategy
{
private readonly ResilienceStrategyTelemetry _telemetry;
private readonly CancellationTokenSourcePool _cancellationTokenSourcePool;
public TimeoutResilienceStrategy(TimeoutStrategyOptions options, TimeProvider timeProvider, ResilienceStrategyTelemetry telemetry)
{
DefaultTimeout = options.Timeout;
TimeoutGenerator = options.TimeoutGenerator;
OnTimeout = options.OnTimeout;
_telemetry = telemetry;
_cancellationTokenSourcePool = CancellationTokenSourcePool.Create(timeProvider);
}
public TimeSpan DefaultTimeout { get; }
public Func<TimeoutGeneratorArguments, ValueTask<TimeSpan>>? TimeoutGenerator { get; }
public Func<OnTimeoutArguments, ValueTask>? OnTimeout { get; }
protected internal override async ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> callback,
ResilienceContext context,
TState state)
{
var timeout = DefaultTimeout;
if (TimeoutGenerator is not null)
{
timeout = await TimeoutGenerator!(new TimeoutGeneratorArguments(context)).ConfigureAwait(context.ContinueOnCapturedContext);
}
if (!TimeoutUtil.ShouldApplyTimeout(timeout))
{
// do nothing
return await callback(context, state).ConfigureAwait(context.ContinueOnCapturedContext);
}
var previousToken = context.CancellationToken;
var cancellationSource = _cancellationTokenSourcePool.Get(timeout);
context.CancellationToken = cancellationSource.Token;
var registration = CreateRegistration(cancellationSource, previousToken);
var outcome = await StrategyHelper.ExecuteCallbackSafeAsync(callback, context, state).ConfigureAwait(context.ContinueOnCapturedContext);
var isCancellationRequested = cancellationSource.IsCancellationRequested;
// execution is finished, clean up
context.CancellationToken = previousToken;
#pragma warning disable CA1849 // Call async methods when in an async method, OK here as the callback is synchronous
registration.Dispose();
#pragma warning restore CA1849 // Call async methods when in an async method
_cancellationTokenSourcePool.Return(cancellationSource);
// check the outcome
if (isCancellationRequested && outcome.Exception is OperationCanceledException e && !previousToken.IsCancellationRequested)
{
var args = new OnTimeoutArguments(context, e, timeout);
_telemetry.Report(new(ResilienceEventSeverity.Error, TimeoutConstants.OnTimeoutEvent), context, args);
if (OnTimeout is not null)
{
await OnTimeout(args).ConfigureAwait(context.ContinueOnCapturedContext);
}
var timeoutException = new TimeoutRejectedException(
$"The operation didn't complete within the allowed timeout of '{timeout}'.",
timeout,
e);
return Outcome.FromException<TResult>(timeoutException.TrySetStackTrace());
}
return outcome;
}
private static CancellationTokenRegistration CreateRegistration(CancellationTokenSource cancellationSource, CancellationToken previousToken)
{
if (previousToken.CanBeCanceled)
{
return previousToken.Register(static state => ((CancellationTokenSource)state!).Cancel(), cancellationSource, useSynchronizationContext: false);
}
return default;
}
}