Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
33b5c4a
[wasm] Bump emscripten to 3.1.34
radekdoulik Mar 27, 2023
dff44d4
Update emsdk deps
radekdoulik Mar 27, 2023
ce1a265
Update icu deps
radekdoulik Mar 27, 2023
ed96a04
Use new images
radekdoulik Mar 27, 2023
8e6a28c
Use emscripten_main_runtime_thread_id
radekdoulik Mar 27, 2023
81672f4
Ignore ExitStatus exceptions
radekdoulik Mar 28, 2023
c2cbb94
Handle UnhandledPromiseRejection for ExitStatus
radekdoulik Mar 28, 2023
f3edd6e
Merge branch 'main' into pr-wasm-emscripten-3-1-34
radekdoulik Mar 28, 2023
4c0cb9e
[wasm][threads] flip YieldFromDispatchLoop; specialize PortableThread…
lambdageek Mar 15, 2023
3d88608
[mono] Implement a LifoJSSemaphore
lambdageek Mar 20, 2023
55b4649
Make managed LowLevelJSSemaphore
lambdageek Mar 20, 2023
8c74dea
copy-paste PortableThreadPool.WorkerThread for threaded WASM
lambdageek Mar 20, 2023
b323f0e
fixup native code for lifo semaphore
lambdageek Mar 21, 2023
51b1fbf
fixup managed code for LowLevelJSSemaphore
lambdageek Mar 21, 2023
3727ff1
Implement PortableThreadPool loop using semaphore callbacks
lambdageek Mar 21, 2023
763edab
manage emscripten event loop from PortableThreadPool.WorkerThread
lambdageek Mar 22, 2023
b45d001
FIXME: thread equality assertion in timeout callback
lambdageek Mar 22, 2023
1f7620c
XXX REVERT ME - minimal async timeout test
lambdageek Mar 22, 2023
06a3cc1
BUGFIX: &wait_entry ===> wait_entry
lambdageek Mar 23, 2023
8574368
nit: log thread id as hex in .ts
lambdageek Mar 23, 2023
c86cb26
XXX minimal sample - fetch on a background thread works
lambdageek Mar 23, 2023
c3cad74
fix non-wasm non-threads builds
lambdageek Mar 24, 2023
e57b262
Add WebWorkerEventLoop internal class to managed event loop keepalive
lambdageek Mar 24, 2023
07eabcd
Start threadpool threads with keepalive checks
lambdageek Mar 24, 2023
474607e
HACK: kind of work around the emscripten_runtime_keepalive_push/pop n…
lambdageek Mar 24, 2023
9f45167
support JS Semaphore with --print-icall-table cross compiler
lambdageek Mar 27, 2023
2e1f31f
make minimal FetchBackground sample more like a unit test
lambdageek Mar 27, 2023
dacc0cb
Share PortableThreadPool.WorkerThread common code
lambdageek Mar 29, 2023
10ca330
make both kinds of lifo semaphore share a base struct
lambdageek Mar 30, 2023
f4a2c02
Unify LowLevelLifoSemaphore for normal and async waiting
lambdageek Mar 30, 2023
42388d8
WebWorkerEventLoop: remove dead code, update comments
lambdageek Mar 31, 2023
853938e
remove unused arg from async wait semaphore
lambdageek Mar 31, 2023
cb8b168
rename native semaphore to LifoSemaphoreAsyncWait
lambdageek Mar 31, 2023
3e8fee4
Rename managed file to LowLevelLifoSemaphore.AsyncWait.Browser.Thread…
lambdageek Mar 31, 2023
552f9a5
Remove unnecessary indirections and allocations from managed AsyncWai…
lambdageek Mar 31, 2023
812524f
fix non-browser+threads builds
lambdageek Mar 31, 2023
50c0f1a
Keep track of unsettled JS interop promises in threadpool workers
lambdageek Apr 3, 2023
c8afaba
change minimal sample's fetch helper to artificially delay
lambdageek Apr 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Share PortableThreadPool.WorkerThread common code
Share the code between non-browser implementations and
browser+threads.

The differences are just in how the work loop is started and implemented
  • Loading branch information
