Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
388 changes: 384 additions & 4 deletions src/Polly.Core/ToBeRemoved/TimeProvider.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ private sealed class DisposableCancellationTokenSourcePool : CancellationTokenSo

protected override CancellationTokenSource GetCore(TimeSpan delay)
{
var source = new CancellationTokenSource();

if (IsCancellable(delay))
if (!IsCancellable(delay))
{
_timeProvider.CancelAfter(source, delay);
return new CancellationTokenSource();
}

return source;
return _timeProvider.CreateCancellationTokenSource(delay);
}

public override void Return(CancellationTokenSource source) => source.Dispose();
Expand Down
11 changes: 4 additions & 7 deletions src/Polly.Core/Utils/CancellationTokenSourcePool.Pooled.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@ internal abstract partial class CancellationTokenSourcePool
#if NET6_0_OR_GREATER
private sealed class PooledCancellationTokenSourcePool : CancellationTokenSourcePool
{
public static readonly PooledCancellationTokenSourcePool SystemInstance = new(TimeProvider.System);
public static readonly PooledCancellationTokenSourcePool SystemInstance = new();

public PooledCancellationTokenSourcePool(TimeProvider timeProvider) => _timeProvider = timeProvider;
private readonly ObjectPool<CancellationTokenSource> _pool;

private readonly ObjectPool<CancellationTokenSource> _pool = new(
static () => new CancellationTokenSource(),
static cts => true);
private readonly TimeProvider _timeProvider;
public PooledCancellationTokenSourcePool() => _pool = new(static () => new CancellationTokenSource(), static cts => true);

protected override CancellationTokenSource GetCore(TimeSpan delay)
{
var source = _pool.Get();

if (IsCancellable(delay))
{
_timeProvider.CancelAfter(source, delay);
source.CancelAfter(delay);
}

return source;
Expand Down
5 changes: 4 additions & 1 deletion src/Polly.Core/Utils/CancellationTokenSourcePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ public static CancellationTokenSourcePool Create(TimeProvider timeProvider)
{
return PooledCancellationTokenSourcePool.SystemInstance;
}
#endif

return new DisposableCancellationTokenSourcePool(timeProvider);
#else
return new DisposableCancellationTokenSourcePool(timeProvider);
#endif
}

public CancellationTokenSource Get(TimeSpan delay)
Expand Down
2 changes: 0 additions & 2 deletions test/Polly.Core.Tests/Hedging/HedgingTimeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public void Advance(TimeSpan diff)

public override long GetTimestamp() => TimeStampProvider();

public override void CancelAfter(CancellationTokenSource source, TimeSpan delay) => throw new NotSupportedException();

public override Task Delay(TimeSpan delayValue, CancellationToken cancellationToken = default)
{
var entry = new DelayEntry(delayValue, new TaskCompletionSource<bool>(), _utcNow.Add(delayValue));
Expand Down
96 changes: 96 additions & 0 deletions test/Polly.Core.Tests/Helpers/FakeTimeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ public override long GetTimestamp()
/// <returns>A string representing the provider's current time.</returns>
public override string ToString() => GetUtcNow().ToString("yyyy-MM-ddTHH:mm:ss.fff", CultureInfo.InvariantCulture);

/// <inheritdoc />
public override ITimer CreateTimer(TimerCallback callback, object? state, TimeSpan dueTime, TimeSpan period)
{
var timer = new Timer(this, callback, state);
_ = timer.Change(dueTime, period);
return timer;
}

internal void RemoveWaiter(Waiter waiter)
{
lock (Waiters)
Expand Down Expand Up @@ -277,3 +285,91 @@ public void InvokeCallback()
_callback(_state);
}
}

// This implements the timer abstractions and is a thin wrapper around a waiter object.
// The main role of this type is to create the waiter, add it to the waiter list, and ensure it gets
// removed from the waiter list when the dispose is disposed or collected.
internal sealed class Timer : ITimer
{
private const uint MaxSupportedTimeout = 0xfffffffe;

private Waiter? _waiter;
private FakeTimeProvider? _timeProvider;
private TimerCallback? _callback;
private object? _state;

public Timer(FakeTimeProvider timeProvider, TimerCallback callback, object? state)
{
_timeProvider = timeProvider;
_callback = callback;
_state = state;
}

public bool Change(TimeSpan dueTime, TimeSpan period)
{
var dueTimeMs = (long)dueTime.TotalMilliseconds;
var periodMs = (long)period.TotalMilliseconds;

if (_timeProvider == null)
{
// timer has been disposed
return false;
}

if (_waiter != null)
{
// remove any previous waiter
_timeProvider.RemoveWaiter(_waiter);
_waiter = null;
}

if (dueTimeMs < 0)
{
// this waiter will never wake up, so just bail
return true;
}

if (periodMs < 0 || periodMs == Timeout.Infinite)
{
// normalize
period = TimeSpan.Zero;
}

_waiter = new Waiter(_callback!, _state, period.Ticks);
_timeProvider.AddWaiter(_waiter, dueTime.Ticks);
return true;
}

// In case the timer is not disposed, this will remove the Waiter instance from the provider.
~Timer() => Dispose(false);

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

public ValueTask DisposeAsync()
{
Dispose(true);
GC.SuppressFinalize(this);
#if NET5_0_OR_GREATER
return ValueTask.CompletedTask;
#else
return default;
#endif
}

private void Dispose(bool _)
{
if (_waiter != null)
{
_timeProvider!.RemoveWaiter(_waiter);
_waiter = null;
}

_timeProvider = null;
_callback = null;
_state = null;
}
}
6 changes: 0 additions & 6 deletions test/Polly.Core.Tests/Helpers/MockTimeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,4 @@ public MockTimeProvider SetupDelayCancelled(TimeSpan delay, CancellationToken ca
Setup(x => x.Delay(delay, cancellationToken)).ThrowsAsync(new OperationCanceledException());
return this;
}

public MockTimeProvider SetupCancelAfterNow(TimeSpan delay)
{
Setup(v => v.CancelAfter(It.IsAny<CancellationTokenSource>(), delay)).Callback<CancellationTokenSource, TimeSpan>((cts, _) => cts.Cancel());
return this;
}
}
84 changes: 48 additions & 36 deletions test/Polly.Core.Tests/Timeout/TimeoutResilienceStrategyTests.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Microsoft.Extensions.Time.Testing;
using Moq;
using Polly.Telemetry;
using Polly.Timeout;
Expand All @@ -7,16 +8,16 @@ namespace Polly.Core.Tests.Timeout;
public class TimeoutResilienceStrategyTests : IDisposable
{
private readonly ResilienceStrategyTelemetry _telemetry;
private readonly MockTimeProvider _timeProvider;
private readonly FakeTimeProvider _timeProvider = new();
private readonly TimeoutStrategyOptions _options;
private readonly CancellationTokenSource _cancellationSource;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(12);

private readonly Mock<DiagnosticSource> _diagnosticSource = new();

public TimeoutResilienceStrategyTests()
{
_telemetry = TestUtilities.CreateResilienceTelemetry(_diagnosticSource.Object);
_timeProvider = new MockTimeProvider();
_options = new TimeoutStrategyOptions();
_cancellationSource = new CancellationTokenSource();
}
Expand Down Expand Up @@ -55,6 +56,8 @@ public async Task Execute_EnsureOnTimeoutCalled()

var called = false;
SetTimeout(_delay);

var executionTime = _delay + TimeSpan.FromSeconds(1);
_options.OnTimeout = args =>
{
args.Exception.Should().BeAssignableTo<OperationCanceledException>();
Expand All @@ -65,10 +68,14 @@ public async Task Execute_EnsureOnTimeoutCalled()
return default;
};

_timeProvider.SetupCancelAfterNow(_delay);

var sut = CreateSut();
await sut.Invoking(s => sut.ExecuteAsync(async token => await Task.Delay(_delay, token)).AsTask()).Should().ThrowAsync<TimeoutRejectedException>();
await sut.Invoking(s => sut.ExecuteAsync(async token =>
{
var delay = _timeProvider.Delay(executionTime, token);
_timeProvider.Advance(_delay);
await delay;
})
.AsTask()).Should().ThrowAsync<TimeoutRejectedException>();

called.Should().BeTrue();
_diagnosticSource.VerifyAll();
Expand All @@ -94,26 +101,37 @@ public async Task Execute_Timeout(bool defaultCancellationToken)
using var cts = new CancellationTokenSource();
CancellationToken token = defaultCancellationToken ? default : cts.Token;
SetTimeout(TimeSpan.FromSeconds(2));
_timeProvider.SetupCancelAfterNow(TimeSpan.FromSeconds(2));
var sut = CreateSut();

await sut
.Invoking(s => s.ExecuteAsync(async token => await Delay(token), token).AsTask())
.Invoking(s => s.ExecuteAsync(async token =>
{
var delay = _timeProvider.Delay(TimeSpan.FromSeconds(4), token);
_timeProvider.Advance(TimeSpan.FromSeconds(2));
await delay;
},
token)
.AsTask())
.Should().ThrowAsync<TimeoutRejectedException>()
.WithMessage("The operation didn't complete within the allowed timeout of '00:00:02'.");

_timeProvider.VerifyAll();
}

