From c4d3fa9003ef95444d6a7843fed19a8c50cf27c5 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Mon, 5 Sep 2022 15:17:40 +0800 Subject: [PATCH] router: add tests for router (#74) --- pkg/manager/router/router.go | 124 +++-- pkg/manager/router/router_test.go | 594 +++++++++++++++++++++ pkg/proxy/backend/backend_conn_mgr.go | 14 +- pkg/proxy/backend/backend_conn_mgr_test.go | 12 +- 4 files changed, 695 insertions(+), 49 deletions(-) create mode 100644 pkg/manager/router/router_test.go diff --git a/pkg/manager/router/router.go b/pkg/manager/router/router.go index ab3ec2b0..3ec6d4c9 100644 --- a/pkg/manager/router/router.go +++ b/pkg/manager/router/router.go @@ -17,11 +17,12 @@ package router import ( "container/list" "context" - "errors" "sync" "time" "github.com/pingcap/TiProxy/pkg/config" + "github.com/pingcap/TiProxy/pkg/util/errors" + "github.com/pingcap/TiProxy/pkg/util/waitgroup" "github.com/pingcap/tidb/util/logutil" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -38,24 +39,38 @@ var ( ErrNoInstanceToSelect = errors.New("no instances to route") ) +type connPhase int + const ( - phaseNotRedirected int = iota + // The session is never redirected. + phaseNotRedirected connPhase = iota + // The session is redirecting. phaseRedirectNotify + // The session redirected successfully last time. phaseRedirectEnd + // The session failed to redirect last time. phaseRedirectFail ) const ( - rebalanceInterval = 10 * time.Millisecond - rebalanceConnsPerLoop = 10 - rebalanceMaxScoreRatio = 1.1 + // The interval to rebalance connections. + rebalanceInterval = 10 * time.Millisecond + // The number of connections to rebalance during each interval. + // Limit the number to avoid creating too many connections suddenly on a backend. + rebalanceConnsPerLoop = 10 + // The threshold of ratio of the highest score and lowest score. + // If the ratio exceeds the threshold, the proxy will rebalance connections. + rebalanceMaxScoreRatio = 1.2 + // After a connection fails to redirect, it may contain some unmigratable status. + // Limit its redirection interval to avoid unnecessary retrial to reduce latency jitter. + redirectFailMinInterval = 3 * time.Second ) // ConnEventReceiver receives connection events. type ConnEventReceiver interface { - OnRedirectSucceed(from, to string, conn RedirectableConn) - OnRedirectFail(from, to string, conn RedirectableConn) - OnConnClosed(addr string, conn RedirectableConn) + OnRedirectSucceed(from, to string, conn RedirectableConn) error + OnRedirectFail(from, to string, conn RedirectableConn) error + OnConnClosed(addr string, conn RedirectableConn) error } // RedirectableConn indicates a redirect-able connection. @@ -84,7 +99,9 @@ func (b *backendWrapper) score() int { // connWrapper wraps RedirectableConn. type connWrapper struct { RedirectableConn - phase int + phase connPhase + // Last redirect start time of this connection. + lastRedirect time.Time } // ScoreBasedRouter is an implementation of Router interface. @@ -93,6 +110,7 @@ type ScoreBasedRouter struct { sync.Mutex observer *BackendObserver cancelFunc context.CancelFunc + wg waitgroup.WaitGroup // A list of *backendWrapper. The backends are in descending order of scores. backends *list.List } @@ -111,7 +129,9 @@ func NewScoreBasedRouter(cfg *config.BackendNamespace, client *clientv3.Client) router.observer = observer childCtx, cancelFunc := context.WithCancel(context.Background()) router.cancelFunc = cancelFunc - go router.rebalanceLoop(childCtx) + router.wg.Run(func() { + router.rebalanceLoop(childCtx) + }) return router, err } @@ -142,7 +162,9 @@ func (router *ScoreBasedRouter) removeConn(be *list.Element, ce *list.Element) { conn := ce.Value.(*connWrapper) backend.connList.Remove(ce) delete(backend.connMap, conn.ConnectionID()) - router.adjustBackendList(be) + if !router.removeBackendIfEmpty(be) { + router.adjustBackendList(be) + } } func (router *ScoreBasedRouter) addConn(be *list.Element, conn *connWrapper) { @@ -220,55 +242,56 @@ func (router *ScoreBasedRouter) lookupBackend(addr string, forward bool) *list.E } // OnRedirectSucceed implements ConnEventReceiver.OnRedirectSucceed interface. -func (router *ScoreBasedRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) { +func (router *ScoreBasedRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) error { router.Lock() defer router.Unlock() be := router.lookupBackend(to, false) if be == nil { - logutil.BgLogger().Error("backend not found in the backend", zap.String("addr", to)) - return + return errors.WithStack(errors.Errorf("backend %s is not found in the router", to)) } toBackend := be.Value.(*backendWrapper) e, ok := toBackend.connMap[conn.ConnectionID()] if !ok { - logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", to), - zap.Uint64("conn", conn.ConnectionID())) - return + return errors.WithStack(errors.Errorf("connection %d is not found on the backend %s", conn.ConnectionID(), to)) } connWrapper := e.Value.(*connWrapper) connWrapper.phase = phaseRedirectEnd + return nil } // OnRedirectFail implements ConnEventReceiver.OnRedirectFail interface. -func (router *ScoreBasedRouter) OnRedirectFail(from, to string, conn RedirectableConn) { +func (router *ScoreBasedRouter) OnRedirectFail(from, to string, conn RedirectableConn) error { router.Lock() defer router.Unlock() be := router.lookupBackend(to, false) if be == nil { - logutil.BgLogger().Error("backend not found in the backend", zap.String("addr", to)) - return + return errors.WithStack(errors.Errorf("backend %s is not found in the router", to)) } toBackend := be.Value.(*backendWrapper) ce, ok := toBackend.connMap[conn.ConnectionID()] if !ok { - logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", to), - zap.Uint64("conn", conn.ConnectionID())) - return + return errors.WithStack(errors.Errorf("connection %d is not found on the backend %s", conn.ConnectionID(), to)) } router.removeConn(be, ce) be = router.lookupBackend(from, true) - // If the backend has already been removed, the connection is discarded from the router. + // The backend may have been removed because it's empty. Add it back. if be == nil { - return + be = router.backends.PushBack(&backendWrapper{ + status: StatusCannotConnect, + addr: from, + connList: list.New(), + connMap: make(map[uint64]*list.Element), + }) } connWrapper := ce.Value.(*connWrapper) connWrapper.phase = phaseRedirectFail router.addConn(be, connWrapper) + return nil } // OnConnClosed implements ConnEventReceiver.OnConnClosed interface. -func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) { +func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) error { router.Lock() defer router.Unlock() // Get the redirecting address in the lock, rather than letting the connection pass it in. @@ -279,18 +302,15 @@ func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) } be := router.lookupBackend(addr, true) if be == nil { - logutil.BgLogger().Error("backend not found in the router", zap.String("addr", addr)) - return + return errors.WithStack(errors.Errorf("backend %s is not found in the router", addr)) } backend := be.Value.(*backendWrapper) ce, ok := backend.connMap[conn.ConnectionID()] if !ok { - logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", addr), - zap.Uint64("conn", conn.ConnectionID())) - return + return errors.WithStack(errors.Errorf("connection %d is not found on the backend %s", conn.ConnectionID(), addr)) } router.removeConn(be, ce) - router.removeBackendIfEmpty(be) + return nil } // OnBackendChanged implements BackendEventReceiver.OnBackendChanged interface. @@ -299,8 +319,8 @@ func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]BackendStat defer router.Unlock() for addr, status := range backends { be := router.lookupBackend(addr, true) - if be == nil { - logutil.BgLogger().Info("find new backend", zap.String("url", addr), + if be == nil && status != StatusCannotConnect { + logutil.BgLogger().Info("find new backend", zap.String("addr", addr), zap.String("status", status.String())) be = router.backends.PushBack(&backendWrapper{ status: status, @@ -310,12 +330,13 @@ func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]BackendStat }) } else { backend := be.Value.(*backendWrapper) - logutil.BgLogger().Info("update backend", zap.String("url", addr), + logutil.BgLogger().Info("update backend", zap.String("addr", addr), zap.String("prev_status", backend.status.String()), zap.String("cur_status", status.String())) backend.status = status } - router.adjustBackendList(be) - router.removeBackendIfEmpty(be) + if !router.removeBackendIfEmpty(be) { + router.adjustBackendList(be) + } } } @@ -331,6 +352,7 @@ func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) { } func (router *ScoreBasedRouter) rebalance(maxNum int) { + curTime := time.Now() router.Lock() defer router.Unlock() for i := 0; i < maxNum; i++ { @@ -348,23 +370,44 @@ func (router *ScoreBasedRouter) rebalance(maxNum int) { busiestBackend := busiestEle.Value.(*backendWrapper) idlestEle := router.backends.Back() idlestBackend := idlestEle.Value.(*backendWrapper) - if float64(busiestBackend.score())/float64(idlestBackend.score()+1) <= rebalanceMaxScoreRatio { + if float64(busiestBackend.score())/float64(idlestBackend.score()+1) < rebalanceMaxScoreRatio { + break + } + var ce *list.Element + for ele := busiestBackend.connList.Front(); ele != nil; ele = ele.Next() { + conn := ele.Value.(*connWrapper) + switch conn.phase { + case phaseRedirectNotify: + // A connection cannot be redirected again when it has not finished redirecting. + continue + case phaseRedirectFail: + // If it failed recently, it will probably fail this time. + if conn.lastRedirect.Add(redirectFailMinInterval).After(curTime) { + continue + } + } + ce = ele + break + } + if ce == nil { break } - ce := busiestBackend.connList.Front() router.removeConn(busiestEle, ce) conn := ce.Value.(*connWrapper) conn.phase = phaseRedirectNotify + conn.lastRedirect = curTime router.addConn(idlestEle, conn) conn.Redirect(idlestBackend.addr) } } -func (router *ScoreBasedRouter) removeBackendIfEmpty(be *list.Element) { +func (router *ScoreBasedRouter) removeBackendIfEmpty(be *list.Element) bool { backend := be.Value.(*backendWrapper) if backend.status == StatusCannotConnect && backend.connList.Len() == 0 { router.backends.Remove(be) + return true } + return false } // Close implements Router.Close interface. @@ -379,5 +422,6 @@ func (router *ScoreBasedRouter) Close() { router.observer.Close() router.observer = nil } + router.wg.Wait() // Router only refers to RedirectableConn, it doesn't manage RedirectableConn. } diff --git a/pkg/manager/router/router_test.go b/pkg/manager/router/router_test.go new file mode 100644 index 00000000..e8a11e95 --- /dev/null +++ b/pkg/manager/router/router_test.go @@ -0,0 +1,594 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package router + +import ( + "container/list" + "context" + "math" + "math/rand" + "strconv" + "sync" + "testing" + "time" + + "github.com/pingcap/TiProxy/pkg/config" + "github.com/pingcap/TiProxy/pkg/util/waitgroup" + "github.com/stretchr/testify/require" +) + +type mockRedirectableConn struct { + sync.Mutex + t *testing.T + connID uint64 + from, to string + receiver ConnEventReceiver +} + +func (conn *mockRedirectableConn) SetEventReceiver(receiver ConnEventReceiver) { + conn.Lock() + conn.receiver = receiver + conn.Unlock() +} + +func (conn *mockRedirectableConn) Redirect(addr string) { + conn.Lock() + require.Len(conn.t, conn.to, 0) + conn.to = addr + conn.Unlock() +} + +func (conn *mockRedirectableConn) GetRedirectingAddr() string { + conn.Lock() + defer conn.Unlock() + return conn.to +} + +func (conn *mockRedirectableConn) ConnectionID() uint64 { + return conn.connID +} + +func (conn *mockRedirectableConn) getAddr() (string, string) { + conn.Lock() + defer conn.Unlock() + return conn.from, conn.to +} + +func (conn *mockRedirectableConn) redirectSucceed() { + conn.Lock() + require.Greater(conn.t, len(conn.to), 0) + conn.from = conn.to + conn.to = "" + conn.Unlock() +} + +func (conn *mockRedirectableConn) redirectFail() { + conn.Lock() + require.Greater(conn.t, len(conn.to), 0) + conn.to = "" + conn.Unlock() +} + +type routerTester struct { + t *testing.T + router *ScoreBasedRouter + connID uint64 + conns map[uint64]*mockRedirectableConn + backendID int +} + +func newRouterTester(t *testing.T) *routerTester { + router := &ScoreBasedRouter{ + backends: list.New(), + } + t.Cleanup(router.Close) + return &routerTester{ + t: t, + router: router, + conns: make(map[uint64]*mockRedirectableConn), + } +} + +func (tester *routerTester) createConn() *mockRedirectableConn { + tester.connID++ + return &mockRedirectableConn{ + t: tester.t, + connID: tester.connID, + } +} + +func (tester *routerTester) addBackends(num int) { + backends := make(map[string]BackendStatus) + for i := 0; i < num; i++ { + tester.backendID++ + addr := strconv.Itoa(tester.backendID) + backends[addr] = StatusHealthy + } + tester.router.OnBackendChanged(backends) + tester.checkBackendOrder() +} + +func (tester *routerTester) killBackends(num int) { + backends := make(map[string]BackendStatus) + indexes := rand.Perm(tester.router.backends.Len()) + for _, index := range indexes { + if len(backends) >= num { + break + } + // set the ith backend as unhealthy + backend := tester.getBackendByIndex(index) + if backend.status == StatusCannotConnect { + continue + } + backends[backend.addr] = StatusCannotConnect + } + tester.router.OnBackendChanged(backends) + tester.checkBackendOrder() +} + +func (tester *routerTester) updateBackendStatusByAddr(addr string, status BackendStatus) { + backends := map[string]BackendStatus{ + addr: status, + } + tester.router.OnBackendChanged(backends) + tester.checkBackendOrder() +} + +func (tester *routerTester) getBackendByIndex(index int) *backendWrapper { + be := tester.router.backends.Front() + for i := 0; be != nil && i < index; be, i = be.Next(), i+1 { + } + require.NotNil(tester.t, be) + return be.Value.(*backendWrapper) +} + +func (tester *routerTester) checkBackendOrder() { + score := math.MaxInt + for be := tester.router.backends.Front(); be != nil; be = be.Next() { + backend := be.Value.(*backendWrapper) + // Empty unhealthy backends should be removed. + if backend.status == StatusCannotConnect { + require.NotEqual(tester.t, 0, backend.connList.Len()) + } + curScore := backend.score() + require.GreaterOrEqual(tester.t, score, curScore) + score = curScore + } +} + +func (tester *routerTester) addConnections(num int) { + for i := 0; i < num; i++ { + conn := tester.createConn() + addr, err := tester.router.Route(conn) + require.NoError(tester.t, err) + conn.from = addr + tester.conns[conn.connID] = conn + } + tester.checkBackendOrder() +} + +func (tester *routerTester) closeConnections(num int, redirecting bool) { + conns := make(map[uint64]*mockRedirectableConn, num) + for id, conn := range tester.conns { + if redirecting { + if len(conn.GetRedirectingAddr()) == 0 { + continue + } + } else { + if len(conn.GetRedirectingAddr()) > 0 { + continue + } + } + conns[id] = conn + if len(conns) >= num { + break + } + } + for _, conn := range conns { + err := tester.router.OnConnClosed(conn.from, conn) + require.NoError(tester.t, err) + delete(tester.conns, conn.connID) + } + tester.checkBackendOrder() +} + +func (tester *routerTester) rebalance(num int) { + tester.router.rebalance(num) + tester.checkBackendOrder() +} + +func (tester *routerTester) redirectFinish(num int, succeed bool) { + i := 0 + for _, conn := range tester.conns { + if len(conn.GetRedirectingAddr()) == 0 { + continue + } + if succeed { + err := tester.router.OnRedirectSucceed(conn.from, conn.to, conn) + require.NoError(tester.t, err) + conn.redirectSucceed() + } else { + err := tester.router.OnRedirectFail(conn.from, conn.to, conn) + require.NoError(tester.t, err) + conn.redirectFail() + } + i++ + if i >= num { + break + } + } + tester.checkBackendOrder() +} + +func (tester *routerTester) checkBalanced() { + maxNum, minNum := 0, math.MaxInt + for be := tester.router.backends.Front(); be != nil; be = be.Next() { + backend := be.Value.(*backendWrapper) + // Empty unhealthy backends should be removed. + require.Equal(tester.t, StatusHealthy, backend.status) + curScore := backend.score() + if curScore > maxNum { + maxNum = curScore + } + if curScore < minNum { + minNum = curScore + } + } + ratio := float64(maxNum) / float64(minNum+1) + require.LessOrEqual(tester.t, ratio, rebalanceMaxScoreRatio) +} + +func (tester *routerTester) checkRedirectingNum(num int) { + redirectingNum := 0 + for _, conn := range tester.conns { + if len(conn.GetRedirectingAddr()) > 0 { + redirectingNum++ + } + } + require.Equal(tester.t, num, redirectingNum) +} + +func (tester *routerTester) checkBackendNum(num int) { + require.Equal(tester.t, num, tester.router.backends.Len()) +} + +func (tester *routerTester) clear() { + tester.conns = make(map[uint64]*mockRedirectableConn) + tester.router.backends = list.New() +} + +// Test that the backends are always ordered by scores. +func TestBackendScore(t *testing.T) { + tester := newRouterTester(t) + tester.addBackends(3) + tester.killBackends(2) + tester.addConnections(100) + // 90 not redirecting + tester.closeConnections(10, false) + // make sure rebalance will work + tester.addBackends(3) + // 40 not redirecting, 50 redirecting + tester.rebalance(50) + tester.checkRedirectingNum(50) + // 40 not redirecting, 40 redirecting + tester.closeConnections(10, true) + tester.checkRedirectingNum(40) + // 50 not redirecting, 30 redirecting + tester.redirectFinish(10, true) + tester.checkRedirectingNum(30) + // 60 not redirecting, 20 redirecting + tester.redirectFinish(10, false) + tester.checkRedirectingNum(20) + // 50 not redirecting, 20 redirecting + tester.closeConnections(10, false) + tester.checkRedirectingNum(20) +} + +// Test that the connections are always balanced after rebalance and routing. +func TestConnBalanced(t *testing.T) { + tester := newRouterTester(t) + tester.addBackends(3) + + // balanced after routing + tester.addConnections(100) + tester.checkBalanced() + + // balanced after scale in + tester.killBackends(1) + tester.rebalance(100) + tester.redirectFinish(100, true) + tester.checkBalanced() + + // balanced after scale out + tester.addBackends(1) + tester.rebalance(100) + tester.redirectFinish(100, true) + tester.checkBalanced() + + // balanced after closing connections + tester.closeConnections(10, false) + tester.rebalance(100) + tester.checkBalanced() +} + +// Test that routing fails when there's no healthy backends. +func TestNoBackends(t *testing.T) { + tester := newRouterTester(t) + conn := tester.createConn() + _, err := tester.router.Route(conn) + require.ErrorIs(t, err, ErrNoInstanceToSelect) + tester.addBackends(1) + tester.addConnections(10) + tester.killBackends(1) + _, err = tester.router.Route(conn) + require.ErrorIs(t, err, ErrNoInstanceToSelect) +} + +// Test that the backends are balanced during rolling restart. +func TestRollingRestart(t *testing.T) { + tester := newRouterTester(t) + backendNum := 3 + tester.addBackends(backendNum) + tester.addConnections(100) + tester.checkBalanced() + + backendAddrs := make([]string, 0, backendNum) + for i := 0; i < backendNum; i++ { + backendAddrs = append(backendAddrs, tester.getBackendByIndex(i).addr) + } + + for i := 0; i < backendNum+1; i++ { + if i > 0 { + tester.updateBackendStatusByAddr(backendAddrs[i-1], StatusHealthy) + tester.rebalance(100) + tester.redirectFinish(100, true) + tester.checkBalanced() + } + if i < backendNum { + tester.updateBackendStatusByAddr(backendAddrs[i], StatusCannotConnect) + tester.rebalance(100) + tester.redirectFinish(100, true) + tester.checkBalanced() + } + } +} + +// Test the corner cases of rebalance. +func TestRebalanceCornerCase(t *testing.T) { + tester := newRouterTester(t) + tests := []func(){ + func() { + // Balancer won't work when there's no backend. + tester.rebalance(10) + tester.checkRedirectingNum(0) + }, + func() { + // Balancer won't work when there's only one backend. + tester.addBackends(1) + tester.addConnections(10) + tester.rebalance(10) + tester.checkRedirectingNum(0) + }, + func() { + // Router should have already balanced it. + tester.addBackends(1) + tester.addConnections(10) + tester.addBackends(1) + tester.addConnections(10) + tester.rebalance(10) + tester.checkRedirectingNum(0) + }, + func() { + // Balancer won't work when all the backends are unhealthy. + tester.addBackends(2) + tester.addConnections(20) + tester.killBackends(2) + tester.rebalance(10) + tester.checkRedirectingNum(0) + }, + func() { + // The parameter limits the redirecting num. + tester.addBackends(2) + tester.addConnections(50) + tester.killBackends(1) + tester.rebalance(5) + tester.checkRedirectingNum(5) + }, + func() { + // All the connections are redirected to the new healthy one and the unhealthy backends are removed. + tester.addBackends(1) + tester.addConnections(10) + tester.killBackends(1) + tester.addBackends(1) + tester.rebalance(10) + tester.checkRedirectingNum(10) + tester.checkBackendNum(1) + backend := tester.getBackendByIndex(0) + require.Len(t, backend.connMap, 10) + }, + func() { + // Connections won't be redirected again before redirection finishes. + tester.addBackends(1) + tester.addConnections(10) + tester.killBackends(1) + tester.addBackends(1) + tester.rebalance(10) + tester.checkRedirectingNum(10) + backend := tester.getBackendByIndex(0) + tester.killBackends(1) + tester.addBackends(1) + tester.rebalance(10) + tester.checkRedirectingNum(10) + require.Len(t, backend.connMap, 10) + }, + func() { + // After redirection fails, the connections are moved back to the unhealthy backends. + tester.addBackends(1) + tester.addConnections(10) + tester.killBackends(1) + tester.addBackends(1) + tester.rebalance(10) + tester.checkBackendNum(1) + tester.redirectFinish(10, false) + tester.checkBackendNum(2) + }, + func() { + // It won't rebalance when there's no connection. + tester.addBackends(1) + tester.addConnections(10) + tester.closeConnections(10, false) + tester.rebalance(10) + tester.checkRedirectingNum(0) + }, + func() { + // It won't rebalance when there's only 1 connection. + tester.addBackends(1) + tester.addConnections(1) + tester.addBackends(1) + tester.rebalance(1) + tester.checkRedirectingNum(0) + }, + func() { + // It won't rebalance when only 2 connections are on 3 backends. + tester.addBackends(2) + tester.addConnections(2) + tester.addBackends(1) + tester.rebalance(1) + tester.checkRedirectingNum(0) + }, + func() { + // Connections will be redirected again immediately after failure. + tester.addBackends(1) + tester.addConnections(10) + tester.killBackends(1) + tester.addBackends(1) + tester.rebalance(10) + tester.redirectFinish(10, false) + tester.killBackends(1) + tester.addBackends(1) + tester.rebalance(10) + tester.checkRedirectingNum(0) + }, + } + + for _, test := range tests { + test() + tester.clear() + } +} + +// Test all kinds of events occur concurrently. +func TestConcurrency(t *testing.T) { + cfg := &config.BackendNamespace{} + // Router.observer doesn't work because the etcd is always empty. + // We create other goroutines to change backends easily. + etcd := createEtcdServer(t, "127.0.0.1:0") + client := createEtcdClient(t, etcd) + router, err := NewScoreBasedRouter(cfg, client) + require.NoError(t, err) + + var wg waitgroup.WaitGroup + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + // Create 3 backends. + backends := map[string]BackendStatus{ + "0": StatusHealthy, + "1": StatusHealthy, + "2": StatusHealthy, + } + router.OnBackendChanged(backends) + for addr, status := range backends { + func(addr string, status BackendStatus) { + wg.Run(func() { + for { + waitTime := rand.Intn(50) + 30 + select { + case <-time.After(time.Duration(waitTime) * time.Millisecond): + case <-ctx.Done(): + return + } + if status == StatusHealthy { + status = StatusCannotConnect + } else { + status = StatusHealthy + } + router.OnBackendChanged(map[string]BackendStatus{ + addr: status, + }) + } + }) + }(addr, status) + } + + // Create 20 connections. + for i := 0; i < 20; i++ { + func(connID uint64) { + wg.Run(func() { + var conn *mockRedirectableConn + for { + waitTime := rand.Intn(20) + 10 + select { + case <-time.After(time.Duration(waitTime) * time.Millisecond): + case <-ctx.Done(): + return + } + + if conn == nil { + // not connected, connect + conn = &mockRedirectableConn{ + t: t, + connID: connID, + } + addr, err := router.Route(conn) + if err != nil { + require.ErrorIs(t, err, ErrNoInstanceToSelect) + conn = nil + continue + } + conn.from = addr + } else if len(conn.GetRedirectingAddr()) > 0 { + // redirecting, 70% success, 20% fail, 10% close + i := rand.Intn(10) + from, to := conn.getAddr() + var err error + if i < 1 { + err = router.OnConnClosed(from, conn) + conn = nil + } else if i < 3 { + conn.redirectFail() + err = router.OnRedirectFail(from, to, conn) + } else { + conn.redirectSucceed() + err = router.OnRedirectSucceed(from, to, conn) + } + require.NoError(t, err) + } else { + // not redirecting, 20% close + i := rand.Intn(10) + if i < 2 { + // The balancer may happen to redirect it concurrently - that's exactly what may happen. + from, _ := conn.getAddr() + err := router.OnConnClosed(from, conn) + require.NoError(t, err) + conn = nil + } + } + } + }) + }(uint64(i)) + } + wg.Wait() + cancel() + router.Close() +} diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index 158da8f9..e3db96fe 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -315,13 +315,14 @@ func (mgr *BackendConnManager) notifyRedirectResult(ctx context.Context, rs *red return } if rs.err != nil { - eventReceiver.OnRedirectFail(rs.from, rs.to, mgr) + err := eventReceiver.OnRedirectFail(rs.from, rs.to, mgr) logutil.Logger(ctx).Warn("redirect connection failed", zap.String("from", rs.from), - zap.String("to", rs.to), zap.Uint64("conn", mgr.connectionID), zap.Error(rs.err)) + zap.String("to", rs.to), zap.Uint64("conn", mgr.connectionID), + zap.NamedError("redirect_err", rs.err), zap.NamedError("notify_err", err)) } else { - eventReceiver.OnRedirectSucceed(rs.from, rs.to, mgr) + err := eventReceiver.OnRedirectSucceed(rs.from, rs.to, mgr) logutil.Logger(ctx).Info("redirect connection succeeds", zap.String("from", rs.from), - zap.String("to", rs.to), zap.Uint64("conn", mgr.connectionID)) + zap.String("to", rs.to), zap.Uint64("conn", mgr.connectionID), zap.NamedError("notify_err", err)) } } @@ -351,7 +352,10 @@ func (mgr *BackendConnManager) Close() error { } // Just notify it with the current address. if len(addr) > 0 { - eventReceiver.OnConnClosed(addr, mgr) + if err := eventReceiver.OnConnClosed(addr, mgr); err != nil { + logutil.BgLogger().Error("close connection error", zap.String("addr", addr), + zap.Uint64("conn", mgr.connectionID), zap.NamedError("notify_err", err)) + } } } return err diff --git a/pkg/proxy/backend/backend_conn_mgr_test.go b/pkg/proxy/backend/backend_conn_mgr_test.go index 9c9e9811..c51426ee 100644 --- a/pkg/proxy/backend/backend_conn_mgr_test.go +++ b/pkg/proxy/backend/backend_conn_mgr_test.go @@ -47,27 +47,30 @@ func newMockEventReceiver() *mockEventReceiver { } } -func (mer *mockEventReceiver) OnRedirectSucceed(from, to string, conn router.RedirectableConn) { +func (mer *mockEventReceiver) OnRedirectSucceed(from, to string, conn router.RedirectableConn) error { mer.eventCh <- event{ from: from, to: to, eventName: eventSucceed, } + return nil } -func (mer *mockEventReceiver) OnRedirectFail(from, to string, conn router.RedirectableConn) { +func (mer *mockEventReceiver) OnRedirectFail(from, to string, conn router.RedirectableConn) error { mer.eventCh <- event{ from: from, to: to, eventName: eventFail, } + return nil } -func (mer *mockEventReceiver) OnConnClosed(from string, conn router.RedirectableConn) { +func (mer *mockEventReceiver) OnConnClosed(from string, conn router.RedirectableConn) error { mer.eventCh <- event{ from: from, eventName: eventClose, } + return nil } func (mer *mockEventReceiver) checkEvent(t *testing.T, eventName int) { @@ -502,7 +505,8 @@ func TestCloseWhileRedirect(t *testing.T) { // Send an event to make Close() block at notifying. addr := ts.tc.backendListener.Addr().String() eventReceiver := ts.mp.getEventReceiver().(*mockEventReceiver) - eventReceiver.OnRedirectSucceed(addr, addr, ts.mp) + err := eventReceiver.OnRedirectSucceed(addr, addr, ts.mp) + require.NoError(t, err) var wg waitgroup.WaitGroup wg.Run(func() { _ = ts.mp.Close()