Skip to content

Commit

Permalink
Make /api/debug/health return error when no backends are available (#692
Browse files Browse the repository at this point in the history
)
  • Loading branch information
djshow832 authored Sep 27, 2024
1 parent 4243fb5 commit 26c66bc
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/balance/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Router interface {
ConnEventReceiver

GetBackendSelector() BackendSelector
HealthyBackendCount() int
RefreshBackend()
RedirectConnections() error
ConnCount() int
Expand Down
16 changes: 16 additions & 0 deletions pkg/balance/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ func (router *ScoreBasedRouter) GetBackendSelector() BackendSelector {
}
}

func (router *ScoreBasedRouter) HealthyBackendCount() int {
router.Lock()
defer router.Unlock()
if router.observeError != nil {
return 0
}

count := 0
for _, backend := range router.backends {
if backend.Healthy() {
count++
}
}
return count
}

func (router *ScoreBasedRouter) getConnWrapper(conn RedirectableConn) *glist.Element[*connWrapper] {
return conn.Value(_routerKey).(*glist.Element[*connWrapper])
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/balance/router/router_score_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,14 @@ func TestNoBackends(t *testing.T) {
conn := tester.createConn()
backend := tester.simpleRoute(conn)
require.True(t, backend == nil || reflect.ValueOf(backend).IsNil())
require.Equal(t, 0, tester.router.HealthyBackendCount())
tester.addBackends(1)
require.Equal(t, 1, tester.router.HealthyBackendCount())
tester.addConnections(10)
tester.killBackends(1)
backend = tester.simpleRoute(conn)
require.True(t, backend == nil || reflect.ValueOf(backend).IsNil())
require.Equal(t, 0, tester.router.HealthyBackendCount())
}

// Test that the backends returned by the BackendSelector are complete and different.
Expand Down
4 changes: 4 additions & 0 deletions pkg/balance/router/router_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (r *StaticRouter) GetBackendSelector() BackendSelector {
}
}

func (r *StaticRouter) HealthyBackendCount() int {
return len(r.backends)
}

func (r *StaticRouter) RefreshBackend() {}

