From 26c66bcb543874aa7de9ee79cc41aea2aa999508 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 27 Sep 2024 16:12:29 +0800 Subject: [PATCH] Make /api/debug/health return error when no backends are available (#692) --- pkg/balance/router/router.go | 1 + pkg/balance/router/router_score.go | 16 ++++++ pkg/balance/router/router_score_test.go | 3 + pkg/balance/router/router_static.go | 4 ++ pkg/manager/namespace/manager.go | 50 ++++++++++++---- pkg/manager/namespace/manager_test.go | 32 +++++++++++ pkg/proxy/backend/handshake_handler.go | 4 +- pkg/server/api/debug.go | 2 +- pkg/server/api/debug_test.go | 6 ++ pkg/server/api/mock_test.go | 76 +++++++++++++++++++++++++ pkg/server/api/namespace_test.go | 5 +- pkg/server/api/server.go | 2 +- pkg/server/api/server_test.go | 4 +- pkg/server/server.go | 2 +- 14 files changed, 186 insertions(+), 21 deletions(-) create mode 100644 pkg/manager/namespace/manager_test.go create mode 100644 pkg/server/api/mock_test.go diff --git a/pkg/balance/router/router.go b/pkg/balance/router/router.go index de1a3b0f..8a24df88 100644 --- a/pkg/balance/router/router.go +++ b/pkg/balance/router/router.go @@ -29,6 +29,7 @@ type Router interface { ConnEventReceiver GetBackendSelector() BackendSelector + HealthyBackendCount() int RefreshBackend() RedirectConnections() error ConnCount() int diff --git a/pkg/balance/router/router_score.go b/pkg/balance/router/router_score.go index 6b6911c9..f4e26e27 100644 --- a/pkg/balance/router/router_score.go +++ b/pkg/balance/router/router_score.go @@ -75,6 +75,22 @@ func (router *ScoreBasedRouter) GetBackendSelector() BackendSelector { } } +func (router *ScoreBasedRouter) HealthyBackendCount() int { + router.Lock() + defer router.Unlock() + if router.observeError != nil { + return 0 + } + + count := 0 + for _, backend := range router.backends { + if backend.Healthy() { + count++ + } + } + return count +} + func (router *ScoreBasedRouter) getConnWrapper(conn RedirectableConn) *glist.Element[*connWrapper] { return conn.Value(_routerKey).(*glist.Element[*connWrapper]) } diff --git a/pkg/balance/router/router_score_test.go b/pkg/balance/router/router_score_test.go index df3e24fd..b7058803 100644 --- a/pkg/balance/router/router_score_test.go +++ b/pkg/balance/router/router_score_test.go @@ -322,11 +322,14 @@ func TestNoBackends(t *testing.T) { conn := tester.createConn() backend := tester.simpleRoute(conn) require.True(t, backend == nil || reflect.ValueOf(backend).IsNil()) + require.Equal(t, 0, tester.router.HealthyBackendCount()) tester.addBackends(1) + require.Equal(t, 1, tester.router.HealthyBackendCount()) tester.addConnections(10) tester.killBackends(1) backend = tester.simpleRoute(conn) require.True(t, backend == nil || reflect.ValueOf(backend).IsNil()) + require.Equal(t, 0, tester.router.HealthyBackendCount()) } // Test that the backends returned by the BackendSelector are complete and different. diff --git a/pkg/balance/router/router_static.go b/pkg/balance/router/router_static.go index d9fe4772..c76d11dd 100644 --- a/pkg/balance/router/router_static.go +++ b/pkg/balance/router/router_static.go @@ -45,6 +45,10 @@ func (r *StaticRouter) GetBackendSelector() BackendSelector { } } +func (r *StaticRouter) HealthyBackendCount() int { + return len(r.backends) +} + func (r *StaticRouter) RefreshBackend() {} func (r *StaticRouter) RedirectConnections() error { diff --git a/pkg/manager/namespace/manager.go b/pkg/manager/namespace/manager.go index b033d2c9..3057aaa3 100644 --- a/pkg/manager/namespace/manager.go +++ b/pkg/manager/namespace/manager.go @@ -22,7 +22,19 @@ import ( "go.uber.org/zap" ) -type NamespaceManager struct { +type NamespaceManager interface { + Init(logger *zap.Logger, nscs []*config.Namespace, tpFetcher observer.TopologyFetcher, + promFetcher metricsreader.PromInfoFetcher, httpCli *http.Client, cfgMgr *mconfig.ConfigManager, + metricsReader metricsreader.MetricsReader) error + CommitNamespaces(nss []*config.Namespace, nssDelete []bool) error + GetNamespace(nm string) (*Namespace, bool) + GetNamespaceByUser(user string) (*Namespace, bool) + RedirectConnections() []error + Ready() bool + Close() error +} + +type namespaceManager struct { sync.RWMutex nsm map[string]*Namespace tpFetcher observer.TopologyFetcher @@ -33,17 +45,17 @@ type NamespaceManager struct { cfgMgr *mconfig.ConfigManager } -func NewNamespaceManager() *NamespaceManager { - return &NamespaceManager{} +func NewNamespaceManager() *namespaceManager { + return &namespaceManager{} } -func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace, error) { +func (mgr *namespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace, error) { logger := mgr.logger.With(zap.String("namespace", cfg.Namespace)) // init BackendFetcher var fetcher observer.BackendFetcher healthCheckCfg := config.NewDefaultHealthCheckConfig() - if !reflect.ValueOf(mgr.tpFetcher).IsNil() { + if mgr.tpFetcher != nil && !reflect.ValueOf(mgr.tpFetcher).IsNil() { fetcher = observer.NewPDFetcher(mgr.tpFetcher, logger.Named("be_fetcher"), healthCheckCfg) } else { fetcher = observer.NewStaticFetcher(cfg.Backend.Instances) @@ -65,7 +77,7 @@ func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace, }, nil } -func (mgr *NamespaceManager) CommitNamespaces(nss []*config.Namespace, nss_delete []bool) error { +func (mgr *namespaceManager) CommitNamespaces(nss []*config.Namespace, nssDelete []bool) error { nsm := make(map[string]*Namespace) mgr.RLock() for k, v := range mgr.nsm { @@ -74,7 +86,7 @@ func (mgr *NamespaceManager) CommitNamespaces(nss []*config.Namespace, nss_delet mgr.RUnlock() for i, nsc := range nss { - if nss_delete != nil && nss_delete[i] { + if nssDelete != nil && nssDelete[i] { delete(nsm, nsc.Namespace) continue } @@ -92,7 +104,7 @@ func (mgr *NamespaceManager) CommitNamespaces(nss []*config.Namespace, nss_delet return nil } -func (mgr *NamespaceManager) Init(logger *zap.Logger, nscs []*config.Namespace, tpFetcher observer.TopologyFetcher, +func (mgr *namespaceManager) Init(logger *zap.Logger, nscs []*config.Namespace, tpFetcher observer.TopologyFetcher, promFetcher metricsreader.PromInfoFetcher, httpCli *http.Client, cfgMgr *mconfig.ConfigManager, metricsReader metricsreader.MetricsReader) error { mgr.Lock() @@ -106,7 +118,7 @@ func (mgr *NamespaceManager) Init(logger *zap.Logger, nscs []*config.Namespace, return mgr.CommitNamespaces(nscs, nil) } -func (mgr *NamespaceManager) GetNamespace(nm string) (*Namespace, bool) { +func (mgr *namespaceManager) GetNamespace(nm string) (*Namespace, bool) { mgr.RLock() defer mgr.RUnlock() @@ -114,7 +126,7 @@ func (mgr *NamespaceManager) GetNamespace(nm string) (*Namespace, bool) { return ns, ok } -func (mgr *NamespaceManager) GetNamespaceByUser(user string) (*Namespace, bool) { +func (mgr *namespaceManager) GetNamespaceByUser(user string) (*Namespace, bool) { mgr.RLock() defer mgr.RUnlock() @@ -126,7 +138,7 @@ func (mgr *NamespaceManager) GetNamespaceByUser(user string) (*Namespace, bool) return nil, false } -func (mgr *NamespaceManager) RedirectConnections() []error { +func (mgr *namespaceManager) RedirectConnections() []error { mgr.RLock() defer mgr.RUnlock() @@ -140,7 +152,21 @@ func (mgr *NamespaceManager) RedirectConnections() []error { return errs } -func (mgr *NamespaceManager) Close() error { +func (mgr *namespaceManager) Ready() bool { + mgr.RLock() + defer mgr.RUnlock() + if len(mgr.nsm) == 0 { + return false + } + for _, ns := range mgr.nsm { + if ns.GetRouter().HealthyBackendCount() <= 0 { + return false + } + } + return true +} + +func (mgr *namespaceManager) Close() error { mgr.RLock() for _, ns := range mgr.nsm { ns.Close() diff --git a/pkg/manager/namespace/manager_test.go b/pkg/manager/namespace/manager_test.go new file mode 100644 index 00000000..fb260997 --- /dev/null +++ b/pkg/manager/namespace/manager_test.go @@ -0,0 +1,32 @@ +// Copyright 2024 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package namespace + +import ( + "testing" + + "github.com/pingcap/tiproxy/pkg/balance/router" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestReady(t *testing.T) { + nsMgr := NewNamespaceManager() + require.NoError(t, nsMgr.Init(zap.NewNop(), nil, nil, nil, nil, nil, nil)) + require.False(t, nsMgr.Ready()) + + rt := router.NewStaticRouter([]string{}) + nsMgr.nsm = map[string]*Namespace{ + "test": { + router: rt, + }, + } + require.False(t, nsMgr.Ready()) + + rt = router.NewStaticRouter([]string{"127.0.0.1:4000"}) + ns, ok := nsMgr.GetNamespace("test") + require.True(t, ok) + ns.router = rt + require.True(t, nsMgr.Ready()) +} diff --git a/pkg/proxy/backend/handshake_handler.go b/pkg/proxy/backend/handshake_handler.go index 1fec497d..f7ff7f97 100644 --- a/pkg/proxy/backend/handshake_handler.go +++ b/pkg/proxy/backend/handshake_handler.go @@ -52,10 +52,10 @@ type HandshakeHandler interface { } type DefaultHandshakeHandler struct { - nsManager *namespace.NamespaceManager + nsManager namespace.NamespaceManager } -func NewDefaultHandshakeHandler(nsManager *namespace.NamespaceManager) *DefaultHandshakeHandler { +func NewDefaultHandshakeHandler(nsManager namespace.NamespaceManager) *DefaultHandshakeHandler { return &DefaultHandshakeHandler{ nsManager: nsManager, } diff --git a/pkg/server/api/debug.go b/pkg/server/api/debug.go index 42eb0a2d..27cc4fbd 100644 --- a/pkg/server/api/debug.go +++ b/pkg/server/api/debug.go @@ -13,7 +13,7 @@ import ( func (h *Server) DebugHealth(c *gin.Context) { status := http.StatusOK - if h.isClosing.Load() { + if h.isClosing.Load() || !h.mgr.NsMgr.Ready() { status = http.StatusBadGateway } c.JSON(status, config.HealthInfo{ diff --git a/pkg/server/api/debug_test.go b/pkg/server/api/debug_test.go index eb6c0949..006739d9 100644 --- a/pkg/server/api/debug_test.go +++ b/pkg/server/api/debug_test.go @@ -24,6 +24,12 @@ func TestDebug(t *testing.T) { require.Equal(t, http.StatusOK, r.StatusCode) }) + server.mgr.NsMgr.(*mockNamespaceManager).success.Store(false) + doHTTP(t, http.MethodGet, "/api/debug/health", httpOpts{}, func(t *testing.T, r *http.Response) { + require.Equal(t, http.StatusBadGateway, r.StatusCode) + }) + + server.mgr.NsMgr.(*mockNamespaceManager).success.Store(true) doHTTP(t, http.MethodGet, "/api/debug/health", httpOpts{}, func(t *testing.T, r *http.Response) { require.Equal(t, http.StatusOK, r.StatusCode) }) diff --git a/pkg/server/api/mock_test.go b/pkg/server/api/mock_test.go new file mode 100644 index 00000000..758a285a --- /dev/null +++ b/pkg/server/api/mock_test.go @@ -0,0 +1,76 @@ +// Copyright 2024 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "context" + "errors" + "sync/atomic" + + "github.com/pingcap/tiproxy/lib/config" + "github.com/pingcap/tiproxy/pkg/balance/metricsreader" + "github.com/pingcap/tiproxy/pkg/balance/observer" + mconfig "github.com/pingcap/tiproxy/pkg/manager/config" + "github.com/pingcap/tiproxy/pkg/manager/namespace" + "github.com/pingcap/tiproxy/pkg/util/http" + "go.uber.org/zap" +) + +var _ namespace.NamespaceManager = (*mockNamespaceManager)(nil) + +type mockNamespaceManager struct { + success atomic.Bool +} + +func newMockNamespaceManager() *mockNamespaceManager { + mgr := &mockNamespaceManager{} + mgr.success.Store(true) + return mgr +} + +func (m *mockNamespaceManager) Init(_ *zap.Logger, _ []*config.Namespace, _ observer.TopologyFetcher, + _ metricsreader.PromInfoFetcher, _ *http.Client, _ *mconfig.ConfigManager, _ metricsreader.MetricsReader) error { + return nil +} + +func (m *mockNamespaceManager) GetNamespace(_ string) (*namespace.Namespace, bool) { + return nil, false +} + +func (m *mockNamespaceManager) GetNamespaceByUser(_ string) (*namespace.Namespace, bool) { + return nil, false +} + +func (m *mockNamespaceManager) SetNamespace(_ context.Context, _ string, _ *config.Namespace) error { + if m.success.Load() { + return nil + } + return errors.New("mock error") +} + +func (m *mockNamespaceManager) GetConfigChecksum() string { + return "" +} + +func (m *mockNamespaceManager) Ready() bool { + return m.success.Load() +} + +func (m *mockNamespaceManager) RedirectConnections() []error { + if m.success.Load() { + return nil + } + return []error{errors.New("mock error")} +} + +func (m *mockNamespaceManager) Close() error { + return nil +} + +func (m *mockNamespaceManager) CommitNamespaces(_ []*config.Namespace, _ []bool) error { + if m.success.Load() { + return nil + } + return errors.New("mock error") +} diff --git a/pkg/server/api/namespace_test.go b/pkg/server/api/namespace_test.go index 2461a153..b81f3650 100644 --- a/pkg/server/api/namespace_test.go +++ b/pkg/server/api/namespace_test.go @@ -13,7 +13,7 @@ import ( ) func TestNamespace(t *testing.T) { - _, doHTTP := createServer(t) + srv, doHTTP := createServer(t) // test list doHTTP(t, http.MethodGet, "/api/admin/namespace", httpOpts{}, func(t *testing.T, r *http.Response) { @@ -51,7 +51,8 @@ func TestNamespace(t *testing.T) { doHTTP(t, http.MethodPost, "/api/admin/namespace/commit?namespace=xx", httpOpts{}, func(t *testing.T, r *http.Response) { require.Equal(t, http.StatusInternalServerError, r.StatusCode) }) + srv.mgr.NsMgr.(*mockNamespaceManager).success.Store(true) doHTTP(t, http.MethodPost, "/api/admin/namespace/commit", httpOpts{}, func(t *testing.T, r *http.Response) { - require.Equal(t, http.StatusInternalServerError, r.StatusCode) + require.Equal(t, http.StatusOK, r.StatusCode) }) } diff --git a/pkg/server/api/server.go b/pkg/server/api/server.go index 5aeeb73f..0e13909f 100644 --- a/pkg/server/api/server.go +++ b/pkg/server/api/server.go @@ -45,7 +45,7 @@ type HTTPHandler interface { type Managers struct { CfgMgr *mgrcfg.ConfigManager - NsMgr *mgrns.NamespaceManager + NsMgr mgrns.NamespaceManager CertMgr *mgrcrt.CertManager BackendReader BackendReader ReplayJobMgr mgrrp.JobManager diff --git a/pkg/server/api/server_test.go b/pkg/server/api/server_test.go index 4b7044de..6a1cfead 100644 --- a/pkg/server/api/server_test.go +++ b/pkg/server/api/server_test.go @@ -14,7 +14,6 @@ import ( "github.com/pingcap/tiproxy/lib/util/logger" mgrcrt "github.com/pingcap/tiproxy/pkg/manager/cert" mgrcfg "github.com/pingcap/tiproxy/pkg/manager/config" - mgrns "github.com/pingcap/tiproxy/pkg/manager/namespace" "github.com/stretchr/testify/require" "go.uber.org/atomic" "google.golang.org/grpc" @@ -33,11 +32,12 @@ func createServer(t *testing.T) (*Server, func(t *testing.T, method string, path require.NoError(t, cfgmgr.Init(context.Background(), lg, "", "")) crtmgr := mgrcrt.NewCertManager() require.NoError(t, crtmgr.Init(cfgmgr.GetConfig(), lg, cfgmgr.WatchConfig())) + nsMgr := newMockNamespaceManager() srv, err := NewServer(config.API{ Addr: "0.0.0.0:0", }, lg, Managers{ CfgMgr: cfgmgr, - NsMgr: mgrns.NewNamespaceManager(), + NsMgr: nsMgr, CertMgr: crtmgr, BackendReader: &mockBackendReader{}, ReplayJobMgr: &mockReplayJobManager{}, diff --git a/pkg/server/server.go b/pkg/server/server.go index e94d1db8..b700e532 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -37,7 +37,7 @@ type Server struct { wg waitgroup.WaitGroup // managers configManager *mgrcfg.ConfigManager - namespaceManager *mgrns.NamespaceManager + namespaceManager mgrns.NamespaceManager metricsManager *metrics.MetricsManager loggerManager *logger.LoggerManager certManager *cert.CertManager