Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix thread pool hang
- In #53471 the thread count goal was moved out of `ThreadCounts`, it turns out that are a few subtle races that it was avoiding. There are other ways to fix it, but I've added the goal back into `ThreadCounts` for now.
- Reverted PR #55985, which worked around the issue in the CI

Fixes #55642
  • Loading branch information
Koundinya Veluri committed Jul 27, 2021
commit 2d0aa6341044bb5595818918663c63793d129b06
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public short MinThreadsGoal
get
{
_threadAdjustmentLock.VerifyIsLocked();
return Math.Min(_separated.numThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
return Math.Min(_separated.counts.NumThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
}
}

Expand Down Expand Up @@ -44,7 +44,7 @@ public bool NotifyThreadBlocked()
Debug.Assert(_numBlockedThreads > 0);

if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary &&
_separated.numThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
_separated.counts.NumThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
{
if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None)
{
Expand Down Expand Up @@ -79,7 +79,7 @@ public void NotifyThreadUnblocked()

if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately &&
_numThreadsAddedDueToBlocking > 0 &&
_separated.numThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
_separated.counts.NumThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
{
wakeGateThread = true;
_pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately;
Expand Down Expand Up @@ -126,7 +126,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
addWorker = false;

short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment;
short numThreadsGoal = _separated.numThreadsGoal;
ThreadCounts counts = _separated.counts;
short numThreadsGoal = counts.NumThreadsGoal;
if (numThreadsGoal == targetThreadsGoal)
{
return 0;
Expand All @@ -144,7 +145,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo

short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking);
_numThreadsAddedDueToBlocking -= toSubtract;
_separated.numThreadsGoal = numThreadsGoal -= toSubtract;
numThreadsGoal -= toSubtract;
_separated.counts.InterlockedSetNumThreadsGoal(numThreadsGoal);
HillClimbing.ThreadPoolHillClimber.ForceChange(
numThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
Expand All @@ -158,7 +160,6 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
{
// Calculate how many threads can be added without a delay. Threads that were already created but may be just
// waiting for work can be released for work without a delay, but creating a new thread may need a delay.
ThreadCounts counts = _separated.counts;
short maxThreadsGoalWithoutDelay =
Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads));
short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay);
Expand Down Expand Up @@ -225,7 +226,7 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
} while (false);

_numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal);
_separated.numThreadsGoal = newNumThreadsGoal;
counts = _separated.counts.InterlockedSetNumThreadsGoal(newNumThreadsGoal);
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,31 @@ private static void GateThreadStart()
// of the number of existing threads, is compared with the goal. There may be alternative
// solutions, for now this is only to maintain consistency in behavior.
ThreadCounts counts = threadPoolInstance._separated.counts;
if (counts.NumProcessingWork < threadPoolInstance._maxThreads &&
counts.NumProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
while (
counts.NumProcessingWork < threadPoolInstance._maxThreads &&
counts.NumProcessingWork >= counts.NumThreadsGoal)
{
if (debuggerBreakOnWorkStarvation)
{
Debugger.Break();
}

ThreadCounts newCounts = counts;
short newNumThreadsGoal = (short)(counts.NumProcessingWork + 1);
threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.Starvation);
addWorker = true;
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts countsBeforeUpdate =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.Starvation);
addWorker = true;
break;
}

counts = countsBeforeUpdate;
}
}
finally
Expand Down Expand Up @@ -183,7 +194,7 @@ private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoo
}
else
{
minimumDelay = (uint)threadPoolInstance._separated.numThreadsGoal * DequeueDelayThresholdMs;
minimumDelay = (uint)threadPoolInstance._separated.counts.NumThreadsGoal * DequeueDelayThresholdMs;
}

return delay > minimumDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ private struct ThreadCounts
// SOS's ThreadPool command depends on this layout
private const byte NumProcessingWorkShift = 0;
private const byte NumExistingThreadsShift = 16;
private const byte NumThreadsGoalShift = 32;

