Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy, config: add config graceful-close-conn-timeout #400

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions conf/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@
# "v2" => accept proxy protocol if any, require backends to support proxy protocol.
# proxy-protocol = ""

# graceful-wait-before-shutdown is recommanded to be set to 0 when there's no other proxy(e.g. NLB) between the client and TiProxy.
# possible values:
# 0 => disable graceful shutdown.
# 30 => graceful shutdown waiting time in 30 seconds.
# 0 => begin to drain clients immediately.
# 30 => HTTP status returns unhealthy and the SQL port accepts new connections for the last 30 seconds. After that, refuse new connections and drain clients.
# graceful-wait-before-shutdown = 0

# graceful-close-conn-timeout is recommanded to be set longer than the lifecycle of a transaction.
# possible values:
# 0 => force closing connections immediately.
# 15 => close connections when they have finished current transactions (AKA drain clients). After 15s, force closing all the connections.

graceful-close-conn-timeout = 15

# possible values:
# "" => enable static routing.
# "pd-addr:pd-port" => automatically tidb discovery.
Expand Down
2 changes: 2 additions & 0 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type ProxyServerOnline struct {
BackendUnhealthyKeepalive KeepAlive `yaml:"backend-unhealthy-keepalive" toml:"backend-unhealthy-keepalive" json:"backend-unhealthy-keepalive"`
ProxyProtocol string `yaml:"proxy-protocol,omitempty" toml:"proxy-protocol,omitempty" json:"proxy-protocol,omitempty"`
GracefulWaitBeforeShutdown int `yaml:"graceful-wait-before-shutdown,omitempty" toml:"graceful-wait-before-shutdown,omitempty" json:"graceful-wait-before-shutdown,omitempty"`
GracefulCloseConnTimeout int `yaml:"graceful-close-conn-timeout,omitempty" toml:"graceful-close-conn-timeout,omitempty" json:"graceful-close-conn-timeout,omitempty"`
}

