Skip to content

Commit ab2d63b

Browse files
authored
Expose ChannelReader.ReadAllAsync from non-core builds as well (#94417)
* Expose ChannelReader.ReadAllAsync from non-core builds as well I'm not sure why we didn't fix this before when we shipped Microsoft.Bcl.AsyncInterfaces, but with IAsyncEnumerable available downlevel, there's no need to hide this method away; it can be in all builds. Doing so makes it easier for others to create their own channel implementations, as they don't _need_ to multitarget in order to override everything they might want to. I've not changed any C# code, just moved it between files. * Address PR feedback
1 parent 81393b7 commit ab2d63b

File tree

9 files changed

+294
-325
lines changed

9 files changed

+294
-325
lines changed

src/libraries/System.Threading.Channels/ref/System.Threading.Channels.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ protected ChannelReader() { }
4949
public virtual System.Threading.Tasks.Task Completion { get { throw null; } }
5050
public virtual int Count { get { throw null; } }
5151
public virtual System.Threading.Tasks.ValueTask<T> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
52+
public virtual System.Collections.Generic.IAsyncEnumerable<T> ReadAllAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
5253
public virtual bool TryPeek([System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute(false)] out T item) { throw null; }
5354
public abstract bool TryRead([System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute(false)] out T item);
5455
public abstract System.Threading.Tasks.ValueTask<bool> WaitToReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));

src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@
1414

1515
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
1616
<PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsVersion)" />
17+
<ProjectReference Include="$(LibrariesProjectRoot)Microsoft.Bcl.AsyncInterfaces\ref\Microsoft.Bcl.AsyncInterfaces.csproj" />
1718
</ItemGroup>
1819
</Project>

src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netcoreapp.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,4 @@ public partial class ChannelClosedException : System.InvalidOperationException
1414
#endif
1515
protected ChannelClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
1616
}
17-
public abstract partial class ChannelReader<T>
18-
{
19-
public virtual System.Collections.Generic.IAsyncEnumerable<T> ReadAllAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
20-
}
2117
}

src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ System.Threading.Channel&lt;T&gt;</PackageDescription>
2626
Condition="$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'netstandard2.1'))" />
2727
<Compile Include="System\Threading\Channels\ChannelOptions.cs" />
2828
<Compile Include="System\Threading\Channels\ChannelReader.cs" />
29-
<Compile Include="System\Threading\Channels\ChannelReader.netcoreapp.cs"
30-
Condition="$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'netstandard2.1'))" />
3129
<Compile Include="System\Threading\Channels\ChannelUtilities.cs" />
3230
<Compile Include="System\Threading\Channels\ChannelWriter.cs" />
3331
<Compile Include="System\Threading\Channels\Channel_1.cs" />
@@ -40,7 +38,7 @@ System.Threading.Channel&lt;T&gt;</PackageDescription>
4038
<Compile Include="$(CommonPath)Internal\Padding.cs"
4139
Link="Common\Internal\Padding.cs" />
4240
<Compile Include="$(CommonPath)System\Collections\Concurrent\IProducerConsumerQueue.cs"
43-
Link="Common\System\Collections\Concurrent\IProducerConsumerQueue.cs" />
41+
Link="Common\System\Collections\Concurrent\IProducerConsumerQueue.cs" />
4442
<Compile Include="$(CommonPath)System\Collections\Concurrent\MultiProducerMultiConsumerQueue.cs"
4543
Link="Common\System\Collections\Concurrent\MultiProducerMultiConsumerQueue.cs" />
4644
<Compile Include="$(CommonPath)System\Collections\Concurrent\SingleProducerSingleConsumerQueue.cs"
@@ -61,6 +59,7 @@ System.Threading.Channel&lt;T&gt;</PackageDescription>
6159

6260
<ItemGroup Condition="!$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'netstandard2.1'))">
6361
<PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsVersion)" />
62+
<ProjectReference Include="$(LibrariesProjectRoot)Microsoft.Bcl.AsyncInterfaces\src\Microsoft.Bcl.AsyncInterfaces.csproj" />
6463
</ItemGroup>
6564

