Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private BufferListSendOperation RentBufferListSendOperation() =>
Interlocked.Exchange(ref _cachedBufferListSendOperation, null) ??
new BufferListSendOperation(this);

private abstract class AsyncOperation
private abstract class AsyncOperation : IThreadPoolWorkItem
{
private enum State
{
Expand Down Expand Up @@ -259,7 +259,7 @@ public bool TryCancel()
return true;
}

public void Dispatch(WaitCallback processingCallback)
public void Dispatch()
{
ManualResetEventSlim e = Event;
if (e != null)
Expand All @@ -270,10 +270,28 @@ public void Dispatch(WaitCallback processingCallback)
else
{
// Async operation. Process the IO on the threadpool.
ThreadPool.UnsafeQueueUserWorkItem(processingCallback, this);
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}

void IThreadPoolWorkItem.Execute()
{
// ReadOperation and WriteOperation, the only two types derived from
// AsyncOperation, implement IThreadPoolWorkItem.Execute to call
// ProcessAsyncOperation(this) on the appropriate receive or send queue.
// However, this base class needs to be able to queue them without
// additional allocation, so it also implements the interface in order
// to pass the compiler's static checking for the interface, but then
// when the runtime queries for the interface, it'll use the derived
// type's interface implementation. We could instead just make this
// an abstract and have the derived types override it, but that adds
// "Execute" as a public method, which could easily be misunderstood.
// We could also add an abstract method that the base interface implementation
// invokes, but that adds an extra virtual dispatch.
Debug.Fail("Expected derived type to implement IThreadPoolWorkItem");
throw new InvalidOperationException();
}

// Called when op is not in the queue yet, so can't be otherwise executing
public void DoAbort()
{
Expand Down Expand Up @@ -302,14 +320,18 @@ public void TraceWithContext(SocketAsyncContext context, string message, [Caller

// These two abstract classes differentiate the operations that go in the
// read queue vs the ones that go in the write queue.
private abstract class ReadOperation : AsyncOperation
private abstract class ReadOperation : AsyncOperation, IThreadPoolWorkItem
{
public ReadOperation(SocketAsyncContext context) : base(context) { }

void IThreadPoolWorkItem.Execute() => AssociatedContext.ProcessAsyncReadOperation(this);
}

private abstract class WriteOperation : AsyncOperation
private abstract class WriteOperation : AsyncOperation, IThreadPoolWorkItem
{
public WriteOperation(SocketAsyncContext context) : base(context) { }

void IThreadPoolWorkItem.Execute() => AssociatedContext.ProcessAsyncWriteOperation(this);
}

private abstract class SendOperation : WriteOperation
Expand Down Expand Up @@ -693,11 +715,6 @@ private enum QueueState : byte

private LockToken Lock() => new LockToken(_queueLock);

private static readonly WaitCallback s_processingCallback =
typeof(TOperation) == typeof(ReadOperation) ? ((op) => { var operation = ((ReadOperation)op); operation.AssociatedContext._receiveQueue.ProcessAsyncOperation(operation); }) :
typeof(TOperation) == typeof(WriteOperation) ? ((op) => { var operation = ((WriteOperation)op); operation.AssociatedContext._sendQueue.ProcessAsyncOperation(operation); }) :
(WaitCallback)null;

public void Init()
{
Debug.Assert(_queueLock == null);
Expand Down Expand Up @@ -836,10 +853,10 @@ public void HandleEvent(SocketAsyncContext context)
}

// Dispatch the op so we can try to process it.
op.Dispatch(s_processingCallback);
op.Dispatch();
}

private void ProcessAsyncOperation(TOperation op)
internal void ProcessAsyncOperation(TOperation op)
{
OperationResult result = ProcessQueuedOperation(op);

Expand Down Expand Up @@ -968,10 +985,7 @@ public OperationResult ProcessQueuedOperation(TOperation op)
}
}

if (nextOp != null)
{
nextOp.Dispatch(s_processingCallback);
}
nextOp?.Dispatch();

return (wasCompleted ? OperationResult.Completed : OperationResult.Cancelled);
}
Expand Down Expand Up @@ -1050,10 +1064,7 @@ public void CancelAndContinueProcessing(TOperation op)
}
}

if (nextOp != null)
{
nextOp.Dispatch(s_processingCallback);
}
nextOp?.Dispatch();
}

// Called when the socket is closed.
Expand Down Expand Up @@ -1255,6 +1266,10 @@ private bool ShouldRetrySyncOperation(out SocketError errorCode)
return false;
}

private void ProcessAsyncReadOperation(ReadOperation op) => _receiveQueue.ProcessAsyncOperation(op);

private void ProcessAsyncWriteOperation(WriteOperation op) => _sendQueue.ProcessAsyncOperation(op);

public SocketError Accept(byte[] socketAddress, ref int socketAddressLen, out IntPtr acceptedFd)
{
Debug.Assert(socketAddress != null, "Expected non-null socketAddress");
Expand Down