diff --git a/docs/area-owners.md b/docs/area-owners.md index baeefb0e3f4f36..e8271c8e127185 100644 --- a/docs/area-owners.md +++ b/docs/area-owners.md @@ -129,6 +129,7 @@ Note: Editing this file doesn't update the mapping used by the `@msftbot` issue | area-System.Text.RegularExpressions | @ericstj | @buyaa-n @joperezr @steveharter | Consultants: @stephentoub | | area-System.Threading | @mangod9 | @kouvel | | | area-System.Threading.Channels | @ericstj | @buyaa-n @joperezr @steveharter | Consultants: @stephentoub | +| area-System.Threading.RateLimiting | @rafikiassumani-msft | @BrennanConroy @halter73 | Consultants: @eerhardt | | area-System.Threading.Tasks | @ericstj | @buyaa-n @joperezr @steveharter | Consultants: @stephentoub | | area-System.Transactions | @HongGit | @HongGit | | | area-System.Xml | @jeffhandley | @eiriktsarpalis @krwq @layomia | | diff --git a/src/libraries/System.Threading.Channels/src/System/Collections/Generic/Deque.cs b/src/libraries/Common/src/System/Collections/Generic/Deque.cs similarity index 91% rename from src/libraries/System.Threading.Channels/src/System/Collections/Generic/Deque.cs rename to src/libraries/Common/src/System/Collections/Generic/Deque.cs index 90fb36692d1e3e..4a770e1f7954fb 100644 --- a/src/libraries/System.Threading.Channels/src/System/Collections/Generic/Deque.cs +++ b/src/libraries/Common/src/System/Collections/Generic/Deque.cs @@ -69,6 +69,17 @@ public T PeekHead() return _array[_head]; } + public T PeekTail() + { + Debug.Assert(!IsEmpty); // caller's responsibility to make sure there are elements remaining + var index = _tail - 1; + if (index == -1) + { + index = _array.Length - 1; + } + return _array[index]; + } + public T DequeueTail() { Debug.Assert(!IsEmpty); // caller's responsibility to make sure there are elements remaining diff --git a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj index 37c4effae5ae43..16b643f1b52ff9 100644 --- a/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj +++ b/src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj @@ -11,7 +11,6 @@ System.Threading.Channel<T> - @@ -40,6 +39,8 @@ System.Threading.Channel<T> Link="Common\Internal\Padding.cs" /> + diff --git a/src/libraries/System.Threading.RateLimiting/System.Threading.RateLimiting.sln b/src/libraries/System.Threading.RateLimiting/System.Threading.RateLimiting.sln new file mode 100644 index 00000000000000..61a334428d6de7 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/System.Threading.RateLimiting.sln @@ -0,0 +1,93 @@ +Microsoft Visual Studio Solution File, Format Version 12.00 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestUtilities", "..\Common\tests\TestUtilities\TestUtilities.csproj", "{CAEE0409-CCC3-4EA6-AB54-177FD305D42D}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Bcl.AsyncInterfaces", "..\Microsoft.Bcl.AsyncInterfaces\ref\Microsoft.Bcl.AsyncInterfaces.csproj", "{39DA5B84-ECA2-42A2-BEBD-C056BDB8AD53}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Bcl.AsyncInterfaces", "..\Microsoft.Bcl.AsyncInterfaces\src\Microsoft.Bcl.AsyncInterfaces.csproj", "{F59F4FD7-EA00-47EA-A09A-6F76CB079F9B}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Runtime.CompilerServices.Unsafe", "..\System.Runtime.CompilerServices.Unsafe\ref\System.Runtime.CompilerServices.Unsafe.csproj", "{0D1C7DCB-970D-4099-AC9F-A01E75923EC6}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Runtime.CompilerServices.Unsafe", "..\System.Runtime.CompilerServices.Unsafe\src\System.Runtime.CompilerServices.Unsafe.ilproj", "{AF838F1D-5C1C-472B-B31C-9A3B7507BB4B}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Interop.DllImportGenerator", "..\System.Runtime.InteropServices\gen\DllImportGenerator\DllImportGenerator.csproj", "{1E52F495-578C-4FDB-86DD-87EAAE0A0BE7}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Interop.SourceGeneration", "..\System.Runtime.InteropServices\gen\Microsoft.Interop.SourceGeneration\Microsoft.Interop.SourceGeneration.csproj", "{25495BDC-0614-4FAC-B6EA-DF3F0E35A871}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Threading.RateLimiting", "ref\System.Threading.RateLimiting.csproj", "{FD274A80-0D68-48A0-9AC7-279C9E69BC63}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Threading.RateLimiting", "src\System.Threading.RateLimiting.csproj", "{CD96AFE9-0F7F-42FA-BBDA-F57EDCBB4609}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Threading.RateLimiting.Tests", "tests\System.Threading.RateLimiting.Tests.csproj", "{AE81EE9F-1240-4AF1-BF21-7F451B7859E5}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{6614EF7F-23FC-4809-AFF5-1ADBF1B6422C}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ref", "ref", "{111B1B5B-A004-4C05-9A8C-E0931DADA5FB}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{85204CF5-0C88-4BBB-9E70-D8CCED82ED3D}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {CAEE0409-CCC3-4EA6-AB54-177FD305D42D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CAEE0409-CCC3-4EA6-AB54-177FD305D42D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CAEE0409-CCC3-4EA6-AB54-177FD305D42D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CAEE0409-CCC3-4EA6-AB54-177FD305D42D}.Release|Any CPU.Build.0 = Release|Any CPU + {39DA5B84-ECA2-42A2-BEBD-C056BDB8AD53}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {39DA5B84-ECA2-42A2-BEBD-C056BDB8AD53}.Debug|Any CPU.Build.0 = Debug|Any CPU + {39DA5B84-ECA2-42A2-BEBD-C056BDB8AD53}.Release|Any CPU.ActiveCfg = Release|Any CPU + {39DA5B84-ECA2-42A2-BEBD-C056BDB8AD53}.Release|Any CPU.Build.0 = Release|Any CPU + {F59F4FD7-EA00-47EA-A09A-6F76CB079F9B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F59F4FD7-EA00-47EA-A09A-6F76CB079F9B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F59F4FD7-EA00-47EA-A09A-6F76CB079F9B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F59F4FD7-EA00-47EA-A09A-6F76CB079F9B}.Release|Any CPU.Build.0 = Release|Any CPU + {0D1C7DCB-970D-4099-AC9F-A01E75923EC6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0D1C7DCB-970D-4099-AC9F-A01E75923EC6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0D1C7DCB-970D-4099-AC9F-A01E75923EC6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0D1C7DCB-970D-4099-AC9F-A01E75923EC6}.Release|Any CPU.Build.0 = Release|Any CPU + {AF838F1D-5C1C-472B-B31C-9A3B7507BB4B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AF838F1D-5C1C-472B-B31C-9A3B7507BB4B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AF838F1D-5C1C-472B-B31C-9A3B7507BB4B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AF838F1D-5C1C-472B-B31C-9A3B7507BB4B}.Release|Any CPU.Build.0 = Release|Any CPU + {1E52F495-578C-4FDB-86DD-87EAAE0A0BE7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1E52F495-578C-4FDB-86DD-87EAAE0A0BE7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1E52F495-578C-4FDB-86DD-87EAAE0A0BE7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1E52F495-578C-4FDB-86DD-87EAAE0A0BE7}.Release|Any CPU.Build.0 = Release|Any CPU + {25495BDC-0614-4FAC-B6EA-DF3F0E35A871}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {25495BDC-0614-4FAC-B6EA-DF3F0E35A871}.Debug|Any CPU.Build.0 = Debug|Any CPU + {25495BDC-0614-4FAC-B6EA-DF3F0E35A871}.Release|Any CPU.ActiveCfg = Release|Any CPU + {25495BDC-0614-4FAC-B6EA-DF3F0E35A871}.Release|Any CPU.Build.0 = Release|Any CPU + {FD274A80-0D68-48A0-9AC7-279C9E69BC63}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FD274A80-0D68-48A0-9AC7-279C9E69BC63}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FD274A80-0D68-48A0-9AC7-279C9E69BC63}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FD274A80-0D68-48A0-9AC7-279C9E69BC63}.Release|Any CPU.Build.0 = Release|Any CPU + {CD96AFE9-0F7F-42FA-BBDA-F57EDCBB4609}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CD96AFE9-0F7F-42FA-BBDA-F57EDCBB4609}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CD96AFE9-0F7F-42FA-BBDA-F57EDCBB4609}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CD96AFE9-0F7F-42FA-BBDA-F57EDCBB4609}.Release|Any CPU.Build.0 = Release|Any CPU + {AE81EE9F-1240-4AF1-BF21-7F451B7859E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AE81EE9F-1240-4AF1-BF21-7F451B7859E5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AE81EE9F-1240-4AF1-BF21-7F451B7859E5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AE81EE9F-1240-4AF1-BF21-7F451B7859E5}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {CAEE0409-CCC3-4EA6-AB54-177FD305D42D} = {6614EF7F-23FC-4809-AFF5-1ADBF1B6422C} + {AE81EE9F-1240-4AF1-BF21-7F451B7859E5} = {6614EF7F-23FC-4809-AFF5-1ADBF1B6422C} + {39DA5B84-ECA2-42A2-BEBD-C056BDB8AD53} = {111B1B5B-A004-4C05-9A8C-E0931DADA5FB} + {0D1C7DCB-970D-4099-AC9F-A01E75923EC6} = {111B1B5B-A004-4C05-9A8C-E0931DADA5FB} + {FD274A80-0D68-48A0-9AC7-279C9E69BC63} = {111B1B5B-A004-4C05-9A8C-E0931DADA5FB} + {F59F4FD7-EA00-47EA-A09A-6F76CB079F9B} = {85204CF5-0C88-4BBB-9E70-D8CCED82ED3D} + {AF838F1D-5C1C-472B-B31C-9A3B7507BB4B} = {85204CF5-0C88-4BBB-9E70-D8CCED82ED3D} + {1E52F495-578C-4FDB-86DD-87EAAE0A0BE7} = {85204CF5-0C88-4BBB-9E70-D8CCED82ED3D} + {25495BDC-0614-4FAC-B6EA-DF3F0E35A871} = {85204CF5-0C88-4BBB-9E70-D8CCED82ED3D} + {CD96AFE9-0F7F-42FA-BBDA-F57EDCBB4609} = {85204CF5-0C88-4BBB-9E70-D8CCED82ED3D} + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {25036AEF-71B3-4C8A-891F-0350414F9A23} + EndGlobalSection +EndGlobal diff --git a/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs b/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs new file mode 100644 index 00000000000000..da3ac5b6f1e31e --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs @@ -0,0 +1,91 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// ------------------------------------------------------------------------------ +// Changes to this file must follow the https://aka.ms/api-review process. +// ------------------------------------------------------------------------------ + +namespace System.Threading.RateLimiting +{ + public sealed partial class ConcurrencyLimiter : System.Threading.RateLimiting.RateLimiter + { + public ConcurrencyLimiter(System.Threading.RateLimiting.ConcurrencyLimiterOptions options) { } + protected override System.Threading.RateLimiting.RateLimitLease AcquireCore(int permitCount) { throw null; } + protected override void Dispose(bool disposing) { } + protected override System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; } + public override int GetAvailablePermits() { throw null; } + protected override System.Threading.Tasks.ValueTask WaitAsyncCore(int permitCount, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + } + public sealed partial class ConcurrencyLimiterOptions + { + public ConcurrencyLimiterOptions(int permitLimit, System.Threading.RateLimiting.QueueProcessingOrder queueProcessingOrder, int queueLimit) { } + public int PermitLimit { get { throw null; } } + public int QueueLimit { get { throw null; } } + public System.Threading.RateLimiting.QueueProcessingOrder QueueProcessingOrder { get { throw null; } } + } + public static partial class MetadataName + { + public static System.Threading.RateLimiting.MetadataName ReasonPhrase { get { throw null; } } + public static System.Threading.RateLimiting.MetadataName RetryAfter { get { throw null; } } + public static System.Threading.RateLimiting.MetadataName Create(string name) { throw null; } + } + public sealed partial class MetadataName : System.IEquatable> + { + public MetadataName(string name) { } + public string Name { get { throw null; } } + public override bool Equals([System.Diagnostics.CodeAnalysis.NotNullWhenAttribute(true)] object? obj) { throw null; } + public bool Equals(System.Threading.RateLimiting.MetadataName? other) { throw null; } + public override int GetHashCode() { throw null; } + public static bool operator ==(System.Threading.RateLimiting.MetadataName left, System.Threading.RateLimiting.MetadataName right) { throw null; } + public static bool operator !=(System.Threading.RateLimiting.MetadataName left, System.Threading.RateLimiting.MetadataName right) { throw null; } + public override string ToString() { throw null; } + } + public enum QueueProcessingOrder + { + OldestFirst = 0, + NewestFirst = 1, + } + public abstract partial class RateLimiter : System.IAsyncDisposable, System.IDisposable + { + protected RateLimiter() { } + public System.Threading.RateLimiting.RateLimitLease Acquire(int permitCount = 1) { throw null; } + protected abstract System.Threading.RateLimiting.RateLimitLease AcquireCore(int permitCount); + public void Dispose() { } + protected virtual void Dispose(bool disposing) { } + public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; } + protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; } + public abstract int GetAvailablePermits(); + public System.Threading.Tasks.ValueTask WaitAsync(int permitCount = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + protected abstract System.Threading.Tasks.ValueTask WaitAsyncCore(int permitCount, System.Threading.CancellationToken cancellationToken); + } + public abstract partial class RateLimitLease : System.IDisposable + { + protected RateLimitLease() { } + public abstract bool IsAcquired { get; } + public abstract System.Collections.Generic.IEnumerable MetadataNames { get; } + public void Dispose() { } + protected virtual void Dispose(bool disposing) { } + public virtual System.Collections.Generic.IEnumerable> GetAllMetadata() { throw null; } + public abstract bool TryGetMetadata(string metadataName, out object? metadata); + public bool TryGetMetadata(System.Threading.RateLimiting.MetadataName metadataName, [System.Diagnostics.CodeAnalysis.MaybeNullAttribute] out T metadata) { throw null; } + } + public sealed partial class TokenBucketRateLimiter : System.Threading.RateLimiting.RateLimiter + { + public TokenBucketRateLimiter(System.Threading.RateLimiting.TokenBucketRateLimiterOptions options) { } + protected override System.Threading.RateLimiting.RateLimitLease AcquireCore(int tokenCount) { throw null; } + protected override void Dispose(bool disposing) { } + protected override System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; } + public override int GetAvailablePermits() { throw null; } + public bool TryReplenish() { throw null; } + protected override System.Threading.Tasks.ValueTask WaitAsyncCore(int tokenCount, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + } + public sealed partial class TokenBucketRateLimiterOptions + { + public TokenBucketRateLimiterOptions(int tokenLimit, System.Threading.RateLimiting.QueueProcessingOrder queueProcessingOrder, int queueLimit, System.TimeSpan replenishmentPeriod, int tokensPerPeriod, bool autoReplenishment = true) { } + public bool AutoReplenishment { get { throw null; } } + public int QueueLimit { get { throw null; } } + public System.Threading.RateLimiting.QueueProcessingOrder QueueProcessingOrder { get { throw null; } } + public System.TimeSpan ReplenishmentPeriod { get { throw null; } } + public int TokenLimit { get { throw null; } } + public int TokensPerPeriod { get { throw null; } } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.csproj b/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.csproj new file mode 100644 index 00000000000000..18ba469734f885 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.csproj @@ -0,0 +1,21 @@ + + + $(NetCoreAppCurrent);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum) + enable + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/libraries/System.Threading.RateLimiting/src/Resources/Strings.resx b/src/libraries/System.Threading.RateLimiting/src/Resources/Strings.resx new file mode 100644 index 00000000000000..0bbb8516816a24 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/Resources/Strings.resx @@ -0,0 +1,129 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + text/microsoft-resx + + + 2.0 + + + System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + {0} token(s) exceeds the token limit of {1}. + + + {0} permit(s) exceeds the permit limit of {1}. + + + Over 49 days is not supported. + + \ No newline at end of file diff --git a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj new file mode 100644 index 00000000000000..9e0d1804cedf13 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj @@ -0,0 +1,36 @@ + + + $(NetCoreAppCurrent);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum) + enable + APIs to help manage rate limiting. + +Commonly Used Types: +System.Threading.RateLimiting.RateLimiter +System.Threading.RateLimiting.ConcurrencyLimiter +System.Threading.RateLimiting.TokenBucketRateLimiter +System.Threading.RateLimiting.RateLimitLease + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs new file mode 100644 index 00000000000000..4ef7a3b721e4de --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -0,0 +1,306 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + /// + /// implementation that helps manage concurrent access to a resource. + /// + public sealed class ConcurrencyLimiter : RateLimiter + { + private int _permitCount; + private int _queueCount; + private bool _disposed; + + private readonly ConcurrencyLimiterOptions _options; + private readonly Deque _queue = new Deque(); + + private static readonly ConcurrencyLease SuccessfulLease = new ConcurrencyLease(true, null, 0); + private static readonly ConcurrencyLease FailedLease = new ConcurrencyLease(false, null, 0); + private static readonly ConcurrencyLease QueueLimitLease = new ConcurrencyLease(false, null, 0, "Queue limit reached"); + + // Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object + private object Lock => _queue; + + /// + /// Initializes the . + /// + /// Options to specify the behavior of the . + public ConcurrencyLimiter(ConcurrencyLimiterOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + _permitCount = _options.PermitLimit; + } + + /// + public override int GetAvailablePermits() => _permitCount; + + /// + protected override RateLimitLease AcquireCore(int permitCount) + { + // These amounts of resources can never be acquired + if (permitCount > _options.PermitLimit) + { + throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit)); + } + + ThrowIfDisposed(); + + // Return SuccessfulLease or FailedLease to indicate limiter state + if (permitCount == 0) + { + return _permitCount > 0 ? SuccessfulLease : FailedLease; + } + + // Perf: Check SemaphoreSlim implementation instead of locking + if (_permitCount >= permitCount) + { + lock (Lock) + { + if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) + { + return lease; + } + } + } + + return FailedLease; + } + + /// + protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default) + { + // These amounts of resources can never be acquired + if (permitCount > _options.PermitLimit) + { + throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit)); + } + + // Return SuccessfulLease if requestedCount is 0 and resources are available + if (permitCount == 0 && _permitCount > 0 && !_disposed) + { + return new ValueTask(SuccessfulLease); + } + + // Perf: Check SemaphoreSlim implementation instead of locking + lock (Lock) + { + if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) + { + return new ValueTask(lease); + } + + // Don't queue if queue limit reached + if (_queueCount + permitCount > _options.QueueLimit) + { + return new ValueTask(QueueLimitLease); + } + + TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + CancellationTokenRegistration ctr = default; + if (cancellationToken.CanBeCanceled) + { + ctr = cancellationToken.Register(static obj => + { + ((TaskCompletionSource)obj!).TrySetException(new OperationCanceledException()); + }, tcs); + } + + RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr); + _queue.EnqueueTail(request); + _queueCount += permitCount; + Debug.Assert(_queueCount <= _options.QueueLimit); + + return new ValueTask(request.Tcs.Task); + } + } + + private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease) + { + ThrowIfDisposed(); + + // if permitCount is 0 we want to queue it if there are no available permits + if (_permitCount >= permitCount && _permitCount != 0) + { + if (permitCount == 0) + { + // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available + lease = SuccessfulLease; + return true; + } + + // a. if there are no items queued we can lease + // b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest + if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst)) + { + _permitCount -= permitCount; + Debug.Assert(_permitCount >= 0); + lease = new ConcurrencyLease(true, this, permitCount); + return true; + } + } + + lease = null; + return false; + } + + private void Release(int releaseCount) + { + lock (Lock) + { + if (_disposed) + { + return; + } + + _permitCount += releaseCount; + Debug.Assert(_permitCount <= _options.PermitLimit); + + while (_queue.Count > 0) + { + RequestRegistration nextPendingRequest = + _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.PeekHead() + : _queue.PeekTail(); + + if (_permitCount >= nextPendingRequest.Count) + { + nextPendingRequest = + _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.DequeueHead() + : _queue.DequeueTail(); + + _permitCount -= nextPendingRequest.Count; + _queueCount -= nextPendingRequest.Count; + Debug.Assert(_queueCount >= 0); + Debug.Assert(_permitCount >= 0); + + ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count); + // Check if request was canceled + if (!nextPendingRequest.Tcs.TrySetResult(lease)) + { + // Queued item was canceled so add count back + _permitCount += nextPendingRequest.Count; + } + nextPendingRequest.CancellationTokenRegistration.Dispose(); + } + else + { + break; + } + } + } + } + + protected override void Dispose(bool disposing) + { + if (!disposing) + { + return; + } + + lock (Lock) + { + if (_disposed) + { + return; + } + _disposed = true; + while (_queue.Count > 0) + { + RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.DequeueHead() + : _queue.DequeueTail(); + next.CancellationTokenRegistration.Dispose(); + next.Tcs.SetResult(FailedLease); + } + } + } + + protected override ValueTask DisposeAsyncCore() + { + Dispose(true); + + return default; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(ConcurrencyLimiter)); + } + } + + private sealed class ConcurrencyLease : RateLimitLease + { + private static readonly string[] s_allMetadataNames = new[] { MetadataName.ReasonPhrase.Name }; + + private bool _disposed; + private readonly ConcurrencyLimiter? _limiter; + private readonly int _count; + private readonly string? _reason; + + public ConcurrencyLease(bool isAcquired, ConcurrencyLimiter? limiter, int count, string? reason = null) + { + IsAcquired = isAcquired; + _limiter = limiter; + _count = count; + _reason = reason; + + // No need to set the limiter if count is 0, Dispose will noop + Debug.Assert(count == 0 ? limiter is null : true); + } + + public override bool IsAcquired { get; } + + public override IEnumerable MetadataNames => s_allMetadataNames; + + public override bool TryGetMetadata(string metadataName, out object? metadata) + { + if (_reason is not null && metadataName == MetadataName.ReasonPhrase.Name) + { + metadata = _reason; + return true; + } + metadata = default; + return false; + } + + protected override void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + _disposed = true; + + _limiter?.Release(_count); + } + } + + private readonly struct RequestRegistration + { + public RequestRegistration(int requestedCount, TaskCompletionSource tcs, + CancellationTokenRegistration cancellationTokenRegistration) + { + Count = requestedCount; + // Perf: Use AsyncOperation instead + Tcs = tcs; + CancellationTokenRegistration = cancellationTokenRegistration; + } + + public int Count { get; } + + public TaskCompletionSource Tcs { get; } + + public CancellationTokenRegistration CancellationTokenRegistration { get; } + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiterOptions.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiterOptions.cs new file mode 100644 index 00000000000000..6fc635c9af3603 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiterOptions.cs @@ -0,0 +1,51 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Threading.RateLimiting +{ + /// + /// Options to specify the behavior of a . + /// + public sealed class ConcurrencyLimiterOptions + { + /// + /// Initializes the . + /// + /// Maximum number of permits that can be leased concurrently. + /// Determines the behaviour of when not enough resources can be leased. + /// Maximum number of permits that can be queued concurrently. + /// When or are less than 0. + public ConcurrencyLimiterOptions(int permitLimit, QueueProcessingOrder queueProcessingOrder, int queueLimit) + { + if (permitLimit < 0) + { + throw new ArgumentOutOfRangeException(nameof(permitLimit)); + } + if (queueLimit < 0) + { + throw new ArgumentOutOfRangeException(nameof(queueLimit)); + } + PermitLimit = permitLimit; + QueueProcessingOrder = queueProcessingOrder; + QueueLimit = queueLimit; + } + + /// + /// Maximum number of permits that can be leased concurrently. + /// + public int PermitLimit { get; } + + /// + /// Determines the behaviour of when not enough resources can be leased. + /// + /// + /// by default. + /// + public QueueProcessingOrder QueueProcessingOrder { get; } = QueueProcessingOrder.OldestFirst; + + /// + /// Maximum number of permits that can be queued concurrently. + /// + public int QueueLimit { get; } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/MetadataName.T.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/MetadataName.T.cs new file mode 100644 index 00000000000000..acc8f1d58ad742 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/MetadataName.T.cs @@ -0,0 +1,81 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.CodeAnalysis; + +namespace System.Threading.RateLimiting +{ + /// + /// A strongly-typed name of metadata that can be stored in a . + /// + /// The type the metadata will be. + public sealed class MetadataName : IEquatable> + { + private readonly string _name; + + /// + /// Constructs a object with the given name. + /// + /// The name of the object. + public MetadataName(string name) + { + _name = name ?? throw new ArgumentNullException(nameof(name)); + } + + /// + /// Gets the name of the metadata. + /// + public string Name => _name; + + /// + public override string ToString() + { + return _name; + } + + /// + public override int GetHashCode() + { + return _name.GetHashCode(); + } + + /// + public override bool Equals([NotNullWhen(true)] object? obj) + { + return obj is MetadataName m && Equals(m); + } + + /// + public bool Equals(MetadataName? other) + { + if (other is null) + { + return false; + } + + return string.Equals(_name, other._name, StringComparison.Ordinal); + } + + /// + /// Determines whether two are equal to each other. + /// + /// + /// + /// + public static bool operator ==(MetadataName left, MetadataName right) + { + return left.Equals(right); + } + + /// + /// Determines whether two are not equal to each other. + /// + /// + /// + /// + public static bool operator !=(MetadataName left, MetadataName right) + { + return !(left == right); + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/MetadataName.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/MetadataName.cs new file mode 100644 index 00000000000000..554b5b3365e50e --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/MetadataName.cs @@ -0,0 +1,30 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Threading.RateLimiting +{ + /// + /// Contains some common metadata name-type pairs and helper method to create a metadata name. + /// + public static class MetadataName + { + /// + /// Metadata put on a failed lease acquisition to specify when to retry acquiring a lease. + /// For example, used in which periodically replenishes leases. + /// + public static MetadataName RetryAfter { get; } = Create("RETRY_AFTER"); + + /// + /// Metadata put on a failed lease acquisition to specify the reason the lease failed. + /// + public static MetadataName ReasonPhrase { get; } = Create("REASON_PHRASE"); + + /// + /// Create a strongly-typed metadata name. + /// + /// Type that the metadata will contain. + /// Name of the metadata. + /// + public static MetadataName Create(string name) => new MetadataName(name); + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/QueueProcessingOrder.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/QueueProcessingOrder.cs new file mode 100644 index 00000000000000..a89a299a542027 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/QueueProcessingOrder.cs @@ -0,0 +1,20 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Threading.RateLimiting +{ + /// + /// Controls the behavior of when not enough resources can be leased. + /// + public enum QueueProcessingOrder + { + /// + /// Lease the oldest queued . + /// + OldestFirst, + /// + /// Lease the newest queued . + /// + NewestFirst + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitLease.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitLease.cs new file mode 100644 index 00000000000000..7d5c43569c0e4b --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimitLease.cs @@ -0,0 +1,89 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; + +namespace System.Threading.RateLimiting +{ + /// + /// Abstraction for leases returned by implementations. + /// A lease represents the success or failure to acquire a resource and contains potential metadata that is relevant to the acquisition operation. + /// + public abstract class RateLimitLease : IDisposable + { + /// + /// Represents whether lease acquisition was successful. + /// + public abstract bool IsAcquired { get; } + + /// + /// Attempt to extract metadata for the lease. + /// + /// The name of the metadata. Some common ones can be found in . + /// The metadata object if it exists. + /// True if the metadata exists, otherwise false. + public abstract bool TryGetMetadata(string metadataName, out object? metadata); + + /// + /// Attempt to extract a strongly-typed metadata for the lease. + /// + /// Type of the expected metadata. + /// The name of the strongly-typed metadata. Some common ones can be found in . + /// The strongly-typed metadata object if it exists. + /// True if the metadata exists, otherwise false. + public bool TryGetMetadata(MetadataName metadataName, [MaybeNull] out T metadata) + { + if (metadataName.Name == null) + { + metadata = default; + return false; + } + + bool successful = TryGetMetadata(metadataName.Name, out object? rawMetadata); + if (successful) + { + metadata = rawMetadata is null ? default : (T)rawMetadata; + return true; + } + + metadata = default; + return false; + } + + /// + /// Gets a list of the metadata names that are available on the lease. + /// + public abstract IEnumerable MetadataNames { get; } + + /// + /// Gets a list of all the metadata that is available on the lease. + /// + /// List of key-value pairs of metadata name and metadata object. + public virtual IEnumerable> GetAllMetadata() + { + foreach (string name in MetadataNames) + { + if (TryGetMetadata(name, out object? metadata)) + { + yield return new KeyValuePair(name, metadata); + } + } + } + + /// + /// Dispose the lease. This may free up space on the limiter implementation the lease came from. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Dispose method for implementations to write. + /// + /// + protected virtual void Dispose(bool disposing) { } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs new file mode 100644 index 00000000000000..377ce911e9f2d0 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/RateLimiter.cs @@ -0,0 +1,118 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + /// + /// Represents a limiter type that users interact with to determine if an operation can proceed. + /// + public abstract class RateLimiter : IAsyncDisposable, IDisposable + { + /// + /// An estimated count of available permits. + /// + /// + public abstract int GetAvailablePermits(); + + /// + /// Fast synchronous attempt to acquire permits. + /// + /// + /// Set to 0 to get whether permits are exhausted. + /// + /// Number of permits to try and acquire. + /// A successful or failed lease. + /// + public RateLimitLease Acquire(int permitCount = 1) + { + if (permitCount < 0) + { + throw new ArgumentOutOfRangeException(nameof(permitCount)); + } + + return AcquireCore(permitCount); + } + + /// + /// Method that implementations implement for . + /// + /// Number of permits to try and acquire. + /// + protected abstract RateLimitLease AcquireCore(int permitCount); + + /// + /// Wait until the requested permits are available or permits can no longer be acquired. + /// + /// + /// Set to 0 to wait until permits are replenished. + /// + /// Number of permits to try and acquire. + /// Optional token to allow canceling a queued request for permits. + /// A task that completes when the requested permits are acquired or when the requested permits are denied. + /// + public ValueTask WaitAsync(int permitCount = 1, CancellationToken cancellationToken = default) + { + if (permitCount < 0) + { + throw new ArgumentOutOfRangeException(nameof(permitCount)); + } + + if (cancellationToken.IsCancellationRequested) + { + return new ValueTask(Task.FromCanceled(cancellationToken)); + } + + return WaitAsyncCore(permitCount, cancellationToken); + } + + /// + /// Method that implementations implement for . + /// + /// Number of permits to try and acquire. + /// Optional token to allow canceling a queued request for permits. + /// A task that completes when the requested permits are acquired or when the requested permits are denied. + protected abstract ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken); + + /// + /// Dispose method for implementations to write. + /// + /// + protected virtual void Dispose(bool disposing) { } + + /// + /// Disposes the RateLimiter. This completes any queued acquires with a failed lease. + /// + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + /// + /// DisposeAsync method for implementations to write. + /// + protected virtual ValueTask DisposeAsyncCore() + { + return default; + } + + /// + /// Disposes the RateLimiter asynchronously. + /// + /// ValueTask representin the completion of the disposal. + public async ValueTask DisposeAsync() + { + // Perform async cleanup. + await DisposeAsyncCore().ConfigureAwait(false); + + // Dispose of unmanaged resources. + Dispose(false); + + // Suppress finalization. + GC.SuppressFinalize(this); + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs new file mode 100644 index 00000000000000..bb1ec82f3fff01 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -0,0 +1,369 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + /// + /// implementation that replenishes tokens periodically instead of via a release mechanism. + /// + public sealed class TokenBucketRateLimiter : RateLimiter + { + private int _tokenCount; + private int _queueCount; + private uint _lastReplenishmentTick = (uint)Environment.TickCount; + private bool _disposed; + + private readonly Timer? _renewTimer; + private readonly TokenBucketRateLimiterOptions _options; + private readonly Deque _queue = new Deque(); + + // Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object + private object Lock => _queue; + + private static readonly RateLimitLease SuccessfulLease = new TokenBucketLease(true, null); + private static readonly RateLimitLease FailedLease = new TokenBucketLease(false, null); + + /// + /// Initializes the . + /// + /// Options to specify the behavior of the . + public TokenBucketRateLimiter(TokenBucketRateLimiterOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + _tokenCount = options.TokenLimit; + + if (_options.AutoReplenishment) + { + _renewTimer = new Timer(Replenish, this, _options.ReplenishmentPeriod, _options.ReplenishmentPeriod); + } + } + + /// + public override int GetAvailablePermits() => _tokenCount; + + /// + protected override RateLimitLease AcquireCore(int tokenCount) + { + // These amounts of resources can never be acquired + if (tokenCount > _options.TokenLimit) + { + throw new ArgumentOutOfRangeException(nameof(tokenCount), tokenCount, SR.Format(SR.TokenLimitExceeded, tokenCount, _options.TokenLimit)); + } + + // Return SuccessfulLease or FailedLease depending to indicate limiter state + if (tokenCount == 0 && !_disposed) + { + if (_tokenCount > 0) + { + return SuccessfulLease; + } + + return CreateFailedTokenLease(tokenCount); + } + + lock (Lock) + { + if (TryLeaseUnsynchronized(tokenCount, out RateLimitLease? lease)) + { + return lease; + } + + return CreateFailedTokenLease(tokenCount); + } + } + + /// + protected override ValueTask WaitAsyncCore(int tokenCount, CancellationToken cancellationToken = default) + { + // These amounts of resources can never be acquired + if (tokenCount > _options.TokenLimit) + { + throw new ArgumentOutOfRangeException(nameof(tokenCount), tokenCount, SR.Format(SR.TokenLimitExceeded, tokenCount, _options.TokenLimit)); + } + + ThrowIfDisposed(); + + // Return SuccessfulAcquisition if requestedCount is 0 and resources are available + if (tokenCount == 0 && _tokenCount > 0) + { + return new ValueTask(SuccessfulLease); + } + + lock (Lock) + { + if (TryLeaseUnsynchronized(tokenCount, out RateLimitLease? lease)) + { + return new ValueTask(lease); + } + + // Don't queue if queue limit reached + if (_queueCount + tokenCount > _options.QueueLimit) + { + return new ValueTask(CreateFailedTokenLease(tokenCount)); + } + + TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + CancellationTokenRegistration ctr = default; + if (cancellationToken.CanBeCanceled) + { + ctr = cancellationToken.Register(static obj => + { + ((TaskCompletionSource)obj!).TrySetException(new OperationCanceledException()); + }, tcs); + } + + RequestRegistration registration = new RequestRegistration(tokenCount, tcs, ctr); + _queue.EnqueueTail(registration); + _queueCount += tokenCount; + Debug.Assert(_queueCount <= _options.QueueLimit); + + // handle cancellation + return new ValueTask(registration.Tcs.Task); + } + } + + private RateLimitLease CreateFailedTokenLease(int tokenCount) + { + int replenishAmount = tokenCount - _tokenCount + _queueCount; + // can't have 0 replenish periods, that would mean it should be a successful lease + // if TokensPerPeriod is larger than the replenishAmount needed then it would be 0 + Debug.Assert(_options.TokensPerPeriod > 0); + int replenishPeriods = Math.Max(replenishAmount / _options.TokensPerPeriod, 1); + + return new TokenBucketLease(false, TimeSpan.FromTicks(_options.ReplenishmentPeriod.Ticks * replenishPeriods)); + } + + private bool TryLeaseUnsynchronized(int tokenCount, [NotNullWhen(true)] out RateLimitLease? lease) + { + ThrowIfDisposed(); + + // if permitCount is 0 we want to queue it if there are no available permits + if (_tokenCount >= tokenCount && _tokenCount != 0) + { + if (tokenCount == 0) + { + // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available + lease = SuccessfulLease; + return true; + } + + // a. if there are no items queued we can lease + // b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest + if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst)) + { + _tokenCount -= tokenCount; + Debug.Assert(_tokenCount >= 0); + lease = SuccessfulLease; + return true; + } + } + + lease = null; + return false; + } + + /// + /// Attempts to replenish the bucket. + /// + /// + /// False if is enabled, otherwise true. + /// Does not reflect if tokens were replenished. + /// + public bool TryReplenish() + { + if (_options.AutoReplenishment) + { + return false; + } + Replenish(this); + return true; + } + + private static void Replenish(object? state) + { + TokenBucketRateLimiter limiter = (state as TokenBucketRateLimiter)!; + Debug.Assert(limiter is not null); + + // Use Environment.TickCount instead of DateTime.UtcNow to avoid issues on systems where the clock can change + uint nowTicks = (uint)Environment.TickCount; + limiter!.ReplenishInternal(nowTicks); + } + + // Used in tests that test behavior with specific time intervals + private void ReplenishInternal(uint nowTicks) + { + bool wrapped = false; + // (uint)TickCount will wrap every ~50 days, we can detect that by checking if the new ticks is less than the last replenishment + if (nowTicks < _lastReplenishmentTick) + { + wrapped = true; + } + + // method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes + lock (Lock) + { + if (_disposed) + { + return; + } + + // Fix the wrapping by using a long and adding uint.MaxValue in the wrapped case + long nonWrappedTicks = wrapped ? (long)nowTicks + uint.MaxValue : nowTicks; + if (nonWrappedTicks - _lastReplenishmentTick < _options.ReplenishmentPeriod.TotalMilliseconds) + { + return; + } + + _lastReplenishmentTick = nowTicks; + + int availablePermits = _tokenCount; + TokenBucketRateLimiterOptions options = _options; + int maxPermits = options.TokenLimit; + int resourcesToAdd; + + if (availablePermits < maxPermits) + { + resourcesToAdd = Math.Min(options.TokensPerPeriod, maxPermits - availablePermits); + } + else + { + // All tokens available, nothing to do + return; + } + + // Process queued requests + Deque queue = _queue; + + _tokenCount += resourcesToAdd; + Debug.Assert(_tokenCount <= _options.TokenLimit); + while (queue.Count > 0) + { + RequestRegistration nextPendingRequest = + options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? queue.PeekHead() + : queue.PeekTail(); + + if (_tokenCount >= nextPendingRequest.Count) + { + // Request can be fulfilled + nextPendingRequest = + options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? queue.DequeueHead() + : queue.DequeueTail(); + + _queueCount -= nextPendingRequest.Count; + _tokenCount -= nextPendingRequest.Count; + Debug.Assert(_queueCount >= 0); + Debug.Assert(_tokenCount >= 0); + + if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + { + // Queued item was canceled so add count back + _tokenCount += nextPendingRequest.Count; + } + nextPendingRequest.CancellationTokenRegistration.Dispose(); + } + else + { + // Request cannot be fulfilled + break; + } + } + } + } + + protected override void Dispose(bool disposing) + { + if (!disposing) + { + return; + } + + lock (Lock) + { + if (_disposed) + { + return; + } + _disposed = true; + _renewTimer?.Dispose(); + while (_queue.Count > 0) + { + RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.DequeueHead() + : _queue.DequeueTail(); + next.CancellationTokenRegistration.Dispose(); + next.Tcs.SetResult(FailedLease); + } + } + } + + protected override ValueTask DisposeAsyncCore() + { + Dispose(true); + + return default; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TokenBucketRateLimiter)); + } + } + + private sealed class TokenBucketLease : RateLimitLease + { + private static readonly string[] s_allMetadataNames = new[] { MetadataName.RetryAfter.Name }; + + private readonly TimeSpan? _retryAfter; + + public TokenBucketLease(bool isAcquired, TimeSpan? retryAfter) + { + IsAcquired = isAcquired; + _retryAfter = retryAfter; + } + + public override bool IsAcquired { get; } + + public override IEnumerable MetadataNames => s_allMetadataNames; + + public override bool TryGetMetadata(string metadataName, out object? metadata) + { + if (metadataName == MetadataName.RetryAfter.Name && _retryAfter.HasValue) + { + metadata = _retryAfter.Value; + return true; + } + + metadata = default; + return false; + } + } + + private readonly struct RequestRegistration + { + public RequestRegistration(int tokenCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) + { + Count = tokenCount; + // Use VoidAsyncOperationWithData instead + Tcs = tcs; + CancellationTokenRegistration = cancellationTokenRegistration; + } + + public int Count { get; } + + public TaskCompletionSource Tcs { get; } + + public CancellationTokenRegistration CancellationTokenRegistration { get; } + + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiterOptions.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiterOptions.cs new file mode 100644 index 00000000000000..85fcf75cc15070 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiterOptions.cs @@ -0,0 +1,95 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Threading.RateLimiting +{ + /// + /// Options to control the behavior of a . + /// + public sealed class TokenBucketRateLimiterOptions + { + /// + /// Initializes the . + /// + /// Maximum number of tokens that can be in the token bucket. + /// + /// Maximum number of unprocessed tokens waiting via . + /// + /// Specifies how often tokens can be replenished. Replenishing is triggered either by an internal timer if is true, or by calling . + /// + /// Specified how many tokens can be added to the token bucket on a successful replenish. Available token count will not exceed . + /// + /// Specifies whether token replenishment will be handled by the or by another party via . + /// + /// When , , or are less than 0 + /// or when is more than 49 days. + public TokenBucketRateLimiterOptions( + int tokenLimit, + QueueProcessingOrder queueProcessingOrder, + int queueLimit, + TimeSpan replenishmentPeriod, + int tokensPerPeriod, + bool autoReplenishment = true) + { + if (tokenLimit < 0) + { + throw new ArgumentOutOfRangeException(nameof(tokenLimit)); + } + if (queueLimit < 0) + { + throw new ArgumentOutOfRangeException(nameof(queueLimit)); + } + if (tokensPerPeriod <= 0) + { + throw new ArgumentOutOfRangeException(nameof(tokensPerPeriod)); + } + if (replenishmentPeriod.TotalDays > 49) + { + // Environment.TickCount is an int and represents milliseconds since system started + // it has a range of -2B - +2B, we cast it to a uint to get a range of 0 - 4B which is 49.7 days before the value will repeat + throw new ArgumentOutOfRangeException(nameof(replenishmentPeriod), replenishmentPeriod, SR.ReplenishmentLimitTooHigh); + } + + TokenLimit = tokenLimit; + QueueProcessingOrder = queueProcessingOrder; + QueueLimit = queueLimit; + ReplenishmentPeriod = replenishmentPeriod; + TokensPerPeriod = tokensPerPeriod; + AutoReplenishment = autoReplenishment; + } + + /// + /// Specifies the minimum period between replenishments. + /// + public TimeSpan ReplenishmentPeriod { get; } + + /// + /// Specifies the maximum number of tokens to restore each replenishment. + /// + public int TokensPerPeriod { get; } + + /// + /// Specified whether the is automatically replenishing tokens or if someone else + /// will be calling to replenish tokens. + /// + public bool AutoReplenishment { get; } + + /// + /// Maximum number of tokens that can be in the bucket at any time. + /// + public int TokenLimit { get; } + + /// + /// Determines the behaviour of when not enough resources can be leased. + /// + /// + /// by default. + /// + public QueueProcessingOrder QueueProcessingOrder { get; } + + /// + /// Maximum cumulative token count of queued acquisition requests. + /// + public int QueueLimit { get; } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs new file mode 100644 index 00000000000000..9d98a5101a33bc --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs @@ -0,0 +1,89 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.RateLimiting.Test +{ + public abstract class BaseRateLimiterTests + { + [Fact] + public abstract void CanAcquireResource(); + + [Fact] + public abstract void InvalidOptionsThrows(); + + [Fact] + public abstract Task CanAcquireResourceAsync(); + + [Fact] + public abstract Task CanAcquireResourceAsync_QueuesAndGrabsOldest(); + + [Fact] + public abstract Task CanAcquireResourceAsync_QueuesAndGrabsNewest(); + + [Fact] + public abstract Task FailsWhenQueuingMoreThanLimit(); + + [Fact] + public abstract Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable(); + + [Fact] + public abstract void ThrowsWhenAcquiringMoreThanLimit(); + + [Fact] + public abstract Task ThrowsWhenWaitingForMoreThanLimit(); + + [Fact] + public abstract void ThrowsWhenAcquiringLessThanZero(); + + [Fact] + public abstract Task ThrowsWhenWaitingForLessThanZero(); + + [Fact] + public abstract void AcquireZero_WithAvailability(); + + [Fact] + public abstract void AcquireZero_WithoutAvailability(); + + [Fact] + public abstract Task WaitAsyncZero_WithAvailability(); + + [Fact] + public abstract Task WaitAsyncZero_WithoutAvailabilityWaitsForAvailability(); + + [Fact] + public abstract Task CanDequeueMultipleResourcesAtOnce(); + + [Fact] + public abstract Task CanAcquireResourcesWithWaitAsyncWithQueuedItemsIfNewestFirst(); + + [Fact] + public abstract Task CannotAcquireResourcesWithWaitAsyncWithQueuedItemsIfOldestFirst(); + + [Fact] + public abstract Task CanCancelWaitAsyncAfterQueuing(); + + [Fact] + public abstract Task CanCancelWaitAsyncBeforeQueuing(); + + [Fact] + public abstract Task CanAcquireResourcesWithAcquireWithQueuedItemsIfNewestFirst(); + + [Fact] + public abstract Task CannotAcquireResourcesWithAcquireWithQueuedItemsIfOldestFirst(); + + [Fact] + public abstract void NoMetadataOnAcquiredLease(); + + [Fact] + public abstract void MetadataNamesContainsAllMetadata(); + + [Fact] + public abstract Task DisposeReleasesQueuedAcquires(); + + [Fact] + public abstract Task DisposeAsyncReleasesQueuedAcquires(); + } +} diff --git a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs new file mode 100644 index 00000000000000..22658e07a5c9ca --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs @@ -0,0 +1,433 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.RateLimiting.Test +{ + public class ConcurrencyLimiterTests : BaseRateLimiterTests + { + [Fact] + public override void InvalidOptionsThrows() + { + Assert.Throws(() => new ConcurrencyLimiterOptions(-1, QueueProcessingOrder.NewestFirst, 1)); + Assert.Throws(() => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, -1)); + } + + [Fact] + public override void CanAcquireResource() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + var lease = limiter.Acquire(); + + Assert.True(lease.IsAcquired); + Assert.False(limiter.Acquire().IsAcquired); + + lease.Dispose(); + + Assert.True(limiter.Acquire().IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourceAsync() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + var lease = await limiter.WaitAsync(); + + Assert.True(lease.IsAcquired); + var wait = limiter.WaitAsync(); + Assert.False(wait.IsCompleted); + + lease.Dispose(); + + Assert.True((await wait).IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourceAsync_QueuesAndGrabsOldest() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2)); + var lease = await limiter.WaitAsync(); + + Assert.True(lease.IsAcquired); + var wait1 = limiter.WaitAsync(); + var wait2 = limiter.WaitAsync(); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + + lease = await wait1; + Assert.True(lease.IsAcquired); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3)); + var lease = await limiter.WaitAsync(2); + + Assert.True(lease.IsAcquired); + var wait1 = limiter.WaitAsync(2); + var wait2 = limiter.WaitAsync(); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + + // second queued item completes first with NewestFirst + lease = await wait2; + Assert.True(lease.IsAcquired); + Assert.False(wait1.IsCompleted); + + lease.Dispose(); + + lease = await wait1; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task FailsWhenQueuingMoreThanLimit() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + using var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); + + var failedLease = await limiter.WaitAsync(1); + Assert.False(failedLease.IsAcquired); + } + + [Fact] + public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); + + var failedLease = await limiter.WaitAsync(1); + Assert.False(failedLease.IsAcquired); + + lease.Dispose(); + lease = await wait; + Assert.True(lease.IsAcquired); + + wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + lease.Dispose(); + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override void ThrowsWhenAcquiringMoreThanLimit() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + var ex = Assert.Throws(() => limiter.Acquire(2)); + Assert.Equal("permitCount", ex.ParamName); + } + + [Fact] + public override async Task ThrowsWhenWaitingForMoreThanLimit() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + var ex = await Assert.ThrowsAsync(async () => await limiter.WaitAsync(2)); + Assert.Equal("permitCount", ex.ParamName); + } + + [Fact] + public override void ThrowsWhenAcquiringLessThanZero() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + Assert.Throws(() => limiter.Acquire(-1)); + } + + [Fact] + public override async Task ThrowsWhenWaitingForLessThanZero() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + await Assert.ThrowsAsync(async () => await limiter.WaitAsync(-1)); + } + + [Fact] + public override void AcquireZero_WithAvailability() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + + using var lease = limiter.Acquire(0); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override void AcquireZero_WithoutAvailability() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + using var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var lease2 = limiter.Acquire(0); + Assert.False(lease2.IsAcquired); + lease2.Dispose(); + } + + [Fact] + public override async Task WaitAsyncZero_WithAvailability() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + + using var lease = await limiter.WaitAsync(0); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task WaitAsyncZero_WithoutAvailabilityWaitsForAvailability() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); + var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(0); + Assert.False(wait.IsCompleted); + + lease.Dispose(); + using var lease2 = await wait; + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task CanDequeueMultipleResourcesAtOnce() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.OldestFirst, 2)); + using var lease = await limiter.WaitAsync(2); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + + var lease1 = await wait1; + var lease2 = await wait2; + Assert.True(lease1.IsAcquired); + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourcesWithWaitAsyncWithQueuedItemsIfNewestFirst() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3)); + using var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(2); + Assert.False(wait1.IsCompleted); + var wait2 = limiter.WaitAsync(1); + var lease2 = await wait2; + Assert.True(lease2.IsAcquired); + + lease.Dispose(); + + Assert.False(wait1.IsCompleted); + lease2.Dispose(); + + var lease1 = await wait1; + Assert.True(lease1.IsAcquired); + } + + [Fact] + public override async Task CannotAcquireResourcesWithWaitAsyncWithQueuedItemsIfOldestFirst() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3)); + using var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(2); + var wait2 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + + var lease1 = await wait1; + Assert.True(lease1.IsAcquired); + Assert.False(wait2.IsCompleted); + + lease1.Dispose(); + var lease2 = await wait2; + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourcesWithAcquireWithQueuedItemsIfNewestFirst() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3)); + using var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(2); + Assert.False(wait1.IsCompleted); + var lease2 = limiter.Acquire(1); + Assert.True(lease2.IsAcquired); + + lease.Dispose(); + + Assert.False(wait1.IsCompleted); + lease2.Dispose(); + + var lease1 = await wait1; + Assert.True(lease1.IsAcquired); + } + + [Fact] + public override async Task CannotAcquireResourcesWithAcquireWithQueuedItemsIfOldestFirst() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3)); + using var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(2); + Assert.False(wait1.IsCompleted); + var lease2 = limiter.Acquire(1); + Assert.False(lease2.IsAcquired); + + lease.Dispose(); + + var lease1 = await wait1; + Assert.True(lease1.IsAcquired); + } + + [Fact] + public override async Task CanCancelWaitAsyncAfterQueuing() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + cts.Cancel(); + await Assert.ThrowsAsync(() => wait.AsTask()); + + lease.Dispose(); + + Assert.Equal(1, limiter.GetAvailablePermits()); + } + + [Fact] + public override async Task CanCancelWaitAsyncBeforeQueuing() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + cts.Cancel(); + + await Assert.ThrowsAsync(() => limiter.WaitAsync(1, cts.Token).AsTask()); + + lease.Dispose(); + + Assert.Equal(1, limiter.GetAvailablePermits()); + } + + [Fact] + public override void NoMetadataOnAcquiredLease() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); + using var lease = limiter.Acquire(1); + Assert.False(lease.TryGetMetadata(MetadataName.ReasonPhrase.Name, out _)); + } + + [Fact] + public override void MetadataNamesContainsAllMetadata() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); + using var lease = limiter.Acquire(1); + Assert.Collection(lease.MetadataNames, metadataName => Assert.Equal(metadataName, MetadataName.ReasonPhrase.Name)); + } + + [Fact] + public override async Task DisposeReleasesQueuedAcquires() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3)); + using var lease = limiter.Acquire(1); + + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + var wait3 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + Assert.False(wait3.IsCompleted); + + limiter.Dispose(); + + var failedLease = await wait1; + Assert.False(failedLease.IsAcquired); + failedLease = await wait2; + Assert.False(failedLease.IsAcquired); + failedLease = await wait3; + Assert.False(failedLease.IsAcquired); + + lease.Dispose(); + + // Throws after disposal + Assert.Throws(() => limiter.Acquire(1)); + await Assert.ThrowsAsync(() => limiter.WaitAsync(1).AsTask()); + } + + [Fact] + public override async Task DisposeAsyncReleasesQueuedAcquires() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3)); + using var lease = limiter.Acquire(1); + + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + var wait3 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + Assert.False(wait3.IsCompleted); + + await limiter.DisposeAsync(); + + var failedLease = await wait1; + Assert.False(failedLease.IsAcquired); + failedLease = await wait2; + Assert.False(failedLease.IsAcquired); + failedLease = await wait3; + Assert.False(failedLease.IsAcquired); + + lease.Dispose(); + + // Throws after disposal + Assert.Throws(() => limiter.Acquire(1)); + await Assert.ThrowsAsync(() => limiter.WaitAsync(1).AsTask()); + } + + [Fact] + public async Task ReasonMetadataOnFailedWaitAsync() + { + var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1)); + using var lease = limiter.Acquire(2); + + var failedLease = await limiter.WaitAsync(2); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.ReasonPhrase.Name, out var metadata)); + Assert.Equal("Queue limit reached", metadata); + + Assert.True(failedLease.TryGetMetadata(MetadataName.ReasonPhrase, out var typedMetadata)); + Assert.Equal("Queue limit reached", typedMetadata); + Assert.Collection(failedLease.MetadataNames, item => item.Equals(MetadataName.ReasonPhrase.Name)); + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj b/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj new file mode 100644 index 00000000000000..1eac02dd7c7dcd --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj @@ -0,0 +1,13 @@ + + + $(NetCoreAppCurrent);$(NetFrameworkMinimum) + + + + + + + + + + diff --git a/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs new file mode 100644 index 00000000000000..edf05bfe15cce9 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs @@ -0,0 +1,632 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.RateLimiting.Test +{ + public class TokenBucketRateLimiterTests : BaseRateLimiterTests + { + [Fact] + public override void CanAcquireResource() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(); + + Assert.True(lease.IsAcquired); + Assert.False(limiter.Acquire().IsAcquired); + + lease.Dispose(); + Assert.False(limiter.Acquire().IsAcquired); + Assert.True(limiter.TryReplenish()); + + Assert.True(limiter.Acquire().IsAcquired); + } + + [Fact] + public override void InvalidOptionsThrows() + { + Assert.Throws(() => new TokenBucketRateLimiterOptions(-1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMinutes(2), 1, autoReplenishment: false)); + Assert.Throws(() => new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, -1, TimeSpan.FromMinutes(2), 1, autoReplenishment: false)); + Assert.Throws(() => new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromMinutes(2), -1, autoReplenishment: false)); + Assert.Throws(() => new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, TimeSpan.FromDays(49).Add(TimeSpan.FromMilliseconds(1)), 1, autoReplenishment: false)); + } + + [Fact] + public override async Task CanAcquireResourceAsync() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + + using var lease = await limiter.WaitAsync(); + + Assert.True(lease.IsAcquired); + var wait = limiter.WaitAsync(); + Assert.False(wait.IsCompleted); + + Assert.True(limiter.TryReplenish()); + + Assert.True((await wait).IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourceAsync_QueuesAndGrabsOldest() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = await limiter.WaitAsync(); + + Assert.True(lease.IsAcquired); + var wait1 = limiter.WaitAsync(); + var wait2 = limiter.WaitAsync(); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + lease = await wait1; + Assert.True(lease.IsAcquired); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.Equal(0, limiter.GetAvailablePermits()); + Assert.True(limiter.TryReplenish()); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3, + TimeSpan.FromMinutes(0), 1, autoReplenishment: false)); + + var lease = await limiter.WaitAsync(2); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(2); + var wait2 = limiter.WaitAsync(); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + // second queued item completes first with NewestFirst + lease = await wait2; + Assert.True(lease.IsAcquired); + Assert.False(wait1.IsCompleted); + + lease.Dispose(); + Assert.Equal(0, limiter.GetAvailablePermits()); + Assert.True(limiter.TryReplenish()); + Assert.True(limiter.TryReplenish()); + + lease = await wait1; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task FailsWhenQueuingMoreThanLimit() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); + + var failedLease = await limiter.WaitAsync(1); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var timeSpan)); + Assert.Equal(TimeSpan.Zero, timeSpan); + } + + [Fact] + public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait = limiter.WaitAsync(1); + + var failedLease = await limiter.WaitAsync(1); + Assert.False(failedLease.IsAcquired); + + limiter.TryReplenish(); + lease = await wait; + Assert.True(lease.IsAcquired); + + wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override void ThrowsWhenAcquiringMoreThanLimit() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + Assert.Throws(() => limiter.Acquire(2)); + } + + [Fact] + public override async Task ThrowsWhenWaitingForMoreThanLimit() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + await Assert.ThrowsAsync(async () => await limiter.WaitAsync(2)); + } + + [Fact] + public override void ThrowsWhenAcquiringLessThanZero() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + Assert.Throws(() => limiter.Acquire(-1)); + } + + [Fact] + public override async Task ThrowsWhenWaitingForLessThanZero() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + await Assert.ThrowsAsync(async () => await limiter.WaitAsync(-1)); + } + + [Fact] + public override void AcquireZero_WithAvailability() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + + using var lease = limiter.Acquire(0); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override void AcquireZero_WithoutAvailability() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var lease2 = limiter.Acquire(0); + Assert.False(lease2.IsAcquired); + lease2.Dispose(); + } + + [Fact] + public override async Task WaitAsyncZero_WithAvailability() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + + using var lease = await limiter.WaitAsync(0); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task WaitAsyncZero_WithoutAvailabilityWaitsForAvailability() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(0); + Assert.False(wait.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + using var lease2 = await wait; + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task CanDequeueMultipleResourcesAtOnce() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.Zero, 2, autoReplenishment: false)); + using var lease = await limiter.WaitAsync(2); + Assert.True(lease.IsAcquired); + + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + var lease1 = await wait1; + var lease2 = await wait2; + Assert.True(lease1.IsAcquired); + Assert.True(lease2.IsAcquired); + } + + [Fact] + public override async Task CanCancelWaitAsyncAfterQueuing() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var wait = limiter.WaitAsync(1, cts.Token); + + cts.Cancel(); + await Assert.ThrowsAsync(() => wait.AsTask()); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + Assert.Equal(1, limiter.GetAvailablePermits()); + } + + [Fact] + public override async Task CanCancelWaitAsyncBeforeQueuing() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + cts.Cancel(); + + await Assert.ThrowsAsync(() => limiter.WaitAsync(1, cts.Token).AsTask()); + + lease.Dispose(); + Assert.True(limiter.TryReplenish()); + + Assert.Equal(1, limiter.GetAvailablePermits()); + } + + [Fact] + public override void NoMetadataOnAcquiredLease() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + Assert.False(lease.TryGetMetadata(MetadataName.RetryAfter, out _)); + } + + [Fact] + public override void MetadataNamesContainsAllMetadata() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + using var lease = limiter.Acquire(1); + Assert.Collection(lease.MetadataNames, metadataName => Assert.Equal(metadataName, MetadataName.RetryAfter.Name)); + } + + [Fact] + public override async Task DisposeReleasesQueuedAcquires() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + var wait3 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + Assert.False(wait3.IsCompleted); + + limiter.Dispose(); + + lease = await wait1; + Assert.False(lease.IsAcquired); + lease = await wait2; + Assert.False(lease.IsAcquired); + lease = await wait3; + Assert.False(lease.IsAcquired); + + // Throws after disposal + Assert.Throws(() => limiter.Acquire(1)); + await Assert.ThrowsAsync(() => limiter.WaitAsync(1).AsTask()); + } + + [Fact] + public override async Task DisposeAsyncReleasesQueuedAcquires() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, 1, autoReplenishment: false)); + var lease = limiter.Acquire(1); + var wait1 = limiter.WaitAsync(1); + var wait2 = limiter.WaitAsync(1); + var wait3 = limiter.WaitAsync(1); + Assert.False(wait1.IsCompleted); + Assert.False(wait2.IsCompleted); + Assert.False(wait3.IsCompleted); + + await limiter.DisposeAsync(); + + lease = await wait1; + Assert.False(lease.IsAcquired); + lease = await wait2; + Assert.False(lease.IsAcquired); + lease = await wait3; + Assert.False(lease.IsAcquired); + + // Throws after disposal + Assert.Throws(() => limiter.Acquire(1)); + await Assert.ThrowsAsync(() => limiter.WaitAsync(1).AsTask()); + } + + [Fact] + public async Task RetryMetadataOnFailedWaitAsync() + { + var options = new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(20), 1, autoReplenishment: false); + var limiter = new TokenBucketRateLimiter(options); + + using var lease = limiter.Acquire(2); + + var failedLease = await limiter.WaitAsync(2); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter.Name, out var metadata)); + var metaDataTime = Assert.IsType(metadata); + Assert.Equal(options.ReplenishmentPeriod.Ticks * 2, metaDataTime.Ticks); + + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata)); + Assert.Equal(options.ReplenishmentPeriod.Ticks * 2, typedMetadata.Ticks); + Assert.Collection(failedLease.MetadataNames, item => item.Equals(MetadataName.RetryAfter.Name)); + } + + [Fact] + public async Task CorrectRetryMetadataWithQueuedItem() + { + var options = new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(20), 1, autoReplenishment: false); + var limiter = new TokenBucketRateLimiter(options); + + using var lease = limiter.Acquire(2); + // Queue item which changes the retry after time for failed items + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var failedLease = await limiter.WaitAsync(2); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata)); + Assert.Equal(options.ReplenishmentPeriod.Ticks * 3, typedMetadata.Ticks); + } + + [Fact] + public async Task CorrectRetryMetadataWithMultipleTokensPerPeriod() + { + var options = new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(20), 2, autoReplenishment: false); + var limiter = new TokenBucketRateLimiter(options); + + using var lease = limiter.Acquire(2); + // Queue item which changes the retry after time for failed waits + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var failedLease = await limiter.WaitAsync(2); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata)); + Assert.Equal(options.ReplenishmentPeriod, typedMetadata); + } + + [Fact] + public async Task CorrectRetryMetadataWithLargeTokensPerPeriod() + { + var options = new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(20), 100, autoReplenishment: false); + var limiter = new TokenBucketRateLimiter(options); + + using var lease = limiter.Acquire(2); + // Queue item which changes the retry after time for failed items + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var failedLease = await limiter.WaitAsync(2); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata)); + Assert.Equal(options.ReplenishmentPeriod, typedMetadata); + } + + [Fact] + public async Task CorrectRetryMetadataWithNonZeroAvailableItems() + { + var options = new TokenBucketRateLimiterOptions(3, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(20), 1, autoReplenishment: false); + var limiter = new TokenBucketRateLimiter(options); + + using var lease = limiter.Acquire(2); + + var failedLease = await limiter.WaitAsync(3); + Assert.False(failedLease.IsAcquired); + Assert.True(failedLease.TryGetMetadata(MetadataName.RetryAfter, out var typedMetadata)); + Assert.Equal(options.ReplenishmentPeriod.Ticks * 2, typedMetadata.Ticks); + } + + [Fact] + public void TryReplenishHonorsTokensPerPeriod() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(7, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 3, autoReplenishment: false)); + Assert.True(limiter.Acquire(5).IsAcquired); + Assert.False(limiter.Acquire(3).IsAcquired); + + Assert.Equal(2, limiter.GetAvailablePermits()); + Assert.True(limiter.TryReplenish()); + Assert.Equal(5, limiter.GetAvailablePermits()); + + Assert.True(limiter.TryReplenish()); + Assert.Equal(7, limiter.GetAvailablePermits()); + } + + [Fact] + public void TryReplenishWithAllTokensAvailable_Noops() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.Zero, 1, autoReplenishment: false)); + Assert.Equal(2, limiter.GetAvailablePermits()); + Assert.True(limiter.TryReplenish()); + Assert.Equal(2, limiter.GetAvailablePermits()); + } + + [Fact] + public void TryReplenishWithAutoReplenish_ReturnsFalse() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromSeconds(1), 1, autoReplenishment: true)); + Assert.Equal(2, limiter.GetAvailablePermits()); + Assert.False(limiter.TryReplenish()); + Assert.Equal(2, limiter.GetAvailablePermits()); + } + + [Fact] + public async Task AutoReplenish_ReplenishesTokens() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 1, + TimeSpan.FromMilliseconds(1000), 1, autoReplenishment: true)); + Assert.Equal(2, limiter.GetAvailablePermits()); + limiter.Acquire(2); + + var lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourcesWithWaitAsyncWithQueuedItemsIfNewestFirst() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2, + TimeSpan.Zero, 2, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + Assert.Equal(1, limiter.GetAvailablePermits()); + lease = await limiter.WaitAsync(1); + Assert.True(lease.IsAcquired); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CannotAcquireResourcesWithWaitAsyncWithQueuedItemsIfOldestFirst() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, 2, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + var wait2 = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + Assert.False(wait2.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + Assert.False(wait2.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait2; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CanAcquireResourcesWithAcquireWithQueuedItemsIfNewestFirst() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 3, + TimeSpan.Zero, 2, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + Assert.False(wait.IsCompleted); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public override async Task CannotAcquireResourcesWithAcquireWithQueuedItemsIfOldestFirst() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.OldestFirst, 3, + TimeSpan.Zero, 2, autoReplenishment: false)); + + var lease = limiter.Acquire(1); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + lease = limiter.Acquire(1); + Assert.False(lease.IsAcquired); + + limiter.TryReplenish(); + + lease = await wait; + Assert.True(lease.IsAcquired); + } + + [Fact] + public async Task ReplenishWorksWhenTicksWrap() + { + var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(10, QueueProcessingOrder.OldestFirst, 2, + TimeSpan.FromMilliseconds(2), 1, autoReplenishment: false)); + + var lease = limiter.Acquire(10); + Assert.True(lease.IsAcquired); + + var wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + var replenishInternalMethod = typeof(TokenBucketRateLimiter).GetMethod("ReplenishInternal", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance)!; + // This will set the last tick to the max value + replenishInternalMethod.Invoke(limiter, new object[] { uint.MaxValue }); + + lease = await wait; + Assert.True(lease.IsAcquired); + + wait = limiter.WaitAsync(1); + Assert.False(wait.IsCompleted); + + // ticks wrapped, should replenish + replenishInternalMethod.Invoke(limiter, new object[] { 2U }); + lease = await wait; + Assert.True(lease.IsAcquired); + + replenishInternalMethod.Invoke(limiter, new object[] { uint.MaxValue }); + + wait = limiter.WaitAsync(2); + Assert.False(wait.IsCompleted); + + // ticks wrapped, but only 1 millisecond passed, make sure the wrapping behaves correctly and replenish doesn't happen + replenishInternalMethod.Invoke(limiter, new object[] { 1U }); + Assert.False(wait.IsCompleted); + Assert.Equal(1, limiter.GetAvailablePermits()); + } + } +}