Skip to content

Commit

Permalink
server: if status address already in use, return an error (#15177)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Mar 18, 2020
1 parent de474b4 commit f8b2d96
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 36 deletions.
67 changes: 33 additions & 34 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,33 @@ func sleepWithCtx(ctx context.Context, d time.Duration) {
}
}

func (s *Server) listenStatusHTTPServer() error {
s.statusAddr = fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, s.cfg.Status.StatusPort)
if s.cfg.Status.StatusPort == 0 {
s.statusAddr = fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, defaultStatusPort)
}

logutil.BgLogger().Info("for status and metrics report", zap.String("listening on addr", s.statusAddr))
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
logutil.BgLogger().Error("invalid TLS config", zap.Error(err))
return errors.Trace(err)
}
tlsConfig = s.setCNChecker(tlsConfig)

if tlsConfig != nil {
// we need to manage TLS here for cmux to distinguish between HTTP and gRPC.
s.statusListener, err = tls.Listen("tcp", s.statusAddr, tlsConfig)
} else {
s.statusListener, err = net.Listen("tcp", s.statusAddr)
}
if err != nil {
logutil.BgLogger().Info("listen failed", zap.Error(err))
return errors.Trace(err)
}
return nil
}

func (s *Server) startHTTPServer() {
router := mux.NewRouter()

Expand Down Expand Up @@ -123,13 +150,8 @@ func (s *Server) startHTTPServer() {
router.Handle("/mvcc/hex/{hexKey}", mvccTxnHandler{tikvHandlerTool, opMvccGetByHex})
router.Handle("/mvcc/index/{db}/{table}/{index}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByIdx})

addr := fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, s.cfg.Status.StatusPort)
if s.cfg.Status.StatusPort == 0 {
addr = fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, defaultStatusPort)
}

// HTTP path for web UI.
if host, port, err := net.SplitHostPort(addr); err == nil {
if host, port, err := net.SplitHostPort(s.statusAddr); err == nil {
if host == "" {
host = "localhost"
}
Expand Down Expand Up @@ -271,40 +293,17 @@ func (s *Server) startHTTPServer() {
logutil.BgLogger().Error("write HTTP index page failed", zap.Error(err))
}
})

logutil.BgLogger().Info("for status and metrics report", zap.String("listening on addr", addr))
s.setupStatusServerAndRPCServer(addr, serverMux)
s.startStatusServerAndRPCServer(serverMux)
}

func (s *Server) setupStatusServerAndRPCServer(addr string, serverMux *http.ServeMux) {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
logutil.BgLogger().Error("invalid TLS config", zap.Error(err))
return
}
tlsConfig = s.setCNChecker(tlsConfig)

var l net.Listener
if tlsConfig != nil {
// we need to manage TLS here for cmux to distinguish between HTTP and gRPC.
l, err = tls.Listen("tcp", addr, tlsConfig)
} else {
l, err = net.Listen("tcp", addr)
}
if err != nil {
logutil.BgLogger().Info("listen failed", zap.Error(err))
return
}
if tlsConfig != nil {
logutil.BgLogger().Info("HTTP/gRPC status server secure connection is enabled", zap.Bool("CN verification enabled", tlsConfig.VerifyPeerCertificate != nil))
}
m := cmux.New(l)
func (s *Server) startStatusServerAndRPCServer(serverMux *http.ServeMux) {
m := cmux.New(s.statusListener)
// Match connections in order:
// First HTTP, and otherwise grpc.
httpL := m.Match(cmux.HTTP1Fast())
grpcL := m.Match(cmux.Any())

s.statusServer = &http.Server{Addr: addr, Handler: CorsHandler{handler: serverMux, cfg: s.cfg}}
s.statusServer = &http.Server{Addr: s.statusAddr, Handler: CorsHandler{handler: serverMux, cfg: s.cfg}}
s.grpcServer = NewRPCServer(s.cfg, s.dom, s)

go util.WithRecovery(func() {
Expand All @@ -317,7 +316,7 @@ func (s *Server) setupStatusServerAndRPCServer(addr string, serverMux *http.Serv
logutil.BgLogger().Error("http server error", zap.Error(err))
}, nil)

err = m.Serve()
err := m.Serve()
if err != nil {
logutil.BgLogger().Error("start status/rpc server error", zap.Error(err))
}
Expand Down
10 changes: 8 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ type Server struct {
clients map[uint32]*clientConn
capability uint32
dom *domain.Domain
statusServer *http.Server
grpcServer *grpc.Server

statusAddr string
statusListener net.Listener
statusServer *http.Server
grpcServer *grpc.Server
}

// ConnectionCount gets current connection count.
Expand Down Expand Up @@ -256,6 +259,9 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
s.listener = pplistener
}

if s.cfg.Status.ReportStatus && err == nil {
err = s.listenStatusHTTPServer()
}
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
22 changes: 22 additions & 0 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"io/ioutil"
"math/big"
"net/http"
Expand Down Expand Up @@ -180,6 +181,27 @@ func (ts *tidbTestSuite) TestStatusAPI(c *C) {
ts.runTestStatusAPI(c)
}

func (ts *tidbTestSuite) TestStatusPort(c *C) {
var err error
ts.store, err = mockstore.NewMockTikvStore()
session.DisableStats4Test()
c.Assert(err, IsNil)
ts.domain, err = session.BootstrapSession(ts.store)
c.Assert(err, IsNil)
ts.tidbdrv = NewTiDBDriver(ts.store)
cfg := config.NewConfig()
cfg.Port = genPort()
cfg.Status.ReportStatus = true
cfg.Status.StatusPort = ts.statusPort
cfg.Performance.TCPKeepAlive = true

server, err := NewServer(cfg, ts.tidbdrv)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals,
fmt.Sprintf("listen tcp 0.0.0.0:%d: bind: address already in use", ts.statusPort))
c.Assert(server, IsNil)
}

func (ts *tidbTestSuite) TestStatusAPIWithTLS(c *C) {
caCert, caKey, err := generateCert(0, "TiDB CA 2", nil, nil, "/tmp/ca-key-2.pem", "/tmp/ca-cert-2.pem")
c.Assert(err, IsNil)
Expand Down

0 comments on commit f8b2d96

Please sign in to comment.