Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make /api/debug/health return error when no backends are available #692

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/balance/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Router interface {
ConnEventReceiver

GetBackendSelector() BackendSelector
HealthyBackendCount() int
RefreshBackend()
RedirectConnections() error
ConnCount() int
Expand Down
16 changes: 16 additions & 0 deletions pkg/balance/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ func (router *ScoreBasedRouter) GetBackendSelector() BackendSelector {
}
}

func (router *ScoreBasedRouter) HealthyBackendCount() int {
router.Lock()
defer router.Unlock()
if router.observeError != nil {
return 0
}

count := 0
for _, backend := range router.backends {
if backend.Healthy() {
count++
}
}
return count
}

func (router *ScoreBasedRouter) getConnWrapper(conn RedirectableConn) *glist.Element[*connWrapper] {
return conn.Value(_routerKey).(*glist.Element[*connWrapper])
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/balance/router/router_score_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,14 @@ func TestNoBackends(t *testing.T) {
conn := tester.createConn()
backend := tester.simpleRoute(conn)
require.True(t, backend == nil || reflect.ValueOf(backend).IsNil())
require.Equal(t, 0, tester.router.HealthyBackendCount())
tester.addBackends(1)
require.Equal(t, 1, tester.router.HealthyBackendCount())
tester.addConnections(10)
tester.killBackends(1)
backend = tester.simpleRoute(conn)
require.True(t, backend == nil || reflect.ValueOf(backend).IsNil())
require.Equal(t, 0, tester.router.HealthyBackendCount())
}

// Test that the backends returned by the BackendSelector are complete and different.
Expand Down
4 changes: 4 additions & 0 deletions pkg/balance/router/router_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (r *StaticRouter) GetBackendSelector() BackendSelector {
}
}

func (r *StaticRouter) HealthyBackendCount() int {
return len(r.backends)
}

func (r *StaticRouter) RefreshBackend() {}

