Skip to content

Commit

Permalink
backend: add onTraffic callback for serverless (#276)
Browse files Browse the repository at this point in the history
Signed-off-by: xhe <xw897002528@gmail.com>
  • Loading branch information
xhebox authored Apr 25, 2023
1 parent e776585 commit 396efd1
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ golangci-lint:
GOBIN=$(GOBIN) go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest

lint: golangci-lint tidy
$(GOBIN)/golangci-lint run
cd lib && $(GOBIN)/golangci-lint run
$(GOBIN)/golangci-lint run

gocovmerge:
GOBIN=$(GOBIN) go install github.com/wadey/gocovmerge@master
Expand Down
1 change: 1 addition & 0 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func (mgr *BackendConnManager) getBackendIO(cctx ConnContext, auth *Authenticato
func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (err error) {
defer func() {
mgr.setQuitSourceByErr(err)
mgr.handshakeHandler.OnTraffic(mgr)
}()
if len(request) < 1 {
err = mysql.ErrMalformPacket
Expand Down
40 changes: 40 additions & 0 deletions pkg/proxy/backend/backend_conn_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,46 @@ func TestHandlerReturnError(t *testing.T) {
}
}

func TestOnTraffic(t *testing.T) {
i := 0
inbytes, outbytes := []int{
0x99,
}, []int{
0xce,
}
ts := newBackendMgrTester(t, func(config *testConfig) {
config.proxyConfig.checkBackendInterval = 10 * time.Millisecond
config.proxyConfig.handler.onTraffic = func(cc ConnContext) {
require.Equal(t, uint64(inbytes[i]), cc.ClientInBytes())
require.Equal(t, uint64(outbytes[i]), cc.ClientOutBytes())
i++
}
})
runners := []runner{
// 1st handshake
{
client: ts.mc.authenticate,
proxy: ts.firstHandshake4Proxy,
backend: ts.handshake4Backend,
},
// query
{
client: func(packetIO *pnet.PacketIO) error {
ts.mc.sql = "select 1"
return ts.mc.request(packetIO)
},
proxy: ts.forwardCmd4Proxy,
backend: func(packetIO *pnet.PacketIO) error {
ts.mb.respondType = responseTypeResultSet
ts.mb.columns = 1
ts.mb.rows = 1
return ts.mb.respond(packetIO)
},
},
}
ts.runTests(runners)
}

func TestGetBackendIO(t *testing.T) {
addrs := make([]string, 0, 3)
listeners := make([]net.Listener, 0, cap(addrs))
Expand Down
12 changes: 12 additions & 0 deletions pkg/proxy/backend/handshake_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (es ErrorSource) String() string {
}

var _ HandshakeHandler = (*DefaultHandshakeHandler)(nil)
var _ HandshakeHandler = (*CustomHandshakeHandler)(nil)

type ConnContext interface {
ClientAddr() string
Expand All @@ -82,6 +83,7 @@ type HandshakeHandler interface {
GetRouter(ctx ConnContext, resp *pnet.HandshakeResp) (router.Router, error)
OnHandshake(ctx ConnContext, to string, err error)
OnConnClose(ctx ConnContext) error
OnTraffic(ctx ConnContext)
GetCapability() pnet.Capability
GetServerVersion() string
}
Expand Down Expand Up @@ -117,6 +119,9 @@ func (handler *DefaultHandshakeHandler) GetRouter(ctx ConnContext, resp *pnet.Ha
func (handler *DefaultHandshakeHandler) OnHandshake(ConnContext, string, error) {
}

func (handler *DefaultHandshakeHandler) OnTraffic(ConnContext) {
}

func (handler *DefaultHandshakeHandler) OnConnClose(ConnContext) error {
return nil
}
Expand All @@ -135,6 +140,7 @@ func (handler *DefaultHandshakeHandler) GetServerVersion() string {
type CustomHandshakeHandler struct {
getRouter func(ctx ConnContext, resp *pnet.HandshakeResp) (router.Router, error)
onHandshake func(ConnContext, string, error)
onTraffic func(ConnContext)
onConnClose func(ConnContext) error
handleHandshakeResp func(ctx ConnContext, resp *pnet.HandshakeResp) error
getCapability func() pnet.Capability
Expand All @@ -154,6 +160,12 @@ func (h *CustomHandshakeHandler) OnHandshake(ctx ConnContext, addr string, err e
}
}

func (h *CustomHandshakeHandler) OnTraffic(ctx ConnContext) {
if h.onTraffic != nil {
h.onTraffic(ctx)
}
}

func (h *CustomHandshakeHandler) OnConnClose(ctx ConnContext) error {
if h.onConnClose != nil {
return h.onConnClose(ctx)
Expand Down

0 comments on commit 396efd1

Please sign in to comment.