Skip to content

Commit

Permalink
cherry pick pingcap#15177 to release-3.1
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <sre-bot@pingcap.com>
  • Loading branch information
zimulala authored and sre-bot committed Apr 10, 2020
1 parent b2d6f52 commit ae1bb61
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 1 deletion.
65 changes: 64 additions & 1 deletion server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,33 @@ func sleepWithCtx(ctx context.Context, d time.Duration) {
}
}

func (s *Server) listenStatusHTTPServer() error {
s.statusAddr = fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, s.cfg.Status.StatusPort)
if s.cfg.Status.StatusPort == 0 {
s.statusAddr = fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, defaultStatusPort)
}

logutil.BgLogger().Info("for status and metrics report", zap.String("listening on addr", s.statusAddr))
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
logutil.BgLogger().Error("invalid TLS config", zap.Error(err))
return errors.Trace(err)
}
tlsConfig = s.setCNChecker(tlsConfig)

if tlsConfig != nil {
// we need to manage TLS here for cmux to distinguish between HTTP and gRPC.
s.statusListener, err = tls.Listen("tcp", s.statusAddr, tlsConfig)
} else {
s.statusListener, err = net.Listen("tcp", s.statusAddr)
}
if err != nil {
logutil.BgLogger().Info("listen failed", zap.Error(err))
return errors.Trace(err)
}
return nil
}

func (s *Server) startHTTPServer() {
router := mux.NewRouter()

Expand Down Expand Up @@ -113,13 +140,22 @@ func (s *Server) startHTTPServer() {
router.Handle("/mvcc/hex/{hexKey}", mvccTxnHandler{tikvHandlerTool, opMvccGetByHex})
router.Handle("/mvcc/index/{db}/{table}/{index}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByIdx})
}
<<<<<<< HEAD
addr := fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, s.cfg.Status.StatusPort)
if s.cfg.Status.StatusPort == 0 {
addr = fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, defaultStatusPort)
}
=======