type ProxyServer struct {
Expand Down Expand Up @@ -140,6 +141,7 @@ func NewConfig() *Config {
cfg.Proxy.FrontendKeepalive, cfg.Proxy.BackendHealthyKeepalive, cfg.Proxy.BackendUnhealthyKeepalive = DefaultKeepAlive()
cfg.Proxy.RequireBackendTLS = true
cfg.Proxy.PDAddrs = "127.0.0.1:2379"
cfg.Proxy.GracefulCloseConnTimeout = 15

cfg.API.Addr = "0.0.0.0:3080"

Expand Down
61 changes: 41 additions & 20 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ import (
"go.uber.org/zap"
)

type serverStatus int

const (
// statusNormal: normal status
statusNormal serverStatus = iota
// statusWaitShutdown: during graceful-wait-before-shutdown
statusWaitShutdown
// statusDrainClient: during graceful-close-conn-timeout
statusDrainClient
)

type serverState struct {
sync.RWMutex
healthyKeepAlive config.KeepAlive
Expand All @@ -34,8 +45,9 @@ type serverState struct {
requireBackendTLS bool
tcpKeepAlive bool
proxyProtocol bool
gracefulWait int
inShutdown bool
gracefulWait int // graceful-wait-before-shutdown
gracefulClose int // graceful-close-conn-timeout
status serverStatus
}

type SQLServer struct {
Expand All @@ -60,6 +72,7 @@ func NewSQLServer(logger *zap.Logger, cfg config.ProxyServer, certMgr *cert.Cert
mu: serverState{
connID: 0,
clients: make(map[uint64]*client.ClientConnection),
status: statusNormal,
},
}

Expand All @@ -84,6 +97,7 @@ func (s *SQLServer) reset(cfg *config.ProxyServerOnline) {
s.mu.requireBackendTLS = cfg.RequireBackendTLS
s.mu.proxyProtocol = cfg.ProxyProtocol != ""
s.mu.gracefulWait = cfg.GracefulWaitBeforeShutdown
s.mu.gracefulClose = cfg.GracefulCloseConnTimeout
s.mu.healthyKeepAlive = cfg.BackendHealthyKeepalive
s.mu.unhealthyKeepAlive = cfg.BackendUnhealthyKeepalive
s.mu.connBufferSize = cfg.ConnBufferSize
Expand Down Expand Up @@ -148,11 +162,6 @@ func (s *SQLServer) onConn(ctx context.Context, conn net.Conn, addr string) {
s.logger.Warn("too many connections", zap.Uint64("max connections", maxConns), zap.String("client_addr", conn.RemoteAddr().Network()), zap.Error(conn.Close()))
return
}
if s.mu.inShutdown {
s.mu.Unlock()
s.logger.Warn("in shutdown", zap.String("client_addr", conn.RemoteAddr().Network()), zap.Error(conn.Close()))
return
}

connID := s.mu.connID
s.mu.connID++
Expand Down Expand Up @@ -192,29 +201,44 @@ func (s *SQLServer) onConn(ctx context.Context, conn net.Conn, addr string) {
clientConn.Run(ctx)
}

// IsClosing tells the HTTP API whether it should return healthy status.
func (s *SQLServer) IsClosing() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.mu.inShutdown
return s.mu.status >= statusWaitShutdown
}

// Graceful shutdown doesn't close the listener but rejects new connections.
// Whether this affects NLB is to be tested.
func (s *SQLServer) gracefulShutdown() {
// Step 1: HTTP status returns unhealthy so that NLB takes this instance offline and then new connections won't come.
s.mu.Lock()
gracefulWait := s.mu.gracefulWait
if gracefulWait == 0 {
s.mu.status = statusWaitShutdown
s.mu.Unlock()
s.logger.Info("SQL server prepares for shutdown", zap.Int("graceful_wait", gracefulWait))
if gracefulWait > 0 {
time.Sleep(time.Duration(gracefulWait) * time.Second)
}

// Step 2: reject new connections and drain clients.
for i := range s.listeners {
if err := s.listeners[i].Close(); err != nil {
s.logger.Warn("closing listener fails", zap.Error(err))
}
}
s.mu.Lock()
s.mu.status = statusDrainClient
gracefulClose := s.mu.gracefulClose
s.logger.Info("SQL server is shutting down", zap.Int("graceful_close", gracefulClose), zap.Int("conn_count", len(s.mu.clients)))
if gracefulClose <= 0 {
s.mu.Unlock()
return
}
s.mu.inShutdown = true
for _, conn := range s.mu.clients {
conn.GracefulClose()
}
s.mu.Unlock()
s.logger.Info("SQL server is shutting down", zap.Int("graceful_wait", gracefulWait))

timer := time.NewTimer(time.Duration(gracefulWait) * time.Second)
timer := time.NewTimer(time.Duration(gracefulClose) * time.Second)
defer timer.Stop()
for {
select {
Expand All @@ -239,19 +263,16 @@ func (s *SQLServer) Close() error {
s.cancelFunc()
s.cancelFunc = nil
}
errs := make([]error, 0, 4)
for i := range s.listeners {
errs = append(errs, s.listeners[i].Close())
}

s.mu.RLock()
s.logger.Info("force closing connections", zap.Int("conn_count", len(s.mu.clients)))
for _, conn := range s.mu.clients {
if err := conn.Close(); err != nil {
errs = append(errs, err)
s.logger.Warn("close connection error", zap.Error(err))
}
}
s.mu.RUnlock()

s.wg.Wait()
return errors.Collect(ErrCloseServer, errs...)
return nil
}
68 changes: 48 additions & 20 deletions pkg/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/errors"
"github.com/pingcap/tiproxy/lib/util/logger"
"github.com/pingcap/tiproxy/lib/util/waitgroup"
"github.com/pingcap/tiproxy/pkg/manager/cert"
"github.com/pingcap/tiproxy/pkg/manager/router"
"github.com/pingcap/tiproxy/pkg/proxy/backend"
Expand All @@ -24,15 +25,16 @@ import (
"github.com/stretchr/testify/require"
)

func TestGracefulShutdown(t *testing.T) {
func TestGracefulCloseConn(t *testing.T) {
// Graceful shutdown finishes immediately if there's no connection.
lg, _ := logger.CreateLoggerForTest(t)
hsHandler := backend.NewDefaultHandshakeHandler(nil)
server, err := NewSQLServer(lg, config.ProxyServer{
cfg := config.ProxyServer{
ProxyServerOnline: config.ProxyServerOnline{
GracefulWaitBeforeShutdown: 10,
GracefulCloseConnTimeout: 10,
},
}, nil, hsHandler)
}
server, err := NewSQLServer(lg, cfg, nil, hsHandler)
require.NoError(t, err)
finish := make(chan struct{})
go func() {
Expand Down Expand Up @@ -62,11 +64,7 @@ func TestGracefulShutdown(t *testing.T) {
}

// Graceful shutdown will be blocked if there are alive connections.
server, err = NewSQLServer(lg, config.ProxyServer{
ProxyServerOnline: config.ProxyServerOnline{
GracefulWaitBeforeShutdown: 10,
},
}, nil, hsHandler)
server, err = NewSQLServer(lg, cfg, nil, hsHandler)
require.NoError(t, err)
clientConn := createClientConn()
go func() {
Expand All @@ -89,12 +87,9 @@ func TestGracefulShutdown(t *testing.T) {
case <-finish:
}

// Graceful shutdown will shut down after GracefulWaitBeforeShutdown.
server, err = NewSQLServer(lg, config.ProxyServer{
ProxyServerOnline: config.ProxyServerOnline{
GracefulWaitBeforeShutdown: 1,
},
}, nil, hsHandler)
// Graceful shutdown will shut down after GracefulCloseConnTimeout.
cfg.GracefulCloseConnTimeout = 1
server, err = NewSQLServer(lg, cfg, nil, hsHandler)
require.NoError(t, err)
createClientConn()
go func() {
Expand All @@ -108,6 +103,39 @@ func TestGracefulShutdown(t *testing.T) {
}
}

func TestGracefulShutDown(t *testing.T) {
lg, _ := logger.CreateLoggerForTest(t)
hsHandler := backend.NewDefaultHandshakeHandler(nil)
cfg := config.ProxyServer{
ProxyServerOnline: config.ProxyServerOnline{
GracefulWaitBeforeShutdown: 2,
GracefulCloseConnTimeout: 10,
},
}
server, err := NewSQLServer(lg, cfg, nil, hsHandler)
require.NoError(t, err)

var wg waitgroup.WaitGroup
wg.Run(func() {
// Wait until the server begins to shut down.
require.Eventually(t, server.IsClosing, 500*time.Millisecond, 10*time.Millisecond)
// The listener should be open.
conn1, err := net.Dial("tcp", server.listeners[0].Addr().String())
require.NoError(t, err)
// The listener should be closed after GracefulWaitBeforeShutdown.
require.Eventually(t, func() bool {
conn, err := net.Dial("tcp", server.listeners[0].Addr().String())
if err == nil {
require.NoError(t, conn.Close())
}
return err != nil
}, 3*time.Second, 100*time.Millisecond)
require.NoError(t, conn1.Close())
})
require.NoError(t, server.Close())
wg.Wait()
}

func TestMultiAddr(t *testing.T) {
lg, _ := logger.CreateLoggerForTest(t)
certManager := cert.NewCertManager()
Expand Down Expand Up @@ -140,11 +168,11 @@ func TestWatchCfg(t *testing.T) {
cfg := &config.Config{
Proxy: config.ProxyServer{
ProxyServerOnline: config.ProxyServerOnline{
RequireBackendTLS: true,
MaxConnections: 100,
ConnBufferSize: 1024 * 1024,
ProxyProtocol: "v2",
GracefulWaitBeforeShutdown: 100,
RequireBackendTLS: true,
MaxConnections: 100,
ConnBufferSize: 1024 * 1024,
ProxyProtocol: "v2",
GracefulCloseConnTimeout: 100,
},
},
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestConfig(t *testing.T) {
addr = '0.0.0.0:6000'
pd-addrs = '127.0.0.1:2379'
require-backend-tls = true
graceful-close-conn-timeout = 15

[proxy.frontend-keepalive]
enabled = true
Expand Down Expand Up @@ -75,7 +76,7 @@ max-backups = 3
doHTTP(t, http.MethodGet, "/api/admin/config?format=json", nil, func(t *testing.T, r *http.Response) {
all, err := io.ReadAll(r.Body)
require.NoError(t, err)
require.Equal(t, `{"proxy":{"addr":"0.0.0.0:6000","pd-addrs":"127.0.0.1:2379","require-backend-tls":true,"frontend-keepalive":{"enabled":true},"backend-healthy-keepalive":{"enabled":true,"idle":60000000000,"cnt":5,"intvl":3000000000,"timeout":15000000000},"backend-unhealthy-keepalive":{"enabled":true,"idle":10000000000,"cnt":5,"intvl":1000000000,"timeout":5000000000}},"api":{"addr":"0.0.0.0:3080"},"advance":{"ignore-wrong-namespace":true},"security":{"server-tls":{"min-tls-version":"1.1"},"peer-tls":{"min-tls-version":"1.1"},"cluster-tls":{"min-tls-version":"1.1"},"sql-tls":{"min-tls-version":"1.1"}},"metrics":{"metrics-addr":"","metrics-interval":0},"log":{"encoder":"tidb","level":"info","log-file":{"max-size":300,"max-days":3,"max-backups":3}}}`,
require.Equal(t, `{"proxy":{"addr":"0.0.0.0:6000","pd-addrs":"127.0.0.1:2379","require-backend-tls":true,"frontend-keepalive":{"enabled":true},"backend-healthy-keepalive":{"enabled":true,"idle":60000000000,"cnt":5,"intvl":3000000000,"timeout":15000000000},"backend-unhealthy-keepalive":{"enabled":true,"idle":10000000000,"cnt":5,"intvl":1000000000,"timeout":5000000000},"graceful-close-conn-timeout":15},"api":{"addr":"0.0.0.0:3080"},"advance":{"ignore-wrong-namespace":true},"security":{"server-tls":{"min-tls-version":"1.1"},"peer-tls":{"min-tls-version":"1.1"},"cluster-tls":{"min-tls-version":"1.1"},"sql-tls":{"min-tls-version":"1.1"}},"metrics":{"metrics-addr":"","metrics-interval":0},"log":{"encoder":"tidb","level":"info","log-file":{"max-size":300,"max-days":3,"max-backups":3}}}`,
string(regexp.MustCompile(`"workdir":"[^"]+",`).ReplaceAll(all, nil)))
require.Equal(t, http.StatusOK, r.StatusCode)
})
Expand Down
Loading