func (r *StaticRouter) RedirectConnections() error {
Expand Down
50 changes: 38 additions & 12 deletions pkg/manager/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,19 @@ import (
"go.uber.org/zap"
)

type NamespaceManager struct {
type NamespaceManager interface {
Init(logger *zap.Logger, nscs []*config.Namespace, tpFetcher observer.TopologyFetcher,
promFetcher metricsreader.PromInfoFetcher, httpCli *http.Client, cfgMgr *mconfig.ConfigManager,
metricsReader metricsreader.MetricsReader) error
CommitNamespaces(nss []*config.Namespace, nssDelete []bool) error
GetNamespace(nm string) (*Namespace, bool)
GetNamespaceByUser(user string) (*Namespace, bool)
RedirectConnections() []error
Ready() bool
Close() error
}

type namespaceManager struct {
sync.RWMutex
nsm map[string]*Namespace
tpFetcher observer.TopologyFetcher
Expand All @@ -33,17 +45,17 @@ type NamespaceManager struct {
cfgMgr *mconfig.ConfigManager
}

func NewNamespaceManager() *NamespaceManager {
return &NamespaceManager{}
func NewNamespaceManager() *namespaceManager {
return &namespaceManager{}
}

func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace, error) {
func (mgr *namespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace, error) {
logger := mgr.logger.With(zap.String("namespace", cfg.Namespace))

// init BackendFetcher
var fetcher observer.BackendFetcher
healthCheckCfg := config.NewDefaultHealthCheckConfig()
if !reflect.ValueOf(mgr.tpFetcher).IsNil() {
if mgr.tpFetcher != nil && !reflect.ValueOf(mgr.tpFetcher).IsNil() {
fetcher = observer.NewPDFetcher(mgr.tpFetcher, logger.Named("be_fetcher"), healthCheckCfg)
} else {
fetcher = observer.NewStaticFetcher(cfg.Backend.Instances)
Expand All @@ -65,7 +77,7 @@ func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace,
}, nil
}

func (mgr *NamespaceManager) CommitNamespaces(nss []*config.Namespace, nss_delete []bool) error {
func (mgr *namespaceManager) CommitNamespaces(nss []*config.Namespace, nssDelete []bool) error {
nsm := make(map[string]*Namespace)
mgr.RLock()
for k, v := range mgr.nsm {
Expand All @@ -74,7 +86,7 @@ func (mgr *NamespaceManager) CommitNamespaces(nss []*config.Namespace, nss_delet
mgr.RUnlock()

for i, nsc := range nss {
if nss_delete != nil && nss_delete[i] {
if nssDelete != nil && nssDelete[i] {
delete(nsm, nsc.Namespace)
continue
}
Expand All @@ -92,7 +104,7 @@ func (mgr *NamespaceManager) CommitNamespaces(nss []*config.Namespace, nss_delet
return nil
}

func (mgr *NamespaceManager) Init(logger *zap.Logger, nscs []*config.Namespace, tpFetcher observer.TopologyFetcher,
func (mgr *namespaceManager) Init(logger *zap.Logger, nscs []*config.Namespace, tpFetcher observer.TopologyFetcher,
promFetcher metricsreader.PromInfoFetcher, httpCli *http.Client, cfgMgr *mconfig.ConfigManager,
metricsReader metricsreader.MetricsReader) error {
mgr.Lock()
Expand All @@ -106,15 +118,15 @@ func (mgr *NamespaceManager) Init(logger *zap.Logger, nscs []*config.Namespace,
return mgr.CommitNamespaces(nscs, nil)
}

func (mgr *NamespaceManager) GetNamespace(nm string) (*Namespace, bool) {
func (mgr *namespaceManager) GetNamespace(nm string) (*Namespace, bool) {
mgr.RLock()
defer mgr.RUnlock()

ns, ok := mgr.nsm[nm]
return ns, ok
}

func (mgr *NamespaceManager) GetNamespaceByUser(user string) (*Namespace, bool) {
func (mgr *namespaceManager) GetNamespaceByUser(user string) (*Namespace, bool) {
mgr.RLock()
defer mgr.RUnlock()

Expand All @@ -126,7 +138,7 @@ func (mgr *NamespaceManager) GetNamespaceByUser(user string) (*Namespace, bool)
return nil, false
}

func (mgr *NamespaceManager) RedirectConnections() []error {
func (mgr *namespaceManager) RedirectConnections() []error {
mgr.RLock()
defer mgr.RUnlock()

Expand All @@ -140,7 +152,21 @@ func (mgr *NamespaceManager) RedirectConnections() []error {
return errs
}

func (mgr *NamespaceManager) Close() error {
func (mgr *namespaceManager) Ready() bool {
mgr.RLock()
defer mgr.RUnlock()
if len(mgr.nsm) == 0 {
return false
}
for _, ns := range mgr.nsm {
if ns.GetRouter().HealthyBackendCount() <= 0 {
return false
}
}
return true
}

func (mgr *namespaceManager) Close() error {
mgr.RLock()
for _, ns := range mgr.nsm {
ns.Close()
Expand Down
32 changes: 32 additions & 0 deletions pkg/manager/namespace/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package namespace

import (
"testing"

"github.com/pingcap/tiproxy/pkg/balance/router"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestReady(t *testing.T) {
nsMgr := NewNamespaceManager()
require.NoError(t, nsMgr.Init(zap.NewNop(), nil, nil, nil, nil, nil, nil))
require.False(t, nsMgr.Ready())

rt := router.NewStaticRouter([]string{})
nsMgr.nsm = map[string]*Namespace{
"test": {
router: rt,
},
}
require.False(t, nsMgr.Ready())

rt = router.NewStaticRouter([]string{"127.0.0.1:4000"})
ns, ok := nsMgr.GetNamespace("test")
require.True(t, ok)
ns.router = rt
require.True(t, nsMgr.Ready())
}
4 changes: 2 additions & 2 deletions pkg/proxy/backend/handshake_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ type HandshakeHandler interface {
}

type DefaultHandshakeHandler struct {
nsManager *namespace.NamespaceManager
nsManager namespace.NamespaceManager
}

func NewDefaultHandshakeHandler(nsManager *namespace.NamespaceManager) *DefaultHandshakeHandler {
func NewDefaultHandshakeHandler(nsManager namespace.NamespaceManager) *DefaultHandshakeHandler {
return &DefaultHandshakeHandler{
nsManager: nsManager,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/api/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func (h *Server) DebugHealth(c *gin.Context) {
status := http.StatusOK
if h.isClosing.Load() {
if h.isClosing.Load() || !h.mgr.NsMgr.Ready() {
status = http.StatusBadGateway
}
c.JSON(status, config.HealthInfo{
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/api/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func TestDebug(t *testing.T) {
require.Equal(t, http.StatusOK, r.StatusCode)
})

server.mgr.NsMgr.(*mockNamespaceManager).success.Store(false)
doHTTP(t, http.MethodGet, "/api/debug/health", httpOpts{}, func(t *testing.T, r *http.Response) {
require.Equal(t, http.StatusBadGateway, r.StatusCode)
})

server.mgr.NsMgr.(*mockNamespaceManager).success.Store(true)
doHTTP(t, http.MethodGet, "/api/debug/health", httpOpts{}, func(t *testing.T, r *http.Response) {
require.Equal(t, http.StatusOK, r.StatusCode)
})
Expand Down
76 changes: 76 additions & 0 deletions pkg/server/api/mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package api

import (
"context"
"errors"
"sync/atomic"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/pkg/balance/metricsreader"
"github.com/pingcap/tiproxy/pkg/balance/observer"
mconfig "github.com/pingcap/tiproxy/pkg/manager/config"
"github.com/pingcap/tiproxy/pkg/manager/namespace"
"github.com/pingcap/tiproxy/pkg/util/http"
"go.uber.org/zap"
)

var _ namespace.NamespaceManager = (*mockNamespaceManager)(nil)

type mockNamespaceManager struct {
success atomic.Bool
}

func newMockNamespaceManager() *mockNamespaceManager {
mgr := &mockNamespaceManager{}
mgr.success.Store(true)
return mgr
}

func (m *mockNamespaceManager) Init(_ *zap.Logger, _ []*config.Namespace, _ observer.TopologyFetcher,
_ metricsreader.PromInfoFetcher, _ *http.Client, _ *mconfig.ConfigManager, _ metricsreader.MetricsReader) error {
return nil
}

func (m *mockNamespaceManager) GetNamespace(_ string) (*namespace.Namespace, bool) {
return nil, false
}

func (m *mockNamespaceManager) GetNamespaceByUser(_ string) (*namespace.Namespace, bool) {
return nil, false
}

func (m *mockNamespaceManager) SetNamespace(_ context.Context, _ string, _ *config.Namespace) error {
if m.success.Load() {
return nil
}
return errors.New("mock error")
}

func (m *mockNamespaceManager) GetConfigChecksum() string {
return ""
}

func (m *mockNamespaceManager) Ready() bool {
return m.success.Load()
}

func (m *mockNamespaceManager) RedirectConnections() []error {
if m.success.Load() {
return nil
}
return []error{errors.New("mock error")}
}

func (m *mockNamespaceManager) Close() error {
return nil
}

func (m *mockNamespaceManager) CommitNamespaces(_ []*config.Namespace, _ []bool) error {
if m.success.Load() {
return nil
}
return errors.New("mock error")
}
5 changes: 3 additions & 2 deletions pkg/server/api/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestNamespace(t *testing.T) {
_, doHTTP := createServer(t)
srv, doHTTP := createServer(t)

// test list
doHTTP(t, http.MethodGet, "/api/admin/namespace", httpOpts{}, func(t *testing.T, r *http.Response) {
Expand Down Expand Up @@ -51,7 +51,8 @@ func TestNamespace(t *testing.T) {
doHTTP(t, http.MethodPost, "/api/admin/namespace/commit?namespace=xx", httpOpts{}, func(t *testing.T, r *http.Response) {
require.Equal(t, http.StatusInternalServerError, r.StatusCode)
})
srv.mgr.NsMgr.(*mockNamespaceManager).success.Store(true)
doHTTP(t, http.MethodPost, "/api/admin/namespace/commit", httpOpts{}, func(t *testing.T, r *http.Response) {
require.Equal(t, http.StatusInternalServerError, r.StatusCode)
require.Equal(t, http.StatusOK, r.StatusCode)
})
}
2 changes: 1 addition & 1 deletion pkg/server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type HTTPHandler interface {

type Managers struct {
CfgMgr *mgrcfg.ConfigManager
NsMgr *mgrns.NamespaceManager
NsMgr mgrns.NamespaceManager
CertMgr *mgrcrt.CertManager
BackendReader BackendReader
ReplayJobMgr mgrrp.JobManager
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/pingcap/tiproxy/lib/util/logger"
mgrcrt "github.com/pingcap/tiproxy/pkg/manager/cert"
mgrcfg "github.com/pingcap/tiproxy/pkg/manager/config"
mgrns "github.com/pingcap/tiproxy/pkg/manager/namespace"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"
Expand All @@ -33,11 +32,12 @@ func createServer(t *testing.T) (*Server, func(t *testing.T, method string, path
require.NoError(t, cfgmgr.Init(context.Background(), lg, "", ""))
crtmgr := mgrcrt.NewCertManager()
require.NoError(t, crtmgr.Init(cfgmgr.GetConfig(), lg, cfgmgr.WatchConfig()))
nsMgr := newMockNamespaceManager()
srv, err := NewServer(config.API{
Addr: "0.0.0.0:0",
}, lg, Managers{
CfgMgr: cfgmgr,
NsMgr: mgrns.NewNamespaceManager(),
NsMgr: nsMgr,
CertMgr: crtmgr,
BackendReader: &mockBackendReader{},
ReplayJobMgr: &mockReplayJobManager{},
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Server struct {
wg waitgroup.WaitGroup
// managers
configManager *mgrcfg.ConfigManager
namespaceManager *mgrns.NamespaceManager
namespaceManager mgrns.NamespaceManager
metricsManager *metrics.MetricsManager
loggerManager *logger.LoggerManager
certManager *cert.CertManager
Expand Down

0 comments on commit 26c66bc

Please sign in to comment.