From 0d0758a38156bca7af8150fd7e36fa3308293b6b Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Fri, 26 Oct 2018 09:32:01 -0700 Subject: [PATCH] Use new threading-related APIs in Channels - Avoid ThreadPool-related allocations via IThreadPoolWorkItem. We already had a fairly low allocation profile on most channels, thanks to an IValueTaskSource implementation. This extends that implementation with an IThreadPoolWorkItem implementation so that when we do need to queue to the pool (e.g. to support RunContinuationsAsynchronously, on writes on the bounded queue, etc.), we can do so without incurring additional allocation. - Avoid ExecutionContext costs with CancellationToken.UnsafeRegister. Minor savings when a cancelable token is provided; we don't need to flow context as all we're doing is completing another object. --- .../src/Configurations.props | 2 +- .../src/System.Threading.Channels.csproj | 5 +- .../Threading/Channels/AsyncOperation.cs | 135 +++++++++--------- .../Channels/AsyncOperation.netcoreapp.cs | 20 +++ .../Channels/AsyncOperation.netstandard.cs | 22 +++ .../tests/Configurations.props | 1 + .../System.Threading.Channels.Tests.csproj | 2 +- 7 files changed, 120 insertions(+), 67 deletions(-) create mode 100644 src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netcoreapp.cs create mode 100644 src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netstandard.cs diff --git a/src/System.Threading.Channels/src/Configurations.props b/src/System.Threading.Channels/src/Configurations.props index d930631d9996..62adcfe1a545 100644 --- a/src/System.Threading.Channels/src/Configurations.props +++ b/src/System.Threading.Channels/src/Configurations.props @@ -2,9 +2,9 @@ + netcoreapp; netstandard1.3; netstandard; - netcoreapp2.1; $(PackageConfigurations); diff --git a/src/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/System.Threading.Channels/src/System.Threading.Channels.csproj index 9e0f7aefc1f8..44e8992b6fd2 100644 --- a/src/System.Threading.Channels/src/System.Threading.Channels.csproj +++ b/src/System.Threading.Channels/src/System.Threading.Channels.csproj @@ -3,12 +3,14 @@ {AAADA5D3-CF64-4E9D-943C-EFDC006D6366} System.Threading.Channels $(OutputPath)$(MSBuildProjectName).xml - netcoreapp-Debug;netcoreapp-Release;netcoreapp2.1-Debug;netcoreapp2.1-Release;netstandard-Debug;netstandard-Release;netstandard1.3-Debug;netstandard1.3-Release + netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release;netstandard1.3-Debug;netstandard1.3-Release + + @@ -37,6 +39,7 @@ + diff --git a/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs index a035e7fecd7e..285830a4e633 100644 --- a/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs +++ b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs @@ -12,9 +12,12 @@ namespace System.Threading.Channels internal abstract class AsyncOperation { /// Sentinel object used in a field to indicate the operation is available for use. - protected static readonly Action s_availableSentinel = new Action(s => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(s_availableSentinel)} invoked with {s}.")); + protected static readonly Action s_availableSentinel = AvailableSentinel; // named method to help with debugging + private static void AvailableSentinel(object s) => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(AvailableSentinel)} invoked with {s}"); + /// Sentinel object used in a field to indicate the operation has completed. - protected static readonly Action s_completedSentinel = new Action(s => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(s_completedSentinel)} invoked with {s}")); + protected static readonly Action s_completedSentinel = CompletedSentinel; // named method to help with debugging + private static void CompletedSentinel(object s) => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(CompletedSentinel)} invoked with {s}"); /// Throws an exception indicating that the operation's result was accessed before the operation completed. protected static void ThrowIncompleteOperationException() => @@ -31,7 +34,7 @@ protected static void ThrowIncorrectCurrentIdException() => /// The representation of an asynchronous operation that has a result value. /// Specifies the type of the result. May be . - internal class AsyncOperation : AsyncOperation, IValueTaskSource, IValueTaskSource + internal partial class AsyncOperation : AsyncOperation, IValueTaskSource, IValueTaskSource { /// Registration with a provided cancellation token. private readonly CancellationTokenRegistration _registration; @@ -85,7 +88,7 @@ public AsyncOperation(bool runContinuationsAsynchronously, CancellationToken can { Debug.Assert(!_pooled, "Cancelable operations can't be pooled"); CancellationToken = cancellationToken; - _registration = cancellationToken.Register(s => + _registration = UnsafeRegister(cancellationToken, s => { var thisRef = (AsyncOperation)s; thisRef.TrySetCanceled(thisRef.CancellationToken); @@ -106,17 +109,16 @@ public AsyncOperation(bool runContinuationsAsynchronously, CancellationToken can /// The token that must match . public ValueTaskSourceStatus GetStatus(short token) { - if (_currentId == token) + if (_currentId != token) { - return - !IsCompleted ? ValueTaskSourceStatus.Pending : - _error == null ? ValueTaskSourceStatus.Succeeded : - _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : - ValueTaskSourceStatus.Faulted; + ThrowIncorrectCurrentIdException(); } - ThrowIncorrectCurrentIdException(); - return default; // just to satisfy compiler + return + !IsCompleted ? ValueTaskSourceStatus.Pending : + _error == null ? ValueTaskSourceStatus.Succeeded : + _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : + ValueTaskSourceStatus.Faulted; } /// Gets whether the operation has completed. @@ -274,8 +276,14 @@ public void OnCompleted(Action continuation, object state, short token, ThrowMultipleContinuations(); } - // Queue the continuation. - if (sc != null) + // Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order + // to avoid stack diving; this path happens in the rare race when we're setting up to await and the + // object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted. + if (_schedulingContext == null) + { + QueueUserWorkItem(continuation, state); + } + else if (sc != null) { sc.Post(s => { @@ -285,7 +293,8 @@ public void OnCompleted(Action continuation, object state, short token, } else { - Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts ?? TaskScheduler.Default); + Debug.Assert(ts != null); + Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); } } } @@ -364,70 +373,68 @@ private void SignalCompletion() { if (_continuation != null || Interlocked.CompareExchange(ref _continuation, s_completedSentinel, null) != null) { - ExecutionContext ec = _executionContext; - if (ec != null) + Debug.Assert(_continuation != s_completedSentinel, $"The continuation was the completion sentinel."); + Debug.Assert(_continuation != s_availableSentinel, $"The continuation was the available sentinel."); + + if (_schedulingContext == null) { - ExecutionContext.Run(ec, s => ((AsyncOperation)s).SignalCompletionCore(), this); + // There's no captured scheduling context. If we're forced to run continuations asynchronously, queue it. + // Otherwise fall through to invoke it synchronously. + if (_runContinuationsAsynchronously) + { + UnsafeQueueSetCompletionAndInvokeContinuation(); + return; + } + } + else if (_schedulingContext is SynchronizationContext sc) + { + // There's a captured synchronization context. If we're forced to run continuations asynchronously, + // or if there's a current synchronization context that's not the one we're targeting, queue it. + // Otherwise fall through to invoke it synchronously. + if (_runContinuationsAsynchronously || sc != SynchronizationContext.Current) + { + sc.Post(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this); + return; + } } else { - SignalCompletionCore(); + // There's a captured TaskScheduler. If we're forced to run continuations asynchronously, + // or if there's a current scheduler that's not the one we're targeting, queue it. + // Otherwise fall through to invoke it synchronously. + TaskScheduler ts = (TaskScheduler)_schedulingContext; + Debug.Assert(ts != null, "Expected a TaskScheduler"); + if (_runContinuationsAsynchronously || ts != TaskScheduler.Current) + { + Task.Factory.StartNew(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this, + CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); + return; + } } + + // Invoke the continuation synchronously. + SetCompletionAndInvokeContinuation(); } } - /// Invokes the registered continuation; separated out of SignalCompletion for convenience so that it may be invoked on multiple code paths. - private void SignalCompletionCore() + private void SetCompletionAndInvokeContinuation() { - Debug.Assert(_continuation != s_completedSentinel, $"The continuation was the completion sentinel."); - Debug.Assert(_continuation != s_availableSentinel, $"The continuation was the available sentinel."); - - if (_schedulingContext == null) - { - // There's no captured scheduling context. If we're forced to run continuations asynchronously, queue it. - // Otherwise fall through to invoke it synchronously. - if (_runContinuationsAsynchronously) - { - Task.Factory.StartNew(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this, - CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); - return; - } - } - else if (_schedulingContext is SynchronizationContext sc) + if (_executionContext == null) { - // There's a captured synchronization context. If we're forced to run continuations asynchronously, - // or if there's a current synchronization context that's not the one we're targeting, queue it. - // Otherwise fall through to invoke it synchronously. - if (_runContinuationsAsynchronously || sc != SynchronizationContext.Current) - { - sc.Post(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this); - return; - } + Action c = _continuation; + _continuation = s_completedSentinel; + c(_continuationState); } else { - // There's a captured TaskScheduler. If we're forced to run continuations asynchronously, - // or if there's a current scheduler that's not the one we're targeting, queue it. - // Otherwise fall through to invoke it synchronously. - TaskScheduler ts = (TaskScheduler)_schedulingContext; - Debug.Assert(ts != null, "Expected a TaskScheduler"); - if (_runContinuationsAsynchronously || ts != TaskScheduler.Current) + ExecutionContext.Run(_executionContext, s => { - Task.Factory.StartNew(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this, - CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); - return; - } + var thisRef = (AsyncOperation)s; + Action c = thisRef._continuation; + thisRef._continuation = s_completedSentinel; + c(thisRef._continuationState); + }, this); } - - // Invoke the continuation synchronously. - SetCompletionAndInvokeContinuation(); - } - - private void SetCompletionAndInvokeContinuation() - { - Action c = _continuation; - _continuation = s_completedSentinel; - c(_continuationState); } } diff --git a/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netcoreapp.cs b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netcoreapp.cs new file mode 100644 index 000000000000..14869e5c737c --- /dev/null +++ b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netcoreapp.cs @@ -0,0 +1,20 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace System.Threading.Channels +{ + internal partial class AsyncOperation : IThreadPoolWorkItem + { + void IThreadPoolWorkItem.Execute() => SetCompletionAndInvokeContinuation(); + + private void UnsafeQueueSetCompletionAndInvokeContinuation() => + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + + private static void QueueUserWorkItem(Action action, object state) => + ThreadPool.QueueUserWorkItem(action, state, preferLocal: false); + + private static CancellationTokenRegistration UnsafeRegister(CancellationToken cancellationToken, Action action, object state) => + cancellationToken.UnsafeRegister(action, state); + } +} diff --git a/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netstandard.cs b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netstandard.cs new file mode 100644 index 000000000000..10cffadfc876 --- /dev/null +++ b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netstandard.cs @@ -0,0 +1,22 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Threading.Channels +{ + internal partial class AsyncOperation + { + private void UnsafeQueueSetCompletionAndInvokeContinuation() => + Task.Factory.StartNew(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this, + CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); + + private static void QueueUserWorkItem(Action action, object state) => + Task.Factory.StartNew(action, state, + CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); + + private static CancellationTokenRegistration UnsafeRegister(CancellationToken cancellationToken, Action action, object state) => + cancellationToken.Register(action, state); + } +} diff --git a/src/System.Threading.Channels/tests/Configurations.props b/src/System.Threading.Channels/tests/Configurations.props index 78953dfc8851..271a4be4c93c 100644 --- a/src/System.Threading.Channels/tests/Configurations.props +++ b/src/System.Threading.Channels/tests/Configurations.props @@ -3,6 +3,7 @@ netstandard; + netcoreapp; diff --git a/src/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj b/src/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj index e6c9ad904da8..4bf2c624a2a2 100644 --- a/src/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj +++ b/src/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj @@ -1,7 +1,7 @@ {1AF01469-DBFC-4BA1-9331-8E39AA639FEE} - netstandard-Debug;netstandard-Release + netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release