lambdageek committed Mar 30, 2023
commit dacc0cb728b5df20bb977a4a8cb5e8c5cd1e9ba1
Original file line number Diff line number Diff line change
Expand Up @@ -2523,6 +2523,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.ThreadCounts.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WaitThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.NonBrowser.cs" Condition="'$(TargetsBrowser)' != 'true'"/>
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerTracking.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Unix.cs" Condition="'$(TargetsUnix)' == 'true' or '$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.Tracing;

namespace System.Threading
{
internal sealed partial class PortableThreadPool
{
/// <summary>
/// The worker thread infastructure for the CLR thread pool.
/// </summary>
private static partial class WorkerThread
{

/// <summary>
/// Semaphore for controlling how many threads are currently working.
/// </summary>
private static readonly LowLevelLifoSemaphore s_semaphore =
new LowLevelLifoSemaphore(
0,
MaxPossibleThreadCount,
AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
SemaphoreSpinCountDefault,
false),
onWait: () =>
{
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}
});

private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;

private static void WorkerThreadStart()
{
Thread.CurrentThread.SetThreadPoolWorkerThreadName();

PortableThreadPool threadPoolInstance = ThreadPoolInstance;

if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}

LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
LowLevelLifoSemaphore semaphore = s_semaphore;

while (true)
{
bool spinWait = true;
while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait))
{
WorkerDoWork(threadPoolInstance, ref spinWait);
}

if (WorkerTimedOutMaybeStop(threadPoolInstance, threadAdjustmentLock))
{
break;
}
}
}


private static void CreateWorkerThread()
{
// Thread pool threads must start in the default execution context without transferring the context, so
// using UnsafeStart() instead of Start()
Thread workerThread = new Thread(s_workerThreadStart);
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
// thread name will be set in thread proc
workerThread.UnsafeStart();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.Tracing;
using System.Runtime.CompilerServices;

namespace System.Threading
{
internal sealed partial class PortableThreadPool
{
#if !(TARGET_BROWSER && FEATURE_WASM_THREADS)
/// <summary>
/// The worker thread infastructure for the CLR thread pool.
/// </summary>
Expand All @@ -29,148 +29,115 @@ private static partial class WorkerThread
// preexisting threads from running out of memory when using new stack space in low-memory situations.
public const int EstimatedAdditionalStackUsagePerThreadBytes = 64 << 10; // 64 KB

/// <summary>
/// Semaphore for controlling how many threads are currently working.
/// </summary>
private static readonly LowLevelLifoSemaphore s_semaphore =
new LowLevelLifoSemaphore(
0,
MaxPossibleThreadCount,
AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
SemaphoreSpinCountDefault,
false),
onWait: () =>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait)
{
bool alreadyRemovedWorkingWorker = false;
while (TakeActiveRequest(threadPoolInstance))
{
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
if (!ThreadPoolWorkQueue.Dispatch())
{
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}
});
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
// already removed this working worker in the counts. This typically happens when hill climbing
// decreases the worker thread count goal.
alreadyRemovedWorkingWorker = true;
break;
}

private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;
if (threadPoolInstance._separated.numRequestedWorkers <= 0)
{
break;
}

private static void WorkerThreadStart()
{
Thread.CurrentThread.SetThreadPoolWorkerThreadName();
// In highly bursty cases with short bursts of work, especially in the portable thread pool
// implementation, worker threads are being released and entering Dispatch very quickly, not finding
// much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on
// data and some interlocked operations, and similarly when the thread pool runs out of work. Since
// there is a pending request for work, introduce a slight delay before serving the next request.
// The spin-wait is mainly for when the sleep is not effective due to there being no other threads
// to schedule.
Thread.UninterruptibleSleep0();
if (!Environment.IsSingleProcessor)
{
Thread.SpinWait(1);
}
}

PortableThreadPool threadPoolInstance = ThreadPoolInstance;
// Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work,
// as it's unlikely that the worker thread count goal would be increased again so soon afterwards that
// the semaphore would be released within the spin-wait window
spinWait = !alreadyRemovedWorkingWorker;

if (NativeRuntimeEventSource.Log.IsEnabled())
if (!alreadyRemovedWorkingWorker)
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
// If we woke up but couldn't find a request, or ran out of work items to process, we need to update
// the number of working workers to reflect that we are done working for now
RemoveWorkingWorker(threadPoolInstance);
}
}

LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
LowLevelLifoSemaphore semaphore = s_semaphore;
// returns true if the worker is shutting down
// returns false if we should do another iteration
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool WorkerTimedOutMaybeStop (PortableThreadPool threadPoolInstance, LowLevelLock threadAdjustmentLock)
{
// The thread cannot exit if it has IO pending, otherwise the IO may be canceled
if (IsIOPending)
{
return false;
}

while (true)
threadAdjustmentLock.Acquire();
try
{
bool spinWait = true;
while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait))
// At this point, the thread's wait timed out. We are shutting down this thread.
// We are going to decrement the number of existing threads to no longer include this one
// and then change the max number of threads in the thread pool to reflect that we don't need as many
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
bool alreadyRemovedWorkingWorker = false;
while (TakeActiveRequest(threadPoolInstance))
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
// this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not
// decreased below NumProcessingWork, as that would be indicative of such a case.
if (counts.NumExistingThreads <= counts.NumProcessingWork)
{
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
if (!ThreadPoolWorkQueue.Dispatch())
{
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
// already removed this working worker in the counts. This typically happens when hill climbing
// decreases the worker thread count goal.
alreadyRemovedWorkingWorker = true;
break;
}

if (threadPoolInstance._separated.numRequestedWorkers <= 0)
{
break;
}

// In highly bursty cases with short bursts of work, especially in the portable thread pool
// implementation, worker threads are being released and entering Dispatch very quickly, not finding
// much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on
// data and some interlocked operations, and similarly when the thread pool runs out of work. Since
// there is a pending request for work, introduce a slight delay before serving the next request.
// The spin-wait is mainly for when the sleep is not effective due to there being no other threads
// to schedule.
Thread.UninterruptibleSleep0();
if (!Environment.IsSingleProcessor)
{
Thread.SpinWait(1);
}
// In this case, enough work came in that this thread should not time out and should go back to work.
break;
}

// Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work,
// as it's unlikely that the worker thread count goal would be increased again so soon afterwards that
// the semaphore would be released within the spin-wait window
spinWait = !alreadyRemovedWorkingWorker;

if (!alreadyRemovedWorkingWorker)
ThreadCounts newCounts = counts;
short newNumExistingThreads = --newCounts.NumExistingThreads;
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts oldCounts =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
// If we woke up but couldn't find a request, or ran out of work items to process, we need to update
// the number of working workers to reflect that we are done working for now
RemoveWorkingWorker(threadPoolInstance);
}
}

// The thread cannot exit if it has IO pending, otherwise the IO may be canceled
if (IsIOPending)
{
continue;
}

threadAdjustmentLock.Acquire();
try
{
// At this point, the thread's wait timed out. We are shutting down this thread.
// We are going to decrement the number of existing threads to no longer include this one
// and then change the max number of threads in the thread pool to reflect that we don't need as many
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
// this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not
// decreased below NumProcessingWork, as that would be indicative of such a case.
if (counts.NumExistingThreads <= counts.NumProcessingWork)
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
// In this case, enough work came in that this thread should not time out and should go back to work.
break;
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
}

ThreadCounts newCounts = counts;
short newNumExistingThreads = --newCounts.NumExistingThreads;
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts oldCounts =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
}
return;
}

counts = oldCounts;
return true;
}
}
finally
{
threadAdjustmentLock.Release();

counts = oldCounts;
}
}
finally
{
threadAdjustmentLock.Release();
}
// if we get here new work came in and we're going to keep running
return false;
}

/// <summary>
Expand Down Expand Up @@ -301,18 +268,6 @@ private static bool TakeActiveRequest(PortableThreadPool threadPoolInstance)
}
return false;
}

private static void CreateWorkerThread()
{
// Thread pool threads must start in the default execution context without transferring the context, so
// using UnsafeStart() instead of Start()
Thread workerThread = new Thread(s_workerThreadStart);
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
// thread name will be set in thread proc
workerThread.UnsafeStart();
}
}
#endif // !(TARGET_BROWSER && FEATURE_WASM_THREADS)
}
}
Loading