Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 58 additions & 26 deletions TUnit.Core/Executors/DedicatedThreadExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,17 @@ private void ExecuteAsyncActionWithMessagePump(Func<ValueTask> action, TaskCompl
try
{
var previousContext = SynchronizationContext.Current;
var taskScheduler = new DedicatedThreadTaskScheduler(Thread.CurrentThread);
var dedicatedContext = new DedicatedThreadSynchronizationContext(taskScheduler);
ManualResetEventSlim? workAvailableEvent = null;
#if NET5_0_OR_GREATER
if (!OperatingSystem.IsBrowser())
{
workAvailableEvent = new ManualResetEventSlim(false);
}
#else
workAvailableEvent = new ManualResetEventSlim(false);
#endif
var taskScheduler = new DedicatedThreadTaskScheduler(Thread.CurrentThread, workAvailableEvent);
var dedicatedContext = new DedicatedThreadSynchronizationContext(taskScheduler, workAvailableEvent);

SynchronizationContext.SetSynchronizationContext(dedicatedContext);

Expand All @@ -82,11 +91,11 @@ private void ExecuteAsyncActionWithMessagePump(Func<ValueTask> action, TaskCompl
return;
}

// Pump messages until the task completes with optimized waiting
// Pump messages until the task completes with event-driven signaling
var deadline = DateTime.UtcNow.AddMinutes(5);
var spinWait = new SpinWait();
var lastTimeCheck = DateTime.UtcNow;
const int TimeCheckIntervalMs = 100;
const int MaxSpinCount = 50;
const int WaitTimeoutMs = 100;

while (!task.IsCompleted)
{
Expand All @@ -95,46 +104,61 @@ private void ExecuteAsyncActionWithMessagePump(Func<ValueTask> action, TaskCompl

if (!hadWork)
{
// No work available, use efficient waiting
if (spinWait.Count < 10)
// Fast path: spin briefly for immediate continuations
if (spinWait.Count < MaxSpinCount)
{
spinWait.SpinOnce();
}
else
{
// After initial spins, yield to other threads
Thread.Yield();
if (spinWait.Count > 100)
#if NET5_0_OR_GREATER
if (workAvailableEvent != null && !OperatingSystem.IsBrowser())
{
// No work after spinning - use event-driven wait (eliminates Thread.Sleep)
// Thread blocks efficiently in kernel, wakes instantly when work queued
workAvailableEvent.Wait(WaitTimeoutMs);
workAvailableEvent.Reset();
spinWait.Reset();
}
else
{
// Fallback for browser or null event
Thread.Yield();
spinWait.Reset();
}
#else
if (workAvailableEvent != null)
{
// After many iterations, do a brief sleep
Thread.Sleep(0);
workAvailableEvent.Wait(WaitTimeoutMs);
workAvailableEvent.Reset();
spinWait.Reset();
}
else
{
Thread.Yield();
spinWait.Reset();
}
#endif
// Check timeout after waiting
if (DateTime.UtcNow >= deadline)
{
tcs.SetException(new TimeoutException("Async operation timed out after 5 minutes"));
return;
}
}
}
else
{
// Had work, reset spin counter
spinWait.Reset();
}

// Check timeout periodically instead of every iteration
var now = DateTime.UtcNow;
if ((now - lastTimeCheck).TotalMilliseconds >= TimeCheckIntervalMs)
{
if (now >= deadline)
{
tcs.SetException(new TimeoutException("Async operation timed out after 5 minutes"));
return;
}
lastTimeCheck = now;
}
}

HandleTaskCompletion(task, tcs);
}
finally
{
workAvailableEvent?.Dispose();
SynchronizationContext.SetSynchronizationContext(previousContext);
}
}
Expand Down Expand Up @@ -177,14 +201,16 @@ protected virtual void CleanUp()
internal sealed class DedicatedThreadTaskScheduler : TaskScheduler
{
private readonly Thread _dedicatedThread;
private readonly ManualResetEventSlim? _workAvailableEvent;
private readonly List<Task> _taskQueue =
[
];
private readonly Lock _queueLock = new();

public DedicatedThreadTaskScheduler(Thread dedicatedThread)
public DedicatedThreadTaskScheduler(Thread dedicatedThread, ManualResetEventSlim? workAvailableEvent)
{
_dedicatedThread = dedicatedThread;
_workAvailableEvent = workAvailableEvent;
}

protected override void QueueTask(Task task)
Expand All @@ -193,6 +219,8 @@ protected override void QueueTask(Task task)
{
_taskQueue.Add(task);
}
// Signal that work is available (wake message pump immediately)
_workAvailableEvent?.Set();
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
Expand Down Expand Up @@ -267,13 +295,15 @@ internal sealed class DedicatedThreadSynchronizationContext : SynchronizationCon
{
private readonly Thread _dedicatedThread;
private readonly DedicatedThreadTaskScheduler _taskScheduler;
private readonly ManualResetEventSlim? _workAvailableEvent;
private readonly Queue<(SendOrPostCallback callback, object? state)> _workQueue = new();
private readonly Lock _queueLock = new();

public DedicatedThreadSynchronizationContext(DedicatedThreadTaskScheduler taskScheduler)
public DedicatedThreadSynchronizationContext(DedicatedThreadTaskScheduler taskScheduler, ManualResetEventSlim? workAvailableEvent)
{
_dedicatedThread = Thread.CurrentThread;
_taskScheduler = taskScheduler;
_workAvailableEvent = workAvailableEvent;
}

public override void Post(SendOrPostCallback d, object? state)
Expand All @@ -283,6 +313,8 @@ public override void Post(SendOrPostCallback d, object? state)
{
_workQueue.Enqueue((d, state));
}
// Signal that work is available (wake message pump immediately)
_workAvailableEvent?.Set();
}

public override void Send(SendOrPostCallback d, object? state)
Expand Down
7 changes: 7 additions & 0 deletions TUnit.Engine/Interfaces/IDynamicTestQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,11 @@ internal interface IDynamicTestQueue
/// Marks the queue as complete, indicating no more tests will be added.
/// </summary>
void Complete();

/// <summary>
/// Asynchronously waits for tests to become available in the queue.
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>True if tests are available or queue is completed, false on cancellation</returns>
ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default);
}
138 changes: 97 additions & 41 deletions TUnit.Engine/Scheduling/TestScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,10 @@ private async Task ProcessDynamicTestQueueAsync(CancellationToken cancellationTo
{
var dynamicTests = new List<AbstractExecutableTest>();

while (!_dynamicTestQueue.IsCompleted || _dynamicTestQueue.PendingCount > 0)
// Use async signaling instead of polling to eliminate IOCP overhead
while (await _dynamicTestQueue.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
// Dequeue all currently pending tests
// Dequeue all currently available tests (batch processing)
while (_dynamicTestQueue.TryDequeue(out var test))
{
if (test != null)
Expand Down Expand Up @@ -258,11 +259,32 @@ private async Task ProcessDynamicTestQueueAsync(CancellationToken cancellationTo

dynamicTests.Clear();
}
}

// Process any remaining tests after queue completion
while (_dynamicTestQueue.TryDequeue(out var test))
{
if (test != null)
{
dynamicTests.Add(test);
}
}

if (dynamicTests.Count > 0)
{
await _logger.LogDebugAsync($"Executing {dynamicTests.Count} remaining dynamic test(s)").ConfigureAwait(false);

var dynamicTestsArray = dynamicTests.ToArray();
var groupedDynamicTests = await _groupingService.GroupTestsByConstraintsAsync(dynamicTestsArray).ConfigureAwait(false);

if (groupedDynamicTests.Parallel.Length > 0)
{
await ExecuteTestsAsync(groupedDynamicTests.Parallel, cancellationToken).ConfigureAwait(false);
}

// If queue is not complete, wait a short time before checking again
if (!_dynamicTestQueue.IsCompleted)
if (groupedDynamicTests.NotInParallel.Length > 0)
{
await Task.Delay(50, cancellationToken).ConfigureAwait(false);
await ExecuteSequentiallyAsync(groupedDynamicTests.NotInParallel, cancellationToken).ConfigureAwait(false);
}
}
}
Expand All @@ -280,21 +302,28 @@ private async Task ExecuteTestsAsync(
}
else
{
var tasks = ArrayPool<Task>.Shared.Rent(tests.Length);
try
{
for (var i = 0; i < tests.Length; i++)
#if NET6_0_OR_GREATER
// Use Parallel.ForEachAsync for bounded concurrency (eliminates unbounded Task.Run queue depth)
// This dramatically reduces ThreadPool contention and GetQueuedCompletionStatus waits
await Parallel.ForEachAsync(
tests,
new ParallelOptions { CancellationToken = cancellationToken },
async (test, ct) =>
{
var test = tests[i];
tasks[i] = test.ExecutionTask ??= Task.Run(() => ExecuteSingleTestAsync(test, cancellationToken), CancellationToken.None);
test.ExecutionTask ??= ExecuteSingleTestAsync(test, ct);
await test.ExecutionTask.ConfigureAwait(false);
}

await WaitForTasksWithFailFastHandling(new ArraySegment<Task>(tasks, 0, tests.Length), cancellationToken).ConfigureAwait(false);
}
finally
).ConfigureAwait(false);
#else
// Fallback for netstandard2.0: use Task.WhenAll (still better than unbounded Task.Run)
var tasks = new Task[tests.Length];
for (var i = 0; i < tests.Length; i++)
{
ArrayPool<Task>.Shared.Return(tasks);
var test = tests[i];
tasks[i] = test.ExecutionTask ??= ExecuteSingleTestAsync(test, cancellationToken);
}
await Task.WhenAll(tasks).ConfigureAwait(false);
#endif
}
}

Expand Down Expand Up @@ -345,16 +374,51 @@ private async Task ExecuteWithGlobalLimitAsync(
AbstractExecutableTest[] tests,
CancellationToken cancellationToken)
{
var tasks = ArrayPool<Task>.Shared.Rent(tests.Length);
try
{
for (var i = 0; i < tests.Length; i++)
#if NET6_0_OR_GREATER
// Use Parallel.ForEachAsync with explicit MaxDegreeOfParallelism
// This eliminates unbounded Task.Run calls and leverages work-stealing for efficiency
await Parallel.ForEachAsync(
tests,
new ParallelOptions
{
var test = tests[i];
tasks[i] = Task.Run(async () =>
MaxDegreeOfParallelism = _maxParallelism,
CancellationToken = cancellationToken
},
async (test, ct) =>
{
SemaphoreSlim? parallelLimiterSemaphore = null;

// Acquire parallel limiter semaphore if needed
if (test.Context.ParallelLimiter != null)
{
SemaphoreSlim? parallelLimiterSemaphore = null;
parallelLimiterSemaphore = _parallelLimitLockProvider.GetLock(test.Context.ParallelLimiter);
await parallelLimiterSemaphore.WaitAsync(ct).ConfigureAwait(false);
}

try
{
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, ct);
await test.ExecutionTask.ConfigureAwait(false);
}
finally
{
parallelLimiterSemaphore?.Release();
}
}
).ConfigureAwait(false);
#else
// Fallback for netstandard2.0: Manual bounded concurrency using existing semaphore
var tasks = new Task[tests.Length];
for (var i = 0; i < tests.Length; i++)
{
var test = tests[i];
tasks[i] = Task.Run(async () =>
{
SemaphoreSlim? parallelLimiterSemaphore = null;

await _maxParallelismSemaphore!.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (test.Context.ParallelLimiter != null)
{
parallelLimiterSemaphore = _parallelLimitLockProvider.GetLock(test.Context.ParallelLimiter);
Expand All @@ -363,30 +427,22 @@ private async Task ExecuteWithGlobalLimitAsync(

try
{
await _maxParallelismSemaphore!.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, cancellationToken);
await test.ExecutionTask.ConfigureAwait(false);
}
finally
{
_maxParallelismSemaphore.Release();
}
test.ExecutionTask ??= _testRunner.ExecuteTestAsync(test, cancellationToken);
await test.ExecutionTask.ConfigureAwait(false);
}
finally
{
parallelLimiterSemaphore?.Release();
}
}, CancellationToken.None);
}

await WaitForTasksWithFailFastHandling(new ArraySegment<Task>(tasks, 0, tests.Length), cancellationToken).ConfigureAwait(false);
}
finally
{
ArrayPool<Task>.Shared.Return(tasks);
}
finally
{
_maxParallelismSemaphore.Release();
}
}, CancellationToken.None);
}
await Task.WhenAll(tasks).ConfigureAwait(false);
#endif
}

private async Task WaitForTasksWithFailFastHandling(IEnumerable<Task> tasks, CancellationToken cancellationToken)
Expand Down
6 changes: 6 additions & 0 deletions TUnit.Engine/Services/DynamicTestQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,11 @@ public bool TryDequeue(out AbstractExecutableTest? test)
public void Complete()
{
_isCompleted = true;
_channel.Writer.Complete();
}

public ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
{
return _channel.Reader.WaitToReadAsync(cancellationToken);
}
}
Loading
Loading