6665
</Project>

src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4+
using System.Collections.Generic;
45
using System.Diagnostics.CodeAnalysis;
6+
using System.Runtime.CompilerServices;
57
using System.Threading.Tasks;
68

79
namespace System.Threading.Channels
@@ -90,5 +92,23 @@ async ValueTask<T> ReadAsyncCore(CancellationToken ct)
9092
}
9193
}
9294
}
95+
96+
/// <summary>Creates an <see cref="IAsyncEnumerable{T}"/> that enables reading all of the data from the channel.</summary>
97+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to use to cancel the enumeration.</param>
98+
/// <remarks>
99+
/// Each <see cref="IAsyncEnumerator{T}.MoveNextAsync"/> call that returns <c>true</c> will read the next item out of the channel.
100+
/// <see cref="IAsyncEnumerator{T}.MoveNextAsync"/> will return false once no more data is or will ever be available to read.
101+
/// </remarks>
102+
/// <returns>The created async enumerable.</returns>
103+
public virtual async IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
104+
{
105+
while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
106+
{
107+
while (TryRead(out T? item))
108+
{
109+
yield return item;
110+
}
111+
}
112+
}
93113
}
94114
}

src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelReader.netcoreapp.cs

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,272 @@ public async Task ReadAsync_ConsecutiveReadsSucceed()
784784
}
785785
}
786786

