diff --git a/eng/Version.Details.xml b/eng/Version.Details.xml index 1a77ac400efe2a..cb9728e5d3e8a4 100644 --- a/eng/Version.Details.xml +++ b/eng/Version.Details.xml @@ -1,8 +1,8 @@ - + https://github.com/dotnet/icu - b3cb54fec4eb845b37038c934d6c9dc17cdfb181 + 389d19d09d3cf16ec0143dba065fcd704ab8e48c https://github.com/dotnet/msquic @@ -85,9 +85,9 @@ f32e148d67dbf348685c3076a37e8bc68ab3a30f - + https://github.com/dotnet/emsdk - a464820353b7956538b07c9b53103d793b5e15b6 + e4089ed2abe29bdc25bab2c261940175d0846824 diff --git a/eng/Versions.props b/eng/Versions.props index c8ed55231d9f9d..abe7d74f347a60 100644 --- a/eng/Versions.props +++ b/eng/Versions.props @@ -216,7 +216,7 @@ 0.11.4-alpha.23163.1 - 8.0.0-preview.3.23163.3 + 8.0.0-preview.4.23177.3 2.1.7 8.0.0-alpha.1.23166.1 @@ -235,7 +235,7 @@ Note: when the name is updated, make sure to update dependency name in eng/pipelines/common/xplat-setup.yml like - DarcDependenciesChanged.Microsoft_NET_Workload_Emscripten_Current_Manifest-8_0_100_Transport --> - 8.0.0-preview.4.23170.1 + 8.0.0-preview.4.23177.1 $(MicrosoftNETWorkloadEmscriptenCurrentManifest80100TransportVersion) 1.1.87-gba258badda diff --git a/eng/pipelines/common/templates/pipeline-with-resources.yml b/eng/pipelines/common/templates/pipeline-with-resources.yml index de16160c88be75..e5c710bd7b8216 100644 --- a/eng/pipelines/common/templates/pipeline-with-resources.yml +++ b/eng/pipelines/common/templates/pipeline-with-resources.yml @@ -60,10 +60,10 @@ resources: ROOTFS_DIR: /crossrootfs/ppc64le - container: browser_wasm - image: mcr.microsoft.com/dotnet-buildtools/prereqs:ubuntu-18.04-webassembly-net8-20230322221728-80fdceb + image: mcr.microsoft.com/dotnet-buildtools/prereqs:ubuntu-18.04-webassembly-net8-20230327150025-4404b5c - container: wasi_wasm - image: mcr.microsoft.com/dotnet-buildtools/prereqs:ubuntu-20.04-webassembly-net8-20230322221804-80fdceb + image: mcr.microsoft.com/dotnet-buildtools/prereqs:ubuntu-20.04-webassembly-net8-20230327150037-4404b5c - container: freebsd_x64 image: mcr.microsoft.com/dotnet-buildtools/prereqs:ubuntu-18.04-cross-freebsd-12 diff --git a/eng/pipelines/libraries/helix-queues-setup.yml b/eng/pipelines/libraries/helix-queues-setup.yml index 32681c5466b018..6c6837a74938ac 100644 --- a/eng/pipelines/libraries/helix-queues-setup.yml +++ b/eng/pipelines/libraries/helix-queues-setup.yml @@ -201,6 +201,6 @@ jobs: # Browser WebAssembly windows - ${{ if in(parameters.platform, 'browser_wasm_win', 'wasi_wasm_win') }}: - - (Windows.Amd64.Server2022.Open)windows.amd64.server2022.open@mcr.microsoft.com/dotnet-buildtools/prereqs:windowsservercore-ltsc2022-helix-webassembly-net8-20230319084205-80fdceb + - (Windows.Amd64.Server2022.Open)windows.amd64.server2022.open@mcr.microsoft.com/dotnet-buildtools/prereqs:windowsservercore-ltsc2022-helix-webassembly-net8-20230327150108-4404b5c ${{ insert }}: ${{ parameters.jobParameters }} diff --git a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems index f969f5c39921a4..9f1b2cbb2a1c14 100644 --- a/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems +++ b/src/libraries/System.Private.CoreLib/src/System.Private.CoreLib.Shared.projitems @@ -2523,8 +2523,9 @@ + - + diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs index 25a229c87ef66a..b720761be85ee2 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs @@ -39,6 +39,9 @@ public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount, int public bool Wait(int timeoutMs, bool spinWait) { +#if TARGET_BROWSER && FEATURE_WASM_THREADS + ThrowIfInvalidSemaphoreKind(LifoSemaphoreKind.Normal); +#endif Debug.Assert(timeoutMs >= -1); int spinCount = spinWait ? _spinCount : 0; diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.NonBrowser.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.NonBrowser.cs new file mode 100644 index 00000000000000..c60b5177d87845 --- /dev/null +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.NonBrowser.cs @@ -0,0 +1,81 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.Tracing; + +namespace System.Threading +{ + internal sealed partial class PortableThreadPool + { + /// + /// The worker thread infastructure for the CLR thread pool. + /// + private static partial class WorkerThread + { + + /// + /// Semaphore for controlling how many threads are currently working. + /// + private static readonly LowLevelLifoSemaphore s_semaphore = + new LowLevelLifoSemaphore( + 0, + MaxPossibleThreadCount, + AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.UnfairSemaphoreSpinLimit", + SemaphoreSpinCountDefault, + false), + onWait: () => + { + if (NativeRuntimeEventSource.Log.IsEnabled()) + { + NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait( + (uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); + } + }); + + private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart; + + private static void WorkerThreadStart() + { + Thread.CurrentThread.SetThreadPoolWorkerThreadName(); + + PortableThreadPool threadPoolInstance = ThreadPoolInstance; + + if (NativeRuntimeEventSource.Log.IsEnabled()) + { + NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart( + (uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); + } + + LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock; + LowLevelLifoSemaphore semaphore = s_semaphore; + + while (true) + { + bool spinWait = true; + while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait)) + { + WorkerDoWork(threadPoolInstance, ref spinWait); + } + + if (WorkerTimedOutMaybeStop(threadPoolInstance, threadAdjustmentLock)) + { + break; + } + } + } + + + private static void CreateWorkerThread() + { + // Thread pool threads must start in the default execution context without transferring the context, so + // using UnsafeStart() instead of Start() + Thread workerThread = new Thread(s_workerThreadStart); + workerThread.IsThreadPoolThread = true; + workerThread.IsBackground = true; + // thread name will be set in thread proc + workerThread.UnsafeStart(); + } + } + } +} diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs index 96578b9de6b8dd..4836a03385d4e8 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Diagnostics.Tracing; +using System.Runtime.CompilerServices; namespace System.Threading { @@ -28,148 +29,115 @@ private static partial class WorkerThread // preexisting threads from running out of memory when using new stack space in low-memory situations. public const int EstimatedAdditionalStackUsagePerThreadBytes = 64 << 10; // 64 KB - /// - /// Semaphore for controlling how many threads are currently working. - /// - private static readonly LowLevelLifoSemaphore s_semaphore = - new LowLevelLifoSemaphore( - 0, - MaxPossibleThreadCount, - AppContextConfigHelper.GetInt32Config( - "System.Threading.ThreadPool.UnfairSemaphoreSpinLimit", - SemaphoreSpinCountDefault, - false), - onWait: () => + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait) + { + bool alreadyRemovedWorkingWorker = false; + while (TakeActiveRequest(threadPoolInstance)) + { + threadPoolInstance._separated.lastDequeueTime = Environment.TickCount; + if (!ThreadPoolWorkQueue.Dispatch()) { - if (NativeRuntimeEventSource.Log.IsEnabled()) - { - NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait( - (uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); - } - }); + // ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have + // already removed this working worker in the counts. This typically happens when hill climbing + // decreases the worker thread count goal. + alreadyRemovedWorkingWorker = true; + break; + } - private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart; + if (threadPoolInstance._separated.numRequestedWorkers <= 0) + { + break; + } - private static void WorkerThreadStart() - { - Thread.CurrentThread.SetThreadPoolWorkerThreadName(); + // In highly bursty cases with short bursts of work, especially in the portable thread pool + // implementation, worker threads are being released and entering Dispatch very quickly, not finding + // much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on + // data and some interlocked operations, and similarly when the thread pool runs out of work. Since + // there is a pending request for work, introduce a slight delay before serving the next request. + // The spin-wait is mainly for when the sleep is not effective due to there being no other threads + // to schedule. + Thread.UninterruptibleSleep0(); + if (!Environment.IsSingleProcessor) + { + Thread.SpinWait(1); + } + } - PortableThreadPool threadPoolInstance = ThreadPoolInstance; + // Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work, + // as it's unlikely that the worker thread count goal would be increased again so soon afterwards that + // the semaphore would be released within the spin-wait window + spinWait = !alreadyRemovedWorkingWorker; - if (NativeRuntimeEventSource.Log.IsEnabled()) + if (!alreadyRemovedWorkingWorker) { - NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart( - (uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); + // If we woke up but couldn't find a request, or ran out of work items to process, we need to update + // the number of working workers to reflect that we are done working for now + RemoveWorkingWorker(threadPoolInstance); } + } - LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock; - LowLevelLifoSemaphore semaphore = s_semaphore; + // returns true if the worker is shutting down + // returns false if we should do another iteration + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static bool WorkerTimedOutMaybeStop (PortableThreadPool threadPoolInstance, LowLevelLock threadAdjustmentLock) + { + // The thread cannot exit if it has IO pending, otherwise the IO may be canceled + if (IsIOPending) + { + return false; + } - while (true) + threadAdjustmentLock.Acquire(); + try { - bool spinWait = true; - while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait)) + // At this point, the thread's wait timed out. We are shutting down this thread. + // We are going to decrement the number of existing threads to no longer include this one + // and then change the max number of threads in the thread pool to reflect that we don't need as many + // as we had. Finally, we are going to tell hill climbing that we changed the max number of threads. + ThreadCounts counts = threadPoolInstance._separated.counts; + while (true) { - bool alreadyRemovedWorkingWorker = false; - while (TakeActiveRequest(threadPoolInstance)) + // Since this thread is currently registered as an existing thread, if more work comes in meanwhile, + // this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not + // decreased below NumProcessingWork, as that would be indicative of such a case. + if (counts.NumExistingThreads <= counts.NumProcessingWork) { - threadPoolInstance._separated.lastDequeueTime = Environment.TickCount; - if (!ThreadPoolWorkQueue.Dispatch()) - { - // ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have - // already removed this working worker in the counts. This typically happens when hill climbing - // decreases the worker thread count goal. - alreadyRemovedWorkingWorker = true; - break; - } - - if (threadPoolInstance._separated.numRequestedWorkers <= 0) - { - break; - } - - // In highly bursty cases with short bursts of work, especially in the portable thread pool - // implementation, worker threads are being released and entering Dispatch very quickly, not finding - // much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on - // data and some interlocked operations, and similarly when the thread pool runs out of work. Since - // there is a pending request for work, introduce a slight delay before serving the next request. - // The spin-wait is mainly for when the sleep is not effective due to there being no other threads - // to schedule. - Thread.UninterruptibleSleep0(); - if (!Environment.IsSingleProcessor) - { - Thread.SpinWait(1); - } + // In this case, enough work came in that this thread should not time out and should go back to work. + break; } - // Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work, - // as it's unlikely that the worker thread count goal would be increased again so soon afterwards that - // the semaphore would be released within the spin-wait window - spinWait = !alreadyRemovedWorkingWorker; - - if (!alreadyRemovedWorkingWorker) + ThreadCounts newCounts = counts; + short newNumExistingThreads = --newCounts.NumExistingThreads; + short newNumThreadsGoal = + Math.Max( + threadPoolInstance.MinThreadsGoal, + Math.Min(newNumExistingThreads, counts.NumThreadsGoal)); + newCounts.NumThreadsGoal = newNumThreadsGoal; + + ThreadCounts oldCounts = + threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); + if (oldCounts == counts) { - // If we woke up but couldn't find a request, or ran out of work items to process, we need to update - // the number of working workers to reflect that we are done working for now - RemoveWorkingWorker(threadPoolInstance); - } - } - - // The thread cannot exit if it has IO pending, otherwise the IO may be canceled - if (IsIOPending) - { - continue; - } - - threadAdjustmentLock.Acquire(); - try - { - // At this point, the thread's wait timed out. We are shutting down this thread. - // We are going to decrement the number of existing threads to no longer include this one - // and then change the max number of threads in the thread pool to reflect that we don't need as many - // as we had. Finally, we are going to tell hill climbing that we changed the max number of threads. - ThreadCounts counts = threadPoolInstance._separated.counts; - while (true) - { - // Since this thread is currently registered as an existing thread, if more work comes in meanwhile, - // this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not - // decreased below NumProcessingWork, as that would be indicative of such a case. - if (counts.NumExistingThreads <= counts.NumProcessingWork) + HillClimbing.ThreadPoolHillClimber.ForceChange( + newNumThreadsGoal, + HillClimbing.StateOrTransition.ThreadTimedOut); + if (NativeRuntimeEventSource.Log.IsEnabled()) { - // In this case, enough work came in that this thread should not time out and should go back to work. - break; + NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads); } - - ThreadCounts newCounts = counts; - short newNumExistingThreads = --newCounts.NumExistingThreads; - short newNumThreadsGoal = - Math.Max( - threadPoolInstance.MinThreadsGoal, - Math.Min(newNumExistingThreads, counts.NumThreadsGoal)); - newCounts.NumThreadsGoal = newNumThreadsGoal; - - ThreadCounts oldCounts = - threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); - if (oldCounts == counts) - { - HillClimbing.ThreadPoolHillClimber.ForceChange( - newNumThreadsGoal, - HillClimbing.StateOrTransition.ThreadTimedOut); - if (NativeRuntimeEventSource.Log.IsEnabled()) - { - NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads); - } - return; - } - - counts = oldCounts; + return true; } - } - finally - { - threadAdjustmentLock.Release(); + + counts = oldCounts; } } + finally + { + threadAdjustmentLock.Release(); + } + // if we get here new work came in and we're going to keep running + return false; } /// @@ -300,17 +268,6 @@ private static bool TakeActiveRequest(PortableThreadPool threadPoolInstance) } return false; } - - private static void CreateWorkerThread() - { - // Thread pool threads must start in the default execution context without transferring the context, so - // using UnsafeStart() instead of Start() - Thread workerThread = new Thread(s_workerThreadStart); - workerThread.IsThreadPoolThread = true; - workerThread.IsBackground = true; - // thread name will be set in thread proc - workerThread.UnsafeStart(); - } } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs index 39e1d6453263e7..a9c4e038129a48 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Portable.cs @@ -19,7 +19,9 @@ public static partial class ThreadPool { // Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that // the runtime may use the thread for processing other work +#if !(TARGET_BROWSER && FEATURE_WASM_THREADS) internal static bool YieldFromDispatchLoop => false; +#endif #if NATIVEAOT private const bool IsWorkerTrackingEnabledInConfig = false; diff --git a/src/mono/System.Private.CoreLib/System.Private.CoreLib.csproj b/src/mono/System.Private.CoreLib/System.Private.CoreLib.csproj index 0e3a627d8f9613..d05b85bbaad79d 100644 --- a/src/mono/System.Private.CoreLib/System.Private.CoreLib.csproj +++ b/src/mono/System.Private.CoreLib/System.Private.CoreLib.csproj @@ -280,6 +280,11 @@ + + + + + diff --git a/src/mono/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.AsyncWait.Browser.Threads.Mono.cs b/src/mono/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.AsyncWait.Browser.Threads.Mono.cs new file mode 100644 index 00000000000000..d4a32a7604b64f --- /dev/null +++ b/src/mono/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.AsyncWait.Browser.Threads.Mono.cs @@ -0,0 +1,222 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +namespace System.Threading; + +// +// This class provides a way for browser threads to asynchronously wait for a semaphore +// from JS, without using the threadpool. It is used to implement threadpool workers. +// +internal sealed partial class LowLevelLifoSemaphore : IDisposable +{ + public static LowLevelLifoSemaphore CreateAsyncWaitSemaphore (int initialSignalCount, int maximumSignalCount, int spinCount, Action onWait) + { + return new LowLevelLifoSemaphore(initialSignalCount, maximumSignalCount, spinCount, onWait, asyncWait: true); + } + + private LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount, int spinCount, Action onWait, bool asyncWait) + { + Debug.Assert(initialSignalCount >= 0); + Debug.Assert(initialSignalCount <= maximumSignalCount); + Debug.Assert(maximumSignalCount > 0); + Debug.Assert(spinCount >= 0); + Debug.Assert(asyncWait); + + _separated = default; + _separated._counts.SignalCount = (uint)initialSignalCount; + _maximumSignalCount = maximumSignalCount; + _spinCount = spinCount; + _onWait = onWait; + + CreateAsyncWait(maximumSignalCount); + } + +#pragma warning disable IDE0060 + private void CreateAsyncWait(int maximumSignalCount) + { + Kind = LifoSemaphoreKind.AsyncWait; + lifo_semaphore = InitInternal((int)Kind); + } +#pragma warning restore IDE0060 + + private sealed record WaitEntry (LowLevelLifoSemaphore Semaphore, int TimeoutMs, Action OnSuccess, Action OnTimeout, object? State); + + public void PrepareAsyncWait(int timeoutMs, Action onSuccess, Action onTimeout, object? state) + { + //FIXME(ak): the async wait never spins. Shoudl we spin a little? + Debug.Assert(timeoutMs >= -1); + ThrowIfInvalidSemaphoreKind(LifoSemaphoreKind.AsyncWait); + + // Try to acquire the semaphore or + // [[a) register as a spinner if false and timeoutMs > 0]] + // b) register as a waiter if [[there's already too many spinners or]] true and timeoutMs > 0 + // c) bail out if timeoutMs == 0 and return false + Counts counts = _separated._counts; + while (true) + { + Debug.Assert(counts.SignalCount <= _maximumSignalCount); + Counts newCounts = counts; + if (counts.SignalCount != 0) + { + newCounts.DecrementSignalCount(); + } + else if (timeoutMs != 0) + { + // Maximum number of spinners reached, register as a waiter instead + newCounts.IncrementWaiterCount(); + } + + Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + if (counts.SignalCount != 0) + { + onSuccess (this, state); + return; + } + if (newCounts.WaiterCount != counts.WaiterCount) + { + PrepareAsyncWaitForSignal(timeoutMs, onSuccess, onTimeout, state); + return; + } + if (timeoutMs == 0) + { + onTimeout (this, state); + return; + } + break; + } + + counts = countsBeforeUpdate; + } + + Debug.Fail("unreachable"); + +#if false + // Unregister as spinner, and acquire the semaphore or register as a waiter + counts = _separated._counts; + while (true) + { + Counts newCounts = counts; + if (counts.SignalCount != 0) + { + newCounts.DecrementSignalCount(); + } + else + { + newCounts.IncrementWaiterCount(); + } + + Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + return counts.SignalCount != 0 || WaitForSignal(timeoutMs); + } + + counts = countsBeforeUpdate; + } +#endif + } + + private void PrepareAsyncWaitForSignal(int timeoutMs, Action onSuccess, Action onTimeout, object? state) + { + Debug.Assert(timeoutMs > 0 || timeoutMs == -1); + + _onWait(); + + PrepareAsyncWaitCore(timeoutMs, new WaitEntry(this, timeoutMs, onSuccess, onTimeout, state)); + // on success calls InternalAsyncWaitSuccess, on timeout calls InternalAsyncWaitTimeout + } + + private static void InternalAsyncWaitTimeout(LowLevelLifoSemaphore self, WaitEntry internalWaitEntry) + { + WaitEntry we = internalWaitEntry!; + // Unregister the waiter. The wait subsystem used above guarantees that a thread that wakes due to a timeout does + // not observe a signal to the object being waited upon. + self._separated._counts.InterlockedDecrementWaiterCount(); + we.OnTimeout(self, we.State); + } + + private static void InternalAsyncWaitSuccess(LowLevelLifoSemaphore self, WaitEntry internalWaitEntry) + { + WaitEntry we = internalWaitEntry!; + // Unregister the waiter if this thread will not be waiting anymore, and try to acquire the semaphore + Counts counts = self._separated._counts; + while (true) + { + Debug.Assert(counts.WaiterCount != 0); + Counts newCounts = counts; + if (counts.SignalCount != 0) + { + newCounts.DecrementSignalCount(); + newCounts.DecrementWaiterCount(); + } + + // This waiter has woken up and this needs to be reflected in the count of waiters signaled to wake + if (counts.CountOfWaitersSignaledToWake != 0) + { + newCounts.DecrementCountOfWaitersSignaledToWake(); + } + + Counts countsBeforeUpdate = self._separated._counts.InterlockedCompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + if (counts.SignalCount != 0) + { + we.OnSuccess(self, we.State); + return; + } + break; + } + + counts = countsBeforeUpdate; + } + // if we get here, we need to keep waiting because the SignalCount above was 0 after we did + // the CompareExchange - someone took the signal before us. + // FIXME(ak): why is the timeoutMs the same as before? wouldn't we starve? why does LowLevelLifoSemaphore.WaitForSignal not decrement timeoutMs? + self.PrepareAsyncWaitCore (we.TimeoutMs, we); + // on success calls InternalAsyncWaitSuccess, on timeout calls InternalAsyncWaitTimeout + } + + private void PrepareAsyncWaitCore(int timeout_ms, WaitEntry internalWaitEntry) + { + GCHandle gchandle = GCHandle.Alloc (internalWaitEntry); + unsafe { + delegate* unmanaged successCallback = &SuccessCallback; + delegate* unmanaged timeoutCallback = &TimeoutCallback; + PrepareAsyncWaitInternal (lifo_semaphore, timeout_ms, successCallback, timeoutCallback, GCHandle.ToIntPtr(gchandle)); + } + } + + [MethodImpl(MethodImplOptions.InternalCall)] + private static extern unsafe void PrepareAsyncWaitInternal(IntPtr semaphore, + int timeoutMs, + /*delegate* unmanaged successCallback*/ void* successCallback, + /*delegate* unmanaged timeoutCallback*/ void* timeoutCallback, + IntPtr userData); + + [UnmanagedCallersOnly] + private static void SuccessCallback(IntPtr lifoSemaphore, IntPtr userData) + { + GCHandle gchandle = GCHandle.FromIntPtr(userData); + WaitEntry internalWaitEntry = (WaitEntry)gchandle.Target!; + gchandle.Free(); + InternalAsyncWaitSuccess(internalWaitEntry.Semaphore, internalWaitEntry); + } + + [UnmanagedCallersOnly] + private static void TimeoutCallback(IntPtr lifoSemaphore, IntPtr userData) + { + GCHandle gchandle = GCHandle.FromIntPtr(userData); + WaitEntry internalWaitEntry = (WaitEntry)gchandle.Target!; + gchandle.Free(); + InternalAsyncWaitTimeout(internalWaitEntry.Semaphore, internalWaitEntry); + } + +} diff --git a/src/mono/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Unix.Mono.cs b/src/mono/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Unix.Mono.cs index 180f802ed84ca0..7064e3091913d2 100644 --- a/src/mono/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Unix.Mono.cs +++ b/src/mono/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.Unix.Mono.cs @@ -8,14 +8,46 @@ namespace System.Threading internal sealed unsafe partial class LowLevelLifoSemaphore : IDisposable { private IntPtr lifo_semaphore; +#if TARGET_BROWSER && FEATURE_WASM_THREADS + private LifoSemaphoreKind _kind; +#endif + +#pragma warning disable CA1822 + private LifoSemaphoreKind Kind +#pragma warning restore CA1822 + { + get + { +#if TARGET_BROWSER && FEATURE_WASM_THREADS + return _kind; +#else + return LifoSemaphoreKind.Normal; +#endif + } + set + { +#if TARGET_BROWSER && FEATURE_WASM_THREADS + _kind = value; +#endif + } + } + + // Keep in sync with lifo-semaphore.h + private enum LifoSemaphoreKind : int { + Normal = 1, +#if TARGET_BROWSER && FEATURE_WASM_THREADS + AsyncWait = 2, +#endif + } [MethodImplAttribute(MethodImplOptions.InternalCall)] - private static extern IntPtr InitInternal(); + private static extern IntPtr InitInternal(int kind); #pragma warning disable IDE0060 private void Create(int maximumSignalCount) { - lifo_semaphore = InitInternal(); + Kind = LifoSemaphoreKind.Normal; + lifo_semaphore = InitInternal((int)Kind); } #pragma warning restore IDE0060 @@ -26,6 +58,7 @@ public void Dispose() { DeleteInternal(lifo_semaphore); lifo_semaphore = IntPtr.Zero; + Kind = (LifoSemaphoreKind)0; } [MethodImplAttribute(MethodImplOptions.InternalCall)] @@ -33,9 +66,20 @@ public void Dispose() private bool WaitCore(int timeoutMs) { + ThrowIfInvalidSemaphoreKind(LifoSemaphoreKind.Normal); return TimedWaitInternal(lifo_semaphore, timeoutMs) != 0; } +#pragma warning disable CA1822 + private void ThrowIfInvalidSemaphoreKind(LifoSemaphoreKind expected) +#pragma warning restore CA1822 + { +#if TARGET_BROWSER && FEATURE_WASM_THREADS + if (_kind != expected) + throw new InvalidOperationException ($"Unexpected LowLevelLifoSemaphore kind {_kind} expected {expected}"); +#endif + } + [MethodImplAttribute(MethodImplOptions.InternalCall)] private static extern void ReleaseInternal(IntPtr semaphore, int count); diff --git a/src/mono/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Browser.Threads.Mono.cs b/src/mono/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Browser.Threads.Mono.cs new file mode 100644 index 00000000000000..d459c992f810f3 --- /dev/null +++ b/src/mono/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Browser.Threads.Mono.cs @@ -0,0 +1,19 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Threading; + +internal sealed partial class PortableThreadPool +{ + private static partial class WorkerThread + { + private static bool IsIOPending => WebWorkerEventLoop.HasUnsettledInteropPromises; + } + + private struct CpuUtilizationReader + { +#pragma warning disable CA1822 + public double CurrentUtilization => 0.0; // FIXME: can we do better +#pragma warning restore CA1822 + } +} diff --git a/src/mono/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.Browser.Threads.Mono.cs b/src/mono/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.Browser.Threads.Mono.cs new file mode 100644 index 00000000000000..52f7b03e699c66 --- /dev/null +++ b/src/mono/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.Browser.Threads.Mono.cs @@ -0,0 +1,122 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.Tracing; +using System.Runtime.CompilerServices; + +namespace System.Threading +{ + internal sealed partial class PortableThreadPool + { + /// + /// The worker thread infastructure for the CLR thread pool. + /// + private static partial class WorkerThread + { + /// + /// Semaphore for controlling how many threads are currently working. + /// + private static readonly LowLevelLifoSemaphore s_semaphore = + LowLevelLifoSemaphore.CreateAsyncWaitSemaphore( + 0, + MaxPossibleThreadCount, + AppContextConfigHelper.GetInt32Config( + "System.Threading.ThreadPool.UnfairSemaphoreSpinLimit", + SemaphoreSpinCountDefault, + false), + onWait: () => + { + if (NativeRuntimeEventSource.Log.IsEnabled()) + { + NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait( + (uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); + } + }); + + private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart; + + private sealed record SemaphoreWaitState(PortableThreadPool ThreadPoolInstance, LowLevelLock ThreadAdjustmentLock, WebWorkerEventLoop.KeepaliveToken KeepaliveToken) + { + public bool SpinWait = true; + + public void ResetIteration() { + SpinWait = true; + } + } + + private static void WorkerThreadStart() + { + Thread.CurrentThread.SetThreadPoolWorkerThreadName(); + + PortableThreadPool threadPoolInstance = ThreadPoolInstance; + + if (NativeRuntimeEventSource.Log.IsEnabled()) + { + NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart( + (uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads); + } + + LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock; + var keepaliveToken = WebWorkerEventLoop.KeepalivePush(); + SemaphoreWaitState state = new(threadPoolInstance, threadAdjustmentLock, keepaliveToken) { SpinWait = true }; + // set up the callbacks for semaphore waits, tell + // emscripten to keep the thread alive, and return to + // the JS event loop. + WaitForWorkLoop(s_semaphore, state); + // return from thread start with keepalive - the thread will stay alive in the JS event loop + } + + private static readonly Action s_WorkLoopSemaphoreSuccess = new(WorkLoopSemaphoreSuccess); + private static readonly Action s_WorkLoopSemaphoreTimedOut = new(WorkLoopSemaphoreTimedOut); + + private static void WaitForWorkLoop(LowLevelLifoSemaphore semaphore, SemaphoreWaitState state) + { + semaphore.PrepareAsyncWait(ThreadPoolThreadTimeoutMs, s_WorkLoopSemaphoreSuccess, s_WorkLoopSemaphoreTimedOut, state); + // thread should still be kept alive + Debug.Assert(state.KeepaliveToken.Valid); + } + + private static void WorkLoopSemaphoreSuccess(LowLevelLifoSemaphore semaphore, object? stateObject) + { + SemaphoreWaitState state = (SemaphoreWaitState)stateObject!; + WorkerDoWork(state.ThreadPoolInstance, ref state.SpinWait); + // Go around the loop one more time, keeping existing mutated state + WaitForWorkLoop(semaphore, state); + } + + private static void WorkLoopSemaphoreTimedOut(LowLevelLifoSemaphore semaphore, object? stateObject) + { + SemaphoreWaitState state = (SemaphoreWaitState)stateObject!; + if (WorkerTimedOutMaybeStop(state.ThreadPoolInstance, state.ThreadAdjustmentLock)) { + // we're done, kill the thread. + + // we're wrapped in an emscripten eventloop handler which will consult the + // keepalive count, destroy the thread and run the TLS dtor which will + // unregister the thread from Mono + state.KeepaliveToken.Pop(); + return; + } else { + // more work showed up while we were shutting down, go around one more time + state.ResetIteration(); + WaitForWorkLoop(semaphore, state); + } + } + + private static void CreateWorkerThread() + { + // Thread pool threads must start in the default execution context without transferring the context, so + // using captureContext: false. + Thread workerThread = new Thread(s_workerThreadStart); + workerThread.IsThreadPoolThread = true; + workerThread.IsBackground = true; + // thread name will be set in thread proc + + // This thread will return to the JS event loop - tell the runtime not to cleanup + // after the start function returns, if the Emscripten keepalive is non-zero. + WebWorkerEventLoop.StartExitable(workerThread, captureContext: false); + } + } + } +} diff --git a/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.Mono.cs b/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.Mono.cs new file mode 100644 index 00000000000000..7933e49db422b9 --- /dev/null +++ b/src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.Mono.cs @@ -0,0 +1,13 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Threading +{ + public static partial class ThreadPool + { + // Indicates that the threadpool should yield the thread from the dispatch loop to the + // runtime periodically. We use this to return back to the JS event loop so that the JS + // event queue can be drained + internal static bool YieldFromDispatchLoop => true; + } +} diff --git a/src/mono/System.Private.CoreLib/src/System/Threading/WebWorkerEventLoop.Browser.Threads.Mono.cs b/src/mono/System.Private.CoreLib/src/System/Threading/WebWorkerEventLoop.Browser.Threads.Mono.cs new file mode 100644 index 00000000000000..0467107bbbb1c5 --- /dev/null +++ b/src/mono/System.Private.CoreLib/src/System/Threading/WebWorkerEventLoop.Browser.Threads.Mono.cs @@ -0,0 +1,83 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.Tracing; +using System.Runtime.CompilerServices; + +namespace System.Threading; + +/// +/// Keep a pthread alive in its WebWorker after its pthread start function returns. +/// +internal static class WebWorkerEventLoop +{ + // FIXME: these keepalive calls could be qcalls with a SuppressGCTransitionAttribute + [MethodImpl(MethodImplOptions.InternalCall)] + private static extern void KeepalivePushInternal(); + [MethodImpl(MethodImplOptions.InternalCall)] + private static extern void KeepalivePopInternal(); + + /// + /// A keepalive token prevents a thread from shutting down even if it returns to the JS event + /// loop. A thread may want a keepalive token if it needs to allow JS code to run to settle JS + /// promises or execute JS timeout callbacks. + /// + internal sealed class KeepaliveToken + { + public bool Valid {get; private set; } + + private KeepaliveToken() { Valid = true; } + + /// + /// Decrement the Emscripten keepalive count. A thread with a zero keepalive count will + /// terminate when it returns from its start function or from an async invocation from the + /// JS event loop. + /// + internal void Pop() { + if (!Valid) + throw new InvalidOperationException(); + Valid = false; + KeepalivePopInternal(); + } + + internal static KeepaliveToken Create() + { + KeepalivePushInternal(); + return new KeepaliveToken(); + } + } + + /// + /// Increment the Emscripten keepalive count. A thread with a positive keepalive can return from its + /// thread start function or a JS event loop invocation and continue running in the JS event + /// loop. + /// + internal static KeepaliveToken KeepalivePush() => KeepaliveToken.Create(); + + /// + /// Start a thread that may be kept alive on its webworker after the start function returns, + /// if the emscripten keepalive count is positive. Once the thread returns to the JS event + /// loop it will be able to settle JS promises as well as run any queued managed async + /// callbacks. + /// + internal static void StartExitable(Thread thread, bool captureContext) + { + // don't support captureContext == true, for now, since it's + // not needed by PortableThreadPool.WorkerThread + if (captureContext) + throw new InvalidOperationException(); + // hack: threadpool threads are exitable, and nothing else is. + // see create_thread() in mono/metadata/threads.c + if (!thread.IsThreadPoolThread) + throw new InvalidOperationException(); + thread.UnsafeStart(); + } + + /// returns true if the current thread has unsettled JS Interop promises + internal static bool HasUnsettledInteropPromises => HasUnsettledInteropPromisesNative(); + + // FIXME: this could be a qcall with a SuppressGCTransitionAttribute + [MethodImpl(MethodImplOptions.InternalCall)] + private static extern bool HasUnsettledInteropPromisesNative(); +} diff --git a/src/mono/mono/metadata/icall-decl.h b/src/mono/mono/metadata/icall-decl.h index 5947224866181c..32ebf5cac03957 100644 --- a/src/mono/mono/metadata/icall-decl.h +++ b/src/mono/mono/metadata/icall-decl.h @@ -179,11 +179,20 @@ ICALL_EXPORT void ves_icall_Mono_SafeStringMarshal_GFree (void *c_str); ICALL_EXPORT char* ves_icall_Mono_SafeStringMarshal_StringToUtf8 (MonoString *volatile* s); ICALL_EXPORT MonoType* ves_icall_Mono_RuntimeClassHandle_GetTypeFromClass (MonoClass *klass); -ICALL_EXPORT gpointer ves_icall_System_Threading_LowLevelLifoSemaphore_InitInternal (void); +ICALL_EXPORT gpointer ves_icall_System_Threading_LowLevelLifoSemaphore_InitInternal (int32_t kind); ICALL_EXPORT void ves_icall_System_Threading_LowLevelLifoSemaphore_DeleteInternal (gpointer sem_ptr); ICALL_EXPORT gint32 ves_icall_System_Threading_LowLevelLifoSemaphore_TimedWaitInternal (gpointer sem_ptr, gint32 timeout_ms); ICALL_EXPORT void ves_icall_System_Threading_LowLevelLifoSemaphore_ReleaseInternal (gpointer sem_ptr, gint32 count); +/* include these declarations if we're in the threaded wasm runtime, or if we're building a wasm-targeting cross compiler and we need to support --print-icall-table */ +#if (defined(HOST_BROWSER) && !defined(DISABLE_THREADS)) || (defined(TARGET_WASM) && defined(ENABLE_ICALL_SYMBOL_MAP)) +ICALL_EXPORT void ves_icall_System_Threading_LowLevelLifoSemaphore_PrepareAsyncWaitInternal (gpointer sem_ptr, gint32 timeout_ms, gpointer success_cb, gpointer timeout_cb, intptr_t user_data); + +ICALL_EXPORT MonoBoolean ves_icall_System_Threading_WebWorkerEventLoop_HasUnsettledInteropPromisesNative(void); +ICALL_EXPORT void ves_icall_System_Threading_WebWorkerEventLoop_KeepalivePushInternal (void); +ICALL_EXPORT void ves_icall_System_Threading_WebWorkerEventLoop_KeepalivePopInternal (void); +#endif + #ifdef TARGET_AMD64 ICALL_EXPORT void ves_icall_System_Runtime_Intrinsics_X86_X86Base___cpuidex (int abcd[4], int function_id, int subfunction_id); #endif diff --git a/src/mono/mono/metadata/icall-def.h b/src/mono/mono/metadata/icall-def.h index 33e6de4f92780c..3edbaf63058545 100644 --- a/src/mono/mono/metadata/icall-def.h +++ b/src/mono/mono/metadata/icall-def.h @@ -570,9 +570,14 @@ NOHANDLES(ICALL(ILOCK_23, "Read(long&)", ves_icall_System_Threading_Interlocked_ ICALL_TYPE(LIFOSEM, "System.Threading.LowLevelLifoSemaphore", LIFOSEM_1) NOHANDLES(ICALL(LIFOSEM_1, "DeleteInternal", ves_icall_System_Threading_LowLevelLifoSemaphore_DeleteInternal)) NOHANDLES(ICALL(LIFOSEM_2, "InitInternal", ves_icall_System_Threading_LowLevelLifoSemaphore_InitInternal)) +/* include these icalls if we're in the threaded wasm runtime, or if we're building a wasm-targeting cross compiler and we need to support --print-icall-table */ +#if (defined(HOST_BROWSER) && !defined(DISABLE_THREADS)) || (defined(TARGET_WASM) && defined(ENABLE_ICALL_SYMBOL_MAP)) +NOHANDLES(ICALL(LIFOSEM_5, "PrepareAsyncWaitInternal", ves_icall_System_Threading_LowLevelLifoSemaphore_PrepareAsyncWaitInternal)) +#endif NOHANDLES(ICALL(LIFOSEM_3, "ReleaseInternal", ves_icall_System_Threading_LowLevelLifoSemaphore_ReleaseInternal)) NOHANDLES(ICALL(LIFOSEM_4, "TimedWaitInternal", ves_icall_System_Threading_LowLevelLifoSemaphore_TimedWaitInternal)) + ICALL_TYPE(MONIT, "System.Threading.Monitor", MONIT_0) HANDLES(MONIT_0, "Enter", ves_icall_System_Threading_Monitor_Monitor_Enter, void, 1, (MonoObject)) HANDLES(MONIT_1, "InternalExit", mono_monitor_exit_icall, void, 1, (MonoObject)) @@ -597,6 +602,14 @@ HANDLES(THREAD_10, "SetState", ves_icall_System_Threading_Thread_SetState, void, HANDLES(THREAD_13, "StartInternal", ves_icall_System_Threading_Thread_StartInternal, void, 2, (MonoThreadObject, gint32)) NOHANDLES(ICALL(THREAD_14, "YieldInternal", ves_icall_System_Threading_Thread_YieldInternal)) +/* include these icalls if we're in the threaded wasm runtime, or if we're building a wasm-targeting cross compiler and we need to support --print-icall-table */ +#if (defined(HOST_BROWSER) && !defined(DISABLE_THREADS)) || (defined(TARGET_WASM) && defined(ENABLE_ICALL_SYMBOL_MAP)) +ICALL_TYPE(WEBWORKERLOOP, "System.Threading.WebWorkerEventLoop", WEBWORKERLOOP_1) +NOHANDLES(ICALL(WEBWORKERLOOP_1, "HasUnsettledInteropPromisesNative", ves_icall_System_Threading_WebWorkerEventLoop_HasUnsettledInteropPromisesNative)) +NOHANDLES(ICALL(WEBWORKERLOOP_2, "KeepalivePopInternal", ves_icall_System_Threading_WebWorkerEventLoop_KeepalivePopInternal)) +NOHANDLES(ICALL(WEBWORKERLOOP_3, "KeepalivePushInternal", ves_icall_System_Threading_WebWorkerEventLoop_KeepalivePushInternal)) +#endif + ICALL_TYPE(TYPE, "System.Type", TYPE_1) HANDLES(TYPE_1, "internal_from_handle", ves_icall_System_Type_internal_from_handle, MonoReflectionType, 1, (MonoType_ref)) diff --git a/src/mono/mono/metadata/threads-types.h b/src/mono/mono/metadata/threads-types.h index fe57d74a02e393..b9652aa33eb321 100644 --- a/src/mono/mono/metadata/threads-types.h +++ b/src/mono/mono/metadata/threads-types.h @@ -78,6 +78,9 @@ typedef enum { MONO_THREAD_CREATE_FLAGS_DEBUGGER = 0x02, MONO_THREAD_CREATE_FLAGS_FORCE_CREATE = 0x04, MONO_THREAD_CREATE_FLAGS_SMALL_STACK = 0x08, +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + MONO_THREAD_CREATE_FLAGS_RETURNS_TO_JS_EVENT_LOOP = 0x10, +#endif } MonoThreadCreateFlags; MONO_COMPONENT_API MonoInternalThread* diff --git a/src/mono/mono/metadata/threads.c b/src/mono/mono/metadata/threads.c index a6bb38f55fea8c..869b8484c473c5 100644 --- a/src/mono/mono/metadata/threads.c +++ b/src/mono/mono/metadata/threads.c @@ -91,6 +91,11 @@ mono_native_thread_join_handle (HANDLE thread_handle, gboolean close_handle); #include #endif +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) +#include +#include +#endif + #include "icall-decl.h" /*#define THREAD_DEBUG(a) do { a; } while (0)*/ @@ -1083,6 +1088,9 @@ typedef struct { MonoThreadStart start_func; gpointer start_func_arg; gboolean force_attach; +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + gboolean returns_to_js_event_loop; +#endif gboolean failed; MonoCoopSem registered; } StartInfo; @@ -1110,6 +1118,12 @@ fire_attach_profiler_events (MonoNativeThreadId tid) "Handle Stack")); } + +#ifdef MONO_EMSCRIPTEN_KEEPALIVE_WORKAROUND_HACK +/* See ves_icall_System_Threading_WebWorkerEventLoop_KeepalivePopInternal */ +__thread uint mono_emscripten_keepalive_hack_count; +#endif + static guint32 WINAPI start_wrapper_internal (StartInfo *start_info, gsize *stack_ptr) { @@ -1167,6 +1181,10 @@ start_wrapper_internal (StartInfo *start_info, gsize *stack_ptr) /* Let the thread that called Start() know we're ready */ mono_coop_sem_post (&start_info->registered); +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + gboolean returns_to_js_event_loop = start_info->returns_to_js_event_loop; +#endif + if (mono_atomic_dec_i32 (&start_info->ref) == 0) { mono_coop_sem_destroy (&start_info->registered); g_free (start_info); @@ -1234,6 +1252,20 @@ start_wrapper_internal (StartInfo *start_info, gsize *stack_ptr) THREAD_DEBUG (g_message ("%s: (%" G_GSIZE_FORMAT ") Start wrapper terminating", __func__, mono_native_thread_id_get ())); +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + if (returns_to_js_event_loop) { + /* if the thread wants to stay alive, don't clean up after it */ +#ifdef MONO_EMSCRIPTEN_KEEPALIVE_WORKAROUND_HACK + /* we "know" that threadpool threads set their keepalive count correctly and will return here */ + g_assert (mono_emscripten_keepalive_hack_count > 0); + return 0; +#else + if (emscripten_runtime_keepalive_check()) + return 0; +#endif + } +#endif + /* Do any cleanup needed for apartment state. This * cannot be done in mono_thread_detach_internal since * mono_thread_detach_internal could be called for a @@ -1260,9 +1292,27 @@ start_wrapper (gpointer data) info = mono_thread_info_attach (); info->runtime_thread = TRUE; +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + gboolean returns_to_js_event_loop = start_info->returns_to_js_event_loop; +#endif /* Run the actual main function of the thread */ res = start_wrapper_internal (start_info, (gsize*)info->stack_end); +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + if (returns_to_js_event_loop) { + /* if the thread wants to stay alive, don't clean up after it */ +#ifdef MONO_EMSCRIPTEN_KEEPALIVE_WORKAROUND_HACK + /* we "know" the keepalive count is positive at this point for threadpool threads. Keep it alive */ + g_assert (mono_emscripten_keepalive_hack_count > 0); + emscripten_unwind_to_js_event_loop (); + g_assert_not_reached(); +#else + if (emscripten_runtime_keepalive_check()) + return 0; +#endif + } +#endif + mono_thread_info_exit (res); g_assert_not_reached (); @@ -1349,6 +1399,9 @@ create_thread (MonoThread *thread, MonoInternalThread *internal, MonoThreadStart start_info->start_func_arg = start_func_arg; start_info->force_attach = flags & MONO_THREAD_CREATE_FLAGS_FORCE_CREATE; start_info->failed = FALSE; +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + start_info->returns_to_js_event_loop = (flags & MONO_THREAD_CREATE_FLAGS_RETURNS_TO_JS_EVENT_LOOP) != 0; +#endif mono_coop_sem_init (&start_info->registered, 0); if (flags != MONO_THREAD_CREATE_FLAGS_SMALL_STACK) @@ -4907,7 +4960,15 @@ ves_icall_System_Threading_Thread_StartInternal (MonoThreadObjectHandle thread_h return; } - res = create_thread (internal, internal, NULL, NULL, stack_size, MONO_THREAD_CREATE_FLAGS_NONE, error); + MonoThreadCreateFlags create_flags = MONO_THREAD_CREATE_FLAGS_NONE; +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + // HACK: threadpool threads can return to the JS event loop + // WISH: support this for other threads, too + if (internal->threadpool_thread) + create_flags |= MONO_THREAD_CREATE_FLAGS_RETURNS_TO_JS_EVENT_LOOP; +#endif + + res = create_thread (internal, internal, NULL, NULL, stack_size, create_flags, error); if (!res) { UNLOCK_THREAD (internal); return; @@ -4938,28 +4999,142 @@ ves_icall_System_Threading_Thread_GetCurrentOSThreadId (MonoError *error) } gpointer -ves_icall_System_Threading_LowLevelLifoSemaphore_InitInternal (void) -{ - return (gpointer)mono_lifo_semaphore_init (); +ves_icall_System_Threading_LowLevelLifoSemaphore_InitInternal (int32_t kind) +{ + switch (kind) { + case LIFO_SEMAPHORE_NORMAL: + return (gpointer)mono_lifo_semaphore_init (); +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + case LIFO_SEMAPHORE_ASYNCWAIT: + return (gpointer)mono_lifo_semaphore_asyncwait_init (); +#endif + default: + g_error ("Invalid LowLevelLifoSemaphore kind %d\n", kind); + g_assert_not_reached(); + } } void ves_icall_System_Threading_LowLevelLifoSemaphore_DeleteInternal (gpointer sem_ptr) { - LifoSemaphore *sem = (LifoSemaphore *)sem_ptr; - mono_lifo_semaphore_delete (sem); + LifoSemaphoreBase *sem = (LifoSemaphoreBase *)sem_ptr; + switch (sem->kind) { + case LIFO_SEMAPHORE_NORMAL: + mono_lifo_semaphore_delete ((LifoSemaphore*)sem); + break; +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + case LIFO_SEMAPHORE_ASYNCWAIT: + mono_lifo_semaphore_asyncwait_delete ((LifoSemaphoreAsyncWait*)sem); + break; +#endif + default: + g_assert_not_reached(); + } } gint32 ves_icall_System_Threading_LowLevelLifoSemaphore_TimedWaitInternal (gpointer sem_ptr, gint32 timeout_ms) { LifoSemaphore *sem = (LifoSemaphore *)sem_ptr; + g_assert (sem->base.kind == LIFO_SEMAPHORE_NORMAL); return mono_lifo_semaphore_timed_wait (sem, timeout_ms); } void ves_icall_System_Threading_LowLevelLifoSemaphore_ReleaseInternal (gpointer sem_ptr, gint32 count) { - LifoSemaphore *sem = (LifoSemaphore *)sem_ptr; - mono_lifo_semaphore_release (sem, count); + LifoSemaphoreBase *sem = (LifoSemaphoreBase *)sem_ptr; + switch (sem->kind) { + case LIFO_SEMAPHORE_NORMAL: + mono_lifo_semaphore_release ((LifoSemaphore*)sem, count); + break; +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + case LIFO_SEMAPHORE_ASYNCWAIT: + mono_lifo_semaphore_asyncwait_release ((LifoSemaphoreAsyncWait*)sem, count); + break; +#endif + default: + g_assert_not_reached(); + } +} + +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) +void +ves_icall_System_Threading_LowLevelLifoSemaphore_PrepareAsyncWaitInternal (gpointer sem_ptr, gint32 timeout_ms, gpointer success_cb, gpointer timedout_cb, intptr_t user_data) +{ + LifoSemaphoreAsyncWait *sem = (LifoSemaphoreAsyncWait *)sem_ptr; + g_assert (sem->base.kind == LIFO_SEMAPHORE_ASYNCWAIT); + mono_lifo_semaphore_asyncwait_prepare_wait (sem, timeout_ms, (LifoSemaphoreAsyncWaitCallbackFn)success_cb, (LifoSemaphoreAsyncWaitCallbackFn)timedout_cb, user_data); +} + +void +ves_icall_System_Threading_WebWorkerEventLoop_KeepalivePushInternal (void) +{ +#ifdef MONO_EMSCRIPTEN_KEEPALIVE_WORKAROUND_HACK + mono_emscripten_keepalive_hack_count++; +#endif + emscripten_runtime_keepalive_push(); +} + +void +ves_icall_System_Threading_WebWorkerEventLoop_KeepalivePopInternal (void) +{ + emscripten_runtime_keepalive_pop(); +#ifdef MONO_EMSCRIPTEN_KEEPALIVE_WORKAROUND_HACK + /* This is a BAD IDEA: + * + * 1. We don't know if there were non-mono callers of emscripten_runtime_keepalive_push. We + * could be dropping a thread that was meant to stay alive. + * + * 2. mono_thread_exit while we have managed frames on the stack means we might leak + * resource since finally clauses didn't run. Also the mono interpreter doesn't really get + * a chance to clean up. + * + * + */ + mono_emscripten_keepalive_hack_count--; + if (!mono_emscripten_keepalive_hack_count) { + g_warning ("thread %p mono keepalive count is zero, detaching\n", (void*)(intptr_t)pthread_self()); + mono_thread_exit(); + } +#endif +} + +extern int mono_wasm_eventloop_has_unsettled_interop_promises(void); + +MonoBoolean +ves_icall_System_Threading_WebWorkerEventLoop_HasUnsettledInteropPromisesNative(void) +{ + return !!mono_wasm_eventloop_has_unsettled_interop_promises(); } + +#endif /* HOST_BROWSER && !DISABLE_THREADS */ + +/* for the AOT cross compiler with --print-icall-table these don't need to be callable, they just + * need to be defined */ +#if defined(TARGET_WASM) && defined(ENABLE_ICALL_SYMBOL_MAP) +void +ves_icall_System_Threading_LowLevelLifoSemaphore_PrepareAsyncWaitInternal (gpointer sem_ptr, gint32 timeout_ms, gpointer success_cb, gpointer timedout_cb, intptr_t user_data) +{ + g_assert_not_reached(); +} + +void +ves_icall_System_Threading_WebWorkerEventLoop_KeepalivePushInternal (void) +{ + g_assert_not_reached(); +} + +void +ves_icall_System_Threading_WebWorkerEventLoop_KeepalivePopInternal (void) +{ + g_assert_not_reached(); +} + +MonoBoolean +ves_icall_System_Threading_WebWorkerEventLoop_HasUnsettledInteropPromisesNative(void) +{ + g_assert_not_reached(); +} + +#endif /* defined(TARGET_WASM) && defined(ENABLE_ICALL_SYMBOL_MAP) */ diff --git a/src/mono/mono/utils/lifo-semaphore.c b/src/mono/mono/utils/lifo-semaphore.c index 624e2bb3b74e68..4333782c3d90be 100644 --- a/src/mono/mono/utils/lifo-semaphore.c +++ b/src/mono/mono/utils/lifo-semaphore.c @@ -1,13 +1,21 @@ +#include +#include #include +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) +#include +#include +#endif + LifoSemaphore * mono_lifo_semaphore_init (void) { LifoSemaphore *semaphore = g_new0 (LifoSemaphore, 1); + semaphore->base.kind = LIFO_SEMAPHORE_NORMAL; if (semaphore == NULL) return NULL; - mono_coop_mutex_init (&semaphore->mutex); + mono_coop_mutex_init (&semaphore->base.mutex); return semaphore; } @@ -16,7 +24,7 @@ void mono_lifo_semaphore_delete (LifoSemaphore *semaphore) { g_assert (semaphore->head == NULL); - mono_coop_mutex_destroy (&semaphore->mutex); + mono_coop_mutex_destroy (&semaphore->base.mutex); g_free (semaphore); } @@ -26,12 +34,12 @@ mono_lifo_semaphore_timed_wait (LifoSemaphore *semaphore, int32_t timeout_ms) LifoSemaphoreWaitEntry wait_entry = {0}; mono_coop_cond_init (&wait_entry.condition); - mono_coop_mutex_lock (&semaphore->mutex); + mono_coop_mutex_lock (&semaphore->base.mutex); - if (semaphore->pending_signals > 0) { - --semaphore->pending_signals; + if (semaphore->base.pending_signals > 0) { + --semaphore->base.pending_signals; mono_coop_cond_destroy (&wait_entry.condition); - mono_coop_mutex_unlock (&semaphore->mutex); + mono_coop_mutex_unlock (&semaphore->base.mutex); return 1; } @@ -45,7 +53,7 @@ mono_lifo_semaphore_timed_wait (LifoSemaphore *semaphore, int32_t timeout_ms) // Wait for a signal or timeout int wait_error = 0; do { - wait_error = mono_coop_cond_timedwait (&wait_entry.condition, &semaphore->mutex, timeout_ms); + wait_error = mono_coop_cond_timedwait (&wait_entry.condition, &semaphore->base.mutex, timeout_ms); } while (wait_error == 0 && !wait_entry.signaled); if (wait_error == -1) { @@ -58,7 +66,7 @@ mono_lifo_semaphore_timed_wait (LifoSemaphore *semaphore, int32_t timeout_ms) } mono_coop_cond_destroy (&wait_entry.condition); - mono_coop_mutex_unlock (&semaphore->mutex); + mono_coop_mutex_unlock (&semaphore->base.mutex); return wait_entry.signaled; } @@ -66,7 +74,7 @@ mono_lifo_semaphore_timed_wait (LifoSemaphore *semaphore, int32_t timeout_ms) void mono_lifo_semaphore_release (LifoSemaphore *semaphore, uint32_t count) { - mono_coop_mutex_lock (&semaphore->mutex); + mono_coop_mutex_lock (&semaphore->base.mutex); while (count > 0) { LifoSemaphoreWaitEntry *wait_entry = semaphore->head; @@ -80,10 +88,238 @@ mono_lifo_semaphore_release (LifoSemaphore *semaphore, uint32_t count) mono_coop_cond_signal (&wait_entry->condition); --count; } else { - semaphore->pending_signals += count; + semaphore->base.pending_signals += count; + count = 0; + } + } + + mono_coop_mutex_unlock (&semaphore->base.mutex); +} + +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + +LifoSemaphoreAsyncWait * +mono_lifo_semaphore_asyncwait_init (void) +{ + LifoSemaphoreAsyncWait *sem = g_new0 (LifoSemaphoreAsyncWait, 1); + if (sem == NULL) + return NULL; + sem->base.kind = LIFO_SEMAPHORE_ASYNCWAIT; + + mono_coop_mutex_init (&sem->base.mutex); + + return sem; +} + +void +mono_lifo_semaphore_asyncwait_delete (LifoSemaphoreAsyncWait *sem) +{ + /* FIXME: this is probably hard to guarantee - in-flight signaled semaphores still have wait entries */ + g_assert (sem->head == NULL); + mono_coop_mutex_destroy (&sem->base.mutex); + g_free (sem); +} + +enum { + LIFO_JS_WAITING = 0, + LIFO_JS_SIGNALED = 1, + LIFO_JS_SIGNALED_TIMEOUT_IGNORED = 2, + +}; + +static void +lifo_js_wait_entry_on_timeout (void *wait_entry_as_user_data); +static void +lifo_js_wait_entry_on_success (void *wait_entry_as_user_data); + + +static void +lifo_js_wait_entry_push (LifoSemaphoreAsyncWaitWaitEntry **head, + LifoSemaphoreAsyncWaitWaitEntry *entry) +{ + LifoSemaphoreAsyncWaitWaitEntry *next = *head; + *head = entry; + entry->next = next; + next->previous = entry; +} + +static void +lifo_js_wait_entry_unlink (LifoSemaphoreAsyncWaitWaitEntry **head, + LifoSemaphoreAsyncWaitWaitEntry *entry) +{ + if (*head == entry) { + *head = entry->next; + } + if (entry->previous) { + entry->previous->next = entry->next; + } + if (entry->next) { + entry->next->previous = entry->previous; + } +} + +/* LOCKING: assumes semaphore is locked */ +static LifoSemaphoreAsyncWaitWaitEntry * +lifo_js_find_waiter (LifoSemaphoreAsyncWaitWaitEntry *entry) +{ + while (entry) { + if (entry->state == LIFO_JS_WAITING) + return entry; + entry = entry->next; + } + return NULL; +} + +static gboolean +lifo_js_wait_entry_no_thread (LifoSemaphoreAsyncWaitWaitEntry *entry, + pthread_t cur) +{ + while (entry) { + if (pthread_equal (entry->thread, cur)) + return FALSE; + entry = entry->next; + } + return TRUE; +} + +void +mono_lifo_semaphore_asyncwait_prepare_wait (LifoSemaphoreAsyncWait *sem, + int32_t timeout_ms, + LifoSemaphoreAsyncWaitCallbackFn success_cb, + LifoSemaphoreAsyncWaitCallbackFn timeout_cb, + intptr_t user_data) +{ + mono_coop_mutex_lock (&sem->base.mutex); + if (sem->base.pending_signals > 0) { + sem->base.pending_signals--; + mono_coop_mutex_unlock (&sem->base.mutex); + success_cb (sem, user_data); // FIXME: queue microtask + return; + } + + pthread_t cur = pthread_self (); + + /* Don't allow the current thread to wait multiple times. + * No particular reason for it, except that it makes reasoning a bit easier. + * This can probably be relaxed if there's a need. + */ + g_assert (lifo_js_wait_entry_no_thread(sem->head, cur)); + + LifoSemaphoreAsyncWaitWaitEntry *wait_entry = g_new0 (LifoSemaphoreAsyncWaitWaitEntry, 1); + wait_entry->success_cb = success_cb; + wait_entry->timeout_cb = timeout_cb; + wait_entry->sem = sem; + wait_entry->user_data = user_data; + wait_entry->thread = pthread_self(); + wait_entry->state = LIFO_JS_WAITING; + wait_entry->refcount = 1; // timeout owns the wait entry + wait_entry->js_timeout_id = emscripten_set_timeout (lifo_js_wait_entry_on_timeout, (double)timeout_ms, wait_entry); + lifo_js_wait_entry_push (&sem->head, wait_entry); + mono_coop_mutex_unlock (&sem->base.mutex); + return; +} + +void +mono_lifo_semaphore_asyncwait_release (LifoSemaphoreAsyncWait *sem, + uint32_t count) +{ + mono_coop_mutex_lock (&sem->base.mutex); + + while (count > 0) { + LifoSemaphoreAsyncWaitWaitEntry *wait_entry = lifo_js_find_waiter (sem->head); + if (wait_entry != NULL) { + /* found one. set its status and queue some work to run on the signaled thread */ + pthread_t target = wait_entry->thread; + wait_entry->state = LIFO_JS_SIGNALED; + wait_entry->refcount++; + // we're under the mutex - if we got here the timeout hasn't fired yet + g_assert (wait_entry->refcount == 2); + --count; + /* if we're on the same thread, don't run the callback while holding the lock */ + emscripten_dispatch_to_thread_async (target, EM_FUNC_SIG_VI, lifo_js_wait_entry_on_success, NULL, wait_entry); + } else { + sem->base.pending_signals += count; count = 0; } } - mono_coop_mutex_unlock (&semaphore->mutex); + mono_coop_mutex_unlock (&sem->base.mutex); +} + +static void +lifo_js_wait_entry_on_timeout (void *wait_entry_as_user_data) +{ + LifoSemaphoreAsyncWaitWaitEntry *wait_entry = (LifoSemaphoreAsyncWaitWaitEntry *)wait_entry_as_user_data; + g_assert (pthread_equal (wait_entry->thread, pthread_self())); + g_assert (wait_entry->sem != NULL); + LifoSemaphoreAsyncWait *sem = wait_entry->sem; + gboolean call_timeout_cb = FALSE; + LifoSemaphoreAsyncWaitCallbackFn timeout_cb = NULL; + intptr_t user_data = 0; + mono_coop_mutex_lock (&sem->base.mutex); + switch (wait_entry->state) { + case LIFO_JS_WAITING: + /* semaphore timed out before a Release. */ + g_assert (wait_entry->refcount == 1); + /* unlink and free the wait entry, run the user timeout_cb. */ + lifo_js_wait_entry_unlink (&sem->head, wait_entry); + timeout_cb = wait_entry->timeout_cb; + user_data = wait_entry->user_data; + g_free (wait_entry); + call_timeout_cb = TRUE; + break; + case LIFO_JS_SIGNALED: + /* seamphore was signaled, but the timeout callback ran before the success callback arrived */ + g_assert (wait_entry->refcount == 2); + /* set state to LIFO_JS_SIGNALED_TIMEOUT_IGNORED, decrement refcount, return */ + wait_entry->state = LIFO_JS_SIGNALED_TIMEOUT_IGNORED; + wait_entry->refcount--; + break; + case LIFO_JS_SIGNALED_TIMEOUT_IGNORED: + default: + g_assert_not_reached(); + } + mono_coop_mutex_unlock (&sem->base.mutex); + if (call_timeout_cb) { + timeout_cb (sem, user_data); + } +} + +static void +lifo_js_wait_entry_on_success (void *wait_entry_as_user_data) +{ + LifoSemaphoreAsyncWaitWaitEntry *wait_entry = (LifoSemaphoreAsyncWaitWaitEntry *)wait_entry_as_user_data; + g_assert (pthread_equal (wait_entry->thread, pthread_self())); + g_assert (wait_entry->sem != NULL); + LifoSemaphoreAsyncWait *sem = wait_entry->sem; + gboolean call_success_cb = FALSE; + LifoSemaphoreAsyncWaitCallbackFn success_cb = NULL; + intptr_t user_data = 0; + mono_coop_mutex_lock (&sem->base.mutex); + switch (wait_entry->state) { + case LIFO_JS_SIGNALED: + g_assert (wait_entry->refcount == 2); + emscripten_clear_timeout (wait_entry->js_timeout_id); + /* emscripten safeSetTimeout calls keepalive push which is popped by the timeout + * callback. If we cancel the timeout, we have to pop the keepalive ourselves. */ + emscripten_runtime_keepalive_pop(); + wait_entry->refcount--; + /* fallthru */ + case LIFO_JS_SIGNALED_TIMEOUT_IGNORED: + g_assert (wait_entry->refcount == 1); + lifo_js_wait_entry_unlink (&sem->head, wait_entry); + success_cb = wait_entry->success_cb; + user_data = wait_entry->user_data; + g_free (wait_entry); + call_success_cb = TRUE; + break; + case LIFO_JS_WAITING: + default: + g_assert_not_reached(); + } + mono_coop_mutex_unlock (&sem->base.mutex); + g_assert (call_success_cb); + success_cb (sem, user_data); } + +#endif /* HOST_BROWSER && !DISABLE_THREADS */ diff --git a/src/mono/mono/utils/lifo-semaphore.h b/src/mono/mono/utils/lifo-semaphore.h index 766f41aaaab6d4..a97a560e281161 100644 --- a/src/mono/mono/utils/lifo-semaphore.h +++ b/src/mono/mono/utils/lifo-semaphore.h @@ -3,6 +3,22 @@ #include +typedef struct _LifoSemaphoreBase LifoSemaphoreBase; + +struct _LifoSemaphoreBase +{ + MonoCoopMutex mutex; + uint32_t pending_signals; + uint8_t kind; +}; + +enum { + LIFO_SEMAPHORE_NORMAL = 1, +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) + LIFO_SEMAPHORE_ASYNCWAIT, +#endif +}; + typedef struct _LifoSemaphore LifoSemaphore; typedef struct _LifoSemaphoreWaitEntry LifoSemaphoreWaitEntry; @@ -14,9 +30,8 @@ struct _LifoSemaphoreWaitEntry { }; struct _LifoSemaphore { - MonoCoopMutex mutex; + LifoSemaphoreBase base; LifoSemaphoreWaitEntry *head; - uint32_t pending_signals; }; LifoSemaphore * @@ -31,4 +46,94 @@ mono_lifo_semaphore_timed_wait (LifoSemaphore *semaphore, int32_t timeout_ms); void mono_lifo_semaphore_release (LifoSemaphore *semaphore, uint32_t count); +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) +/* A type of lifo semaphore that can be waited from the JS event loop. + * + * Instead of a blocking timed_wait function, it uses a pair of callbacks: a success callback and a + * timeout callback. The wait function returns immediately and the callbacks will fire on the JS + * event loop when the semaphore is released or the timeout expires. + */ +typedef struct _LifoSemaphoreAsyncWait LifoSemaphoreAsyncWait; +/* + * Because the callbacks are asynchronous, it's possible for the same thread to attempt to wait + * multiple times for the same semaphore. For simplicity of reasoning, we dissallow that and + * assert. In principle we could support it, but we haven't implemented that. + */ +typedef struct _LifoSemaphoreAsyncWaitWaitEntry LifoSemaphoreAsyncWaitWaitEntry; + +typedef void (*LifoSemaphoreAsyncWaitCallbackFn)(LifoSemaphoreAsyncWait *semaphore, intptr_t user_data); + +struct _LifoSemaphoreAsyncWaitWaitEntry { + LifoSemaphoreAsyncWaitWaitEntry *previous; + LifoSemaphoreAsyncWaitWaitEntry *next; + LifoSemaphoreAsyncWaitCallbackFn success_cb; + LifoSemaphoreAsyncWaitCallbackFn timeout_cb; + LifoSemaphoreAsyncWait *sem; + intptr_t user_data; + pthread_t thread; + int32_t js_timeout_id; // only valid to access from the waiting thread + /* state and refcount are protected by the semaphore mutex */ + uint16_t state; /* 0 waiting, 1 signaled, 2 signaled - timeout ignored */ + uint16_t refcount; /* 1 if waiting, 2 if signaled, 1 if timeout fired while signaled and we're ignoring the timeout */ +}; + +struct _LifoSemaphoreAsyncWait { + LifoSemaphoreBase base; + LifoSemaphoreAsyncWaitWaitEntry *head; +}; + +LifoSemaphoreAsyncWait * +mono_lifo_semaphore_asyncwait_init (void); + +/* what to do with waiters? + * might be kind of academic - we don't expect to destroy these + */ +void +mono_lifo_semaphore_asyncwait_delete (LifoSemaphoreAsyncWait *semaphore); + +/* + * the timeout_cb is triggered by a JS setTimeout callback + * + * the success_cb is triggered using Emscripten's capability to push async work from one thread to + * another. That means the main thread will need to be able to process JS events (in order to + * assist threads in pushing work from one thread to another) in order for success callbacks to + * function. Emscripten also pumps the async work queues in other circumstances (during sleeps) but + * the main thread still needs to participate. + * + * There's a potential race the implementation needs to be careful about: + * when one thread releases a semaphore and queues the success callback to run, + * while the success callback is in flight, the timeout callback can fire. + * It is important that the called back functions don't destroy the wait entry until either both + * callbacks have fired, or the success callback has a chance to cancel the timeout callback. + * + * We use a refcount to delimit the lifetime of the wait entry. When the wait is created, the + * refcount is 1 and it is notionally owned by the timeout callback. When a sempahore is released, + * the refcount goes to 2. When a continuation fires, it decreases the refcount. If the timeout + * callback fires first if it sees a refcount of 2 it can decrement and return - we know a success + * continuation is in flight and we can allow it to complete. If the refcount is 1 we need to take the semaphore's mutex and remove the wait entry. (With double check locking - the refcount could go up). + * + * When the success continuation fires,it will examine the refcount. If the refcount is 1 at the + * outset, then the cancelation already tried to fire while we were in flight. If the refcount is 2 + * at the outset, then the success contination fired before the timeout, so we can cancel the + * timeout. In either case we can remove the wait entry. + * + * Both the success and timeout code only calls the user provided callbacks after the wait entry is + * destroyed. + * + * FIXME: should we just always use the mutex to protect the wait entry status+refcount? + * + * TODO: when we call emscripten_set_timeout it implicitly calls emscripten_runtime_keepalive_push which is + * popped when the timeout runs. But emscripten_clear_timeout doesn't pop - we need to pop ourselves + */ +void +mono_lifo_semaphore_asyncwait_prepare_wait (LifoSemaphoreAsyncWait *semaphore, int32_t timeout_ms, + LifoSemaphoreAsyncWaitCallbackFn success_cb, + LifoSemaphoreAsyncWaitCallbackFn timeout_cb, + intptr_t user_data); + +void +mono_lifo_semaphore_asyncwait_release (LifoSemaphoreAsyncWait *semaphore, uint32_t count); + +#endif /* HOST_BROWSER && !DISABLE_THREADS */ + #endif // __MONO_LIFO_SEMAPHORE_H__ diff --git a/src/mono/mono/utils/mono-threads-wasm.c b/src/mono/mono/utils/mono-threads-wasm.c index 74cc69825f29fa..96e5446388a43a 100644 --- a/src/mono/mono/utils/mono-threads-wasm.c +++ b/src/mono/mono/utils/mono-threads-wasm.c @@ -403,7 +403,7 @@ mono_threads_wasm_browser_thread_tid (void) #ifdef DISABLE_THREADS return (MonoNativeThreadId)1; #else - return (MonoNativeThreadId)emscripten_main_browser_thread_id (); + return (MonoNativeThreadId)emscripten_main_runtime_thread_id (); #endif } diff --git a/src/mono/mono/utils/mono-threads-wasm.h b/src/mono/mono/utils/mono-threads-wasm.h index c06d8501e1ec3e..b76606045ecb8f 100644 --- a/src/mono/mono/utils/mono-threads-wasm.h +++ b/src/mono/mono/utils/mono-threads-wasm.h @@ -11,6 +11,20 @@ #include #include +#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS) +#include +/* for Emscripten < 3.1.33, + * emscripten_runtime_keepalive_push()/emscripten_runtime_keepalive_pop()/emscripten_keepalive_check() + * are no-ops when -sNO_EXIT_RUNTIME=1 (the default). Do our own bookkeeping when we can. Note + * that this is a HACK that is very sensitive to code that actually cares about this bookkeeping. + * + * Specifically we need https://github.com/emscripten-core/emscripten/commit/0c2f5896b839e25fee9763a9ac9c619f359988f4 + */ +#if (__EMSCRIPTEN_major__ < 3) || (__EMSCRIPTEN_major__ == 3 && __EMSCRIPTEN_minor__ < 1) || (__EMSCRIPTEN_major__ == 3 && __EMSCRIPTEN_minor__ == 1 && __EMSCRIPTEN_tiny__ < 33) +#define MONO_EMSCRIPTEN_KEEPALIVE_WORKAROUND_HACK 1 +#endif +#endif /*HOST_BROWSER && !DISABLE_THREADS*/ + #ifdef HOST_WASM /* diff --git a/src/mono/sample/wasm/browser-threads-minimal/Program.cs b/src/mono/sample/wasm/browser-threads-minimal/Program.cs index 0b9784836bbd78..b9ef3854ef1631 100644 --- a/src/mono/sample/wasm/browser-threads-minimal/Program.cs +++ b/src/mono/sample/wasm/browser-threads-minimal/Program.cs @@ -18,6 +18,61 @@ public static int Main(string[] args) return 0; } + [JSImport("globalThis.setTimeout")] + static partial void GlobalThisSetTimeout([JSMarshalAs] Action cb, int timeoutMs); + + [JSImport("globalThis.fetch")] + private static partial Task GlobalThisFetch(string url); + + [JSExport] + public static async Task Hello() + { + var t = Task.Run(TimeOutThenComplete); + await t; + Console.WriteLine ($"XYZ: Main Thread caught task tid:{Thread.CurrentThread.ManagedThreadId}"); + } + + const string fetchhelper = "./fetchelper.js"; + + [JSImport("responseText", fetchhelper)] + private static partial Task FetchHelperResponseText(JSObject response); + + [JSExport] + public static async Task FetchBackground(string url) + { + var t = Task.Run(async () => + { + await JSHost.ImportAsync(fetchhelper, "./fetchhelper.js"); + var r = await GlobalThisFetch(url); + var ok = (bool)r.GetPropertyAsBoolean("ok"); + + Console.WriteLine($"XYZ: FetchBackground fetch returned to thread:{Thread.CurrentThread.ManagedThreadId}, ok: {ok}"); + if (ok) + { + var text = await FetchHelperResponseText(r); + Console.WriteLine($"XYZ: FetchBackground fetch returned to thread:{Thread.CurrentThread.ManagedThreadId}, text: {text}"); + return text; + } + return "not-ok"; + }); + var r = await t; + Console.WriteLine($"XYZ: FetchBackground thread:{Thread.CurrentThread.ManagedThreadId} background thread returned"); + return r; + } + + private static async Task TimeOutThenComplete() + { + var tcs = new TaskCompletionSource(); + Console.WriteLine ($"XYZ: Task running tid:{Thread.CurrentThread.ManagedThreadId}"); + GlobalThisSetTimeout(() => { + tcs.SetResult(); + Console.WriteLine ($"XYZ: Timeout fired tid:{Thread.CurrentThread.ManagedThreadId}"); + }, 250); + Console.WriteLine ($"XYZ: Task sleeping tid:{Thread.CurrentThread.ManagedThreadId}"); + await tcs.Task; + Console.WriteLine ($"XYZ: Task resumed tid:{Thread.CurrentThread.ManagedThreadId}"); + } + [JSExport] public static async Task RunBackgroundThreadCompute() { @@ -41,10 +96,27 @@ public static async Task RunBackgroundLongRunningTaskCompute() return await t; } + [JSExport] + public static async Task RunBackgroundTaskRunCompute() + { + var t1 = Task.Run (() => { + var n = CountingCollatzTest(); + return n; + }); + var t2 = Task.Run (() => { + var n = CountingCollatzTest(); + return n; + }); + var rs = await Task.WhenAll (new [] { t1, t2 }); + if (rs[0] != rs[1]) + throw new Exception ($"Results from two tasks {rs[0]}, {rs[1]}, differ"); + return rs[0]; + } + public static int CountingCollatzTest() { const int limit = 5000; - const int maxInput = 500_000; + const int maxInput = 200_000; int bigly = 0; int hugely = 0; int maxSteps = 0; @@ -60,7 +132,7 @@ public static int CountingCollatzTest() Console.WriteLine ($"Bigly: {bigly}, Hugely: {hugely}, maxSteps: {maxSteps}"); - if (bigly == 241677 && hugely == 0 && maxSteps == 448) + if (bigly == 86187 && hugely == 0 && maxSteps == 382) return 524; else return 0; diff --git a/src/mono/sample/wasm/browser-threads-minimal/Wasm.Browser.Threads.Minimal.Sample.csproj b/src/mono/sample/wasm/browser-threads-minimal/Wasm.Browser.Threads.Minimal.Sample.csproj index f9c81f4b40e714..defce7521ac7f5 100644 --- a/src/mono/sample/wasm/browser-threads-minimal/Wasm.Browser.Threads.Minimal.Sample.csproj +++ b/src/mono/sample/wasm/browser-threads-minimal/Wasm.Browser.Threads.Minimal.Sample.csproj @@ -6,6 +6,8 @@ + + diff --git a/src/mono/sample/wasm/browser-threads-minimal/blurst.txt b/src/mono/sample/wasm/browser-threads-minimal/blurst.txt new file mode 100644 index 00000000000000..6679d914da1c75 --- /dev/null +++ b/src/mono/sample/wasm/browser-threads-minimal/blurst.txt @@ -0,0 +1 @@ +It was the best of times, it was the blurst of times. diff --git a/src/mono/sample/wasm/browser-threads-minimal/fetchhelper.js b/src/mono/sample/wasm/browser-threads-minimal/fetchhelper.js new file mode 100644 index 00000000000000..928492378fc6c4 --- /dev/null +++ b/src/mono/sample/wasm/browser-threads-minimal/fetchhelper.js @@ -0,0 +1,11 @@ + +function delay(timeoutMs) { + return new Promise(resolve => setTimeout(resolve, timeoutMs)); +} + +export async function responseText(response) /* Promise */ { + console.log("artificially waiting for response for 25 seconds"); + await delay(25000); + console.log("artificial waiting done"); + return await response.text(); +} diff --git a/src/mono/sample/wasm/browser-threads-minimal/main.js b/src/mono/sample/wasm/browser-threads-minimal/main.js index f607d96c2846ab..4c6a28e9160523 100644 --- a/src/mono/sample/wasm/browser-threads-minimal/main.js +++ b/src/mono/sample/wasm/browser-threads-minimal/main.js @@ -15,18 +15,37 @@ try { const exports = await getAssemblyExports(assemblyName); - const r1 = await exports.Sample.Test.RunBackgroundThreadCompute(); - if (r1 !== 524) { - const msg = `Unexpected result ${r1} from RunBackgroundThreadCompute()`; + //console.log ("XYZ: running hello"); + //await exports.Sample.Test.Hello(); + //console.log ("XYZ: hello done"); + + console.log ("XYZ: running FetchBackground"); + let s = await exports.Sample.Test.FetchBackground("./blurst.txt"); + console.log ("XYZ: FetchBackground done"); + if (s !== "It was the best of times, it was the blurst of times.\n") { + const msg = `Unexpected FetchBackground result ${s}`; document.getElementById("out").innerHTML = msg; - throw new Error(msg); + throw new Error (msg); } - const r2 = await exports.Sample.Test.RunBackgroundLongRunningTaskCompute(); - if (r2 !== 524) { - const msg = `Unexpected result ${r2} from RunBackgorundLongRunningTaskCompute()`; + + console.log ("XYZ: running FetchBackground(missing)"); + s = await exports.Sample.Test.FetchBackground("./missing.txt"); + console.log ("XYZ: FetchBackground(missing) done"); + if (s !== "not-ok") { + const msg = `Unexpected FetchBackground(missing) result ${s}`; document.getElementById("out").innerHTML = msg; - throw new Error(msg); + throw new Error (msg); } + + //console.log ("HHH: running TaskRunCompute"); + //const r1 = await exports.Sample.Test.RunBackgroundTaskRunCompute(); + //if (r1 !== 524) { + // const msg = `Unexpected result ${r1} from RunBackgorundTaskRunCompute()`; + // document.getElementById("out").innerHTML = msg; + // throw new Error(msg); + //} + //console.log ("HHH: TaskRunCompute done"); + let exit_code = await runMain(assemblyName, []); exit(exit_code); diff --git a/src/mono/wasm/Wasm.Build.Tests/data/test-main-7.0.js b/src/mono/wasm/Wasm.Build.Tests/data/test-main-7.0.js index bac3f856160c57..3b32add2f39e57 100644 --- a/src/mono/wasm/Wasm.Build.Tests/data/test-main-7.0.js +++ b/src/mono/wasm/Wasm.Build.Tests/data/test-main-7.0.js @@ -23,6 +23,16 @@ if (is_node && process.versions.node.split(".")[0] < 14) { throw new Error(`NodeJS at '${process.execPath}' has too low version '${process.versions.node}'`); } +if (is_node) { + // the emscripten 3.1.34 stopped handling these when MODULARIZE is enabled + process.on('uncaughtException', function(ex) { + // ignore UnhandledPromiseRejection exceptions with exit status + if (ex !== 'unwind' && (ex.name !== "UnhandledPromiseRejection" || !ex.message.includes('"#"'))) { + throw ex; + } + }); +} + if (typeof globalThis.crypto === 'undefined') { // **NOTE** this is a simple insecure polyfill for testing purposes only // /dev/random doesn't work on js shells, so define our own diff --git a/src/mono/wasm/emscripten-version.txt b/src/mono/wasm/emscripten-version.txt index f4e47c2e5e20a5..45dd3925308664 100644 --- a/src/mono/wasm/emscripten-version.txt +++ b/src/mono/wasm/emscripten-version.txt @@ -1 +1 @@ -3.1.30 \ No newline at end of file +3.1.34 \ No newline at end of file diff --git a/src/mono/wasm/runtime/es6/dotnet.es6.lib.js b/src/mono/wasm/runtime/es6/dotnet.es6.lib.js index 4e446e8dce62b4..d2fe6874dc69e3 100644 --- a/src/mono/wasm/runtime/es6/dotnet.es6.lib.js +++ b/src/mono/wasm/runtime/es6/dotnet.es6.lib.js @@ -105,6 +105,8 @@ if (monoWasmThreads) { linked_functions = [...linked_functions, /// mono-threads-wasm.c "mono_wasm_pthread_on_pthread_attached", + // threads.c + "mono_wasm_eventloop_has_unsettled_interop_promises", // diagnostics_server.c "mono_wasm_diagnostic_server_on_server_thread_created", "mono_wasm_diagnostic_server_on_runtime_server_init", diff --git a/src/mono/wasm/runtime/exports-linker.ts b/src/mono/wasm/runtime/exports-linker.ts index f5f55de63f128e..0cd5e1def740b8 100644 --- a/src/mono/wasm/runtime/exports-linker.ts +++ b/src/mono/wasm/runtime/exports-linker.ts @@ -4,7 +4,7 @@ import MonoWasmThreads from "consts:monoWasmThreads"; import WasmEnableLegacyJsInterop from "consts:WasmEnableLegacyJsInterop"; import { mono_wasm_debugger_log, mono_wasm_add_dbg_command_received, mono_wasm_set_entrypoint_breakpoint, mono_wasm_fire_debugger_agent_message_with_data, mono_wasm_fire_debugger_agent_message_with_data_to_pause } from "./debug"; -import { mono_wasm_release_cs_owned_object } from "./gc-handles"; +import { mono_wasm_release_cs_owned_object, mono_wasm_eventloop_has_unsettled_interop_promises } from "./gc-handles"; import { mono_wasm_bind_cs_function } from "./invoke-cs"; import { mono_wasm_bind_js_function, mono_wasm_invoke_bound_function, mono_wasm_invoke_import } from "./invoke-js"; import { mono_interp_tier_prepare_jiterpreter } from "./jiterpreter"; @@ -32,6 +32,8 @@ import { const mono_wasm_threads_exports = !MonoWasmThreads ? undefined : { // mono-threads-wasm.c mono_wasm_pthread_on_pthread_attached, + // threads.c + mono_wasm_eventloop_has_unsettled_interop_promises, // diagnostics_server.c mono_wasm_diagnostic_server_on_server_thread_created, mono_wasm_diagnostic_server_on_runtime_server_init, diff --git a/src/mono/wasm/runtime/gc-handles.ts b/src/mono/wasm/runtime/gc-handles.ts index 465bfc9264c078..8cdd824e046686 100644 --- a/src/mono/wasm/runtime/gc-handles.ts +++ b/src/mono/wasm/runtime/gc-handles.ts @@ -49,8 +49,8 @@ export function mono_wasm_get_js_handle(js_obj: any): JSHandle { js_obj[cs_owned_js_handle_symbol] = js_handle; } // else - // The consequence of not adding the cs_owned_js_handle_symbol is, that we could have multiple JSHandles and multiple proxy instances. - // Throwing exception would prevent us from creating any proxy of non-extensible things. + // The consequence of not adding the cs_owned_js_handle_symbol is, that we could have multiple JSHandles and multiple proxy instances. + // Throwing exception would prevent us from creating any proxy of non-extensible things. // If we have weakmap instead, we would pay the price of the lookup for all proxies, not just non-extensible objects. return js_handle as JSHandle; @@ -131,3 +131,9 @@ export function _lookup_js_owned_object(gc_handle: GCHandle): any { } return null; } + +/// Called from the C# threadpool worker loop to find out if there are any +/// unsettled JS promises that need to keep the worker alive +export function mono_wasm_eventloop_has_unsettled_interop_promises(): boolean { + return _js_owned_object_table.size > 0; +} diff --git a/src/mono/wasm/runtime/pthreads/shared/index.ts b/src/mono/wasm/runtime/pthreads/shared/index.ts index c71a27f5f3967d..774dbc76ec8af4 100644 --- a/src/mono/wasm/runtime/pthreads/shared/index.ts +++ b/src/mono/wasm/runtime/pthreads/shared/index.ts @@ -20,7 +20,7 @@ export const MainThread: PThreadInfo = { let browser_thread_id_lazy: pthread_ptr | undefined; export function getBrowserThreadID(): pthread_ptr { if (browser_thread_id_lazy === undefined) { - browser_thread_id_lazy = (Module)["_emscripten_main_browser_thread_id"]() as pthread_ptr; + browser_thread_id_lazy = (Module)["_emscripten_main_runtime_thread_id"]() as pthread_ptr; } return browser_thread_id_lazy; } diff --git a/src/mono/wasm/runtime/pthreads/worker/index.ts b/src/mono/wasm/runtime/pthreads/worker/index.ts index da4c780804c55b..71a6f3bcfbe7ea 100644 --- a/src/mono/wasm/runtime/pthreads/worker/index.ts +++ b/src/mono/wasm/runtime/pthreads/worker/index.ts @@ -114,7 +114,7 @@ function onMonoConfigReceived(config: MonoConfigInternal): void { export function mono_wasm_pthread_on_pthread_attached(pthread_id: pthread_ptr): void { const self = pthread_self; mono_assert(self !== null && self.pthread_id == pthread_id, "expected pthread_self to be set already when attaching"); - console.debug("MONO_WASM: attaching pthread to runtime", pthread_id); + console.debug("MONO_WASM: attaching pthread to runtime 0x" + pthread_id.toString(16)); preRunWorker(); currentWorkerThreadEvents.dispatchEvent(makeWorkerThreadEvent(dotnetPthreadAttached, self)); } @@ -127,7 +127,7 @@ export function afterThreadInitTLS(): void { if (ENVIRONMENT_IS_PTHREAD) { const pthread_ptr = (Module)["_pthread_self"](); mono_assert(!is_nullish(pthread_ptr), "pthread_self() returned null"); - console.debug("MONO_WASM: after thread init, pthread ptr", pthread_ptr); + console.debug("MONO_WASM: after thread init, pthread ptr 0x" + pthread_ptr.toString(16)); const self = setupChannelToMainThread(pthread_ptr); currentWorkerThreadEvents.dispatchEvent(makeWorkerThreadEvent(dotnetPthreadCreated, self)); } diff --git a/src/mono/wasm/runtime/startup.ts b/src/mono/wasm/runtime/startup.ts index d1a76cf7f5a78a..8d807991363b82 100644 --- a/src/mono/wasm/runtime/startup.ts +++ b/src/mono/wasm/runtime/startup.ts @@ -748,7 +748,7 @@ export async function mono_wasm_pthread_worker_init(module: DotnetModule, export pthreads_worker.setupPreloadChannelToMainThread(); // This is a good place for subsystems to attach listeners for pthreads_worker.currentWorkerThreadEvents pthreads_worker.currentWorkerThreadEvents.addEventListener(pthreads_worker.dotnetPthreadCreated, (ev) => { - console.debug("MONO_WASM: pthread created", ev.pthread_self.pthread_id); + console.debug("MONO_WASM: pthread created 0x" + ev.pthread_self.pthread_id.toString(16)); }); // this is the only event which is called on worker diff --git a/src/mono/wasm/test-main.js b/src/mono/wasm/test-main.js index 60c54734929629..3d835c0bb61407 100644 --- a/src/mono/wasm/test-main.js +++ b/src/mono/wasm/test-main.js @@ -23,6 +23,16 @@ if (is_node && process.versions.node.split(".")[0] < 14) { throw new Error(`NodeJS at '${process.execPath}' has too low version '${process.versions.node}'`); } +if (is_node) { + // the emscripten 3.1.34 stopped handling these when MODULARIZE is enabled + process.on('uncaughtException', function(ex) { + // ignore UnhandledPromiseRejection exceptions with exit status + if (ex !== 'unwind' && (ex.name !== "UnhandledPromiseRejection" || !ex.message.includes('"#"'))) { + throw ex; + } + }); +} + if (!is_node && !is_browser && typeof globalThis.crypto === 'undefined') { // **NOTE** this is a simple insecure polyfill for testing purposes only // /dev/random doesn't work on js shells, so define our own diff --git a/src/mono/wasm/wasm.proj b/src/mono/wasm/wasm.proj index 9755e405afed42..8342ee12786ebb 100644 --- a/src/mono/wasm/wasm.proj +++ b/src/mono/wasm/wasm.proj @@ -293,7 +293,6 @@ <_EmccLinkFlags Include="-s EXPORTED_RUNTIME_METHODS=$(_EmccExportedRuntimeMethods)" /> <_EmccLinkFlags Include="-s EXPORTED_FUNCTIONS=$(_EmccExportedFunctions)" /> <_EmccLinkFlags Include="--source-map-base http://example.com" /> - <_EmccLinkFlags Include="-s STRICT_JS=1" /> <_EmccLinkFlags Include="-s WASM_BIGINT=1" /> <_EmccLinkFlags Include="-s EXPORT_NAME="'createDotnetRuntime'"" /> <_EmccLinkFlags Include="-s MODULARIZE=1"/>