Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions PolyShim.Tests/Net60/ParallelTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;

namespace PolyShim.Tests.Net60;

public class ParallelTests
{
[Fact]
public async Task ForEachAsync_Test()
{
// Arrange
var items = new[] { 'a', 'b', 'c', 'd', 'e' };
var results = new ConcurrentBag<char>();

// Act
await Parallel.ForEachAsync(
items,
async (item, cancellationToken) =>
{
await Task.Delay(10, cancellationToken);
results.Add(item);
}
);

// Assert
results.Should().BeEquivalentTo(items);
}

[Fact]
public async Task ForEachAsync_Cancellation_Test()
{
// Arrange
var items = new[] { 'a', 'b', 'c', 'd', 'e' };
var cancellationToken = new CancellationToken(true);

// Act & assert
var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
await Parallel.ForEachAsync(
items,
cancellationToken,
async (_, innerCancellationToken) => await Task.Delay(10, innerCancellationToken)
)
);

ex.CancellationToken.Should().Be(cancellationToken);
}

[Fact]
public async Task ForEachAsync_MaxDegreeOfParallelism_Test()
{
// Arrange
var items = new[] { 1, 2, 3, 4, 5 };
var currentParallelism = 0;
var maxObservedParallelism = 0;

// Act
await Parallel.ForEachAsync(
items,
new ParallelOptions { MaxDegreeOfParallelism = 2 },
async (_, cancellationToken) =>
{
Interlocked.Increment(ref currentParallelism);

int initialValue,
newValue;
do
{
initialValue = maxObservedParallelism;
newValue = Math.Max(initialValue, currentParallelism);
} while (
Interlocked.CompareExchange(ref maxObservedParallelism, newValue, initialValue)
!= initialValue
);

await Task.Delay(50, cancellationToken);
Interlocked.Decrement(ref currentParallelism);
}
);

// Assert
maxObservedParallelism.Should().BeLessThanOrEqualTo(2);
}

[Fact]
public async Task ForEachAsync_AsyncEnumerable_Test()
{
// Arrange
async IAsyncEnumerable<int> GetItemsAsync()
{
for (var i = 1; i <= 5; i++)
{
await Task.Delay(10);
yield return i;
}
}

var results = new ConcurrentBag<int>();

// Act
await Parallel.ForEachAsync(
GetItemsAsync(),
async (item, cancellationToken) =>
{
await Task.Delay(10, cancellationToken);
results.Add(item);
}
);

// Assert
results.Should().BeEquivalentTo([1, 2, 3, 4, 5]);
}
}
82 changes: 82 additions & 0 deletions PolyShim.Tests/Net80/ParallelTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;

namespace PolyShim.Tests.Net80;

public class ParallelTests
{
[Fact]
public async Task ForAsync_Test()
{
// Act
var sum = 0;
await Parallel.ForAsync(
1,
6,
async (i, cancellationToken) =>
{
await Task.Delay(10, cancellationToken);
Interlocked.Add(ref sum, i);
}
);

// Assert
sum.Should().Be(15);
}

[Fact]
public async Task ForAsync_Cancellation_Test()
{
// Arrange
var cancellationToken = new CancellationToken(true);

// Act & assert
var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
await Parallel.ForAsync(
1,
6,
cancellationToken,
async (_, innerCancellationToken) =>
{
await Task.Delay(10, innerCancellationToken);
}
)
);

ex.CancellationToken.Should().Be(cancellationToken);
}

[Fact]
public async Task ForAsync_MaxDegreeOfParallelism_Test()
{
// Arrange
var currentParallelism = 0;
var maxObservedParallelism = 0;

// Act
await Parallel.ForAsync(
1,
21,
new ParallelOptions { MaxDegreeOfParallelism = 4 },
async (_, cancellationToken) =>
{
var parallelism = Interlocked.Increment(ref currentParallelism);
try
{
maxObservedParallelism = Math.Max(maxObservedParallelism, parallelism);
await Task.Delay(50, cancellationToken);
}
finally
{
Interlocked.Decrement(ref currentParallelism);
}
}
);

// Assert
maxObservedParallelism.Should().BeLessThanOrEqualTo(4);
}
}
161 changes: 161 additions & 0 deletions PolyShim/Net60/Parallel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#if FEATURE_TASK
#if (NETCOREAPP && !NET6_0_OR_GREATER) || (NETFRAMEWORK) || (NETSTANDARD)
#nullable enable
// ReSharper disable RedundantUsingDirective
// ReSharper disable CheckNamespace
// ReSharper disable InconsistentNaming
// ReSharper disable PartialTypeWithSinglePart

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

