Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/System.Threading.Channels/src/Configurations.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<PackageConfigurations>
netcoreapp;
netstandard1.3;
netstandard;
netcoreapp2.1;
</PackageConfigurations>
<BuildConfigurations>
$(PackageConfigurations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
<ProjectGuid>{AAADA5D3-CF64-4E9D-943C-EFDC006D6366}</ProjectGuid>
<RootNamespace>System.Threading.Channels</RootNamespace>
<DocumentationFile>$(OutputPath)$(MSBuildProjectName).xml</DocumentationFile>
<Configurations>netcoreapp-Debug;netcoreapp-Release;netcoreapp2.1-Debug;netcoreapp2.1-Release;netstandard-Debug;netstandard-Release;netstandard1.3-Debug;netstandard1.3-Release</Configurations>
<Configurations>netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release;netstandard1.3-Debug;netstandard1.3-Release</Configurations>
</PropertyGroup>
<ItemGroup>
<Compile Include="System\VoidResult.cs" />
<Compile Include="System\Collections\Generic\Deque.cs" />
<Compile Include="System\Threading\Channels\AsyncOperation.cs" />
<Compile Include="System\Threading\Channels\AsyncOperation.netcoreapp.cs" Condition="'$(TargetGroup)' == 'netcoreapp'" />
<Compile Include="System\Threading\Channels\AsyncOperation.netstandard.cs" Condition="'$(TargetGroup)' != 'netcoreapp'" />
<Compile Include="System\Threading\Channels\BoundedChannel.cs" />
<Compile Include="System\Threading\Channels\BoundedChannelFullMode.cs" />
<Compile Include="System\Threading\Channels\Channel.cs" />
Expand Down Expand Up @@ -37,6 +39,7 @@
<Reference Include="System.Runtime" />
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Threading" />
<Reference Include="System.Threading.ThreadPool" />
<Reference Include="System.Threading.Tasks" />
<Reference Include="System.Threading.Tasks.Extensions" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ namespace System.Threading.Channels
internal abstract class AsyncOperation
{
/// <summary>Sentinel object used in a field to indicate the operation is available for use.</summary>
protected static readonly Action<object> s_availableSentinel = new Action<object>(s => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(s_availableSentinel)} invoked with {s}."));
protected static readonly Action<object> s_availableSentinel = AvailableSentinel; // named method to help with debugging
private static void AvailableSentinel(object s) => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(AvailableSentinel)} invoked with {s}");

/// <summary>Sentinel object used in a field to indicate the operation has completed.</summary>
protected static readonly Action<object> s_completedSentinel = new Action<object>(s => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(s_completedSentinel)} invoked with {s}"));
protected static readonly Action<object> s_completedSentinel = CompletedSentinel; // named method to help with debugging
private static void CompletedSentinel(object s) => Debug.Fail($"{nameof(AsyncOperation)}.{nameof(CompletedSentinel)} invoked with {s}");

/// <summary>Throws an exception indicating that the operation's result was accessed before the operation completed.</summary>
protected static void ThrowIncompleteOperationException() =>
Expand All @@ -31,7 +34,7 @@ protected static void ThrowIncorrectCurrentIdException() =>

/// <summary>The representation of an asynchronous operation that has a result value.</summary>
/// <typeparam name="TResult">Specifies the type of the result. May be <see cref="VoidResult"/>.</typeparam>
internal class AsyncOperation<TResult> : AsyncOperation, IValueTaskSource, IValueTaskSource<TResult>
internal partial class AsyncOperation<TResult> : AsyncOperation, IValueTaskSource, IValueTaskSource<TResult>
{
/// <summary>Registration with a provided cancellation token.</summary>
private readonly CancellationTokenRegistration _registration;
Expand Down Expand Up @@ -85,7 +88,7 @@ public AsyncOperation(bool runContinuationsAsynchronously, CancellationToken can
{
Debug.Assert(!_pooled, "Cancelable operations can't be pooled");
CancellationToken = cancellationToken;
_registration = cancellationToken.Register(s =>
_registration = UnsafeRegister(cancellationToken, s =>
{
var thisRef = (AsyncOperation<TResult>)s;
thisRef.TrySetCanceled(thisRef.CancellationToken);
Expand All @@ -106,17 +109,16 @@ public AsyncOperation(bool runContinuationsAsynchronously, CancellationToken can
/// <param name="token">The token that must match <see cref="_currentId"/>.</param>
public ValueTaskSourceStatus GetStatus(short token)
{
if (_currentId == token)
if (_currentId != token)
{
return
!IsCompleted ? ValueTaskSourceStatus.Pending :
_error == null ? ValueTaskSourceStatus.Succeeded :
_error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
ValueTaskSourceStatus.Faulted;
ThrowIncorrectCurrentIdException();
}

ThrowIncorrectCurrentIdException();
return default; // just to satisfy compiler
return
!IsCompleted ? ValueTaskSourceStatus.Pending :
_error == null ? ValueTaskSourceStatus.Succeeded :
_error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
ValueTaskSourceStatus.Faulted;
}

/// <summary>Gets whether the operation has completed.</summary>
Expand Down Expand Up @@ -274,8 +276,14 @@ public void OnCompleted(Action<object> continuation, object state, short token,
ThrowMultipleContinuations();
}

// Queue the continuation.
if (sc != null)
// Queue the continuation. We always queue here, even if !RunContinuationsAsynchronously, in order
// to avoid stack diving; this path happens in the rare race when we're setting up to await and the
// object is completed after the awaiter.IsCompleted but before the awaiter.OnCompleted.
if (_schedulingContext == null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (_schedulingContext == null) [](start = 16, length = 31)

Is it possible at all to have _schedulingContext == null and sc != null? and if yes, shouldn't we still need to do sc.Post at that time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the code above that assigns _schedulingContext: it's only set to sc if the sc is one we want to respect.

That said, looking at it again there's an existing very-corner-case bug here, pre-existing this change... it's possible sc could be non-null, ts could be non-null, and we want to use the ts, but we'll end up using the sc instead. I'll fix that separately. We should be setting sc to null when we go to check for a task scheduler.

Copy link
Member

@tarekgh tarekgh Oct 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also it is worth adding some assert here. I mean ensuring the the states of sc with _schedulingContext. just in case in the future anyone update the sc initialization code.


In reply to: 226717506 [](ancestors = 226717506)

{
QueueUserWorkItem(continuation, state);
}
else if (sc != null)
{
sc.Post(s =>
{
Expand All @@ -285,7 +293,8 @@ public void OnCompleted(Action<object> continuation, object state, short token,
}
else
{
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts ?? TaskScheduler.Default);
Debug.Assert(ts != null);
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
}
}
}
Expand Down Expand Up @@ -364,70 +373,68 @@ private void SignalCompletion()
{
if (_continuation != null || Interlocked.CompareExchange(ref _continuation, s_completedSentinel, null) != null)
{
ExecutionContext ec = _executionContext;
if (ec != null)
Debug.Assert(_continuation != s_completedSentinel, $"The continuation was the completion sentinel.");
Debug.Assert(_continuation != s_availableSentinel, $"The continuation was the available sentinel.");

if (_schedulingContext == null)
{
ExecutionContext.Run(ec, s => ((AsyncOperation<TResult>)s).SignalCompletionCore(), this);
// There's no captured scheduling context. If we're forced to run continuations asynchronously, queue it.
// Otherwise fall through to invoke it synchronously.
if (_runContinuationsAsynchronously)
{
UnsafeQueueSetCompletionAndInvokeContinuation();
return;
}
}
else if (_schedulingContext is SynchronizationContext sc)
{
// There's a captured synchronization context. If we're forced to run continuations asynchronously,
// or if there's a current synchronization context that's not the one we're targeting, queue it.
// Otherwise fall through to invoke it synchronously.
if (_runContinuationsAsynchronously || sc != SynchronizationContext.Current)
{
sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
return;
}
}
else
{
SignalCompletionCore();
// There's a captured TaskScheduler. If we're forced to run continuations asynchronously,
// or if there's a current scheduler that's not the one we're targeting, queue it.
// Otherwise fall through to invoke it synchronously.
TaskScheduler ts = (TaskScheduler)_schedulingContext;
Debug.Assert(ts != null, "Expected a TaskScheduler");
if (_runContinuationsAsynchronously || ts != TaskScheduler.Current)
{
Task.Factory.StartNew(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
return;
}
}

// Invoke the continuation synchronously.
SetCompletionAndInvokeContinuation();
}
}

/// <summary>Invokes the registered continuation; separated out of SignalCompletion for convenience so that it may be invoked on multiple code paths.</summary>
private void SignalCompletionCore()
private void SetCompletionAndInvokeContinuation()
{
Debug.Assert(_continuation != s_completedSentinel, $"The continuation was the completion sentinel.");
Debug.Assert(_continuation != s_availableSentinel, $"The continuation was the available sentinel.");

if (_schedulingContext == null)
{
// There's no captured scheduling context. If we're forced to run continuations asynchronously, queue it.
// Otherwise fall through to invoke it synchronously.
if (_runContinuationsAsynchronously)
{
Task.Factory.StartNew(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
return;
}
}
else if (_schedulingContext is SynchronizationContext sc)
if (_executionContext == null)
{
// There's a captured synchronization context. If we're forced to run continuations asynchronously,
// or if there's a current synchronization context that's not the one we're targeting, queue it.
// Otherwise fall through to invoke it synchronously.
if (_runContinuationsAsynchronously || sc != SynchronizationContext.Current)
{
sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
return;
}
Action<object> c = _continuation;
_continuation = s_completedSentinel;
c(_continuationState);
}
else
{
// There's a captured TaskScheduler. If we're forced to run continuations asynchronously,
// or if there's a current scheduler that's not the one we're targeting, queue it.
// Otherwise fall through to invoke it synchronously.
TaskScheduler ts = (TaskScheduler)_schedulingContext;
Debug.Assert(ts != null, "Expected a TaskScheduler");
if (_runContinuationsAsynchronously || ts != TaskScheduler.Current)
ExecutionContext.Run(_executionContext, s =>
{
Task.Factory.StartNew(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
return;
}
var thisRef = (AsyncOperation<TResult>)s;
Action<object> c = thisRef._continuation;
thisRef._continuation = s_completedSentinel;
c(thisRef._continuationState);
}, this);
}

// Invoke the continuation synchronously.
SetCompletionAndInvokeContinuation();
}

private void SetCompletionAndInvokeContinuation()
{
Action<object> c = _continuation;
_continuation = s_completedSentinel;
c(_continuationState);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

namespace System.Threading.Channels
{
internal partial class AsyncOperation<TResult> : IThreadPoolWorkItem
{
void IThreadPoolWorkItem.Execute() => SetCompletionAndInvokeContinuation();

private void UnsafeQueueSetCompletionAndInvokeContinuation() =>
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);

private static void QueueUserWorkItem(Action<object> action, object state) =>
ThreadPool.QueueUserWorkItem(action, state, preferLocal: false);

private static CancellationTokenRegistration UnsafeRegister(CancellationToken cancellationToken, Action<object> action, object state) =>
cancellationToken.UnsafeRegister(action, state);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Threading.Tasks;

namespace System.Threading.Channels
{
internal partial class AsyncOperation<TResult>
{
private void UnsafeQueueSetCompletionAndInvokeContinuation() =>
Task.Factory.StartNew(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

private static void QueueUserWorkItem(Action<object> action, object state) =>
Task.Factory.StartNew(action, state,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

private static CancellationTokenRegistration UnsafeRegister(CancellationToken cancellationToken, Action<object> action, object state) =>
cancellationToken.Register(action, state);
}
}
1 change: 1 addition & 0 deletions src/System.Threading.Channels/tests/Configurations.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<PropertyGroup>
<BuildConfigurations>
netstandard;
netcoreapp;
</BuildConfigurations>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ProjectGuid>{1AF01469-DBFC-4BA1-9331-8E39AA639FEE}</ProjectGuid>
<Configurations>netstandard-Debug;netstandard-Release</Configurations>
<Configurations>netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release</Configurations>
</PropertyGroup>
<ItemGroup>
<Compile Include="BoundedChannelTests.cs" />
Expand Down