Skip to content

Commit

Permalink
router, backend: retry connecting to different backends (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Jan 5, 2023
1 parent f41b593 commit 66e47e3
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 53 deletions.
38 changes: 38 additions & 0 deletions pkg/manager/router/backend_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package router

type BackendSelector struct {
excluded []string
cur string
routeOnce func(excluded []string) string
addConn func(addr string, conn RedirectableConn) error
}

func (bs *BackendSelector) Reset() {
bs.excluded = bs.excluded[:0]
}

func (bs *BackendSelector) Next() string {
if len(bs.cur) > 0 {
bs.excluded = append(bs.excluded, bs.cur)
}
bs.cur = bs.routeOnce(bs.excluded)
return bs.cur
}

func (bs *BackendSelector) Succeed(conn RedirectableConn) error {
return bs.addConn(bs.cur, conn)
}
2 changes: 1 addition & 1 deletion pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Router interface {
// Router will handle connection events to balance connections if possible.
ConnEventReceiver

Route(RedirectableConn) (string, error)
GetBackendSelector() BackendSelector
RedirectConnections() error
ConnCount() int
Close()
Expand Down
51 changes: 39 additions & 12 deletions pkg/manager/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,27 +61,54 @@ func NewScoreBasedRouter(logger *zap.Logger, httpCli *http.Client, fetcher Backe
return router, nil
}

// Route implements Router.Route interface.
func (router *ScoreBasedRouter) Route(conn RedirectableConn) (string, error) {
// GetBackendSelector implements Router.GetBackendSelector interface.
func (router *ScoreBasedRouter) GetBackendSelector() BackendSelector {
return BackendSelector{
routeOnce: router.routeOnce,
addConn: router.addNewConn,
}
}

func (router *ScoreBasedRouter) routeOnce(excluded []string) string {
router.Lock()
defer router.Unlock()
be := router.backends.Back()
if be == nil {
return "", ErrNoInstanceToSelect
}
backend := be.Value.(*backendWrapper)
switch backend.status {
case StatusCannotConnect, StatusSchemaOutdated:
return "", ErrNoInstanceToSelect
for be := router.backends.Back(); be != nil; be = be.Prev() {
backend := be.Value.(*backendWrapper)
// These backends may be recycled, so we should not connect to them again.
switch backend.status {
case StatusCannotConnect, StatusSchemaOutdated:
continue
}
found := false
for _, ex := range excluded {
if ex == backend.addr {
found = true
break
}
}
if !found {
return backend.addr
}
}
return ""
}

func (router *ScoreBasedRouter) addNewConn(addr string, conn RedirectableConn) error {
connWrapper := &connWrapper{
RedirectableConn: conn,
phase: phaseNotRedirected,
}
router.Lock()
be := router.lookupBackend(addr, true)
if be == nil {
router.Unlock()
return errors.WithStack(errors.Errorf("backend %s is not found in the router", addr))
}
router.addConn(be, connWrapper)
addBackendConnMetrics(backend.addr)
router.Unlock()
addBackendConnMetrics(addr)
conn.SetEventReceiver(router)
return backend.addr, nil
return nil
}

func (router *ScoreBasedRouter) removeConn(be *list.Element, ce *list.Element) {
Expand Down
29 changes: 24 additions & 5 deletions pkg/manager/router/router_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,43 @@ package router
var _ Router = &StaticRouter{}

type StaticRouter struct {
addr string
addr []string
cnt int
}

func NewStaticRouter(addr string) *StaticRouter {
func NewStaticRouter(addr []string) *StaticRouter {
return &StaticRouter{addr: addr}
}

func (r *StaticRouter) Route(c RedirectableConn) (string, error) {
return r.addr, nil
func (r *StaticRouter) GetBackendSelector() BackendSelector {
return BackendSelector{
routeOnce: func(excluded []string) string {
for _, addr := range r.addr {
found := false
for _, e := range excluded {
if e == addr {
found = true
break
}
}
if !found {
return addr
}
}
return ""
},
addConn: func(addr string, conn RedirectableConn) error {
r.cnt++
return nil
},
}
}

func (r *StaticRouter) RedirectConnections() error {
return nil
}

func (r *StaticRouter) ConnCount() int {
r.cnt++
return r.cnt
}

Expand Down
66 changes: 57 additions & 9 deletions pkg/manager/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,21 @@ func (tester *routerTester) checkBackendOrder() {
}
}

func (tester *routerTester) simpleRoute(conn RedirectableConn) string {
selector := tester.router.GetBackendSelector()
addr := selector.Next()
if len(addr) > 0 {
err := selector.Succeed(conn)
require.NoError(tester.t, err)
}
return addr
}

func (tester *routerTester) addConnections(num int) {
for i := 0; i < num; i++ {
conn := tester.createConn()
addr, err := tester.router.Route(conn)
require.NoError(tester.t, err)
addr := tester.simpleRoute(conn)
require.True(tester.t, len(addr) > 0)
conn.from = addr
tester.conns[conn.connID] = conn
}
Expand Down Expand Up @@ -355,13 +365,49 @@ func TestConnBalanced(t *testing.T) {
func TestNoBackends(t *testing.T) {
tester := newRouterTester(t)
conn := tester.createConn()
_, err := tester.router.Route(conn)
require.ErrorIs(t, err, ErrNoInstanceToSelect)
addr := tester.simpleRoute(conn)
require.True(t, len(addr) == 0)
tester.addBackends(1)
tester.addConnections(10)
tester.killBackends(1)
_, err = tester.router.Route(conn)
require.ErrorIs(t, err, ErrNoInstanceToSelect)
addr = tester.simpleRoute(conn)
require.True(t, len(addr) == 0)
}

// Test that the backends returned by the BackendSelector are complete and different.
func TestSelectorReturnOrder(t *testing.T) {
tester := newRouterTester(t)
tester.addBackends(3)
selector := tester.router.GetBackendSelector()
for i := 0; i < 3; i++ {
addrs := make(map[string]struct{}, 3)
for j := 0; j < 3; j++ {
addr := selector.Next()
addrs[addr] = struct{}{}
}
// All 3 addresses are different.
require.Equal(t, 3, len(addrs))
addr := selector.Next()
require.True(t, len(addr) == 0)
selector.Reset()
}

tester.killBackends(1)
for i := 0; i < 2; i++ {
addr := selector.Next()
require.True(t, len(addr) > 0)
}
addr := selector.Next()
require.True(t, len(addr) == 0)
selector.Reset()

tester.addBackends(1)
for i := 0; i < 3; i++ {
addr := selector.Next()
require.True(t, len(addr) > 0)
}
addr = selector.Next()
require.True(t, len(addr) == 0)
}

// Test that the backends are balanced during rolling restart.
Expand Down Expand Up @@ -578,12 +624,14 @@ func TestConcurrency(t *testing.T) {
t: t,
connID: connID,
}
addr, err := router.Route(conn)
if err != nil {
require.ErrorIs(t, err, ErrNoInstanceToSelect)
selector := router.GetBackendSelector()
addr := selector.Next()
if len(addr) == 0 {
conn = nil
continue
}
err = selector.Succeed(conn)
require.NoError(t, err)
conn.from = addr
} else if len(conn.GetRedirectingAddr()) > 0 {
// redirecting, 70% success, 20% fail, 10% close
Expand Down
5 changes: 3 additions & 2 deletions pkg/proxy/backend/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"sync"
"time"

"github.com/pingcap/TiProxy/lib/util/errors"
pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
Expand Down Expand Up @@ -101,7 +102,7 @@ func (auth *Authenticator) verifyBackendCaps(logger *zap.Logger, backendCapabili
return nil
}

type backendIOGetter func(ctx ConnContext, auth *Authenticator, resp *pnet.HandshakeResp) (*pnet.PacketIO, error)
type backendIOGetter func(ctx ConnContext, auth *Authenticator, resp *pnet.HandshakeResp, timeout time.Duration) (*pnet.PacketIO, error)

func (auth *Authenticator) handshakeFirstTime(logger *zap.Logger, clientIO *pnet.PacketIO, handshakeHandler HandshakeHandler,
getBackendIO backendIOGetter, frontendTLSConfig, backendTLSConfig *tls.Config) error {
Expand Down Expand Up @@ -160,7 +161,7 @@ func (auth *Authenticator) handshakeFirstTime(logger *zap.Logger, clientIO *pnet
auth.attrs = resp.Attrs

// In case of testing, backendIO is passed manually that we don't want to bother with the routing logic.
backendIO, err := getBackendIO(auth, auth, resp)
backendIO, err := getBackendIO(auth, auth, resp, 5*time.Second)
if err != nil {
return err
}
Expand Down
53 changes: 31 additions & 22 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,40 +153,49 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, clientIO *pnet.Packe
return nil
}

func (mgr *BackendConnManager) getBackendIO(ctx ConnContext, auth *Authenticator, resp *pnet.HandshakeResp) (*pnet.PacketIO, error) {
func (mgr *BackendConnManager) getBackendIO(ctx ConnContext, auth *Authenticator, resp *pnet.HandshakeResp, timeout time.Duration) (*pnet.PacketIO, error) {
r, err := mgr.handshakeHandler.GetRouter(auth, resp)
if err != nil {
return nil, err
}
// wait for initialize
bctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
addr, err := backoff.RetryNotifyWithData(
func() (string, error) {
addr, err := r.Route(mgr)
if !errors.Is(err, router.ErrNoInstanceToSelect) {
return addr, backoff.Permanent(err)
// Reasons to wait:
// - The TiDB instances may not be initialized yet
// - One TiDB may be just shut down and another is just started but not ready yet
bctx, cancel := context.WithTimeout(context.Background(), timeout)
selector := r.GetBackendSelector()
io, err := backoff.RetryNotifyWithData(
func() (*pnet.PacketIO, error) {
// Try to connect to all backup backends one by one.
selector.Reset()
for {
addr := selector.Next()
if len(addr) == 0 {
return nil, router.ErrNoInstanceToSelect
}
backendConn := NewBackendConnection(addr)
err := backendConn.Connect()
mgr.handshakeHandler.OnHandshake(auth, addr, err)
if err == nil {
if err = selector.Succeed(mgr); err == nil {
mgr.logger.Info("connected to backend", zap.String("addr", addr))
mgr.backendConn = backendConn
auth.serverAddr = addr
return mgr.backendConn.PacketIO(), nil
}
// Bad luck: the backend has been recycled or shut down just after the selector returns it.
if ignoredErr := backendConn.Close(); ignoredErr != nil {
mgr.logger.Error("close backend connection failed", zap.String("addr", addr), zap.Error(ignoredErr))
}
}
}
return addr, err
},
backoff.WithContext(backoff.NewConstantBackOff(200*time.Millisecond), bctx),
func(err error, d time.Duration) {
mgr.handshakeHandler.OnHandshake(auth, "", err)
},
)
cancel()
if err != nil {
return nil, err
}

mgr.logger.Info("found", zap.String("addr", addr))
mgr.backendConn = NewBackendConnection(addr)
if err := mgr.backendConn.Connect(); err != nil {
mgr.handshakeHandler.OnHandshake(auth, addr, err)
return nil, err
}

auth.serverAddr = addr
return mgr.backendConn.PacketIO(), nil
return io, err
}

// ExecuteCmd forwards messages between the client and the backend.
Expand Down
Loading

0 comments on commit 66e47e3

Please sign in to comment.