internal static partial class PolyfillExtensions
{
extension(Parallel)
{
// Task instead of ValueTask for maximum compatibility
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-ienumerable((-0))-system-threading-tasks-paralleloptions-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
public static async Task ForEachAsync<T>(
IEnumerable<T> source,
ParallelOptions parallelOptions,
Func<T, CancellationToken, Task> body
)
{
using var semaphore = new SemaphoreSlim(
parallelOptions.MaxDegreeOfParallelism switch
{
> 0 => parallelOptions.MaxDegreeOfParallelism,
-1 => Environment.ProcessorCount,
_ => throw new ArgumentOutOfRangeException(
nameof(parallelOptions.MaxDegreeOfParallelism),
"Max degree of parallelism must be greater than 0 or -1 for unlimited."
),
}
);

var tasks = source.Select(async item =>
{
#if !NETFRAMEWORK || NET45_OR_GREATER
await semaphore.WaitAsync(parallelOptions.CancellationToken).ConfigureAwait(false);
#else
await Task.Run(
() => semaphore.Wait(parallelOptions.CancellationToken),
parallelOptions.CancellationToken
).ConfigureAwait(false);
#endif

try
{
await body(item, parallelOptions.CancellationToken).ConfigureAwait(false);
}
finally
{
semaphore.Release();
}
});

await Task
.Factory.ContinueWhenAll(
tasks.ToArray(),
_ => { },
parallelOptions.CancellationToken,
TaskContinuationOptions.None,
parallelOptions.TaskScheduler ?? TaskScheduler.Default
)
.ConfigureAwait(false);
}

// Task instead of ValueTask for maximum compatibility
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-ienumerable((-0))-system-threading-cancellationtoken-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
public static async Task ForEachAsync<T>(
IEnumerable<T> source,
CancellationToken cancellationToken,
Func<T, CancellationToken, Task> body
) =>
await ForEachAsync(
source,
new ParallelOptions { CancellationToken = cancellationToken },
body
);

// Task instead of ValueTask for maximum compatibility
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-ienumerable((-0))-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
public static async Task ForEachAsync<T>(
IEnumerable<T> source,
Func<T, CancellationToken, Task> body
) => await ForEachAsync(source, CancellationToken.None, body);

#if FEATURE_ASYNCINTERFACES
// Task instead of ValueTask for maximum compatibility
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-iasyncenumerable((-0))-system-threading-tasks-paralleloptions-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
public static async Task ForEachAsync<T>(
IAsyncEnumerable<T> source,
ParallelOptions parallelOptions,
Func<T, CancellationToken, Task> body
)
{
using var semaphore = new SemaphoreSlim(
parallelOptions.MaxDegreeOfParallelism switch
{
> 0 => parallelOptions.MaxDegreeOfParallelism,
-1 => Environment.ProcessorCount,
_ => throw new ArgumentOutOfRangeException(
nameof(parallelOptions.MaxDegreeOfParallelism),
"Max degree of parallelism must be greater than 0 or -1 for unlimited."
),
}
);

var tasks = new List<Task>();

await foreach (var item in source.WithCancellation(parallelOptions.CancellationToken))
{
var task = Task.Factory.StartNew(async () =>
{
await semaphore
.WaitAsync(parallelOptions.CancellationToken)
.ConfigureAwait(false);

try
{
await body(item, parallelOptions.CancellationToken).ConfigureAwait(false);
}
finally
{
semaphore.Release();
}
}, parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler ?? TaskScheduler.Default).Unwrap();

tasks.Add(task);
}

await Task.WhenAll(tasks).ConfigureAwait(false);
}

// Task instead of ValueTask for maximum compatibility
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-iasyncenumerable((-0))-system-threading-cancellationtoken-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
public static async Task ForEachAsync<T>(
IAsyncEnumerable<T> source,
CancellationToken cancellationToken,
Func<T, CancellationToken, Task> body
) =>
await ForEachAsync(
source,
new ParallelOptions { CancellationToken = cancellationToken },
body
);

// Task instead of ValueTask for maximum compatibility
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-iasyncenumerable((-0))-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
public static async Task ForEachAsync<T>(
IAsyncEnumerable<T> source,
Func<T, CancellationToken, Task> body
) => await ForEachAsync(source, CancellationToken.None, body);
#endif
}
}
#endif
#endif
Loading
Loading