Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
33b5c4a
[wasm] Bump emscripten to 3.1.34
radekdoulik Mar 27, 2023
dff44d4
Update emsdk deps
radekdoulik Mar 27, 2023
ce1a265
Update icu deps
radekdoulik Mar 27, 2023
ed96a04
Use new images
radekdoulik Mar 27, 2023
8e6a28c
Use emscripten_main_runtime_thread_id
radekdoulik Mar 27, 2023
81672f4
Ignore ExitStatus exceptions
radekdoulik Mar 28, 2023
c2cbb94
Handle UnhandledPromiseRejection for ExitStatus
radekdoulik Mar 28, 2023
f3edd6e
Merge branch 'main' into pr-wasm-emscripten-3-1-34
radekdoulik Mar 28, 2023
4c0cb9e
[wasm][threads] flip YieldFromDispatchLoop; specialize PortableThread…
lambdageek Mar 15, 2023
3d88608
[mono] Implement a LifoJSSemaphore
lambdageek Mar 20, 2023
55b4649
Make managed LowLevelJSSemaphore
lambdageek Mar 20, 2023
8c74dea
copy-paste PortableThreadPool.WorkerThread for threaded WASM
lambdageek Mar 20, 2023
b323f0e
fixup native code for lifo semaphore
lambdageek Mar 21, 2023
51b1fbf
fixup managed code for LowLevelJSSemaphore
lambdageek Mar 21, 2023
3727ff1
Implement PortableThreadPool loop using semaphore callbacks
lambdageek Mar 21, 2023
763edab
manage emscripten event loop from PortableThreadPool.WorkerThread
lambdageek Mar 22, 2023
b45d001
FIXME: thread equality assertion in timeout callback
lambdageek Mar 22, 2023
1f7620c
XXX REVERT ME - minimal async timeout test
lambdageek Mar 22, 2023
06a3cc1
BUGFIX: &wait_entry ===> wait_entry
lambdageek Mar 23, 2023
8574368
nit: log thread id as hex in .ts
lambdageek Mar 23, 2023
c86cb26
XXX minimal sample - fetch on a background thread works
lambdageek Mar 23, 2023
c3cad74
fix non-wasm non-threads builds
lambdageek Mar 24, 2023
e57b262
Add WebWorkerEventLoop internal class to managed event loop keepalive
lambdageek Mar 24, 2023
07eabcd
Start threadpool threads with keepalive checks
lambdageek Mar 24, 2023
474607e
HACK: kind of work around the emscripten_runtime_keepalive_push/pop n…
lambdageek Mar 24, 2023
9f45167
support JS Semaphore with --print-icall-table cross compiler
lambdageek Mar 27, 2023
2e1f31f
make minimal FetchBackground sample more like a unit test
lambdageek Mar 27, 2023
dacc0cb
Share PortableThreadPool.WorkerThread common code
lambdageek Mar 29, 2023
10ca330
make both kinds of lifo semaphore share a base struct
lambdageek Mar 30, 2023
f4a2c02
Unify LowLevelLifoSemaphore for normal and async waiting
lambdageek Mar 30, 2023
42388d8
WebWorkerEventLoop: remove dead code, update comments
lambdageek Mar 31, 2023
853938e
remove unused arg from async wait semaphore
lambdageek Mar 31, 2023
cb8b168
rename native semaphore to LifoSemaphoreAsyncWait
lambdageek Mar 31, 2023
3e8fee4
Rename managed file to LowLevelLifoSemaphore.AsyncWait.Browser.Thread…
lambdageek Mar 31, 2023
552f9a5
Remove unnecessary indirections and allocations from managed AsyncWai…
lambdageek Mar 31, 2023
812524f
fix non-browser+threads builds
lambdageek Mar 31, 2023
50c0f1a
Keep track of unsettled JS interop promises in threadpool workers
lambdageek Apr 3, 2023
c8afaba
change minimal sample's fetch helper to artificially delay
lambdageek Apr 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
copy-paste PortableThreadPool.WorkerThread for threaded WASM
no changes yet. just copying verbatim to a separate file
  • Loading branch information
lambdageek committed Mar 28, 2023
commit 8c74deaab2b74110875fe9f44040234c56eec4b3
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,297 @@ internal sealed partial class PortableThreadPool
/// </summary>
private static partial class WorkerThread
{
private const int SemaphoreSpinCountDefaultBaseline = 70;
private const int SemaphoreSpinCountDefault = SemaphoreSpinCountDefaultBaseline;

// This value represents an assumption of how much uncommitted stack space a worker thread may use in the future.
// Used in calculations to estimate when to throttle the rate of thread injection to reduce the possibility of
// preexisting threads from running out of memory when using new stack space in low-memory situations.
public const int EstimatedAdditionalStackUsagePerThreadBytes = 64 << 10; // 64 KB

/// <summary>
/// Semaphore for controlling how many threads are currently working.
/// </summary>
private static readonly LowLevelLifoSemaphore s_semaphore =
new LowLevelLifoSemaphore(
0,
MaxPossibleThreadCount,
AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
SemaphoreSpinCountDefault,
false),
onWait: () =>
{
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}
});

private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;

private static void WorkerThreadStart()
{
Thread.CurrentThread.SetThreadPoolWorkerThreadName();

PortableThreadPool threadPoolInstance = ThreadPoolInstance;

if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}

LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
LowLevelLifoSemaphore semaphore = s_semaphore;

