Skip to content

Commit

Permalink
router, backend: stop retrying connecting to backends if the fetcher …
Browse files Browse the repository at this point in the history
…fails (#182)
  • Loading branch information
djshow832 authored Jan 11, 2023
1 parent dec59b4 commit 17cf8c8
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 61 deletions.
13 changes: 8 additions & 5 deletions pkg/manager/router/backend_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ import (
"google.golang.org/grpc/keepalive"
)

var _ BackendFetcher = (*PDFetcher)(nil)
var _ BackendFetcher = (*StaticFetcher)(nil)

// BackendFetcher is an interface to fetch the backend list.
type BackendFetcher interface {
GetBackendList(context.Context) map[string]*BackendInfo
GetBackendList(context.Context) (map[string]*BackendInfo, error)
}

// InitEtcdClient initializes an etcd client that fetches TiDB instance topology from PD.
Expand Down Expand Up @@ -99,10 +102,10 @@ func NewPDFetcher(client *clientv3.Client, logger *zap.Logger, config *HealthChe
}
}

func (pf *PDFetcher) GetBackendList(ctx context.Context) map[string]*BackendInfo {
func (pf *PDFetcher) GetBackendList(ctx context.Context) (map[string]*BackendInfo, error) {
pf.fetchBackendList(ctx)
backendInfo := pf.filterTombstoneBackends()
return backendInfo
return backendInfo, nil
}

func (pf *PDFetcher) fetchBackendList(ctx context.Context) {
Expand Down Expand Up @@ -208,8 +211,8 @@ func NewStaticFetcher(staticAddrs []string) *StaticFetcher {
}
}

func (sf *StaticFetcher) GetBackendList(context.Context) map[string]*BackendInfo {
return sf.backends
func (sf *StaticFetcher) GetBackendList(context.Context) (map[string]*BackendInfo, error) {
return sf.backends, nil
}

func backendListToMap(addrs []string) map[string]*BackendInfo {
Expand Down
22 changes: 13 additions & 9 deletions pkg/manager/router/backend_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewDefaultHealthCheckConfig() *HealthCheckConfig {
// BackendEventReceiver receives the event of backend status change.
type BackendEventReceiver interface {
// OnBackendChanged is called when the backend list changes.
OnBackendChanged(backends map[string]BackendStatus)
OnBackendChanged(backends map[string]BackendStatus, err error)
}

// BackendInfo stores the status info of each backend.
Expand Down Expand Up @@ -179,12 +179,17 @@ func (bo *BackendObserver) Refresh() {

func (bo *BackendObserver) observe(ctx context.Context) {
for ctx.Err() == nil {
backendInfo := bo.fetcher.GetBackendList(ctx)
backendStatus := bo.checkHealth(ctx, backendInfo)
if ctx.Err() != nil {
return
backendInfo, err := bo.fetcher.GetBackendList(ctx)
if err != nil {
bo.logger.Warn("fetching backends encounters error", zap.Error(err))
bo.eventReceiver.OnBackendChanged(nil, err)
} else {
backendStatus := bo.checkHealth(ctx, backendInfo)
if ctx.Err() != nil {
return
}
bo.notifyIfChanged(backendStatus)
}
bo.notifyIfChanged(backendStatus)
select {
case <-time.After(bo.config.healthCheckInterval):
case <-bo.refreshChan:
Expand Down Expand Up @@ -286,9 +291,8 @@ func (bo *BackendObserver) notifyIfChanged(backendStatus map[string]BackendStatu
}
}
}
if len(updatedBackends) > 0 {
bo.eventReceiver.OnBackendChanged(updatedBackends)
}
// Notify it even when the updatedBackends is empty, in order to clear the last error.
bo.eventReceiver.OnBackendChanged(updatedBackends, nil)
bo.curBackendInfo = backendStatus
}

Expand Down
47 changes: 34 additions & 13 deletions pkg/manager/router/backend_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/pingcap/TiProxy/lib/util/errors"
"github.com/pingcap/TiProxy/lib/util/logger"
"github.com/pingcap/TiProxy/lib/util/waitgroup"
"github.com/stretchr/testify/require"
Expand All @@ -33,15 +34,21 @@ import (

type mockEventReceiver struct {
backendChan chan map[string]BackendStatus
errChan chan error
}

func (mer *mockEventReceiver) OnBackendChanged(backends map[string]BackendStatus) {
mer.backendChan <- backends
func (mer *mockEventReceiver) OnBackendChanged(backends map[string]BackendStatus, err error) {
if err != nil {
mer.errChan <- err
} else if len(backends) > 0 {
mer.backendChan <- backends
}
}

func newMockEventReceiver(backendChan chan map[string]BackendStatus) *mockEventReceiver {
func newMockEventReceiver(backendChan chan map[string]BackendStatus, errChan chan error) *mockEventReceiver {
return &mockEventReceiver{
backendChan: backendChan,
errChan: errChan,
}
}

Expand Down Expand Up @@ -94,7 +101,8 @@ func TestCancelObserver(t *testing.T) {
for i := 0; i < 3; i++ {
backends = append(backends, addBackend(t, kv))
}
backendInfo := bo.fetcher.GetBackendList(context.Background())
backendInfo, err := bo.fetcher.GetBackendList(context.Background())
require.NoError(t, err)
require.Equal(t, 3, len(backendInfo))

// Try 10 times.
Expand Down Expand Up @@ -133,11 +141,12 @@ func TestEtcdUnavailable(t *testing.T) {
// Test that the notified backend status is correct for external fetcher.
func TestExternalFetcher(t *testing.T) {
backendAddrs := make([]string, 0)
var observeError error
var mutex sync.Mutex
backendGetter := func() []string {
backendGetter := func() ([]string, error) {
mutex.Lock()
defer mutex.Unlock()
return backendAddrs
return backendAddrs, observeError
}
addBackend := func() *backendServer {
backend := newBackendServer(t)
Expand All @@ -152,9 +161,15 @@ func TestExternalFetcher(t *testing.T) {
backendAddrs = append(backendAddrs[:idx], backendAddrs[idx+1:]...)
mutex.Unlock()
}
mockError := func() {
mutex.Lock()
observeError = errors.New("mock observe error")
mutex.Unlock()
}

backendChan := make(chan map[string]BackendStatus, 1)
mer := newMockEventReceiver(backendChan)
errChan := make(chan error, 1)
mer := newMockEventReceiver(backendChan, errChan)
fetcher := NewExternalFetcher(backendGetter)
bo, err := NewBackendObserver(logger.CreateLoggerForTest(t), mer, nil, newHealthCheckConfigForTest(), fetcher)
require.NoError(t, err)
Expand All @@ -175,14 +190,20 @@ func TestExternalFetcher(t *testing.T) {
backend2.close()
checkStatus(t, backendChan, backend2, StatusCannotConnect)
removeBackend(backend2)

// returns observe error
require.Len(t, errChan, 0)
mockError()
err = <-errChan
require.Error(t, err)
}

func runETCDTest(t *testing.T, f func(etcd *embed.Etcd, kv clientv3.KV, bo *BackendObserver, backendChan chan map[string]BackendStatus)) {
etcd := createEtcdServer(t, "127.0.0.1:0")
client := createEtcdClient(t, etcd)
kv := clientv3.NewKV(client)
backendChan := make(chan map[string]BackendStatus, 1)
mer := newMockEventReceiver(backendChan)
mer := newMockEventReceiver(backendChan, make(chan error, 1))
fetcher := NewPDFetcher(client, logger.CreateLoggerForTest(t), newHealthCheckConfigForTest())
bo, err := NewBackendObserver(logger.CreateLoggerForTest(t), mer, nil, newHealthCheckConfigForTest(), fetcher)
require.NoError(t, err)
Expand Down Expand Up @@ -293,16 +314,16 @@ func startListener(t *testing.T, addr string) (net.Listener, string) {

// ExternalFetcher fetches backend list from a given callback.
type ExternalFetcher struct {
backendGetter func() []string
backendGetter func() ([]string, error)
}

func NewExternalFetcher(backendGetter func() []string) *ExternalFetcher {
func NewExternalFetcher(backendGetter func() ([]string, error)) *ExternalFetcher {
return &ExternalFetcher{
backendGetter: backendGetter,
}
}

func (ef *ExternalFetcher) GetBackendList(context.Context) map[string]*BackendInfo {
addrs := ef.backendGetter()
return backendListToMap(addrs)
func (ef *ExternalFetcher) GetBackendList(context.Context) (map[string]*BackendInfo, error) {
addrs, err := ef.backendGetter()
return backendListToMap(addrs), err
}
12 changes: 8 additions & 4 deletions pkg/manager/router/backend_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@ package router
type BackendSelector struct {
excluded []string
cur string
routeOnce func(excluded []string) string
routeOnce func(excluded []string) (string, error)
addConn func(addr string, conn RedirectableConn) error
}

func (bs *BackendSelector) Reset() {
bs.excluded = bs.excluded[:0]
}

func (bs *BackendSelector) Next() string {
bs.cur = bs.routeOnce(bs.excluded)
func (bs *BackendSelector) Next() (string, error) {
addr, err := bs.routeOnce(bs.excluded)
if err != nil {
return addr, err
}
bs.cur = addr
if bs.cur != "" {
bs.excluded = append(bs.excluded, bs.cur)
}
return bs.cur
return bs.cur, nil
}

func (bs *BackendSelector) Succeed(conn RedirectableConn) error {
Expand Down
15 changes: 10 additions & 5 deletions pkg/manager/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type ScoreBasedRouter struct {
cancelFunc context.CancelFunc
wg waitgroup.WaitGroup
// A list of *backendWrapper. The backends are in descending order of scores.
backends *list.List
backends *list.List
observeError error
}

// NewScoreBasedRouter creates a ScoreBasedRouter.
Expand Down Expand Up @@ -69,9 +70,12 @@ func (router *ScoreBasedRouter) GetBackendSelector() BackendSelector {
}
}

func (router *ScoreBasedRouter) routeOnce(excluded []string) string {
func (router *ScoreBasedRouter) routeOnce(excluded []string) (string, error) {
router.Lock()
defer router.Unlock()
if router.observeError != nil {
return "", router.observeError
}
for be := router.backends.Back(); be != nil; be = be.Prev() {
backend := be.Value.(*backendWrapper)
// These backends may be recycled, so we should not connect to them again.
Expand All @@ -87,15 +91,15 @@ func (router *ScoreBasedRouter) routeOnce(excluded []string) string {
}
}
if !found {
return backend.addr
return backend.addr, nil
}
}
// No available backends, maybe the health check result is outdated during rolling restart.
// Refresh the backends asynchronously in this case.
if router.observer != nil {
router.observer.Refresh()
}
return ""
return "", nil
}

func (router *ScoreBasedRouter) addNewConn(addr string, conn RedirectableConn) error {
Expand Down Expand Up @@ -278,9 +282,10 @@ func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn)
}

// OnBackendChanged implements BackendEventReceiver.OnBackendChanged interface.
func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]BackendStatus) {
func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]BackendStatus, err error) {
router.Lock()
defer router.Unlock()
router.observeError = err
for addr, status := range backends {
be := router.lookupBackend(addr, true)
if be == nil && status != StatusCannotConnect {
Expand Down
6 changes: 3 additions & 3 deletions pkg/manager/router/router_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewStaticRouter(addr []string) *StaticRouter {

func (r *StaticRouter) GetBackendSelector() BackendSelector {
return BackendSelector{
routeOnce: func(excluded []string) string {
routeOnce: func(excluded []string) (string, error) {
for _, addr := range r.addrs {
found := false
for _, e := range excluded {
Expand All @@ -37,10 +37,10 @@ func (r *StaticRouter) GetBackendSelector() BackendSelector {
}
}
if !found {
return addr
return addr, nil
}
}
return ""
return "", nil
},
addConn: func(addr string, conn RedirectableConn) error {
r.cnt++
Expand Down
Loading

0 comments on commit 17cf8c8

Please sign in to comment.