Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address feedback
  • Loading branch information
Koundinya Veluri committed Mar 3, 2022
commit 165842adbcd9128f8054e499a602fa122ec363a1
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,9 @@ private sealed unsafe class IOCompletionPoller
1024;
#endif

private static readonly Action<Event> ProcessEventDelegate = ProcessEvent;

private readonly nint _port;
private readonly Interop.Kernel32.OVERLAPPED_ENTRY* _nativeEvents;
private readonly ThreadPoolTypedWorkItemQueue<Event> _events;
private readonly ThreadPoolTypedWorkItemQueue<Event, Callback> _events;
private readonly Thread _thread;

public IOCompletionPoller(nint port)
Expand All @@ -118,18 +116,30 @@ public IOCompletionPoller(nint port)

_nativeEvents =
(Interop.Kernel32.OVERLAPPED_ENTRY*)
NativeMemory.Alloc((nuint)NativeEventCapacity * (nuint)Unsafe.SizeOf<Interop.Kernel32.OVERLAPPED_ENTRY>());
_events = new ThreadPoolTypedWorkItemQueue<Event>(ProcessEventDelegate);
NativeMemory.Alloc(NativeEventCapacity, (nuint)sizeof(Interop.Kernel32.OVERLAPPED_ENTRY));
_events = new(default);

// Thread pool threads must start in the default execution context without transferring the context, so
// using UnsafeStart() instead of Start()
_thread = new Thread(Poll, SmallStackSizeBytes)
{
IsThreadPoolThread = true,
IsBackground = true,
Priority = ThreadPriority.Highest,
Name = ".NET ThreadPool IO"
};

// Poller threads are typically expected to be few in number and have to compete for time slices with all other
// threads that are scheduled to run. They do only a small amount of work and don't run any user code. In
// situations where frequently, a large number of threads are scheduled to run, a scheduled poller thread may be
// delayed artificially quite a bit. The poller threads are given higher priority than normal to mitigate that
// issue. It's unlikely that these threads would starve a system because in such a situation IO completions
// would stop occurring. Since the number of IO pollers is configurable, avoid having too many poller threads at
// higher priority.
if (ProcessorsPerPoller >= 4)
{
_thread.Priority = ThreadPriority.AboveNormal;
}

_thread.UnsafeStart();
}

Expand Down Expand Up @@ -159,22 +169,25 @@ private void Poll()
ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error());
}

private static void ProcessEvent(Event e)
private struct Callback : IThreadPoolTypedWorkItemQueueCallback<Event>
{
if (NativeRuntimeEventSource.Log.IsEnabled())
public void Invoke(Event e)
{
NativeRuntimeEventSource.Log.ThreadPoolIODequeue(e.nativeOverlapped);
}
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolIODequeue(e.nativeOverlapped);
}

// The NtStatus code for the operation is in the InternalLow field
uint ntStatus = (uint)(nint)e.nativeOverlapped->InternalLow;
uint errorCode = Interop.Errors.ERROR_SUCCESS;
if (ntStatus != Interop.StatusOptions.STATUS_SUCCESS)
{
errorCode = Interop.NtDll.RtlNtStatusToDosError((int)ntStatus);
}
// The NtStatus code for the operation is in the InternalLow field
uint ntStatus = (uint)(nint)e.nativeOverlapped->InternalLow;
uint errorCode = Interop.Errors.ERROR_SUCCESS;
if (ntStatus != Interop.StatusOptions.STATUS_SUCCESS)
{
errorCode = Interop.NtDll.RtlNtStatusToDosError((int)ntStatus);
}

_IOCompletionCallback.PerformSingleIOCompletionCallback(errorCode, e.bytesTransferred, e.nativeOverlapped);
_IOCompletionCallback.PerformSingleIOCompletionCallback(errorCode, e.bytesTransferred, e.nativeOverlapped);
}
}

private readonly struct Event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,13 +869,22 @@ public enum WorkState
}
}

internal sealed class ThreadPoolTypedWorkItemQueue<T> : IThreadPoolWorkItem
// A strongly typed callback for ThreadPoolTypedWorkItemQueue<T, TCallback>.
// This way we avoid the indirection of a delegate call.
internal interface IThreadPoolTypedWorkItemQueueCallback<T>
{
// TODO: Make it static abstract when we can.
void Invoke(T item);
}

internal sealed class ThreadPoolTypedWorkItemQueue<T, TCallback>
: IThreadPoolWorkItem where TCallback : struct, IThreadPoolTypedWorkItemQueueCallback<T>
{
private int _isScheduledForProcessing;
private readonly ConcurrentQueue<T> _workItems = new ConcurrentQueue<T>();
private readonly Action<T> _callback;
private readonly TCallback _callback;

public ThreadPoolTypedWorkItemQueue(Action<T> callback) => _callback = callback;
public ThreadPoolTypedWorkItemQueue(TCallback callback) => _callback = callback;

public int Count => _workItems.Count;

Expand Down Expand Up @@ -914,6 +923,11 @@ void IThreadPoolWorkItem.Execute()
return;
}

// An work item was successfully dequeued, and there may be more work items to process. Schedule a work item to
// parallelize processing of work items, before processing more work items. Following this, it is the responsibility
// of the new work item and the poller thread to schedule more work items as necessary. The parallelization may be
// necessary here if the user callback as part of handling the work item blocks for some reason that may have a
// dependency on other queued work items.
ScheduleForProcessing();

ThreadPoolWorkQueueThreadLocals tl = ThreadPoolWorkQueueThreadLocals.threadLocals!;
Expand All @@ -924,8 +938,15 @@ void IThreadPoolWorkItem.Execute()
int startTimeMs = Environment.TickCount;
while (true)
{
_callback(workItem);

_callback.Invoke(workItem);

// This work item processes queued work items until certain conditions are met, and tracks some things:
// - Keep track of the number of work items processed, it will be added to the counter later
// - Local work items take precedence over all other types of work items, process them first
// - This work item should not run for too long. It is processing a specific type of work in batch, but should
// not starve other thread pool work items. Check how long it has been since this work item has started, and
// yield to the thread pool after some time. The threshold used is half of the thread pool's dispatch quantum,
// which the thread pool uses for doing periodic work.
if (++completedCount == uint.MaxValue ||
(tl.workState & ThreadPoolWorkQueueThreadLocals.WorkState.MayHaveLocalWorkItems) != 0 ||
(uint)(Environment.TickCount - startTimeMs) >= ThreadPoolWorkQueue.DispatchQuantumMs / 2 ||
Expand Down