Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
</PropertyGroup>
<ItemGroup>
<Compile Include="System\VoidResult.cs" />
<Compile Include="System\Collections\Generic\Dequeue.cs" />
<Compile Include="System\Collections\Generic\Deque.cs" />
<Compile Include="System\Threading\Channels\AsyncOperation.cs" />
<Compile Include="System\Threading\Channels\BoundedChannel.cs" />
<Compile Include="System\Threading\Channels\BoundedChannelFullMode.cs" />
<Compile Include="System\Threading\Channels\Channel.cs" />
Expand All @@ -19,7 +20,6 @@
<Compile Include="System\Threading\Channels\Channel_1.cs" />
<Compile Include="System\Threading\Channels\Channel_2.cs" />
<Compile Include="System\Threading\Channels\IDebugEnumerator.cs" />
<Compile Include="System\Threading\Channels\AsyncOperation.cs" />
<Compile Include="System\Threading\Channels\SingleConsumerUnboundedChannel.cs" />
<Compile Include="System\Threading\Channels\UnboundedChannel.cs" />
<Compile Include="$(CommonPath)\Internal\Padding.cs">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace System.Collections.Generic
/// <summary>Provides a double-ended queue data structure.</summary>
/// <typeparam name="T">Type of the data stored in the dequeue.</typeparam>
[DebuggerDisplay("Count = {_size}")]
internal sealed class Dequeue<T>
internal sealed class Deque<T>
{
private T[] _array = Array.Empty<T>();
private int _head; // First valid element in the queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public void OnCompleted(Action<object> continuation, object state, short token,
}
else
{
sc = null;
ts = TaskScheduler.Current;
if (ts != TaskScheduler.Default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ internal sealed class BoundedChannel<T> : Channel<T>, IDebugEnumerable<T>
/// <summary>The maximum capacity of the channel.</summary>
private readonly int _bufferedCapacity;
/// <summary>Items currently stored in the channel waiting to be read.</summary>
private readonly Dequeue<T> _items = new Dequeue<T>();
private readonly Deque<T> _items = new Deque<T>();
/// <summary>Readers waiting to read from the channel.</summary>
private readonly Dequeue<AsyncOperation<T>> _blockedReaders = new Dequeue<AsyncOperation<T>>();
private readonly Deque<AsyncOperation<T>> _blockedReaders = new Deque<AsyncOperation<T>>();
/// <summary>Writers waiting to write to the channel.</summary>
private readonly Dequeue<VoidAsyncOperationWithData<T>> _blockedWriters = new Dequeue<VoidAsyncOperationWithData<T>>();
private readonly Deque<VoidAsyncOperationWithData<T>> _blockedWriters = new Deque<VoidAsyncOperationWithData<T>>();
/// <summary>Linked list of WaitToReadAsync waiters.</summary>
private AsyncOperation<bool> _waitingReadersTail;
/// <summary>Linked list of WaitToWriteAsync waiters.</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ public abstract class ChannelReader<T>
/// <returns>true if an item was read; otherwise, false if no item was read.</returns>
public abstract bool TryRead(out T item);

/// <summary>Returns a <see cref="Task{Boolean}"/> that will complete when data is available to read.</summary>
/// <summary>Returns a <see cref="ValueTask{Boolean}"/> that will complete when data is available to read.</summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the wait operation.</param>
/// <returns>
/// A <see cref="Task{Boolean}"/> that will complete with a <c>true</c> result when data is available to read
/// A <see cref="ValueTask{Boolean}"/> that will complete with a <c>true</c> result when data is available to read
/// or with a <c>false</c> result when no further data will ever be available to be read.
/// </returns>
public abstract ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ internal static void WakeUpWaiters(ref AsyncOperation<bool> listTail, bool resul
/// <summary>Removes all operations from the queue, failing each.</summary>
/// <param name="operations">The queue of operations to complete.</param>
/// <param name="error">The error with which to complete each operations.</param>
internal static void FailOperations<T, TInner>(Dequeue<T> operations, Exception error) where T : AsyncOperation<TInner>
internal static void FailOperations<T, TInner>(Deque<T> operations, Exception error) where T : AsyncOperation<TInner>
{
Debug.Assert(error != null);
while (!operations.IsEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ public abstract class ChannelWriter<T>
/// <returns>true if the item was written; otherwise, false if it wasn't written.</returns>
public abstract bool TryWrite(T item);

/// <summary>Returns a <see cref="Task{Boolean}"/> that will complete when space is available to write an item.</summary>
/// <summary>Returns a <see cref="ValueTask{Boolean}"/> that will complete when space is available to write an item.</summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the wait operation.</param>
/// <returns>
/// A <see cref="Task{Boolean}"/> that will complete with a <c>true</c> result when space is available to write an item
/// A <see cref="ValueTask{Boolean}"/> that will complete with a <c>true</c> result when space is available to write an item
/// or with a <c>false</c> result when no further writing will be permitted.
/// </returns>
public abstract ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default);

/// <summary>Asynchronously writes an item to the channel.</summary>
/// <param name="item">The value to write to the channel.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the write operation.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous write operation.</returns>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous write operation.</returns>
public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal sealed class UnboundedChannel<T> : Channel<T>, IDebugEnumerable<T>
/// <summary>The items in the channel.</summary>
private readonly ConcurrentQueue<T> _items = new ConcurrentQueue<T>();
/// <summary>Readers blocked reading from the channel.</summary>
private readonly Dequeue<AsyncOperation<T>> _blockedReaders = new Dequeue<AsyncOperation<T>>();
private readonly Deque<AsyncOperation<T>> _blockedReaders = new Deque<AsyncOperation<T>>();
/// <summary>Whether to force continuations to be executed asynchronously from producer writes.</summary>
private readonly bool _runContinuationsAsynchronously;

Expand Down
26 changes: 20 additions & 6 deletions src/System.Threading.Channels/tests/ChannelTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -976,19 +976,20 @@ public static IEnumerable<object[]> Reader_ContinuesOnCurrentContextIfDesired_Me
from completeBeforeOnCompleted in new[] { true, false }
from flowExecutionContext in new[] { true, false }
from continueOnCapturedContext in new bool?[] { null, false, true }
select new object[] { readOrWait, completeBeforeOnCompleted, flowExecutionContext, continueOnCapturedContext };
from setNonDefaultTaskScheduler in new[] { true, false }
select new object[] { readOrWait, completeBeforeOnCompleted, flowExecutionContext, continueOnCapturedContext, setNonDefaultTaskScheduler };

[Theory]
[MemberData(nameof(Reader_ContinuesOnCurrentContextIfDesired_MemberData))]
public async Task Reader_ContinuesOnCurrentSynchronizationContextIfDesired(
bool readOrWait, bool completeBeforeOnCompleted, bool flowExecutionContext, bool? continueOnCapturedContext)
bool readOrWait, bool completeBeforeOnCompleted, bool flowExecutionContext, bool? continueOnCapturedContext, bool setNonDefaultTaskScheduler)
{
if (AllowSynchronousContinuations)
{
return;
}

await Task.Run(async () =>
await Task.Factory.StartNew(async () =>
{
Assert.Null(SynchronizationContext.Current);

Expand Down Expand Up @@ -1063,13 +1064,21 @@ await Task.Run(async () =>
{
Assert.Equal(flowExecutionContext, executionContextWasFlowed);
}
});
}, CancellationToken.None, TaskCreationOptions.None, setNonDefaultTaskScheduler ? new CustomTaskScheduler() : TaskScheduler.Default);
}

public static IEnumerable<object[]> Reader_ContinuesOnCurrentSchedulerIfDesired_MemberData() =>
from readOrWait in new[] { true, false }
from completeBeforeOnCompleted in new[] { true, false }
from flowExecutionContext in new[] { true, false }
from continueOnCapturedContext in new bool?[] { null, false, true }
from setDefaultSyncContext in new[] { true, false }
select new object[] { readOrWait, completeBeforeOnCompleted, flowExecutionContext, continueOnCapturedContext, setDefaultSyncContext };

[Theory]
[MemberData(nameof(Reader_ContinuesOnCurrentContextIfDesired_MemberData))]
[MemberData(nameof(Reader_ContinuesOnCurrentSchedulerIfDesired_MemberData))]
public async Task Reader_ContinuesOnCurrentTaskSchedulerIfDesired(
bool readOrWait, bool completeBeforeOnCompleted, bool flowExecutionContext, bool? continueOnCapturedContext)
bool readOrWait, bool completeBeforeOnCompleted, bool flowExecutionContext, bool? continueOnCapturedContext, bool setDefaultSyncContext)
{
if (AllowSynchronousContinuations)
{
Expand Down Expand Up @@ -1105,6 +1114,11 @@ await Task.Run(async () =>

await Task.Factory.StartNew(() =>
{
if (setDefaultSyncContext)
{
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
}

Assert.IsType<CustomTaskScheduler>(TaskScheduler.Current);
asyncLocal.Value = 42;
switch (continueOnCapturedContext)
Expand Down