[Fact]
public async Task Execute_Timeout_EnsureStackTrace()
{
using var cts = new CancellationTokenSource();
SetTimeout(TimeSpan.FromSeconds(2));
_timeProvider.SetupCancelAfterNow(TimeSpan.FromSeconds(2));
var sut = CreateSut();

var outcome = await sut.ExecuteOutcomeAsync(async (c, _) => { await Delay(c.CancellationToken); return Outcome.FromResult("dummy"); }, ResilienceContext.Get(), "state");
var outcome = await sut.ExecuteOutcomeAsync(async (c, _) =>
{
var delay = _timeProvider.Delay(TimeSpan.FromSeconds(4), c.CancellationToken);
_timeProvider.Advance(TimeSpan.FromSeconds(2));
await delay;

return Outcome.FromResult("dummy");
},
ResilienceContext.Get(),
"state");
outcome.Exception.Should().BeOfType<TimeoutRejectedException>();
outcome.Exception!.StackTrace.Should().Contain("Execute_Timeout_EnsureStackTrace");
}
Expand All @@ -132,15 +150,18 @@ public async Task Execute_Cancelled_EnsureNoTimeout()
return default;
};

_timeProvider.Setup(v => v.CancelAfter(It.IsAny<CancellationTokenSource>(), delay));