787+
[Fact]
788+
public void ReadAllAsync_NotIdempotent()
789+
{
790+
Channel<int> c = CreateChannel();
791+
IAsyncEnumerable<int> e = c.Reader.ReadAllAsync();
792+
Assert.NotNull(e);
793+
Assert.NotSame(e, c.Reader.ReadAllAsync());
794+
}
795+
796+
[Theory]
797+
[InlineData(false)]
798+
[InlineData(true)]
799+
public async Task ReadAllAsync_UseMoveNextAsyncAfterCompleted_ReturnsFalse(bool completeWhilePending)
800+
{
801+
Channel<int> c = CreateChannel();
802+
IAsyncEnumerator<int> e = c.Reader.ReadAllAsync().GetAsyncEnumerator();
803+
804+
ValueTask<bool> vt;
805+
if (completeWhilePending)
806+
{
807+
c.Writer.Complete();
808+
vt = e.MoveNextAsync();
809+
Assert.True(vt.IsCompletedSuccessfully);
810+
Assert.False(vt.Result);
811+
}
812+
else
813+
{
814+
vt = e.MoveNextAsync();
815+
Assert.False(vt.IsCompleted);
816+
c.Writer.Complete();
817+
Assert.False(await vt);
818+
}
819+
820+
vt = e.MoveNextAsync();
821+
Assert.True(vt.IsCompletedSuccessfully);
822+
Assert.False(vt.Result);
823+
}
824+
825+
[Fact]
826+
public void ReadAllAsync_AvailableDataCompletesSynchronously()
827+
{
828+
Channel<int> c = CreateChannel();
829+
830+
IAsyncEnumerator<int> e = c.Reader.ReadAllAsync().GetAsyncEnumerator();
831+
try
832+
{
833+
for (int i = 100; i < 110; i++)
834+
{
835+
Assert.True(c.Writer.TryWrite(i));
836+
ValueTask<bool> vt = e.MoveNextAsync();
837+
Assert.True(vt.IsCompletedSuccessfully);
838+
Assert.True(vt.Result);
839+
Assert.Equal(i, e.Current);
840+
}
841+
}
842+
finally
843+
{
844+
ValueTask vt = e.DisposeAsync();
845+
Assert.True(vt.IsCompletedSuccessfully);
846+
vt.GetAwaiter().GetResult();
847+
}
848+
}
849+
850+
[Fact]
851+
public async Task ReadAllAsync_UnavailableDataCompletesAsynchronously()
852+
{
853+
Channel<int> c = CreateChannel();
854+
855+
IAsyncEnumerator<int> e = c.Reader.ReadAllAsync().GetAsyncEnumerator();
856+
try
857+
{
858+
for (int i = 100; i < 110; i++)
859+
{
860+
ValueTask<bool> vt = e.MoveNextAsync();
861+
Assert.False(vt.IsCompleted);
862+
Task producer = Task.Run(() => c.Writer.TryWrite(i));
863+
Assert.True(await vt);
864+
await producer;
865+
Assert.Equal(i, e.Current);
866+
}
867+
}
868+
finally
869+
{
870+
ValueTask vt = e.DisposeAsync();
871+
Assert.True(vt.IsCompletedSuccessfully);
872+
vt.GetAwaiter().GetResult();
873+
}
874+
}
875+
876+
[Theory]
877+
[InlineData(0)]
878+
[InlineData(1)]
879+
[InlineData(128)]
880+
public async Task ReadAllAsync_ProducerConsumer_ConsumesAllData(int items)
881+
{
882+
Channel<int> c = CreateChannel();
883+
884+
int producedTotal = 0, consumedTotal = 0;
885+
await Task.WhenAll(
886+
Task.Run(async () =>
887+
{
888+
for (int i = 0; i < items; i++)
889+
{
890+
await c.Writer.WriteAsync(i);
891+
producedTotal += i;
892+
}
893+
c.Writer.Complete();
894+
}),
895+
Task.Run(async () =>
896+
{
897+
IAsyncEnumerator<int> e = c.Reader.ReadAllAsync().GetAsyncEnumerator();
898+
try
899+
{
900+
while (await e.MoveNextAsync())
901+
{
902+
consumedTotal += e.Current;
903+
}
904+
}
905+
finally
906+
{
907+
await e.DisposeAsync();
908+
}
909+
}));
910+
911+
Assert.Equal(producedTotal, consumedTotal);
912+
}
913+
914+
[Fact]
915+
public async Task ReadAllAsync_MultipleEnumerationsToEnd()
916+
{
917+
Channel<int> c = CreateChannel();
918+
919+
Assert.True(c.Writer.TryWrite(42));
920+
c.Writer.Complete();
921+
922+
IAsyncEnumerable<int> enumerable = c.Reader.ReadAllAsync();
923+
IAsyncEnumerator<int> e = enumerable.GetAsyncEnumerator();
924+
925+
Assert.True(await e.MoveNextAsync());
926+
Assert.Equal(42, e.Current);
927+
928+
Assert.False(await e.MoveNextAsync());
929+
Assert.False(await e.MoveNextAsync());
930+
931+
await e.DisposeAsync();
932+
933+
e = enumerable.GetAsyncEnumerator();
934+
Assert.Same(enumerable, e);
935+
936+
Assert.False(await e.MoveNextAsync());
937+
Assert.False(await e.MoveNextAsync());
938+
}
939+
940+
[Theory]
941+
[InlineData(false, false)]
942+
[InlineData(false, true)]
943+
[InlineData(true, false)]
944+
[InlineData(true, true)]
945+
public void ReadAllAsync_MultipleSingleElementEnumerations_AllItemsEnumerated(bool sameEnumerable, bool dispose)
946+
{
947+
Channel<int> c = CreateChannel();
948+
IAsyncEnumerable<int> enumerable = c.Reader.ReadAllAsync();
949+
950+
for (int i = 0; i < 10; i++)
951+
{
952+
Assert.True(c.Writer.TryWrite(i));
953+
IAsyncEnumerator<int> e = (sameEnumerable ? enumerable : c.Reader.ReadAllAsync()).GetAsyncEnumerator();
954+
ValueTask<bool> vt = e.MoveNextAsync();
955+
Assert.True(vt.IsCompletedSuccessfully);
956+
Assert.True(vt.Result);
957+
Assert.Equal(i, e.Current);
958+
if (dispose)
959+
{
960+
ValueTask dvt = e.DisposeAsync();
961+
Assert.True(dvt.IsCompletedSuccessfully);
962+
dvt.GetAwaiter().GetResult();
963+
}
964+
}
965+
}
966+
967+
[Theory]
968+
[InlineData(false)]
969+
[InlineData(true)]
970+
public async Task ReadAllAsync_DualConcurrentEnumeration_AllItemsEnumerated(bool sameEnumerable)
971+
{
972+
if (RequiresSingleReader)
973+
{
974+
return;
975+
}
976+
977+
Channel<int> c = CreateChannel();
978+
979+
IAsyncEnumerable<int> enumerable = c.Reader.ReadAllAsync();
980+
981+
IAsyncEnumerator<int> e1 = enumerable.GetAsyncEnumerator();
982+
IAsyncEnumerator<int> e2 = (sameEnumerable ? enumerable : c.Reader.ReadAllAsync()).GetAsyncEnumerator();
983+
Assert.NotSame(e1, e2);
984+
985+
ValueTask<bool> vt1, vt2;
986+
int producerTotal = 0, consumerTotal = 0;
987+
for (int i = 0; i < 10; i++)
988+
{
989+
vt1 = e1.MoveNextAsync();
990+
vt2 = e2.MoveNextAsync();
991+
992+
await c.Writer.WriteAsync(i);
993+
producerTotal += i;
994+
await c.Writer.WriteAsync(i * 2);
995+
producerTotal += i * 2;
996+
997+
Assert.True(await vt1);
998+
Assert.True(await vt2);
999+
consumerTotal += e1.Current;
1000+
consumerTotal += e2.Current;
1001+
}
1002+
1003+
vt1 = e1.MoveNextAsync();
1004+
vt2 = e2.MoveNextAsync();
1005+
c.Writer.Complete();
1006+
Assert.False(await vt1);
1007+
Assert.False(await vt2);
1008+
1009+
Assert.Equal(producerTotal, consumerTotal);
1010+
}
1011+
1012+
[Theory]
1013+
[InlineData(false)]
1014+
[InlineData(true)]
1015+
public async Task ReadAllAsync_CanceledBeforeMoveNextAsync_Throws(bool dataAvailable)
1016+
{
1017+
Channel<int> c = CreateChannel();
1018+
if (dataAvailable)
1019+
{
1020+
Assert.True(c.Writer.TryWrite(42));
1021+
}
1022+
1023+
var cts = new CancellationTokenSource();
1024+
cts.Cancel();
1025+
1026+
IAsyncEnumerator<int> e = c.Reader.ReadAllAsync(cts.Token).GetAsyncEnumerator();
1027+
ValueTask<bool> vt = e.MoveNextAsync();
1028+
Assert.True(vt.IsCompleted);
1029+
Assert.False(vt.IsCompletedSuccessfully);
1030+
OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await vt);
1031+
Assert.Equal(cts.Token, oce.CancellationToken);
1032+
}
1033+
1034+
[Fact]
1035+
public async Task ReadAllAsync_CanceledAfterMoveNextAsync_Throws()
1036+
{
1037+
Channel<int> c = CreateChannel();
1038+
var cts = new CancellationTokenSource();
1039+
1040+
IAsyncEnumerator<int> e = c.Reader.ReadAllAsync(cts.Token).GetAsyncEnumerator();
1041+
ValueTask<bool> vt = e.MoveNextAsync();
1042+
Assert.False(vt.IsCompleted);
1043+
1044+
cts.Cancel();
1045+
OperationCanceledException oce = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await vt);
1046+
Assert.Equal(cts.Token, oce.CancellationToken);
1047+
1048+
vt = e.MoveNextAsync();
1049+
Assert.True(vt.IsCompletedSuccessfully);
1050+
Assert.False(vt.Result);
1051+
}
1052+
7871053
[Fact]
7881054
public async Task WaitToReadAsync_ConsecutiveReadsSucceed()
7891055
{

0 commit comments

Comments
 (0)