while (true)
{
bool spinWait = true;
while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait))
{
bool alreadyRemovedWorkingWorker = false;
while (TakeActiveRequest(threadPoolInstance))
{
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
if (!ThreadPoolWorkQueue.Dispatch())
{
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
// already removed this working worker in the counts. This typically happens when hill climbing
// decreases the worker thread count goal.
alreadyRemovedWorkingWorker = true;
break;
}

if (threadPoolInstance._separated.numRequestedWorkers <= 0)
{
break;
}

// In highly bursty cases with short bursts of work, especially in the portable thread pool
// implementation, worker threads are being released and entering Dispatch very quickly, not finding
// much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on
// data and some interlocked operations, and similarly when the thread pool runs out of work. Since
// there is a pending request for work, introduce a slight delay before serving the next request.
// The spin-wait is mainly for when the sleep is not effective due to there being no other threads
// to schedule.
Thread.UninterruptibleSleep0();
if (!Environment.IsSingleProcessor)
{
Thread.SpinWait(1);
}
}

// Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work,
// as it's unlikely that the worker thread count goal would be increased again so soon afterwards that
// the semaphore would be released within the spin-wait window
spinWait = !alreadyRemovedWorkingWorker;

if (!alreadyRemovedWorkingWorker)
{
// If we woke up but couldn't find a request, or ran out of work items to process, we need to update
// the number of working workers to reflect that we are done working for now
RemoveWorkingWorker(threadPoolInstance);
}
}

// The thread cannot exit if it has IO pending, otherwise the IO may be canceled
if (IsIOPending)
{
continue;
}

threadAdjustmentLock.Acquire();
try
{
// At this point, the thread's wait timed out. We are shutting down this thread.
// We are going to decrement the number of existing threads to no longer include this one
// and then change the max number of threads in the thread pool to reflect that we don't need as many
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
// this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not
// decreased below NumProcessingWork, as that would be indicative of such a case.
if (counts.NumExistingThreads <= counts.NumProcessingWork)
{
// In this case, enough work came in that this thread should not time out and should go back to work.
break;
}

ThreadCounts newCounts = counts;
short newNumExistingThreads = --newCounts.NumExistingThreads;
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts oldCounts =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
}
return;
}

counts = oldCounts;
}
}
finally
{
threadAdjustmentLock.Release();
}
}
}

/// <summary>
/// Reduce the number of working workers by one, but maybe add back a worker (possibily this thread) if a thread request comes in while we are marking this thread as not working.
/// </summary>
private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance)
{
// A compare-exchange loop is used instead of Interlocked.Decrement or Interlocked.Add to defensively prevent
// NumProcessingWork from underflowing. See the setter for NumProcessingWork.
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
ThreadCounts newCounts = counts;
newCounts.NumProcessingWork--;

ThreadCounts countsBeforeUpdate =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
break;
}

counts = countsBeforeUpdate;
}

// It's possible that we decided we had thread requests just before a request came in,
// but reduced the worker count *after* the request came in. In this case, we might
// miss the notification of a thread request. So we wake up a thread (maybe this one!)
// if there is work to do.
if (threadPoolInstance._separated.numRequestedWorkers > 0)
{
MaybeAddWorkingWorker(threadPoolInstance);
}
}

internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance)
{
ThreadCounts counts = threadPoolInstance._separated.counts;
short numExistingThreads, numProcessingWork, newNumExistingThreads, newNumProcessingWork;
while (true)
{
numProcessingWork = counts.NumProcessingWork;
if (numProcessingWork >= counts.NumThreadsGoal)
{
return;
}

newNumProcessingWork = (short)(numProcessingWork + 1);
numExistingThreads = counts.NumExistingThreads;
newNumExistingThreads = Math.Max(numExistingThreads, newNumProcessingWork);

ThreadCounts newCounts = counts;
newCounts.NumProcessingWork = newNumProcessingWork;
newCounts.NumExistingThreads = newNumExistingThreads;

ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);

if (oldCounts == counts)
{
break;
}

counts = oldCounts;
}

int toCreate = newNumExistingThreads - numExistingThreads;
int toRelease = newNumProcessingWork - numProcessingWork;

if (toRelease > 0)
{
s_semaphore.Release(toRelease);
}

while (toCreate > 0)
{
CreateWorkerThread();
toCreate--;
}
}

/// <summary>
/// Returns if the current thread should stop processing work on the thread pool.
/// A thread should stop processing work on the thread pool when work remains only when
/// there are more worker threads in the thread pool than we currently want.
/// </summary>
/// <returns>Whether or not this thread should stop processing work even if there is still work in the queue.</returns>
internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolInstance)
{
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
// When there are more threads processing work than the thread count goal, it may have been decided
// to decrease the number of threads. Stop processing if the counts can be updated. We may have more
// threads existing than the thread count goal and that is ok, the cold ones will eventually time out if
// the thread count goal is not increased again. This logic is a bit different from the original CoreCLR
// code from which this implementation was ported, which turns a processing thread into a retired thread
// and checks for pending requests like RemoveWorkingWorker. In this implementation there are
// no retired threads, so only the count of threads processing work is considered.
if (counts.NumProcessingWork <= counts.NumThreadsGoal)
{
return false;
}

ThreadCounts newCounts = counts;
newCounts.NumProcessingWork--;

ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);

if (oldCounts == counts)
{
return true;
}
counts = oldCounts;
}
}

private static bool TakeActiveRequest(PortableThreadPool threadPoolInstance)
{
int count = threadPoolInstance._separated.numRequestedWorkers;
while (count > 0)
{
int prevCount = Interlocked.CompareExchange(ref threadPoolInstance._separated.numRequestedWorkers, count - 1, count);
if (prevCount == count)
{
return true;
}
count = prevCount;
}
return false;
}

private static void CreateWorkerThread()
{
// Thread pool threads must start in the default execution context without transferring the context, so
// using UnsafeStart() instead of Start()
Thread workerThread = new Thread(s_workerThreadStart);
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
// thread name will be set in thread proc
workerThread.UnsafeStart();
}
}
}
}