var sut = CreateSut();

await sut.Invoking(s => s.ExecuteAsync(async token => await Delay(token, () => cts.Cancel()), cts.Token).AsTask())
.Should()
.ThrowAsync<OperationCanceledException>();
await sut.Invoking(s => s.ExecuteAsync(async token =>
{
var task = _timeProvider.Delay(delay, token);
cts.Cancel();
await task;
},
cts.Token).AsTask())
.Should()
.ThrowAsync<OperationCanceledException>();

_timeProvider.VerifyAll();
onTimeoutCalled.Should().BeFalse();

_diagnosticSource.Verify(v => v.IsEnabled("OnTimeout"), Times.Never());
Expand All @@ -153,7 +174,7 @@ public async Task Execute_NoTimeoutOrCancellation_EnsureCancellationTokenRestore

using var cts = new CancellationTokenSource();
SetTimeout(TimeSpan.FromSeconds(10));
_timeProvider.Setup(v => v.CancelAfter(It.IsAny<CancellationTokenSource>(), delay));
_timeProvider.Advance(delay);

var sut = CreateSut();

Expand All @@ -178,7 +199,6 @@ public async Task Execute_EnsureCancellationTokenRegistrationNotExecutedOnSynchr
// Arrange
using var cts = new CancellationTokenSource();
SetTimeout(TimeSpan.FromSeconds(10));
_timeProvider.Setup(v => v.CancelAfter(It.IsAny<CancellationTokenSource>(), TimeSpan.FromSeconds(10)));

var sut = CreateSut();

Expand All @@ -196,7 +216,13 @@ public async Task Execute_EnsureCancellationTokenRegistrationNotExecutedOnSynchr
// Act
try
{
await sut.ExecuteAsync(async token => await Delay(token, () => cts.Cancel()), cts.Token);
await sut.ExecuteAsync(async token =>
{
Task delayTask = Task.Delay(TimeSpan.FromSeconds(10), token);
cts.Cancel();
await delayTask;
},
cts.Token);
}
catch (OperationCanceledException)
{
Expand All @@ -209,19 +235,5 @@ public async Task Execute_EnsureCancellationTokenRegistrationNotExecutedOnSynchr

private void SetTimeout(TimeSpan timeout) => _options.TimeoutGenerator = args => new ValueTask<TimeSpan>(timeout);

private TimeoutResilienceStrategy CreateSut() => new(_options, _timeProvider.Object, _telemetry);

private static Task Delay(CancellationToken token, Action? onWaiting = null)
{
Task delayTask = Task.CompletedTask;

try
{
return Task.Delay(TimeSpan.FromSeconds(2), token);
}
finally
{
onWaiting?.Invoke();
}
}
private TimeoutResilienceStrategy CreateSut() => new(_options, _timeProvider, _telemetry);
}
Loading