diff --git a/src/System.Threading.Channels/src/Configurations.props b/src/System.Threading.Channels/src/Configurations.props index d930631d9996..62adcfe1a545 100644 --- a/src/System.Threading.Channels/src/Configurations.props +++ b/src/System.Threading.Channels/src/Configurations.props @@ -2,9 +2,9 @@ + netcoreapp; netstandard1.3; netstandard; - netcoreapp2.1; $(PackageConfigurations); diff --git a/src/System.Threading.Channels/src/System.Threading.Channels.csproj b/src/System.Threading.Channels/src/System.Threading.Channels.csproj index 9e0f7aefc1f8..44e8992b6fd2 100644 --- a/src/System.Threading.Channels/src/System.Threading.Channels.csproj +++ b/src/System.Threading.Channels/src/System.Threading.Channels.csproj @@ -3,12 +3,14 @@ {AAADA5D3-CF64-4E9D-943C-EFDC006D6366} System.Threading.Channels $(OutputPath)$(MSBuildProjectName).xml - netcoreapp-Debug;netcoreapp-Release;netcoreapp2.1-Debug;netcoreapp2.1-Release;netstandard-Debug;netstandard-Release;netstandard1.3-Debug;netstandard1.3-Release + netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release;netstandard1.3-Debug;netstandard1.3-Release + + @@ -37,6 +39,7 @@ + diff --git a/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs index a035e7fecd7e..285830a4e633 100644 --- a/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs +++ b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.cs @@ -12,9 +12,12 @@ namespace System.Threading.Channels internal abstract class AsyncOperation { /// Sentinel object used in a field to indicate the operation is available for use. - protected static readonly Action s_availableSentinel = new Action(s => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(s_availableSentinel)} invoked with {s}.")); + protected static readonly Action s_availableSentinel = AvailableSentinel; // named method to help with debugging + private static void AvailableSentinel(object s) => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(AvailableSentinel)} invoked with {s}"); + /// Sentinel object used in a field to indicate the operation has completed. - protected static readonly Action s_completedSentinel = new Action(s => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(s_completedSentinel)} invoked with {s}")); + protected static readonly Action s_completedSentinel = CompletedSentinel; // named method to help with debugging + private static void CompletedSentinel(object s) => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(CompletedSentinel)} invoked with {s}"); /// Throws an exception indicating that the operation's result was accessed before the operation completed. protected static void ThrowIncompleteOperationException() => @@ -31,7 +34,7 @@ protected static void ThrowIncorrectCurrentIdException() => /// The representation of an asynchronous operation that has a result value. /// Specifies the type of the result. May be . - internal class AsyncOperation : AsyncOperation, IValueTaskSource, IValueTaskSource + internal partial class AsyncOperation : AsyncOperation, IValueTaskSource, IValueTaskSource { /// Registration with a provided cancellation token. private readonly CancellationTokenRegistration _registration; @@ -85,7 +88,7 @@ public AsyncOperation(bool runContinuationsAsynchronously, CancellationToken can { Debug.Assert(!_pooled, "Cancelable operations can't be pooled"); CancellationToken = cancellationToken; - _registration = cancellationToken.Register(s => + _registration = UnsafeRegister(cancellationToken, s => { var thisRef = (AsyncOperation)s; thisRef.TrySetCanceled(thisRef.CancellationToken); @@ -106,17 +109,16 @@ public AsyncOperation(bool runContinuationsAsynchronously, CancellationToken can /// The token that must match . public ValueTaskSourceStatus GetStatus(short token) { - if (_currentId == token) + if (_currentId != token) { - return - !IsCompleted ? ValueTaskSourceStatus.Pending : - _error == null ? ValueTaskSourceStatus.Succeeded : - _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : - ValueTaskSourceStatus.Faulted; + ThrowIncorrectCurrentIdException(); } - ThrowIncorrectCurrentIdException(); - return default; // just to satisfy compiler + return + !IsCompleted ? ValueTaskSourceStatus.Pending : + _error == null ? ValueTaskSourceStatus.Succeeded : + _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled : + ValueTaskSourceStatus.Faulted; } /// Gets whether the operation has completed. @@ -274,8 +276,14 @@ public void OnCompleted(Action continuation, object state, short token, ThrowMultipleContinuations(); } - // Queue the continuation. - if (sc != null) + // Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order + // to avoid stack diving; this path happens in the rare race when we're setting up to await and the + // object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted. + if (_schedulingContext == null) + { + QueueUserWorkItem(continuation, state); + } + else if (sc != null) { sc.Post(s => { @@ -285,7 +293,8 @@ public void OnCompleted(Action continuation, object state, short token, } else { - Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts ?? TaskScheduler.Default); + Debug.Assert(ts != null); + Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); } } } @@ -364,70 +373,68 @@ private void SignalCompletion() { if (_continuation != null || Interlocked.CompareExchange(ref _continuation, s_completedSentinel, null) != null) { - ExecutionContext ec = _executionContext; - if (ec != null) + Debug.Assert(_continuation != s_completedSentinel, $"The continuation was the completion sentinel."); + Debug.Assert(_continuation != s_availableSentinel, $"The continuation was the available sentinel."); + + if (_schedulingContext == null) { - ExecutionContext.Run(ec, s => ((AsyncOperation)s).SignalCompletionCore(), this); + // There's no captured scheduling context. If we're forced to run continuations asynchronously, queue it. + // Otherwise fall through to invoke it synchronously. + if (_runContinuationsAsynchronously) + { + UnsafeQueueSetCompletionAndInvokeContinuation(); + return; + } + } + else if (_schedulingContext is SynchronizationContext sc) + { + // There's a captured synchronization context. If we're forced to run continuations asynchronously, + // or if there's a current synchronization context that's not the one we're targeting, queue it. + // Otherwise fall through to invoke it synchronously. + if (_runContinuationsAsynchronously || sc != SynchronizationContext.Current) + { + sc.Post(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this); + return; + } } else { - SignalCompletionCore(); + // There's a captured TaskScheduler. If we're forced to run continuations asynchronously, + // or if there's a current scheduler that's not the one we're targeting, queue it. + // Otherwise fall through to invoke it synchronously. + TaskScheduler ts = (TaskScheduler)_schedulingContext; + Debug.Assert(ts != null, "Expected a TaskScheduler"); + if (_runContinuationsAsynchronously || ts != TaskScheduler.Current) + { + Task.Factory.StartNew(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this, + CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); + return; + } } + + // Invoke the continuation synchronously. + SetCompletionAndInvokeContinuation(); } } - /// Invokes the registered continuation; separated out of SignalCompletion for convenience so that it may be invoked on multiple code paths. - private void SignalCompletionCore() + private void SetCompletionAndInvokeContinuation() { - Debug.Assert(_continuation != s_completedSentinel, $"The continuation was the completion sentinel."); - Debug.Assert(_continuation != s_availableSentinel, $"The continuation was the available sentinel."); - - if (_schedulingContext == null) - { - // There's no captured scheduling context. If we're forced to run continuations asynchronously, queue it. - // Otherwise fall through to invoke it synchronously. - if (_runContinuationsAsynchronously) - { - Task.Factory.StartNew(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this, - CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); - return; - } - } - else if (_schedulingContext is SynchronizationContext sc) + if (_executionContext == null) { - // There's a captured synchronization context. If we're forced to run continuations asynchronously, - // or if there's a current synchronization context that's not the one we're targeting, queue it. - // Otherwise fall through to invoke it synchronously. - if (_runContinuationsAsynchronously || sc != SynchronizationContext.Current) - { - sc.Post(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this); - return; - } + Action c = _continuation; + _continuation = s_completedSentinel; + c(_continuationState); } else { - // There's a captured TaskScheduler. If we're forced to run continuations asynchronously, - // or if there's a current scheduler that's not the one we're targeting, queue it. - // Otherwise fall through to invoke it synchronously. - TaskScheduler ts = (TaskScheduler)_schedulingContext; - Debug.Assert(ts != null, "Expected a TaskScheduler"); - if (_runContinuationsAsynchronously || ts != TaskScheduler.Current) + ExecutionContext.Run(_executionContext, s => { - Task.Factory.StartNew(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this, - CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts); - return; - } + var thisRef = (AsyncOperation)s; + Action c = thisRef._continuation; + thisRef._continuation = s_completedSentinel; + c(thisRef._continuationState); + }, this); } - - // Invoke the continuation synchronously. - SetCompletionAndInvokeContinuation(); - } - - private void SetCompletionAndInvokeContinuation() - { - Action c = _continuation; - _continuation = s_completedSentinel; - c(_continuationState); } } diff --git a/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netcoreapp.cs b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netcoreapp.cs new file mode 100644 index 000000000000..14869e5c737c --- /dev/null +++ b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netcoreapp.cs @@ -0,0 +1,20 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace System.Threading.Channels +{ + internal partial class AsyncOperation : IThreadPoolWorkItem + { + void IThreadPoolWorkItem.Execute() => SetCompletionAndInvokeContinuation(); + + private void UnsafeQueueSetCompletionAndInvokeContinuation() => + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + + private static void QueueUserWorkItem(Action action, object state) => + ThreadPool.QueueUserWorkItem(action, state, preferLocal: false); + + private static CancellationTokenRegistration UnsafeRegister(CancellationToken cancellationToken, Action action, object state) => + cancellationToken.UnsafeRegister(action, state); + } +} diff --git a/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netstandard.cs b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netstandard.cs new file mode 100644 index 000000000000..10cffadfc876 --- /dev/null +++ b/src/System.Threading.Channels/src/System/Threading/Channels/AsyncOperation.netstandard.cs @@ -0,0 +1,22 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Threading.Channels +{ + internal partial class AsyncOperation + { + private void UnsafeQueueSetCompletionAndInvokeContinuation() => + Task.Factory.StartNew(s => ((AsyncOperation)s).SetCompletionAndInvokeContinuation(), this, + CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); + + private static void QueueUserWorkItem(Action action, object state) => + Task.Factory.StartNew(action, state, + CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); + + private static CancellationTokenRegistration UnsafeRegister(CancellationToken cancellationToken, Action action, object state) => + cancellationToken.Register(action, state); + } +} diff --git a/src/System.Threading.Channels/tests/Configurations.props b/src/System.Threading.Channels/tests/Configurations.props index 78953dfc8851..271a4be4c93c 100644 --- a/src/System.Threading.Channels/tests/Configurations.props +++ b/src/System.Threading.Channels/tests/Configurations.props @@ -3,6 +3,7 @@ netstandard; + netcoreapp; diff --git a/src/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj b/src/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj index e6c9ad904da8..4bf2c624a2a2 100644 --- a/src/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj +++ b/src/System.Threading.Channels/tests/System.Threading.Channels.Tests.csproj @@ -1,7 +1,7 @@ {1AF01469-DBFC-4BA1-9331-8E39AA639FEE} - netstandard-Debug;netstandard-Release + netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release