-
Notifications
You must be signed in to change notification settings - Fork 5.3k
Split PortableThreadPool.WorkerThread start and loop body #84490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
lambdageek
merged 4 commits into
dotnet:main
from
lambdageek:pieces-wasm-threadpool-worker-1-worker-split
Apr 18, 2023
Merged
Changes from 1 commit
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next
Next commit
Split PortableThreadPool.WorkerThread start and loop body
For browser-wasm we will need to start the worker thread in a special way, and use callbacks to run the loop body. Current PR is just refactoring existing code. No functional change.
- Loading branch information
commit 1be7d00ef7b3866c2c88a85827fdff46f5304609
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
81 changes: 81 additions & 0 deletions
81
...System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.NonBrowser.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using System.Diagnostics.Tracing; | ||
|
|
||
| namespace System.Threading | ||
| { | ||
| internal sealed partial class PortableThreadPool | ||
| { | ||
| /// <summary> | ||
| /// The worker thread infastructure for the CLR thread pool. | ||
| /// </summary> | ||
| private static partial class WorkerThread | ||
| { | ||
|
|
||
| /// <summary> | ||
| /// Semaphore for controlling how many threads are currently working. | ||
| /// </summary> | ||
| private static readonly LowLevelLifoSemaphore s_semaphore = | ||
| new LowLevelLifoSemaphore( | ||
| 0, | ||
| MaxPossibleThreadCount, | ||
| AppContextConfigHelper.GetInt32Config( | ||
| "System.Threading.ThreadPool.UnfairSemaphoreSpinLimit", | ||
| SemaphoreSpinCountDefault, | ||
| false), | ||
| onWait: () => | ||
| { | ||
| if (NativeRuntimeEventSource.Log.IsEnabled()) | ||
| { | ||
| NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait( | ||
| (uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); | ||
| } | ||
| }); | ||
|
|
||
| private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart; | ||
|
|
||
| private static void WorkerThreadStart() | ||
| { | ||
| Thread.CurrentThread.SetThreadPoolWorkerThreadName(); | ||
|
|
||
| PortableThreadPool threadPoolInstance = ThreadPoolInstance; | ||
|
|
||
| if (NativeRuntimeEventSource.Log.IsEnabled()) | ||
| { | ||
| NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart( | ||
| (uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); | ||
| } | ||
|
|
||
| LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock; | ||
| LowLevelLifoSemaphore semaphore = s_semaphore; | ||
|
|
||
| while (true) | ||
| { | ||
| bool spinWait = true; | ||
| while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait)) | ||
| { | ||
| WorkerDoWork(threadPoolInstance, ref spinWait); | ||
| } | ||
|
|
||
| if (WorkerTimedOutMaybeStop(threadPoolInstance, threadAdjustmentLock)) | ||
| { | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| private static void CreateWorkerThread() | ||
| { | ||
| // Thread pool threads must start in the default execution context without transferring the context, so | ||
| // using UnsafeStart() instead of Start() | ||
| Thread workerThread = new Thread(s_workerThreadStart); | ||
| workerThread.IsThreadPoolThread = true; | ||
| workerThread.IsBackground = true; | ||
| // thread name will be set in thread proc | ||
| workerThread.UnsafeStart(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using System.Diagnostics.Tracing; | ||
| using System.Runtime.CompilerServices; | ||
|
|
||
| namespace System.Threading | ||
| { | ||
|
|
@@ -28,148 +29,115 @@ private static partial class WorkerThread | |
| // preexisting threads from running out of memory when using new stack space in low-memory situations. | ||
| public const int EstimatedAdditionalStackUsagePerThreadBytes = 64 << 10; // 64 KB | ||
|
|
||
| /// <summary> | ||
| /// Semaphore for controlling how many threads are currently working. | ||
| /// </summary> | ||
| private static readonly LowLevelLifoSemaphore s_semaphore = | ||
| new LowLevelLifoSemaphore( | ||
| 0, | ||
| MaxPossibleThreadCount, | ||
| AppContextConfigHelper.GetInt32Config( | ||
| "System.Threading.ThreadPool.UnfairSemaphoreSpinLimit", | ||
| SemaphoreSpinCountDefault, | ||
| false), | ||
| onWait: () => | ||
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait) | ||
| { | ||
| bool alreadyRemovedWorkingWorker = false; | ||
| while (TakeActiveRequest(threadPoolInstance)) | ||
| { | ||
| threadPoolInstance._separated.lastDequeueTime = Environment.TickCount; | ||
| if (!ThreadPoolWorkQueue.Dispatch()) | ||
| { | ||
| if (NativeRuntimeEventSource.Log.IsEnabled()) | ||
| { | ||
| NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait( | ||
| (uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); | ||
| } | ||
| }); | ||
| // ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have | ||
| // already removed this working worker in the counts. This typically happens when hill climbing | ||
| // decreases the worker thread count goal. | ||
| alreadyRemovedWorkingWorker = true; | ||
| break; | ||
| } | ||
|
|
||
| private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart; | ||
| if (threadPoolInstance._separated.numRequestedWorkers <= 0) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| private static void WorkerThreadStart() | ||
| { | ||
| Thread.CurrentThread.SetThreadPoolWorkerThreadName(); | ||
| // In highly bursty cases with short bursts of work, especially in the portable thread pool | ||
| // implementation, worker threads are being released and entering Dispatch very quickly, not finding | ||
| // much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on | ||
| // data and some interlocked operations, and similarly when the thread pool runs out of work. Since | ||
| // there is a pending request for work, introduce a slight delay before serving the next request. | ||
| // The spin-wait is mainly for when the sleep is not effective due to there being no other threads | ||
| // to schedule. | ||
| Thread.UninterruptibleSleep0(); | ||
| if (!Environment.IsSingleProcessor) | ||
| { | ||
| Thread.SpinWait(1); | ||
| } | ||
| } | ||
|
|
||
| PortableThreadPool threadPoolInstance = ThreadPoolInstance; | ||
| // Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work, | ||
| // as it's unlikely that the worker thread count goal would be increased again so soon afterwards that | ||
| // the semaphore would be released within the spin-wait window | ||
| spinWait = !alreadyRemovedWorkingWorker; | ||
|
|
||
| if (NativeRuntimeEventSource.Log.IsEnabled()) | ||
| if (!alreadyRemovedWorkingWorker) | ||
| { | ||
| NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart( | ||
| (uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); | ||
| // If we woke up but couldn't find a request, or ran out of work items to process, we need to update | ||
| // the number of working workers to reflect that we are done working for now | ||
| RemoveWorkingWorker(threadPoolInstance); | ||
| } | ||
| } | ||
|
|
||
| LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock; | ||
| LowLevelLifoSemaphore semaphore = s_semaphore; | ||
| // returns true if the worker is shutting down | ||
| // returns false if we should do another iteration | ||
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| private static bool WorkerTimedOutMaybeStop (PortableThreadPool threadPoolInstance, LowLevelLock threadAdjustmentLock) | ||
lambdageek marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| { | ||
| // The thread cannot exit if it has IO pending, otherwise the IO may be canceled | ||
| if (IsIOPending) | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| while (true) | ||
| threadAdjustmentLock.Acquire(); | ||
| try | ||
| { | ||
| bool spinWait = true; | ||
| while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait)) | ||
| // At this point, the thread's wait timed out. We are shutting down this thread. | ||
| // We are going to decrement the number of existing threads to no longer include this one | ||
| // and then change the max number of threads in the thread pool to reflect that we don't need as many | ||
| // as we had. Finally, we are going to tell hill climbing that we changed the max number of threads. | ||
| ThreadCounts counts = threadPoolInstance._separated.counts; | ||
| while (true) | ||
| { | ||
| bool alreadyRemovedWorkingWorker = false; | ||
| while (TakeActiveRequest(threadPoolInstance)) | ||
| // Since this thread is currently registered as an existing thread, if more work comes in meanwhile, | ||
| // this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not | ||
| // decreased below NumProcessingWork, as that would be indicative of such a case. | ||
| if (counts.NumExistingThreads <= counts.NumProcessingWork) | ||
| { | ||
| threadPoolInstance._separated.lastDequeueTime = Environment.TickCount; | ||
| if (!ThreadPoolWorkQueue.Dispatch()) | ||
| { | ||
| // ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have | ||
| // already removed this working worker in the counts. This typically happens when hill climbing | ||
| // decreases the worker thread count goal. | ||
| alreadyRemovedWorkingWorker = true; | ||
| break; | ||
| } | ||
|
|
||
| if (threadPoolInstance._separated.numRequestedWorkers <= 0) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| // In highly bursty cases with short bursts of work, especially in the portable thread pool | ||
| // implementation, worker threads are being released and entering Dispatch very quickly, not finding | ||
| // much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on | ||
| // data and some interlocked operations, and similarly when the thread pool runs out of work. Since | ||
| // there is a pending request for work, introduce a slight delay before serving the next request. | ||
| // The spin-wait is mainly for when the sleep is not effective due to there being no other threads | ||
| // to schedule. | ||
| Thread.UninterruptibleSleep0(); | ||
| if (!Environment.IsSingleProcessor) | ||
| { | ||
| Thread.SpinWait(1); | ||
| } | ||
| // In this case, enough work came in that this thread should not time out and should go back to work. | ||
| break; | ||
| } | ||
|
|
||
| // Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work, | ||
| // as it's unlikely that the worker thread count goal would be increased again so soon afterwards that | ||
| // the semaphore would be released within the spin-wait window | ||
| spinWait = !alreadyRemovedWorkingWorker; | ||
|
|
||
| if (!alreadyRemovedWorkingWorker) | ||
| ThreadCounts newCounts = counts; | ||
| short newNumExistingThreads = --newCounts.NumExistingThreads; | ||
| short newNumThreadsGoal = | ||
| Math.Max( | ||
| threadPoolInstance.MinThreadsGoal, | ||
| Math.Min(newNumExistingThreads, counts.NumThreadsGoal)); | ||
| newCounts.NumThreadsGoal = newNumThreadsGoal; | ||
|
|
||
| ThreadCounts oldCounts = | ||
| threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); | ||
| if (oldCounts == counts) | ||
| { | ||
| // If we woke up but couldn't find a request, or ran out of work items to process, we need to update | ||
| // the number of working workers to reflect that we are done working for now | ||
| RemoveWorkingWorker(threadPoolInstance); | ||
| } | ||
| } | ||
|
|
||
| // The thread cannot exit if it has IO pending, otherwise the IO may be canceled | ||
| if (IsIOPending) | ||
| { | ||
| continue; | ||
| } | ||
|
|
||
| threadAdjustmentLock.Acquire(); | ||
| try | ||
| { | ||
| // At this point, the thread's wait timed out. We are shutting down this thread. | ||
| // We are going to decrement the number of existing threads to no longer include this one | ||
| // and then change the max number of threads in the thread pool to reflect that we don't need as many | ||
| // as we had. Finally, we are going to tell hill climbing that we changed the max number of threads. | ||
| ThreadCounts counts = threadPoolInstance._separated.counts; | ||
| while (true) | ||
| { | ||
| // Since this thread is currently registered as an existing thread, if more work comes in meanwhile, | ||
| // this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not | ||
| // decreased below NumProcessingWork, as that would be indicative of such a case. | ||
| if (counts.NumExistingThreads <= counts.NumProcessingWork) | ||
| HillClimbing.ThreadPoolHillClimber.ForceChange( | ||
| newNumThreadsGoal, | ||
| HillClimbing.StateOrTransition.ThreadTimedOut); | ||
| if (NativeRuntimeEventSource.Log.IsEnabled()) | ||
| { | ||
| // In this case, enough work came in that this thread should not time out and should go back to work. | ||
| break; | ||
| NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads); | ||
| } | ||
|
|
||
| ThreadCounts newCounts = counts; | ||
| short newNumExistingThreads = --newCounts.NumExistingThreads; | ||
| short newNumThreadsGoal = | ||
| Math.Max( | ||
| threadPoolInstance.MinThreadsGoal, | ||
| Math.Min(newNumExistingThreads, counts.NumThreadsGoal)); | ||
| newCounts.NumThreadsGoal = newNumThreadsGoal; | ||
|
|
||
| ThreadCounts oldCounts = | ||
| threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); | ||
| if (oldCounts == counts) | ||
| { | ||
| HillClimbing.ThreadPoolHillClimber.ForceChange( | ||
| newNumThreadsGoal, | ||
| HillClimbing.StateOrTransition.ThreadTimedOut); | ||
| if (NativeRuntimeEventSource.Log.IsEnabled()) | ||
| { | ||
| NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| counts = oldCounts; | ||
| return true; | ||
| } | ||
| } | ||
| finally | ||
| { | ||
| threadAdjustmentLock.Release(); | ||
|
|
||
| counts = oldCounts; | ||
| } | ||
| } | ||
| finally | ||
| { | ||
| threadAdjustmentLock.Release(); | ||
| } | ||
| // if we get here new work came in and we're going to keep running | ||
| return false; | ||
|
||
| } | ||
|
|
||
| /// <summary> | ||
|
|
@@ -300,17 +268,6 @@ private static bool TakeActiveRequest(PortableThreadPool threadPoolInstance) | |
| } | ||
| return false; | ||
| } | ||
|
|
||
| private static void CreateWorkerThread() | ||
| { | ||
| // Thread pool threads must start in the default execution context without transferring the context, so | ||
| // using UnsafeStart() instead of Start() | ||
| Thread workerThread = new Thread(s_workerThreadStart); | ||
| workerThread.IsThreadPoolThread = true; | ||
| workerThread.IsBackground = true; | ||
| // thread name will be set in thread proc | ||
| workerThread.UnsafeStart(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.