diff --git a/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs b/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs index 45b2380b7e0b..a2b7ecef9ffe 100644 --- a/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs +++ b/src/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncContext.Unix.cs @@ -108,7 +108,7 @@ private BufferListSendOperation RentBufferListSendOperation() => Interlocked.Exchange(ref _cachedBufferListSendOperation, null) ?? new BufferListSendOperation(this); - private abstract class AsyncOperation + private abstract class AsyncOperation : IThreadPoolWorkItem { private enum State { @@ -259,7 +259,7 @@ public bool TryCancel() return true; } - public void Dispatch(WaitCallback processingCallback) + public void Dispatch() { ManualResetEventSlim e = Event; if (e != null) @@ -270,10 +270,28 @@ public void Dispatch(WaitCallback processingCallback) else { // Async operation. Process the IO on the threadpool. - ThreadPool.UnsafeQueueUserWorkItem(processingCallback, this); + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); } } + void IThreadPoolWorkItem.Execute() + { + // ReadOperation and WriteOperation, the only two types derived from + // AsyncOperation, implement IThreadPoolWorkItem.Execute to call + // ProcessAsyncOperation(this) on the appropriate receive or send queue. + // However, this base class needs to be able to queue them without + // additional allocation, so it also implements the interface in order + // to pass the compiler's static checking for the interface, but then + // when the runtime queries for the interface, it'll use the derived + // type's interface implementation. We could instead just make this + // an abstract and have the derived types override it, but that adds + // "Execute" as a public method, which could easily be misunderstood. + // We could also add an abstract method that the base interface implementation + // invokes, but that adds an extra virtual dispatch. + Debug.Fail("Expected derived type to implement IThreadPoolWorkItem"); + throw new InvalidOperationException(); + } + // Called when op is not in the queue yet, so can't be otherwise executing public void DoAbort() { @@ -302,14 +320,18 @@ public void TraceWithContext(SocketAsyncContext context, string message, [Caller // These two abstract classes differentiate the operations that go in the // read queue vs the ones that go in the write queue. - private abstract class ReadOperation : AsyncOperation + private abstract class ReadOperation : AsyncOperation, IThreadPoolWorkItem { public ReadOperation(SocketAsyncContext context) : base(context) { } + + void IThreadPoolWorkItem.Execute() => AssociatedContext.ProcessAsyncReadOperation(this); } - private abstract class WriteOperation : AsyncOperation + private abstract class WriteOperation : AsyncOperation, IThreadPoolWorkItem { public WriteOperation(SocketAsyncContext context) : base(context) { } + + void IThreadPoolWorkItem.Execute() => AssociatedContext.ProcessAsyncWriteOperation(this); } private abstract class SendOperation : WriteOperation @@ -693,11 +715,6 @@ private enum QueueState : byte private LockToken Lock() => new LockToken(_queueLock); - private static readonly WaitCallback s_processingCallback = - typeof(TOperation) == typeof(ReadOperation) ? ((op) => { var operation = ((ReadOperation)op); operation.AssociatedContext._receiveQueue.ProcessAsyncOperation(operation); }) : - typeof(TOperation) == typeof(WriteOperation) ? ((op) => { var operation = ((WriteOperation)op); operation.AssociatedContext._sendQueue.ProcessAsyncOperation(operation); }) : - (WaitCallback)null; - public void Init() { Debug.Assert(_queueLock == null); @@ -836,10 +853,10 @@ public void HandleEvent(SocketAsyncContext context) } // Dispatch the op so we can try to process it. - op.Dispatch(s_processingCallback); + op.Dispatch(); } - private void ProcessAsyncOperation(TOperation op) + internal void ProcessAsyncOperation(TOperation op) { OperationResult result = ProcessQueuedOperation(op); @@ -968,10 +985,7 @@ public OperationResult ProcessQueuedOperation(TOperation op) } } - if (nextOp != null) - { - nextOp.Dispatch(s_processingCallback); - } + nextOp?.Dispatch(); return (wasCompleted ? OperationResult.Completed : OperationResult.Cancelled); } @@ -1050,10 +1064,7 @@ public void CancelAndContinueProcessing(TOperation op) } } - if (nextOp != null) - { - nextOp.Dispatch(s_processingCallback); - } + nextOp?.Dispatch(); } // Called when the socket is closed. @@ -1255,6 +1266,10 @@ private bool ShouldRetrySyncOperation(out SocketError errorCode) return false; } + private void ProcessAsyncReadOperation(ReadOperation op) => _receiveQueue.ProcessAsyncOperation(op); + + private void ProcessAsyncWriteOperation(WriteOperation op) => _sendQueue.ProcessAsyncOperation(op); + public SocketError Accept(byte[] socketAddress, ref int socketAddressLen, out IntPtr acceptedFd) { Debug.Assert(socketAddress != null, "Expected non-null socketAddress");