private uint _data; // SOS's ThreadPool command depends on this name
private ulong _data; // SOS's ThreadPool command depends on this name

private ThreadCounts(uint data) => _data = data;
private ThreadCounts(ulong data) => _data = data;

private short GetInt16Value(byte shift) => (short)(_data >> shift);
private void SetInt16Value(short value, byte shift) =>
_data = (_data & ~((uint)ushort.MaxValue << shift)) | ((uint)(ushort)value << shift);
_data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)(ushort)value << shift);

/// <summary>
/// Number of threads processing work items.
Expand All @@ -43,7 +44,7 @@ public void SubtractNumProcessingWork(short value)
Debug.Assert(value >= 0);
Debug.Assert(value <= NumProcessingWork);

_data -= (uint)(ushort)value << NumProcessingWorkShift;
_data -= (ulong)(ushort)value << NumProcessingWorkShift;
}

public void InterlockedDecrementNumProcessingWork()
Expand Down Expand Up @@ -72,19 +73,61 @@ public void SubtractNumExistingThreads(short value)
Debug.Assert(value >= 0);
Debug.Assert(value <= NumExistingThreads);

_data -= (uint)(ushort)value << NumExistingThreadsShift;
_data -= (ulong)(ushort)value << NumExistingThreadsShift;
}

/// <summary>
/// Max possible thread pool threads we want to have.
/// </summary>
public short NumThreadsGoal
{
get => GetInt16Value(NumThreadsGoalShift);
set
{
Debug.Assert(value > 0);
SetInt16Value(value, NumThreadsGoalShift);
}
}

public ThreadCounts InterlockedSetNumThreadsGoal(short value)
{
ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked();

ThreadCounts counts = this;
while (true)
{
ThreadCounts newCounts = counts;
newCounts.NumThreadsGoal = value;

ThreadCounts countsBeforeUpdate = InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
return newCounts;
}

counts = countsBeforeUpdate;
}
}

public ThreadCounts VolatileRead() => new ThreadCounts(Volatile.Read(ref _data));

public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts) =>
new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));
public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts)
{
#if DEBUG
if (newCounts.NumThreadsGoal != oldCounts.NumThreadsGoal)
{
ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked();
}
#endif

return new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));
}

public static bool operator ==(ThreadCounts lhs, ThreadCounts rhs) => lhs._data == rhs._data;
public static bool operator !=(ThreadCounts lhs, ThreadCounts rhs) => lhs._data != rhs._data;

public override bool Equals([NotNullWhen(true)] object? obj) => obj is ThreadCounts other && _data == other._data;
public override int GetHashCode() => (int)_data;
public override int GetHashCode() => (int)_data + (int)(_data >> 32);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,19 @@ private static void WorkerThreadStart()
ThreadCounts newCounts = counts;
newCounts.SubtractNumExistingThreads(1);
short newNumExistingThreads = (short)(numExistingThreads - 1);

ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts oldCounts =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, threadPoolInstance._separated.numThreadsGoal));
if (threadPoolInstance._separated.numThreadsGoal != newNumThreadsGoal)
{
threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
}

HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
Expand Down Expand Up @@ -181,7 +178,7 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance
while (true)
{
numProcessingWork = counts.NumProcessingWork;
if (numProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
if (numProcessingWork >= counts.NumThreadsGoal)
{
return;
}
Expand Down Expand Up @@ -256,7 +253,7 @@ internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolIn
// code from which this implementation was ported, which turns a processing thread into a retired thread
// and checks for pending requests like RemoveWorkingWorker. In this implementation there are
// no retired threads, so only the count of threads processing work is considered.
if (counts.NumProcessingWork <= threadPoolInstance._separated.numThreadsGoal)
if (counts.NumProcessingWork <= counts.NumThreadsGoal)
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ private struct CacheLineSeparated
{
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)]
public ThreadCounts counts; // SOS's ThreadPool command depends on this name
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1 + sizeof(uint))]
public short numThreadsGoal;