// HTTP path for get MVCC info
router.Handle("/mvcc/key/{db}/{table}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByKey})
router.Handle("/mvcc/txn/{startTS}/{db}/{table}", mvccTxnHandler{tikvHandlerTool, opMvccGetByTxn})
router.Handle("/mvcc/hex/{hexKey}", mvccTxnHandler{tikvHandlerTool, opMvccGetByHex})
router.Handle("/mvcc/index/{db}/{table}/{index}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByIdx})
>>>>>>> f8b2d96... server: if status address already in use, return an error (#15177)

// HTTP path for web UI.
if host, port, err := net.SplitHostPort(addr); err == nil {
if host, port, err := net.SplitHostPort(s.statusAddr); err == nil {
if host == "" {
host = "localhost"
}
Expand Down Expand Up @@ -254,6 +290,7 @@ func (s *Server) startHTTPServer() {
logutil.Logger(context.Background()).Error("write HTTP index page failed", zap.Error(err))
}
})
<<<<<<< HEAD

logutil.Logger(context.Background()).Info("for status and metrics report", zap.String("listening on addr", addr))
s.statusServer = &http.Server{Addr: addr, Handler: CorsHandler{handler: serverMux, cfg: s.cfg}}
Expand All @@ -276,6 +313,32 @@ func (s *Server) startHTTPServer() {
}

err = s.statusServer.Serve(ln)
=======
s.startStatusServerAndRPCServer(serverMux)
}

func (s *Server) startStatusServerAndRPCServer(serverMux *http.ServeMux) {
m := cmux.New(s.statusListener)
// Match connections in order:
// First HTTP, and otherwise grpc.
httpL := m.Match(cmux.HTTP1Fast())
grpcL := m.Match(cmux.Any())

s.statusServer = &http.Server{Addr: s.statusAddr, Handler: CorsHandler{handler: serverMux, cfg: s.cfg}}
s.grpcServer = NewRPCServer(s.cfg, s.dom, s)

go util.WithRecovery(func() {
err := s.grpcServer.Serve(grpcL)
logutil.BgLogger().Error("grpc server error", zap.Error(err))
}, nil)

go util.WithRecovery(func() {
err := s.statusServer.Serve(httpL)
logutil.BgLogger().Error("http server error", zap.Error(err))
}, nil)

err := m.Serve()
>>>>>>> f8b2d96... server: if status address already in use, return an error (#15177)
if err != nil {
logutil.Logger(context.Background()).Info("serve status port failed", zap.Error(err))
}
Expand Down
12 changes: 12 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,16 @@ type Server struct {
concurrentLimiter *TokenLimiter
clients map[uint32]*clientConn
capability uint32
<<<<<<< HEAD
statusServer *http.Server
=======
dom *domain.Domain

statusAddr string
statusListener net.Listener
statusServer *http.Server
grpcServer *grpc.Server
>>>>>>> f8b2d96... server: if status address already in use, return an error (#15177)
}

// ConnectionCount gets current connection count.
Expand Down Expand Up @@ -247,6 +256,9 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
s.listener = pplistener
}

if s.cfg.Status.ReportStatus && err == nil {
err = s.listenStatusHTTPServer()
}
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
59 changes: 59 additions & 0 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,68 @@ func (ts *TidbTestSuite) TestResultFieldTableIsNull(c *C) {
runTestResultFieldTableIsNull(c)
}

<<<<<<< HEAD
func (ts *TidbTestSuite) TestStatusAPI(c *C) {
c.Parallel()
runTestStatusAPI(c)
=======
func (ts *tidbTestSuite) TestStatusPort(c *C) {
var err error
ts.store, err = mockstore.NewMockTikvStore()
session.DisableStats4Test()
c.Assert(err, IsNil)
ts.domain, err = session.BootstrapSession(ts.store)
c.Assert(err, IsNil)
ts.tidbdrv = NewTiDBDriver(ts.store)
cfg := config.NewConfig()
cfg.Port = genPort()
cfg.Status.ReportStatus = true
cfg.Status.StatusPort = ts.statusPort
cfg.Performance.TCPKeepAlive = true

server, err := NewServer(cfg, ts.tidbdrv)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals,
fmt.Sprintf("listen tcp 0.0.0.0:%d: bind: address already in use", ts.statusPort))
c.Assert(server, IsNil)
}

func (ts *tidbTestSuite) TestStatusAPIWithTLS(c *C) {
caCert, caKey, err := generateCert(0, "TiDB CA 2", nil, nil, "/tmp/ca-key-2.pem", "/tmp/ca-cert-2.pem")
c.Assert(err, IsNil)
_, _, err = generateCert(1, "tidb-server-2", caCert, caKey, "/tmp/server-key-2.pem", "/tmp/server-cert-2.pem")
c.Assert(err, IsNil)

defer func() {
os.Remove("/tmp/ca-key-2.pem")
os.Remove("/tmp/ca-cert-2.pem")
os.Remove("/tmp/server-key-2.pem")
os.Remove("/tmp/server-cert-2.pem")
}()

cli := newTestServerClient()
cli.statusScheme = "https"
cfg := config.NewConfig()
cfg.Port = cli.port
cfg.Status.StatusPort = cli.statusPort
cfg.Security.ClusterSSLCA = "/tmp/ca-cert-2.pem"
cfg.Security.ClusterSSLCert = "/tmp/server-cert-2.pem"
cfg.Security.ClusterSSLKey = "/tmp/server-key-2.pem"
server, err := NewServer(cfg, ts.tidbdrv)
c.Assert(err, IsNil)
go server.Run()
time.Sleep(time.Millisecond * 100)

// https connection should work.
ts.runTestStatusAPI(c)

// but plain http connection should fail.
cli.statusScheme = "http"
_, err = cli.fetchStatus("/status")
c.Assert(err, NotNil)

server.Close()
>>>>>>> f8b2d96... server: if status address already in use, return an error (#15177)
}

func (ts *TidbTestSuite) TestStatusAPIWithTLSCNCheck(c *C) {
Expand Down

0 comments on commit ae1bb61

Please sign in to comment.