Refactor PacketProxy to PacketRelay (Side-by-Side)#618
Conversation
Greptile SummaryThis PR introduces the new
Confidence Score: 3/5The new PacketRelay abstraction layer is architecturally sound and well-tested, but two concrete defects warrant attention before the code is relied on in production: a broken interface contract and a global-lock bottleneck in the lwIP UDP path. The
Important Files Changed
Sequence DiagramsequenceDiagram
participant LwIP as lwIP Stack
participant URH as udpRelayHandler
participant PR as PacketRelay
participant PS as PacketSender
participant RCV as PacketReceiver
participant FWD as udpRelayPacketForwarder
LwIP->>URH: ReceiveTo(tunConn, data, dest)
URH->>URH: lock h.mu
alt new local address
URH->>PR: NewAssociation()
PR-->>URH: PacketSender, PacketReceiver
URH->>URH: store sender in map
URH->>URH: spawn goroutine for ReceivePackets
end
URH->>URH: unlock h.mu
URH->>PS: SendPacket(data, dest)
PS-->>LwIP: error or nil
RCV->>FWD: HandlePacket(p, source)
FWD->>LwIP: conn.WriteFrom(p, srcAddr)
note over PS,RCV: On timeout or explicit Close
PS->>PS: Close triggers inner.Close
RCV-->>URH: ReceivePackets returns
URH->>URH: closeSession removes from map
Reviews (1): Last reviewed commit: "Revert outline-cli" | Re-trigger Greptile |
| // | ||
| // Before returning, ReceivePackets MUST call handler.Close() to indicate the end of the stream. | ||
| ReceivePackets(handler PacketHandler) error |
There was a problem hiding this comment.
Unreachable contract:
PacketHandler has no Close() method
The contract says "Before returning, ReceivePackets MUST call handler.Close() to indicate the end of the stream," but PacketHandler only defines HandlePacket. No implementation in this PR can satisfy this requirement, and any future implementor following the contract will get a compile error. If a cleanup signal is needed, either add Close() to PacketHandler or remove the sentence from the docstring.
| h.mu.Lock() | ||
| sender, ok := h.senders[laddr] | ||
| if !ok { | ||
| // Synchronize new session creation completely under the lock to prevent proxy resource leaks | ||
| // when concurrent packets arrive on a new local port. | ||
| var err error | ||
| sender, err = h.newSession(tunConn) | ||
| if err != nil { | ||
| h.mu.Unlock() | ||
| return err | ||
| } | ||
| h.senders[laddr] = sender | ||
| } | ||
| h.mu.Unlock() | ||
|
|
||
| return sender.SendPacket(data, destAddr.AddrPort()) | ||
| } |
There was a problem hiding this comment.
NewAssociation() called while holding h.mu, blocking all active UDP sessions
newSession (which calls h.relay.NewAssociation()) runs entirely inside the h.mu lock. If the underlying relay's NewAssociation() involves I/O — such as PacketListenerRelay calling ListenPacket over a proxy connection — every concurrent ReceiveTo call for already-established sessions stalls waiting for the lock. A single slow handshake can drop or delay packets across all active UDP flows. The lock guards map access, but NewAssociation() itself only needs to happen once per new address; the map write is the only part that truly requires h.mu.
| // Push to channel inside the lock using select with default to avoid deadlocks | ||
| select { | ||
| case s.ch <- dnsPacket{payload: buf, source: destination}: | ||
| return nil | ||
| default: | ||
| // Queue is full! | ||
| return errors.New("DNS truncation queue full") | ||
| } |
There was a problem hiding this comment.
Queue-full error is not a sentinel, making it impossible to detect with
errors.Is
errors.New("DNS truncation queue full") creates a new, anonymous error value on every call. Callers that need to distinguish a full queue from network errors or a closed state cannot do so. Defining a package-level error variable lets callers use errors.Is for targeted handling.
| // Push to channel inside the lock using select with default to avoid deadlocks | |
| select { | |
| case s.ch <- dnsPacket{payload: buf, source: destination}: | |
| return nil | |
| default: | |
| // Queue is full! | |
| return errors.New("DNS truncation queue full") | |
| } | |
| // Push to channel inside the lock using select with default to avoid deadlocks | |
| select { | |
| case s.ch <- dnsPacket{payload: buf, source: destination}: | |
| return nil | |
| default: | |
| // Queue is full! | |
| return ErrQueueFull | |
| } |
| // SendPacket implements [packetrelay.PacketSender].SendPacket(). It parses a packet from p, and determines whether it is | ||
| // a valid DNS request. If so, it will push a DNS response with TC (truncated) bit set to the receiver. | ||
| func (s *dnsTruncateSender) SendPacket(p []byte, destination netip.AddrPort) error { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| if s.closed { | ||
| return packetrelay.ErrClosed | ||
| } | ||
|
|
||
| if destination.Port() != standardDNSPort { | ||
| return 0, fmt.Errorf("UDP traffic to non-DNS port %v is not supported: %w", destination.Port(), network.ErrPortUnreachable) | ||
| return fmt.Errorf("UDP traffic to non-DNS port %v is not supported: %w", destination.Port(), network.ErrPortUnreachable) | ||
| } | ||
| if len(p) < dnsUdpMinMsgLen { | ||
| return 0, fmt.Errorf("invalid DNS message of length %v, it must be at least %v bytes", len(p), dnsUdpMinMsgLen) | ||
| return fmt.Errorf("invalid DNS message of length %v, it must be at least %v bytes", len(p), dnsUdpMinMsgLen) | ||
| } | ||
|
|
||
| // Allocate buffer from slicepool, because `go build -gcflags="-m"` shows a local array will escape to heap | ||
| slice := packetBufferPool.LazySlice() | ||
| buf := slice.Acquire() | ||
| defer slice.Release() | ||
|
|
||
| // We need to copy p into buf because "WriteTo must not modify p, even temporarily". | ||
| n := copy(buf, p) | ||
| // We need to copy p into a new buffer because we pass it through a channel | ||
| buf := make([]byte, len(p)) | ||
| copy(buf, p) | ||
|
|
||
| // Set "Response", "Truncated" and "NoError" | ||
| // Note: gopacket is a good library doing this kind of things. But it will increase the binary size a lot. | ||
| // If we decide to use gopacket in the future, please evaluate the binary size and runtime memory consumption. | ||
| buf[dnsUdpAnswerByte] |= (dnsUdpResponseBit | dnsUdpTruncatedBit) | ||
| buf[dnsUdpRCodeByte] &= ^dnsUdpRCodeMask | ||
|
|
||
| // Copy QDCOUNT to ANCOUNT. This is an incorrect workaround for some DNS clients (such as Windows 7); | ||
| // because without these clients won't retry over TCP. | ||
| // | ||
| // For reference: https://github.com/eycorsican/go-tun2socks/blob/master/proxy/dnsfallback/udp.go#L59-L63 | ||
| copy(buf[dnsARCntStartByte:dnsARCntEndByte+1], buf[dnsQDCntStartByte:dnsQDCntEndByte+1]) | ||
|
|
||
| return h.respWriter.WriteFrom(buf[:n], net.UDPAddrFromAddrPort(destination)) | ||
| // Push to channel inside the lock using select with default to avoid deadlocks | ||
| select { | ||
| case s.ch <- dnsPacket{payload: buf, source: destination}: | ||
| return nil | ||
| default: | ||
| // Queue is full! | ||
| return errors.New("DNS truncation queue full") | ||
| } | ||
| } |
There was a problem hiding this comment.
SendPacket holds s.mu during buffer allocation, copy, and header modification
The mutex is acquired before make([]byte, ...) + copy + bit manipulation, all of which are CPU-local work that needs no mutual exclusion. Only the closed check and the channel send require the lock. Holding it through the full body serialises all concurrent SendPacket calls unnecessarily.
This PR introduces the new flow-based
PacketRelayAPI side-by-side with the legacyPacketProxyAPI to avoid breaking existing consumers, migrating it to a dedicated standalone subpackage for perfect API isolation. It also modernizeslwip2transportto natively adopt the new abstractions.Key Architectural Benefits:
PacketSender) and receive path (PacketReceiver) explicitly. The receive path blocks and processes packets using the caller's own goroutine loop, eliminating "callback hell" and unnecessary background goroutines in the core components.TimeoutPacketRelay,LazyPacketRelay) that can wrap anyPacketRelayimplementation.TimeoutPacketRelay(only resetting the timer on outgoingSendPacketwrites, as recommended by RFC 4787 Section 4.3) to prevent remote attackers from flooding the relay to hold mappings open indefinitely.network/packetrelaycomplete with comprehensive package-level documentation, resolving all import cycles and maintaining zero dependencies on the legacy types.NewPacketProxyFromPacketRelay) and wrappers in thenetworkpackage to allow existing consumers (such aslwip2transport) to easily adopt and bridge the new abstractions.Core Component & Consumer Migrations:
PacketListenerRelay: Pure transport packet listener adapter with synchronizedClose()mechanics safely unblocking active blocking reads.TimeoutPacketRelay: Managed timeout decorator using a high-performance, race-freelastActivitytimestamp to safely avoid standardAfterFunc/Resetraces.LazyPacketRelay[NEW]: Deferred association decorator that postpones underlying association creation until the first packet is sent, optimizing resource allocation.DelegatePacketRelay: Thread-safe hot-swappable wrapper usingatomic.Pointerto avoid autogenerated type consistency panic issues.dnstruncate.PacketRelay: Refactored to channel-basedPacketRelaydesign using a non-blocking lock-protectedselect{default:}queue to guarantee zero panics on channel closures during concurrent sends.lwip2transport.ConfigureDeviceWithRelay[NEW]: Added nativepacketrelaysupport to the LwIP stack, eliminating response callbacks and using allocation-freenet.UDPAddrFromAddrPortaddress conversions. Resolves a concurrent UDP session creation leak under mutex lock.