[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)]
public int lastDequeueTime;
Expand Down Expand Up @@ -103,7 +101,7 @@ private PortableThreadPool()
_maxThreads = _minThreads;
}

_separated.numThreadsGoal = _minThreads;
_separated.counts.NumThreadsGoal = _minThreads;
}

public bool SetMinThreads(int workerThreads, int ioCompletionThreads)
Expand Down Expand Up @@ -142,9 +140,9 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads)
wakeGateThread = true;
}
}
else if (_separated.numThreadsGoal < newMinThreads)
else if (_separated.counts.NumThreadsGoal < newMinThreads)
{
_separated.numThreadsGoal = newMinThreads;
_separated.counts.InterlockedSetNumThreadsGoal(newMinThreads);
if (_separated.numRequestedWorkers > 0)
{
addWorker = true;
Expand Down Expand Up @@ -193,9 +191,9 @@ public bool SetMaxThreads(int workerThreads, int ioCompletionThreads)

short newMaxThreads = (short)Math.Min(workerThreads, MaxPossibleThreadCount);
_maxThreads = newMaxThreads;
if (_separated.numThreadsGoal > newMaxThreads)
if (_separated.counts.NumThreadsGoal > newMaxThreads)
{
_separated.numThreadsGoal = newMaxThreads;
_separated.counts.InterlockedSetNumThreadsGoal(newMaxThreads);
}
return true;
}
Expand Down Expand Up @@ -272,13 +270,15 @@ private void AdjustMaxWorkersActive()
bool addWorker = false;
try
{
// Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the
// blocking adjustment heuristics and increase the thread count too quickly.
if (_pendingBlockingAdjustment != PendingBlockingAdjustment.None)
// Repeated checks from ShouldAdjustMaxWorkersActive() inside the lock
ThreadCounts counts = _separated.counts;
if (counts.NumProcessingWork > counts.NumThreadsGoal ||
_pendingBlockingAdjustment != PendingBlockingAdjustment.None)
{
return;
}


long startTime = _currentSampleStartTime;
long endTime = Stopwatch.GetTimestamp();
long freq = Stopwatch.Frequency;
Expand All @@ -291,13 +291,13 @@ private void AdjustMaxWorkersActive()
int totalNumCompletions = (int)_completionCounter.Count;
int numCompletions = totalNumCompletions - _separated.priorCompletionCount;

short oldNumThreadsGoal = counts.NumThreadsGoal;
int newNumThreadsGoal;
(newNumThreadsGoal, _threadAdjustmentIntervalMs) =
HillClimbing.ThreadPoolHillClimber.Update(_separated.numThreadsGoal, elapsedSeconds, numCompletions);
short oldNumThreadsGoal = _separated.numThreadsGoal;
HillClimbing.ThreadPoolHillClimber.Update(oldNumThreadsGoal, elapsedSeconds, numCompletions);
if (oldNumThreadsGoal != (short)newNumThreadsGoal)
{
_separated.numThreadsGoal = (short)newNumThreadsGoal;
_separated.counts.InterlockedSetNumThreadsGoal((short)newNumThreadsGoal);

//
// If we're increasing the goal, inject a thread. If that thread finds work, it will inject
Expand Down Expand Up @@ -354,7 +354,8 @@ private bool ShouldAdjustMaxWorkersActive(int currentTimeMs)
// threads processing work to stop in response to a decreased thread count goal. The logic here is a bit
// different from the original CoreCLR code from which this implementation was ported because in this
// implementation there are no retired threads, so only the count of threads processing work is considered.
if (_separated.counts.NumProcessingWork > _separated.numThreadsGoal)
ThreadCounts counts = _separated.counts;
if (counts.NumProcessingWork > counts.NumThreadsGoal)
{
return false;
}
Expand Down