diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 60ba03234..5b55f07da 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "hash/maphash" "net" "net/netip" "path/filepath" @@ -31,7 +32,8 @@ import ( var ( status = newAtomicStatus(Suspend) tcpQueue = make(chan C.ConnContext, 200) - udpQueue = make(chan C.PacketAdapter, 200) + udpQueues []chan C.PacketAdapter + udpHashSeed = maphash.MakeSeed() natTable = nat.New() rules []C.Rule listeners = make(map[string]C.InboundListener) @@ -70,8 +72,17 @@ func (t tunnel) HandleTCPConn(conn net.Conn, metadata *C.Metadata) { func (t tunnel) HandleUDPPacket(packet C.UDPPacket, metadata *C.Metadata) { packetAdapter := C.NewPacketAdapter(packet, metadata) + + var h maphash.Hash + + h.SetSeed(udpHashSeed) + h.WriteString(metadata.SourceAddress()) + h.WriteString(metadata.RemoteAddress()) + + queueNo := uint(h.Sum64()) % uint(len(udpQueues)) + select { - case udpQueue <- packetAdapter: + case udpQueues[queueNo] <- packetAdapter: default: } } @@ -141,7 +152,8 @@ func TCPIn() chan<- C.ConnContext { // UDPIn return fan-in udp queue // Deprecated: using Tunnel instead func UDPIn() chan<- C.PacketAdapter { - return udpQueue + // compatibility: first queue is always available for external callers + return udpQueues[0] } // NatTable return nat table @@ -243,8 +255,8 @@ func isHandle(t C.Type) bool { } // processUDP starts a loop to handle udp packet -func processUDP() { - queue := udpQueue +func processUDP(queueNo int) { + queue := udpQueues[queueNo] for conn := range queue { handleUDPConn(conn) } @@ -255,8 +267,11 @@ func process() { if num := runtime.GOMAXPROCS(0); num > numUDPWorkers { numUDPWorkers = num } + + udpQueues = make([]chan C.PacketAdapter, numUDPWorkers) for i := 0; i < numUDPWorkers; i++ { - go processUDP() + udpQueues[i] = make(chan C.PacketAdapter, 200) + go processUDP(i) } queue := tcpQueue