func (r *StaticRouter) RedirectConnections() error {
Expand Down
50 changes: 38 additions & 12 deletions pkg/manager/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,19 @@ import (
"go.uber.org/zap"
)

type NamespaceManager struct {
type NamespaceManager interface {
Init(logger *zap.Logger, nscs []*config.Namespace, tpFetcher observer.TopologyFetcher,
promFetcher metricsreader.PromInfoFetcher, httpCli *http.Client, cfgMgr *mconfig.ConfigManager,
metricsReader metricsreader.MetricsReader) error
CommitNamespaces(nss []*config.Namespace, nssDelete []bool) error
GetNamespace(nm string) (*Namespace, bool)
GetNamespaceByUser(user string) (*Namespace, bool)
RedirectConnections() []error
Ready() bool
Close() error
}

type namespaceManager struct {
sync.RWMutex
nsm map[string]*Namespace
tpFetcher observer.TopologyFetcher
Expand All @@ -33,17 +45,17 @@ type NamespaceManager struct {
cfgMgr *mconfig.ConfigManager
}

func NewNamespaceManager() *NamespaceManager {
return &NamespaceManager{}
func NewNamespaceManager() *namespaceManager {
return &namespaceManager{}
}

func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace, error) {
func (mgr *namespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace, error) {
logger := mgr.logger.With(zap.String("namespace", cfg.Namespace))

// init BackendFetcher
var fetcher observer.BackendFetcher
healthCheckCfg := config.NewDefaultHealthCheckConfig()
if !reflect.ValueOf(mgr.tpFetcher).IsNil() {
if mgr.tpFetcher != nil && !reflect.ValueOf(mgr.tpFetcher).IsNil() {
fetcher = observer.NewPDFetcher(mgr.tpFetcher, logger.Named("be_fetcher"), healthCheckCfg)
} else {
fetcher = observer.NewStaticFetcher(cfg.Backend.Instances)
Expand All @@ -65,7 +77,7 @@ func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace,
}, nil
}

func (mgr *NamespaceManager) CommitNamespaces(nss []*config.Namespace, nss_delete []bool) error {
func (mgr *namespaceManager) CommitNamespaces(nss []*config.Namespace, nssDelete []bool) error {
nsm := make(map[string]*Namespace)
mgr.RLock()
for k, v := range mgr.nsm {
Expand All @@ -74,7 +86,7 @@ func (mgr *NamespaceManager) CommitNamespaces(nss []*config.Namespace, nss_delet
mgr.RUnlock()

for i, nsc := range nss {
if nss_delete != nil && nss_delete[i] {
if nssDelete != nil && nssDelete[i] {
delete(nsm, nsc.Namespace)
continue
}
Expand All @@ -92,7 +104,7 @@ func (mgr *NamespaceManager) CommitNamespaces(nss []*config.Namespace, nss_delet
return nil
}

func (mgr *NamespaceManager) Init(logger *zap.Logger, nscs []*config.Namespace, tpFetcher observer.TopologyFetcher,
func (mgr *namespaceManager) Init(logger *zap.Logger, nscs []*config.Namespace, tpFetcher observer.TopologyFetcher,
promFetcher metricsreader.PromInfoFetcher, httpCli *http.Client, cfgMgr *mconfig.ConfigManager,
metricsReader metricsreader.MetricsReader) error {
mgr.Lock()
Expand All @@ -106,15 +118,15 @@ func (mgr *NamespaceManager) Init(logger *zap.Logger, nscs []*config.Namespace,
return mgr.CommitNamespaces(nscs, nil)
}

func (mgr *NamespaceManager) GetNamespace(nm string) (*Namespace, bool) {
func (mgr *namespaceManager) GetNamespace(nm string) (*Namespace, bool) {
mgr.RLock()
defer mgr.RUnlock()

ns, ok := mgr.nsm[nm]
return ns, ok
}

func (mgr *NamespaceManager) GetNamespaceByUser(user string) (*Namespace, bool) {
func (mgr *namespaceManager) GetNamespaceByUser(user string) (*Namespace, bool) {
mgr.RLock()
defer mgr.RUnlock()

Expand All @@ -126,7 +138,7 @@ func (mgr *NamespaceManager) GetNamespaceByUser(user string) (*Namespace, bool)
return nil, false
}

func (mgr *NamespaceManager) RedirectConnections() []error {
func (mgr *namespaceManager) RedirectConnections() []error {
mgr.RLock()
defer mgr.RUnlock()

Expand All @@ -140,7 +152,21 @@ func (mgr *NamespaceManager) RedirectConnections() []error {
return errs
}

func (mgr *NamespaceManager) Close() error {
func (mgr *namespaceManager) Ready() bool {
mgr.RLock()
defer mgr.RUnlock()
if len(mgr.nsm) == 0 {
return false
}
for _, ns := range mgr.nsm {
if ns.GetRouter().HealthyBackendCount() <= 0 {
return false
}
}
return true
}

func (mgr *namespaceManager) Close() error {
mgr.RLock()
for _, ns := range mgr.nsm {
ns.Close()
Expand Down
32 changes: 32 additions & 0 deletions pkg/manager/namespace/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package namespace

import (
"testing"

"github.com/pingcap/tiproxy/pkg/balance/router"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestReady(t *testing.T) {
nsMgr := NewNamespaceManager()
require.NoError(t, nsMgr.Init(zap.NewNop(), nil, nil, nil, nil, nil, nil))
require.False(t, nsMgr.Ready())

rt := router.NewStaticRouter([]string{})
nsMgr.nsm = map[string]*Namespace{
"test": {
router: rt,
},
}
require.False(t, nsMgr.Ready())

rt = router.NewStaticRouter([]string{"127.0.0.1:4000"})
ns, ok := nsMgr.GetNamespace("test")
require.True(t, ok)
ns.router = rt
require.True(t, nsMgr.Ready())
}
4 changes: 2 additions & 2 deletions pkg/proxy/backend/handshake_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ type HandshakeHandler interface {
}

type DefaultHandshakeHandler struct {
nsManager *namespace.NamespaceManager
nsManager namespace.NamespaceManager
}

func NewDefaultHandshakeHandler(nsManager *namespace.NamespaceManager) *DefaultHandshakeHandler {
func NewDefaultHandshakeHandler(nsManager namespace.NamespaceManager) *DefaultHandshakeHandler {
return &DefaultHandshakeHandler{
nsManager: nsManager,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/api/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func (h *Server) DebugHealth(c *gin.Context) {
status := http.StatusOK
if h.isClosing.Load() {
if h.isClosing.Load() || !h.mgr.NsMgr.Ready() {
status = http.StatusBadGateway
}
c.JSON(status, config.HealthInfo{
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/api/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func TestDebug(t *testing.T) {
require.Equal(t, http.StatusOK, r.StatusCode)
})

server.mgr.NsMgr.(*mockNamespaceManager).success.Store(false)
doHTTP(t, http.MethodGet, "/api/debug/health", httpOpts{}, func(t *testing.T, r *http.Response) {
require.Equal(t, http.StatusBadGateway, r.StatusCode)
})

server.mgr.NsMgr.(*mockNamespaceManager).success.Store(true)
doHTTP(t, http.MethodGet, "/api/debug/health", httpOpts{}, func(t *testing.T, r *http.Response) {
require.Equal(t, http.StatusOK, r.StatusCode)
})
Expand Down
76 changes: 76 additions & 0 deletions pkg/server/api/mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package api

import (
"context"
"errors"
"sync/atomic"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/pkg/balance/metricsreader"
"github.com/pingcap/tiproxy/pkg/balance/observer"
mconfig "github.com/pingcap/tiproxy/pkg/manager/config"
"github.com/pingcap/tiproxy/pkg/manager/namespace"
"github.com/pingcap/tiproxy/pkg/util/http"
"go.uber.org/zap"
)

var _ namespace.NamespaceManager = (*mockNamespaceManager)(nil)

type mockNamespaceManager struct {
success atomic.Bool
}

func newMockNamespaceManager() *mockNamespaceManager {
mgr := &mockNamespaceManager{}
mgr.success.Store(true)
return mgr
}

func (m *mockNamespaceManager) Init(_ *zap.Logger, _ []*config.Namespace, _ observer.TopologyFetcher,
_ metricsreader.PromInfoFetcher, _ *http.Client, _ *mconfig.ConfigManager, _ metricsreader.MetricsReader) error {
return nil
}

func (m *mockNamespaceManager) GetNamespace(_ string) (*namespace.Namespace, bool) {
return nil, false
}

func (m *mockNamespaceManager) GetNamespaceByUser(_ string) (*namespace.Namespace, bool) {
return nil, false
}

func (m *mockNamespaceManager) SetNamespace(_ context.Context, _ string, _ *config.Namespace) error {
if m.success.Load() {
return nil
}
return errors.New("mock error")
}

func (m *mockNamespaceManager) GetConfigChecksum() string {
return ""
}

func (m *mockNamespaceManager) Ready() bool {
return m.success.Load()
}

func (m *mockNamespaceManager) RedirectConnections() []error {
if m.success.Load() {
return nil
}
return []error{errors.New("mock error")}
}

func (m *mockNamespaceManager) Close() error {
return nil
}

func (m *mockNamespaceManager) CommitNamespaces(_ []*config.Namespace, _ []bool) error {
if m.success.Load() {
return nil
}
return errors.New("mock error")
}
5 changes: 3 additions & 2 deletions pkg/server/api/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestNamespace(t *testing.T) {
_, doHTTP := createServer(t)
srv, doHTTP := createServer(t)

// test list
doHTTP(t, http.MethodGet, "/api/admin/namespace", httpOpts{}, func(t *testing.T, r *http.Response) {
Expand Down Expand Up @@ -51,7 +51,8 @@ func TestNamespace(t *testing.T) {
doHTTP(t, http.MethodPost, "/api/admin/namespace/commit?namespace=xx", httpOpts{}, func(t *testing.T, r *http.Response) {
require.Equal(t, http.StatusInternalServerError, r.StatusCode)
})
srv.mgr.NsMgr.(*mockNamespaceManager).success.Store(true)
doHTTP(t, http.MethodPost, "/api/admin/namespace/commit", httpOpts{}, func(t *testing.T, r *http.Response) {
require.Equal(t, http.StatusInternalServerError, r.StatusCode)
require.Equal(t, http.StatusOK, r.StatusCode)
})
}
2 changes: 1 addition & 1 deletion pkg/server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type HTTPHandler interface {

type Managers struct {
CfgMgr *mgrcfg.ConfigManager
NsMgr *mgrns.NamespaceManager
NsMgr mgrns.NamespaceManager
CertMgr *mgrcrt.CertManager
BackendReader BackendReader
ReplayJobMgr mgrrp.JobManager
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/pingcap/tiproxy/lib/util/logger"
mgrcrt "github.com/pingcap/tiproxy/pkg/manager/cert"
mgrcfg "github.com/pingcap/tiproxy/pkg/manager/config"
mgrns "github.com/pingcap/tiproxy/pkg/manager/namespace"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"
Expand All @@ -33,11 +32,12 @@ func createServer(t *testing.T) (*Server, func(t *testing.T, method string, path
require.NoError(t, cfgmgr.Init(context.Background(), lg, "", ""))
crtmgr := mgrcrt.NewCertManager()
require.NoError(t, crtmgr.Init(cfgmgr.GetConfig(), lg, cfgmgr.WatchConfig()))
nsMgr := newMockNamespaceManager()
srv, err := NewServer(config.API{
Addr: "0.0.0.0:0",
}, lg, Managers{
CfgMgr: cfgmgr,
NsMgr: mgrns.NewNamespaceManager(),
NsMgr: nsMgr,
CertMgr: crtmgr,
BackendReader: &mockBackendReader{},
ReplayJobMgr: &mockReplayJobManager{},
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Server struct {
wg waitgroup.WaitGroup
// managers
configManager *mgrcfg.ConfigManager
namespaceManager *mgrns.NamespaceManager
namespaceManager mgrns.NamespaceManager
metricsManager *metrics.MetricsManager
loggerManager *logger.LoggerManager
certManager *cert.CertManager
Expand Down