Skip to content

Commit

Permalink
backend: use atomic.Pointer for *signalRedirect (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox authored May 18, 2023
1 parent 04120b6 commit 1538fcd
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 17 deletions.
3 changes: 2 additions & 1 deletion pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ type RedirectableConn interface {
SetEventReceiver(receiver ConnEventReceiver)
SetValue(key, val any)
Value(key any) any
Redirect(addr string)
// Redirect returns false if the current conn is not redirectable.
Redirect(addr string) bool
NotifyBackendStatus(status BackendStatus)
ConnectionID() uint64
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ func (router *ScoreBasedRouter) RedirectConnections() error {
connWrapper := ce.Value
if connWrapper.phase != phaseRedirectNotify {
connWrapper.phase = phaseRedirectNotify
connWrapper.Redirect(backend.addr)
// we dont care the results
_ = connWrapper.Redirect(backend.addr)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ func (conn *mockRedirectableConn) Value(k any) any {
return v
}

func (conn *mockRedirectableConn) Redirect(addr string) {
func (conn *mockRedirectableConn) Redirect(addr string) bool {
conn.Lock()
require.Len(conn.t, conn.to, 0)
conn.to = addr
conn.Unlock()
return true
}

func (conn *mockRedirectableConn) GetRedirectingAddr() string {
Expand Down
19 changes: 8 additions & 11 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,8 @@ type BackendConnManager struct {
eventReceiver unsafe.Pointer
config *BCConfig
logger *zap.Logger
// type *signalRedirect, it saves the last signal if there are multiple signals.
// It will be set to nil after migration.
signal unsafe.Pointer
redirectInfo atomic.Pointer[signalRedirect]
// redirectResCh is used to notify the event receiver asynchronously.
redirectResCh chan *redirectResult
closeStatus atomic.Int32
Expand Down Expand Up @@ -282,7 +281,7 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (
return
}
defer mgr.resetCheckBackendTicker()
waitingRedirect := atomic.LoadPointer(&mgr.signal) != nil
waitingRedirect := mgr.redirectInfo.Load() != nil
var holdRequest bool
holdRequest, err = mgr.cmdProcessor.executeCmd(request, mgr.clientIO, mgr.backendIO.Load(), waitingRedirect)
if !holdRequest {
Expand Down Expand Up @@ -409,7 +408,7 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
case statusNotifyClose, statusClosing, statusClosed:
return
}
signal := (*signalRedirect)(atomic.LoadPointer(&mgr.signal))
signal := mgr.redirectInfo.Load()
if signal == nil {
return
}
Expand All @@ -423,7 +422,7 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
}
defer func() {
// The `mgr` won't be notified again before it calls `OnRedirectSucceed`, so simply `StorePointer` is also fine.
atomic.CompareAndSwapPointer(&mgr.signal, unsafe.Pointer(signal), nil)
mgr.redirectInfo.Store(nil)
// Notifying may block. Notify the receiver asynchronously to:
// - Reduce the latency of session migration
// - Avoid the risk of deadlock
Expand Down Expand Up @@ -493,18 +492,16 @@ func (mgr *BackendConnManager) updateAuthInfoFromSessionStates(sessionStates []b

// Redirect implements RedirectableConn.Redirect interface. It redirects the current session to the newAddr.
// Note that the caller requires the function to be non-blocking.
func (mgr *BackendConnManager) Redirect(newAddr string) {
func (mgr *BackendConnManager) Redirect(newAddr string) bool {
// NOTE: BackendConnManager may be closing concurrently because of no lock.
// The eventReceiver may read the new address even after BackendConnManager is closed.
atomic.StorePointer(&mgr.signal, unsafe.Pointer(&signalRedirect{
newAddr: newAddr,
}))
switch mgr.closeStatus.Load() {
case statusNotifyClose, statusClosing, statusClosed:
return
return false
}
mgr.redirectInfo.Store(&signalRedirect{newAddr: newAddr})
// Generally, it won't wait because the caller won't send another signal before the previous one finishes.
mgr.signalReceived <- signalTypeRedirect
return true
}

func (mgr *BackendConnManager) notifyRedirectResult(ctx context.Context, rs *redirectResult) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/proxy/backend/backend_conn_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"net"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -179,8 +178,8 @@ func (ts *backendMgrTester) startTxn4Backend(packetIO *pnet.PacketIO) error {
}

func (ts *backendMgrTester) checkNotRedirected4Proxy(clientIO, backendIO *pnet.PacketIO) error {
signal := (*signalRedirect)(atomic.LoadPointer(&ts.mp.signal))
require.Nil(ts.t, signal)
redirInfo := ts.mp.redirectInfo.Load()
require.Nil(ts.t, redirInfo)
backend1 := ts.mp.backendIO.Load()
// There is no other way to verify it's not redirected.
// The buffer size of channel signalReceived is 0, so after the second redirect signal is sent,
Expand Down

0 comments on commit 1538fcd

Please sign in to comment.