Skip to content

Commit

Permalink
server, backend: graceful shutdown (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Dec 26, 2022
1 parent 2c9d13a commit f725184
Show file tree
Hide file tree
Showing 11 changed files with 337 additions and 28 deletions.
1 change: 1 addition & 0 deletions conf/proxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ proxy:
pd-addrs: "127.0.0.1:2379"
# require-backend-tls: true
# proxy-protocol: "v2"
# graceful-wait-before-shutdown: 10
metrics:
api:
addr: "0.0.0.0:3080"
Expand Down
7 changes: 4 additions & 3 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ type Metrics struct {
}

type ProxyServerOnline struct {
MaxConnections uint64 `yaml:"max-connections,omitempty" toml:"max-connections,omitempty" json:"max-connections,omitempty"`
TCPKeepAlive bool `yaml:"tcp-keep-alive,omitempty" toml:"tcp-keep-alive,omitempty" json:"tcp-keep-alive,omitempty"`
ProxyProtocol string `yaml:"proxy-protocol,omitempty" toml:"proxy-protocol,omitempty" json:"proxy-protocol,omitempty"`
MaxConnections uint64 `yaml:"max-connections,omitempty" toml:"max-connections,omitempty" json:"max-connections,omitempty"`
TCPKeepAlive bool `yaml:"tcp-keep-alive,omitempty" toml:"tcp-keep-alive,omitempty" json:"tcp-keep-alive,omitempty"`
ProxyProtocol string `yaml:"proxy-protocol,omitempty" toml:"proxy-protocol,omitempty" json:"proxy-protocol,omitempty"`
GracefulWaitBeforeShutdown int `yaml:"graceful-wait-before-shutdown,omitempty" toml:"graceful-wait-before-shutdown,omitempty" json:"graceful-wait-before-shutdown,omitempty"`
}

type ProxyServer struct {
Expand Down
7 changes: 4 additions & 3 deletions lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ var testProxyConfig = Config{
PDAddrs: "127.0.0.1:4089",
RequireBackendTLS: true,
ProxyServerOnline: ProxyServerOnline{
MaxConnections: 1,
TCPKeepAlive: true,
ProxyProtocol: "v2",
MaxConnections: 1,
TCPKeepAlive: true,
ProxyProtocol: "v2",
GracefulWaitBeforeShutdown: 10,
},
},
API: API{
Expand Down
96 changes: 79 additions & 17 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ const (
currentDBKey = "current-db"
)

type signalType int

const (
signalTypeRedirect signalType = iota
signalTypeGracefulClose
signalTypeNums
)

type signalRedirect struct {
newAddr string
}
Expand All @@ -59,6 +67,14 @@ type redirectResult struct {
to string
}

const (
statusConnected int32 = iota
statusHandshaked
statusNotifyClose // notified to graceful close
statusClosing // really closing
statusClosed
)

type backendIOGetter func(ctx ConnContext, auth *Authenticator, resp *pnet.HandshakeResp) (*pnet.PacketIO, error)

// BackendConnManager migrates a session from one BackendConnection to another.
Expand All @@ -75,7 +91,7 @@ type BackendConnManager struct {
processLock sync.Mutex
wg waitgroup.WaitGroup
// signalReceived is used to notify the signal processing goroutine.
signalReceived chan struct{}
signalReceived chan signalType
authenticator *Authenticator
cmdProcessor *CmdProcessor
eventReceiver unsafe.Pointer
Expand All @@ -85,6 +101,7 @@ type BackendConnManager struct {
signal unsafe.Pointer
// redirectResCh is used to notify the event receiver asynchronously.
redirectResCh chan *redirectResult
connStatus atomic.Int32
// cancelFunc is used to cancel the signal processing goroutine.
cancelFunc context.CancelFunc
backendConn *BackendConnection
Expand All @@ -107,7 +124,8 @@ func NewBackendConnManager(logger *zap.Logger, handshakeHandler HandshakeHandler
requireBackendTLS: requireBackendTLS,
salt: GenerateSalt(20),
},
signalReceived: make(chan struct{}, 1),
// There are 2 types of signals, which may be sent concurrently.
signalReceived: make(chan signalType, signalTypeNums),
redirectResCh: make(chan *redirectResult, 1),
}
mgr.getBackendIO = func(ctx ConnContext, auth *Authenticator, resp *pnet.HandshakeResp) (*pnet.PacketIO, error) {
Expand Down Expand Up @@ -142,6 +160,7 @@ func NewBackendConnManager(logger *zap.Logger, handshakeHandler HandshakeHandler
return nil, err
}

mgr.connStatus.Store(statusConnected)
auth.serverAddr = addr
backendIO := mgr.backendConn.PacketIO()
return backendIO, nil
Expand Down Expand Up @@ -172,6 +191,7 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, clientIO *pnet.Packe
return err
}

mgr.connStatus.Store(statusHandshaked)
mgr.cmdProcessor.capability = mgr.authenticator.capability
childCtx, cancelFunc := context.WithCancel(ctx)
mgr.cancelFunc = cancelFunc
Expand All @@ -192,6 +212,10 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte, c
mgr.processLock.Lock()
defer mgr.processLock.Unlock()

switch mgr.connStatus.Load() {
case statusClosing, statusClosed:
return nil
}
waitingRedirect := atomic.LoadPointer(&mgr.signal) != nil
holdRequest, err := mgr.cmdProcessor.executeCmd(request, clientIO, mgr.backendConn.PacketIO(), waitingRedirect)
if !holdRequest {
Expand Down Expand Up @@ -227,15 +251,19 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte, c
}
}
// Even if it meets an MySQL error, it may have changed the status, such as when executing multi-statements.
if waitingRedirect && mgr.cmdProcessor.canRedirect() {
mgr.tryRedirect(ctx, clientIO)
// Execute the held request no matter redirection succeeds or not.
if holdRequest {
if mgr.cmdProcessor.finishedTxn() {
if waitingRedirect && holdRequest {
mgr.tryRedirect(ctx, clientIO)
// Execute the held request no matter redirection succeeds or not.
_, err = mgr.cmdProcessor.executeCmd(request, clientIO, mgr.backendConn.PacketIO(), false)
addCmdMetrics(cmd, mgr.backendConn.Addr(), startTime)
}
if err != nil && !IsMySQLError(err) {
return err
if err != nil && !IsMySQLError(err) {
return err
}
} else if mgr.connStatus.Load() == statusNotifyClose {
mgr.tryGracefulClose(ctx, clientIO)
} else if waitingRedirect {
mgr.tryRedirect(ctx, clientIO)
}
}
// Ignore MySQL errors, only return unexpected errors.
Expand Down Expand Up @@ -284,10 +312,15 @@ func (mgr *BackendConnManager) querySessionStates() (sessionStates, sessionToken
func (mgr *BackendConnManager) processSignals(ctx context.Context, clientIO *pnet.PacketIO) {
for {
select {
case <-mgr.signalReceived:
// Redirect the session immediately just in case the session is idle.
case s := <-mgr.signalReceived:
// Redirect the session immediately just in case the session is finishedTxn.
mgr.processLock.Lock()
mgr.tryRedirect(ctx, clientIO)
switch s {
case signalTypeGracefulClose:
mgr.tryGracefulClose(ctx, clientIO)
case signalTypeRedirect:
mgr.tryRedirect(ctx, clientIO)
}
mgr.processLock.Unlock()
case rs := <-mgr.redirectResCh:
mgr.notifyRedirectResult(ctx, rs)
Expand All @@ -300,11 +333,15 @@ func (mgr *BackendConnManager) processSignals(ctx context.Context, clientIO *pne
// tryRedirect tries to migrate the session if the session is redirect-able.
// NOTE: processLock should be held before calling this function.
func (mgr *BackendConnManager) tryRedirect(ctx context.Context, clientIO *pnet.PacketIO) {
switch mgr.connStatus.Load() {
case statusNotifyClose, statusClosing, statusClosed:
return
}
signal := (*signalRedirect)(atomic.LoadPointer(&mgr.signal))
if signal == nil {
return
}
if !mgr.cmdProcessor.canRedirect() {
if !mgr.cmdProcessor.finishedTxn() {
return
}

Expand Down Expand Up @@ -371,14 +408,17 @@ func (mgr *BackendConnManager) updateAuthInfoFromSessionStates(sessionStates []b
// Redirect implements RedirectableConn.Redirect interface. It redirects the current session to the newAddr.
// Note that the caller requires the function to be non-blocking.
func (mgr *BackendConnManager) Redirect(newAddr string) {
// We do not use `chan signalRedirect` to avoid blocking. We cannot discard the signal when it blocks,
// because only the latest signal matters.
// NOTE: BackendConnManager may be closing concurrently because of no lock.
// The eventReceiver may read the new address even after BackendConnManager is closed.
atomic.StorePointer(&mgr.signal, unsafe.Pointer(&signalRedirect{
newAddr: newAddr,
}))
// Generally, it won't wait because the caller won't send another signal because the last one finishes.
mgr.signalReceived <- struct{}{}
switch mgr.connStatus.Load() {
case statusNotifyClose, statusClosing, statusClosed:
return
}
// Generally, it won't wait because the caller won't send another signal before the previous one finishes.
mgr.signalReceived <- signalTypeRedirect
}

// GetRedirectingAddr implements RedirectableConn.GetRedirectingAddr interface.
Expand Down Expand Up @@ -410,8 +450,29 @@ func (mgr *BackendConnManager) notifyRedirectResult(ctx context.Context, rs *red
}
}

// GracefulClose waits for the end of the transaction and closes the session.
func (mgr *BackendConnManager) GracefulClose() {
mgr.connStatus.Store(statusNotifyClose)
mgr.signalReceived <- signalTypeGracefulClose
}

func (mgr *BackendConnManager) tryGracefulClose(ctx context.Context, clientIO *pnet.PacketIO) {
if mgr.connStatus.Load() != statusNotifyClose {
return
}
if !mgr.cmdProcessor.finishedTxn() {
return
}
// Closing clientIO will cause the whole connection to be closed.
if err := clientIO.GracefulClose(); err != nil {
mgr.logger.Warn("graceful close client IO error", zap.Stringer("addr", clientIO.SourceAddr()), zap.Error(err))
}
mgr.connStatus.Store(statusClosing)
}

// Close releases all resources.
func (mgr *BackendConnManager) Close() error {
mgr.connStatus.Store(statusClosing)
if mgr.cancelFunc != nil {
mgr.cancelFunc()
mgr.cancelFunc = nil
Expand Down Expand Up @@ -443,5 +504,6 @@ func (mgr *BackendConnManager) Close() error {
}
}
}
mgr.connStatus.Store(statusClosed)
return errors.Collect(ErrCloseConnMgr, connErr, handErr)
}
77 changes: 76 additions & 1 deletion pkg/proxy/backend/backend_conn_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/TiProxy/lib/util/errors"
"github.com/pingcap/TiProxy/lib/util/waitgroup"
"github.com/pingcap/TiProxy/pkg/manager/router"
pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
Expand Down Expand Up @@ -192,7 +194,7 @@ func (ts *backendMgrTester) checkNotRedirected4Proxy(clientIO, backendIO *pnet.P
// The buffer size of channel signalReceived is 0, so after the second redirect signal is sent,
// we can ensure that the first signal is already processed.
ts.mp.Redirect(ts.tc.backendListener.Addr().String())
ts.mp.signalReceived <- struct{}{}
ts.mp.signalReceived <- signalTypeRedirect
// The backend connection is still the same.
require.Equal(ts.t, backend1, ts.mp.backendConn)
return nil
Expand All @@ -217,6 +219,17 @@ func (ts *backendMgrTester) redirectFail4Proxy(clientIO, backendIO *pnet.PacketI
return nil
}

func (ts *backendMgrTester) checkConnClosed(_, _ *pnet.PacketIO) error {
for i := 0; i < 30; i++ {
switch ts.mp.connStatus.Load() {
case statusClosing, statusClosed:
return nil
}
time.Sleep(100 * time.Millisecond)
}
return errors.New("timeout")
}

func (ts *backendMgrTester) runTests(runners []runner) {
for _, runner := range runners {
ts.runAndCheck(ts.t, nil, runner.client, runner.backend, runner.proxy)
Expand Down Expand Up @@ -593,3 +606,65 @@ func TestCustomHandshake(t *testing.T) {
}
ts.runTests(runners)
}

func TestGracefulCloseWhenIdle(t *testing.T) {
ts := newBackendMgrTester(t)
runners := []runner{
// 1st handshake
{
client: ts.mc.authenticate,
proxy: ts.firstHandshake4Proxy,
backend: ts.handshake4Backend,
},
// graceful close
{
proxy: func(_, _ *pnet.PacketIO) error {
ts.mp.GracefulClose()
return nil
},
},
// really closed
{
proxy: ts.checkConnClosed,
},
}
ts.runTests(runners)
}

func TestGracefulCloseWhenActive(t *testing.T) {
ts := newBackendMgrTester(t)
runners := []runner{
// 1st handshake
{
client: ts.mc.authenticate,
proxy: ts.firstHandshake4Proxy,
backend: ts.handshake4Backend,
},
// start a transaction to make it active
{
client: ts.mc.request,
proxy: ts.forwardCmd4Proxy,
backend: ts.startTxn4Backend,
},
// try to gracefully close but it doesn't close
{
proxy: func(_, _ *pnet.PacketIO) error {
ts.mp.GracefulClose()
time.Sleep(300 * time.Millisecond)
require.Equal(t, statusNotifyClose, ts.mp.connStatus.Load())
return nil
},
},
// finish the transaction
{
client: ts.mc.request,
proxy: ts.forwardCmd4Proxy,
backend: ts.respondWithNoTxn4Backend,
},
// it will then automatically close
{
proxy: ts.checkConnClosed,
},
}
ts.runTests(runners)
}
2 changes: 1 addition & 1 deletion pkg/proxy/backend/cmd_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (cp *CmdProcessor) updatePrepStmtStatus(request []byte, serverStatus uint16
}
}

func (cp *CmdProcessor) canRedirect() bool {
func (cp *CmdProcessor) finishedTxn() bool {
if cp.serverStatus&(StatusInTrans|StatusQuit) > 0 {
return false
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/proxy/backend/cmd_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func TestPreparedStmts(t *testing.T) {
for _, test := range tests {
ts, clean := newTestSuite(t, tc)
c := func(t *testing.T, ts *testSuite) {
require.Equal(t, test.canRedirect, ts.mp.cmdProcessor.canRedirect())
require.Equal(t, test.canRedirect, ts.mp.cmdProcessor.finishedTxn())
}
ts.executeMultiCmd(t, test.cfgs, c)
clean()
Expand Down Expand Up @@ -708,7 +708,7 @@ func TestTxnStatus(t *testing.T) {
for _, test := range tests {
ts, clean := newTestSuite(t, tc)
c := func(t *testing.T, ts *testSuite) {
require.Equal(t, test.canRedirect, ts.mp.cmdProcessor.canRedirect())
require.Equal(t, test.canRedirect, ts.mp.cmdProcessor.finishedTxn())
}
ts.executeMultiCmd(t, test.cfgs, c)
clean()
Expand Down Expand Up @@ -820,7 +820,7 @@ func TestMixPrepAndTxnStatus(t *testing.T) {
for _, test := range tests {
ts, clean := newTestSuite(t, tc)
c := func(t *testing.T, ts *testSuite) {
require.Equal(t, test.canRedirect, ts.mp.cmdProcessor.canRedirect())
require.Equal(t, test.canRedirect, ts.mp.cmdProcessor.finishedTxn())
}
ts.executeMultiCmd(t, test.cfgs, c)
clean()
Expand Down
Loading

0 comments on commit f725184

Please sign in to comment.