Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend, router: fix errors when closing and redirecting concurrently #72

Merged
merged 2 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/manager/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewNamespaceManager() *NamespaceManager {
}
func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace, client *clientv3.Client) (*Namespace, error) {
logger := mgr.logger.With(zap.String("namespace", cfg.Namespace))
rt, err := router.NewRandomRouter(&cfg.Backend, client)
rt, err := router.NewScoreBasedRouter(&cfg.Backend, client)
if err != nil {
return nil, errors.Errorf("build router error: %w", err)
}
Expand Down
139 changes: 84 additions & 55 deletions pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.uber.org/zap"
)

// Router routes client connections to backends.
type Router interface {
Route(RedirectableConn) (string, error)
RedirectConnections() error
Expand All @@ -50,45 +51,55 @@ const (
rebalanceMaxScoreRatio = 1.1
)

// ConnEventReceiver receives connection events.
type ConnEventReceiver interface {
OnRedirectSucceed(from, to string, conn RedirectableConn)
OnRedirectFail(from, to string, conn RedirectableConn)
OnConnClosed(addr string, conn RedirectableConn)
}

// RedirectableConn indicates a redirect-able connection.
type RedirectableConn interface {
SetEventReceiver(receiver ConnEventReceiver)
Redirect(addr string)
GetRedirectingAddr() string
ConnectionID() uint64
}

type BackendWrapper struct {
// backendWrapper contains the connections on the backend.
type backendWrapper struct {
status BackendStatus
addr string
// A list of *ConnWrapper and is ordered by the connecting or redirecting time.
// A list of *connWrapper and is ordered by the connecting or redirecting time.
// connList and connMap include moving out connections but not moving in connections.
connList *list.List
connMap map[uint64]*list.Element
}

func (b *BackendWrapper) score() int {
// score calculates the score of the backend. Larger score indicates higher load.
func (b *backendWrapper) score() int {
return b.status.ToScore() + b.connList.Len()
}

type ConnWrapper struct {
// connWrapper wraps RedirectableConn.
type connWrapper struct {
RedirectableConn
phase int
}

type RandomRouter struct {
// ScoreBasedRouter is an implementation of Router interface.
// It routes a connection based on score.
type ScoreBasedRouter struct {
sync.Mutex
observer *BackendObserver
cancelFunc context.CancelFunc
// A list of *BackendWrapper and ordered by the score of the backends.
// A list of *backendWrapper. The backends are in descending order of scores.
backends *list.List
}

func NewRandomRouter(cfg *config.BackendNamespace, client *clientv3.Client) (*RandomRouter, error) {
router := &RandomRouter{
// NewScoreBasedRouter creates a ScoreBasedRouter.
func NewScoreBasedRouter(cfg *config.BackendNamespace, client *clientv3.Client) (*ScoreBasedRouter, error) {
router := &ScoreBasedRouter{
backends: list.New(),
}
router.Lock()
Expand All @@ -104,19 +115,20 @@ func NewRandomRouter(cfg *config.BackendNamespace, client *clientv3.Client) (*Ra
return router, err
}

func (router *RandomRouter) Route(conn RedirectableConn) (string, error) {
// Route implements Router.Route interface.
func (router *ScoreBasedRouter) Route(conn RedirectableConn) (string, error) {
router.Lock()
defer router.Unlock()
be := router.backends.Back()
if be == nil {
return "", ErrNoInstanceToSelect
}
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
switch backend.status {
case StatusCannotConnect, StatusSchemaOutdated:
return "", ErrNoInstanceToSelect
}
connWrapper := &ConnWrapper{
connWrapper := &connWrapper{
RedirectableConn: conn,
phase: phaseNotRedirected,
}
Expand All @@ -125,27 +137,28 @@ func (router *RandomRouter) Route(conn RedirectableConn) (string, error) {
return backend.addr, nil
}

func (router *RandomRouter) removeConn(be *list.Element, ce *list.Element) {
backend := be.Value.(*BackendWrapper)
conn := ce.Value.(*ConnWrapper)
func (router *ScoreBasedRouter) removeConn(be *list.Element, ce *list.Element) {
backend := be.Value.(*backendWrapper)
conn := ce.Value.(*connWrapper)
backend.connList.Remove(ce)
delete(backend.connMap, conn.ConnectionID())
router.adjustBackendList(be)
}

func (router *RandomRouter) addConn(be *list.Element, conn *ConnWrapper) {
backend := be.Value.(*BackendWrapper)
func (router *ScoreBasedRouter) addConn(be *list.Element, conn *connWrapper) {
backend := be.Value.(*backendWrapper)
ce := backend.connList.PushBack(conn)
backend.connMap[conn.ConnectionID()] = ce
router.adjustBackendList(be)
}

func (router *RandomRouter) adjustBackendList(be *list.Element) {
backend := be.Value.(*BackendWrapper)
// adjustBackendList moves `be` after the score of `be` changes to keep the list ordered.
func (router *ScoreBasedRouter) adjustBackendList(be *list.Element) {
backend := be.Value.(*backendWrapper)
curScore := backend.score()
var mark *list.Element
for ele := be.Prev(); ele != nil; ele = ele.Prev() {
b := ele.Value.(*BackendWrapper)
b := ele.Value.(*backendWrapper)
if b.score() >= curScore {
break
}
Expand All @@ -156,7 +169,7 @@ func (router *RandomRouter) adjustBackendList(be *list.Element) {
return
}
for ele := be.Next(); ele != nil; ele = ele.Next() {
b := ele.Value.(*BackendWrapper)
b := ele.Value.(*backendWrapper)
if b.score() <= curScore {
break
}
Expand All @@ -167,14 +180,16 @@ func (router *RandomRouter) adjustBackendList(be *list.Element) {
}
}

func (router *RandomRouter) RedirectConnections() error {
// RedirectConnections implements Router.RedirectConnections interface.
// It redirects all connections compulsively. It's only used for testing.
func (router *ScoreBasedRouter) RedirectConnections() error {
router.Lock()
defer router.Unlock()
for be := router.backends.Front(); be != nil; be = be.Next() {
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
for ce := backend.connList.Front(); ce != nil; ce = ce.Next() {
// This is only for test, so we allow it to reconnect to the same backend.
connWrapper := ce.Value.(*ConnWrapper)
connWrapper := ce.Value.(*connWrapper)
if connWrapper.phase != phaseRedirectNotify {
connWrapper.phase = phaseRedirectNotify
connWrapper.Redirect(backend.addr)
Expand All @@ -184,17 +199,18 @@ func (router *RandomRouter) RedirectConnections() error {
return nil
}

func (router *RandomRouter) lookupBackend(addr string, forward bool) *list.Element {
// forward is a hint to speed up searching.
func (router *ScoreBasedRouter) lookupBackend(addr string, forward bool) *list.Element {
if forward {
for be := router.backends.Front(); be != nil; be = be.Next() {
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
if backend.addr == addr {
return be
}
}
} else {
for be := router.backends.Back(); be != nil; be = be.Prev() {
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
if backend.addr == addr {
return be
}
Expand All @@ -203,36 +219,40 @@ func (router *RandomRouter) lookupBackend(addr string, forward bool) *list.Eleme
return nil
}

func (router *RandomRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) {
// OnRedirectSucceed implements ConnEventReceiver.OnRedirectSucceed interface.
func (router *ScoreBasedRouter) OnRedirectSucceed(from, to string, conn RedirectableConn) {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
if be == nil {
// impossible here
logutil.BgLogger().Error("backend not found in the backend", zap.String("addr", to))
return
}
toBackend := be.Value.(*BackendWrapper)
toBackend := be.Value.(*backendWrapper)
e, ok := toBackend.connMap[conn.ConnectionID()]
if !ok {
// impossible here
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", to),
zap.Uint64("conn", conn.ConnectionID()))
return
}
connWrapper := e.Value.(*ConnWrapper)
connWrapper := e.Value.(*connWrapper)
connWrapper.phase = phaseRedirectEnd
}

func (router *RandomRouter) OnRedirectFail(from, to string, conn RedirectableConn) {
// OnRedirectFail implements ConnEventReceiver.OnRedirectFail interface.
func (router *ScoreBasedRouter) OnRedirectFail(from, to string, conn RedirectableConn) {
router.Lock()
defer router.Unlock()
be := router.lookupBackend(to, false)
if be == nil {
// impossible here
logutil.BgLogger().Error("backend not found in the backend", zap.String("addr", to))
return
}
toBackend := be.Value.(*BackendWrapper)
toBackend := be.Value.(*backendWrapper)
ce, ok := toBackend.connMap[conn.ConnectionID()]
if !ok {
// impossible here
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", to),
zap.Uint64("conn", conn.ConnectionID()))
return
}
router.removeConn(be, ce)
Expand All @@ -242,46 +262,54 @@ func (router *RandomRouter) OnRedirectFail(from, to string, conn RedirectableCon
if be == nil {
return
}
connWrapper := ce.Value.(*ConnWrapper)
connWrapper := ce.Value.(*connWrapper)
connWrapper.phase = phaseRedirectFail
router.addConn(be, connWrapper)
}

func (router *RandomRouter) OnConnClosed(addr string, conn RedirectableConn) {
connID := conn.ConnectionID()
// OnConnClosed implements ConnEventReceiver.OnConnClosed interface.
func (router *ScoreBasedRouter) OnConnClosed(addr string, conn RedirectableConn) {
router.Lock()
defer router.Unlock()
// Get the redirecting address in the lock, rather than letting the connection pass it in.
// While the connection closes, the router may also send a new redirection signal concurrently
// and move it to another backendWrapper.
if toAddr := conn.GetRedirectingAddr(); len(toAddr) > 0 {
addr = toAddr
}
be := router.lookupBackend(addr, true)
if be != nil {
// impossible here
if be == nil {
logutil.BgLogger().Error("backend not found in the router", zap.String("addr", addr))
return
}
backend := be.Value.(*BackendWrapper)
ce, ok := backend.connMap[connID]
backend := be.Value.(*backendWrapper)
ce, ok := backend.connMap[conn.ConnectionID()]
if !ok {
// impossible here
logutil.BgLogger().Error("connection not found in the backend", zap.String("addr", addr),
zap.Uint64("conn", conn.ConnectionID()))
return
}
router.removeConn(be, ce)
router.removeBackendIfEmpty(be)
}

func (router *RandomRouter) OnBackendChanged(backends map[string]BackendStatus) {
// OnBackendChanged implements BackendEventReceiver.OnBackendChanged interface.
func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]BackendStatus) {
router.Lock()
defer router.Unlock()
for addr, status := range backends {
be := router.lookupBackend(addr, true)
if be == nil {
logutil.BgLogger().Info("find new backend", zap.String("url", addr),
zap.String("status", status.String()))
be = router.backends.PushBack(&BackendWrapper{
be = router.backends.PushBack(&backendWrapper{
status: status,
addr: addr,
connList: list.New(),
connMap: make(map[uint64]*list.Element),
})
} else {
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
logutil.BgLogger().Info("update backend", zap.String("url", addr),
zap.String("prev_status", backend.status.String()), zap.String("cur_status", status.String()))
backend.status = status
Expand All @@ -291,7 +319,7 @@ func (router *RandomRouter) OnBackendChanged(backends map[string]BackendStatus)
}
}

func (router *RandomRouter) rebalanceLoop(ctx context.Context) {
func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) {
for {
router.rebalance(rebalanceConnsPerLoop)
select {
Expand All @@ -302,13 +330,13 @@ func (router *RandomRouter) rebalanceLoop(ctx context.Context) {
}
}

func (router *RandomRouter) rebalance(maxNum int) {
func (router *ScoreBasedRouter) rebalance(maxNum int) {
router.Lock()
defer router.Unlock()
for i := 0; i < maxNum; i++ {
var busiestEle *list.Element
for be := router.backends.Front(); be != nil; be = be.Next() {
backend := be.Value.(*BackendWrapper)
backend := be.Value.(*backendWrapper)
if backend.connList.Len() > 0 {
busiestEle = be
break
Expand All @@ -317,29 +345,30 @@ func (router *RandomRouter) rebalance(maxNum int) {
if busiestEle == nil {
break
}
busiestBackend := busiestEle.Value.(*BackendWrapper)
busiestBackend := busiestEle.Value.(*backendWrapper)
idlestEle := router.backends.Back()
idlestBackend := idlestEle.Value.(*BackendWrapper)
idlestBackend := idlestEle.Value.(*backendWrapper)
if float64(busiestBackend.score())/float64(idlestBackend.score()+1) <= rebalanceMaxScoreRatio {
break
}
ce := busiestBackend.connList.Front()
router.removeConn(busiestEle, ce)
conn := ce.Value.(*ConnWrapper)
conn := ce.Value.(*connWrapper)
conn.phase = phaseRedirectNotify
router.addConn(idlestEle, conn)
conn.Redirect(idlestBackend.addr)
}
}

func (router *RandomRouter) removeBackendIfEmpty(be *list.Element) {
backend := be.Value.(*BackendWrapper)
func (router *ScoreBasedRouter) removeBackendIfEmpty(be *list.Element) {
backend := be.Value.(*backendWrapper)
if backend.status == StatusCannotConnect && backend.connList.Len() == 0 {
router.backends.Remove(be)
}
}

func (router *RandomRouter) Close() {
// Close implements Router.Close interface.
func (router *ScoreBasedRouter) Close() {
router.Lock()
defer router.Unlock()
if router.cancelFunc != nil {
Expand Down
Loading