Skip to content

Commit

Permalink
Feat: Add workers based on CPU threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Musixal committed Oct 13, 2024
1 parent abff19f commit 0675482
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 16 deletions.
6 changes: 3 additions & 3 deletions internal/server/transport/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (s *QuicTransport) channelHandshake(qConn quic.Connection) {
qConn.CloseWithError(1, "failed to set deadline")
return
}
msg, _, err := utils.ReceiveBinaryString(stream)
msg, err := utils.ReceiveBinaryString(stream)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
s.logger.Warn("timeout while waiting for control channel signal")
Expand All @@ -274,7 +274,7 @@ func (s *QuicTransport) channelHandshake(qConn quic.Connection) {
return
}

err = utils.SendBinaryString(stream, s.config.Token, utils.SG_TCP)
err = utils.SendBinaryString(stream, s.config.Token)
if err != nil {
s.logger.Errorf("failed to send security token: %v", err)
stream.Close()
Expand Down Expand Up @@ -489,7 +489,7 @@ func (s *QuicTransport) handleSession(session quic.Connection, next chan struct{
}

// Send the target port over the tunnel connection
err = utils.SendBinaryString(stream, incomingConn.remoteAddr, utils.SG_TCP)
err = utils.SendBinaryString(stream, incomingConn.remoteAddr)
if err != nil {
s.logger.Errorf("failed to send address %v over stream: %v", incomingConn.remoteAddr, err)

Expand Down
20 changes: 15 additions & 5 deletions internal/server/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -79,10 +80,19 @@ func (s *TcpTransport) Start() {
if s.controlChannel != nil {
s.config.TunnelStatus = "Connected (TCP)"

numCPU := runtime.NumCPU()
if numCPU > 4 {
numCPU = 4 // Max allowed handler is 4
}

go s.parsePortMappings()
go s.channelHandler()
go s.handleLoop()

s.logger.Infof("starting %d handle loops on each CPU thread", numCPU)

for i := 0; i < numCPU; i++ {
go s.handleLoop()
}
}
}
func (s *TcpTransport) Restart() {
Expand Down Expand Up @@ -133,9 +143,9 @@ func (s *TcpTransport) channelHandshake() {
continue
}

msg, transport, err := utils.ReceiveBinaryString(conn)
msg, transport, err := utils.ReceiveBinaryTransportString(conn)
if transport != utils.SG_Chan {
s.logger.Errorf("invalid signal for channel, discard the connection")
s.logger.Errorf("invalid signal received for channel, Discarding connection")
conn.Close()
continue
} else if err != nil {
Expand All @@ -157,7 +167,7 @@ func (s *TcpTransport) channelHandshake() {
continue
}

err = utils.SendBinaryString(conn, s.config.Token, utils.SG_TCP)
err = utils.SendBinaryTransportString(conn, s.config.Token, utils.SG_Chan)
if err != nil {
s.logger.Errorf("failed to send security token: %v", err)
conn.Close()
Expand Down Expand Up @@ -409,7 +419,7 @@ func (s *TcpTransport) handleLoop() {

case tunnelConn := <-s.tunnelChannel:
// Send the target addr over the connection
if err := utils.SendBinaryString(tunnelConn, localConn.remoteAddr, utils.SG_TCP); err != nil {
if err := utils.SendBinaryTransportString(tunnelConn, localConn.remoteAddr, utils.SG_TCP); err != nil {
s.logger.Errorf("%v", err)
tunnelConn.Close()
continue loop
Expand Down
22 changes: 17 additions & 5 deletions internal/server/transport/tcpmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -95,9 +96,20 @@ func (s *TcpMuxTransport) Start() {
if s.controlChannel != nil {
s.config.TunnelStatus = "Connected (TCPMux)"

numCPU := runtime.NumCPU()
if numCPU > 4 {
numCPU = 4 // Max allowed handler is 4
}

go s.parsePortMappings()
go s.channelHandler()
go s.handleLoop()

s.logger.Infof("starting %d handle loops on each CPU thread", numCPU)

for i := 0; i < numCPU; i++ {
go s.handleLoop()
}

}

}
Expand Down Expand Up @@ -148,9 +160,9 @@ func (s *TcpMuxTransport) channelHandshake() {
conn.Close()
continue
}
msg, transport, err := utils.ReceiveBinaryString(conn)
msg, transport, err := utils.ReceiveBinaryTransportString(conn)
if transport != utils.SG_Chan {
s.logger.Errorf("invalid signal for channel, discard the connection")
s.logger.Errorf("invalid signal received for channel, Discarding connection")
conn.Close()
continue
} else if err != nil {
Expand All @@ -172,7 +184,7 @@ func (s *TcpMuxTransport) channelHandshake() {
continue
}

err = utils.SendBinaryString(conn, s.config.Token, utils.SG_TCP)
err = utils.SendBinaryTransportString(conn, s.config.Token, utils.SG_Chan)
if err != nil {
s.logger.Errorf("failed to send security token: %v", err)
conn.Close()
Expand Down Expand Up @@ -453,7 +465,7 @@ func (s *TcpMuxTransport) handleSession(session *smux.Session, next chan struct{
}

// Send the target port over the tunnel connection
if err := utils.SendBinaryString(stream, incomingConn.remoteAddr, utils.SG_TCP); err != nil {
if err := utils.SendBinaryString(stream, incomingConn.remoteAddr); err != nil {
s.handleSessionError(session, &incomingConn, next, done, counter, err)
return
}
Expand Down
13 changes: 12 additions & 1 deletion internal/server/transport/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net"
"net/http"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -208,9 +209,19 @@ func (s *WsTransport) tunnelListener() {

s.logger.Info("control channel established successfully")

numCPU := runtime.NumCPU()
if numCPU > 4 {
numCPU = 4 // Max allowed handler is 4
}

go s.channelHandler()
go s.parsePortMappings()
go s.handleLoop()

s.logger.Infof("starting %d handle loops on each CPU thread", numCPU)

for i := 0; i < numCPU; i++ {
go s.handleLoop()
}

s.config.TunnelStatus = fmt.Sprintf("Connected (%s)", s.config.Mode)

Expand Down
15 changes: 13 additions & 2 deletions internal/server/transport/wsmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net"
"net/http"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -229,9 +230,19 @@ func (s *WsMuxTransport) tunnelListener() {

s.logger.Info("control channel established successfully")

numCPU := runtime.NumCPU()
if numCPU > 4 {
numCPU = 4 // Max allowed handler is 4
}

go s.channelHandler()
go s.parsePortMappings()
go s.handleLoop()

s.logger.Infof("starting %d handle loops on each CPU thread", numCPU)

for i := 0; i < numCPU; i++ {
go s.handleLoop()
}

s.config.TunnelStatus = fmt.Sprintf("Connected (%s)", s.config.Mode)

Expand Down Expand Up @@ -412,7 +423,7 @@ func (s *WsMuxTransport) handleSession(session *smux.Session, next chan struct{}
time.Sleep(1 * time.Millisecond)

// Send the target port over the tunnel connection
if err := utils.SendBinaryString(stream, incomingConn.remoteAddr, utils.SG_TCP); err != nil {
if err := utils.SendBinaryString(stream, incomingConn.remoteAddr); err != nil {
s.handleSessionError(session, &incomingConn, next, done, counter, err)
return
}
Expand Down

0 comments on commit 0675482

Please sign in to comment.