diff --git a/server/http_status.go b/server/http_status.go index 65caf4213a67c..6cec8cfe93efe 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -68,6 +68,33 @@ func sleepWithCtx(ctx context.Context, d time.Duration) { } } +func (s *Server) listenStatusHTTPServer() error { + s.statusAddr = fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, s.cfg.Status.StatusPort) + if s.cfg.Status.StatusPort == 0 { + s.statusAddr = fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, defaultStatusPort) + } + + logutil.BgLogger().Info("for status and metrics report", zap.String("listening on addr", s.statusAddr)) + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + logutil.BgLogger().Error("invalid TLS config", zap.Error(err)) + return errors.Trace(err) + } + tlsConfig = s.setCNChecker(tlsConfig) + + if tlsConfig != nil { + // we need to manage TLS here for cmux to distinguish between HTTP and gRPC. + s.statusListener, err = tls.Listen("tcp", s.statusAddr, tlsConfig) + } else { + s.statusListener, err = net.Listen("tcp", s.statusAddr) + } + if err != nil { + logutil.BgLogger().Info("listen failed", zap.Error(err)) + return errors.Trace(err) + } + return nil +} + func (s *Server) startHTTPServer() { router := mux.NewRouter() @@ -111,13 +138,22 @@ func (s *Server) startHTTPServer() { router.Handle("/mvcc/hex/{hexKey}", mvccTxnHandler{tikvHandlerTool, opMvccGetByHex}) router.Handle("/mvcc/index/{db}/{table}/{index}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByIdx}) } +<<<<<<< HEAD addr := fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, s.cfg.Status.StatusPort) if s.cfg.Status.StatusPort == 0 { addr = fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, defaultStatusPort) } +======= + + // HTTP path for get MVCC info + router.Handle("/mvcc/key/{db}/{table}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByKey}) + router.Handle("/mvcc/txn/{startTS}/{db}/{table}", mvccTxnHandler{tikvHandlerTool, opMvccGetByTxn}) + router.Handle("/mvcc/hex/{hexKey}", mvccTxnHandler{tikvHandlerTool, opMvccGetByHex}) + router.Handle("/mvcc/index/{db}/{table}/{index}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByIdx}) +>>>>>>> f8b2d96... server: if status address already in use, return an error (#15177) // HTTP path for web UI. - if host, port, err := net.SplitHostPort(addr); err == nil { + if host, port, err := net.SplitHostPort(s.statusAddr); err == nil { if host == "" { host = "localhost" } @@ -252,6 +288,7 @@ func (s *Server) startHTTPServer() { logutil.Logger(context.Background()).Error("write HTTP index page failed", zap.Error(err)) } }) +<<<<<<< HEAD logutil.Logger(context.Background()).Info("for status and metrics report", zap.String("listening on addr", addr)) s.statusServer = &http.Server{Addr: addr, Handler: CorsHandler{handler: serverMux, cfg: s.cfg}} @@ -274,6 +311,32 @@ func (s *Server) startHTTPServer() { } err = s.statusServer.Serve(ln) +======= + s.startStatusServerAndRPCServer(serverMux) +} + +func (s *Server) startStatusServerAndRPCServer(serverMux *http.ServeMux) { + m := cmux.New(s.statusListener) + // Match connections in order: + // First HTTP, and otherwise grpc. + httpL := m.Match(cmux.HTTP1Fast()) + grpcL := m.Match(cmux.Any()) + + s.statusServer = &http.Server{Addr: s.statusAddr, Handler: CorsHandler{handler: serverMux, cfg: s.cfg}} + s.grpcServer = NewRPCServer(s.cfg, s.dom, s) + + go util.WithRecovery(func() { + err := s.grpcServer.Serve(grpcL) + logutil.BgLogger().Error("grpc server error", zap.Error(err)) + }, nil) + + go util.WithRecovery(func() { + err := s.statusServer.Serve(httpL) + logutil.BgLogger().Error("http server error", zap.Error(err)) + }, nil) + + err := m.Serve() +>>>>>>> f8b2d96... server: if status address already in use, return an error (#15177) if err != nil { logutil.Logger(context.Background()).Info("serve status port failed", zap.Error(err)) } diff --git a/server/server.go b/server/server.go index 145ab54b16bb3..9e0cde3dcc53b 100644 --- a/server/server.go +++ b/server/server.go @@ -112,7 +112,16 @@ type Server struct { concurrentLimiter *TokenLimiter clients map[uint32]*clientConn capability uint32 +<<<<<<< HEAD statusServer *http.Server +======= + dom *domain.Domain + + statusAddr string + statusListener net.Listener + statusServer *http.Server + grpcServer *grpc.Server +>>>>>>> f8b2d96... server: if status address already in use, return an error (#15177) } // ConnectionCount gets current connection count. @@ -247,6 +256,9 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { s.listener = pplistener } + if s.cfg.Status.ReportStatus && err == nil { + err = s.listenStatusHTTPServer() + } if err != nil { return nil, errors.Trace(err) } diff --git a/server/tidb_test.go b/server/tidb_test.go index 8e628aee3e8b6..1c4575d21ba4b 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -155,9 +155,68 @@ func (ts *TidbTestSuite) TestResultFieldTableIsNull(c *C) { runTestResultFieldTableIsNull(c) } +<<<<<<< HEAD func (ts *TidbTestSuite) TestStatusAPI(c *C) { c.Parallel() runTestStatusAPI(c) +======= +func (ts *tidbTestSuite) TestStatusPort(c *C) { + var err error + ts.store, err = mockstore.NewMockTikvStore() + session.DisableStats4Test() + c.Assert(err, IsNil) + ts.domain, err = session.BootstrapSession(ts.store) + c.Assert(err, IsNil) + ts.tidbdrv = NewTiDBDriver(ts.store) + cfg := config.NewConfig() + cfg.Port = genPort() + cfg.Status.ReportStatus = true + cfg.Status.StatusPort = ts.statusPort + cfg.Performance.TCPKeepAlive = true + + server, err := NewServer(cfg, ts.tidbdrv) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, + fmt.Sprintf("listen tcp 0.0.0.0:%d: bind: address already in use", ts.statusPort)) + c.Assert(server, IsNil) +} + +func (ts *tidbTestSuite) TestStatusAPIWithTLS(c *C) { + caCert, caKey, err := generateCert(0, "TiDB CA 2", nil, nil, "/tmp/ca-key-2.pem", "/tmp/ca-cert-2.pem") + c.Assert(err, IsNil) + _, _, err = generateCert(1, "tidb-server-2", caCert, caKey, "/tmp/server-key-2.pem", "/tmp/server-cert-2.pem") + c.Assert(err, IsNil) + + defer func() { + os.Remove("/tmp/ca-key-2.pem") + os.Remove("/tmp/ca-cert-2.pem") + os.Remove("/tmp/server-key-2.pem") + os.Remove("/tmp/server-cert-2.pem") + }() + + cli := newTestServerClient() + cli.statusScheme = "https" + cfg := config.NewConfig() + cfg.Port = cli.port + cfg.Status.StatusPort = cli.statusPort + cfg.Security.ClusterSSLCA = "/tmp/ca-cert-2.pem" + cfg.Security.ClusterSSLCert = "/tmp/server-cert-2.pem" + cfg.Security.ClusterSSLKey = "/tmp/server-key-2.pem" + server, err := NewServer(cfg, ts.tidbdrv) + c.Assert(err, IsNil) + go server.Run() + time.Sleep(time.Millisecond * 100) + + // https connection should work. + ts.runTestStatusAPI(c) + + // but plain http connection should fail. + cli.statusScheme = "http" + _, err = cli.fetchStatus("/status") + c.Assert(err, NotNil) + + server.Close() +>>>>>>> f8b2d96... server: if status address already in use, return an error (#15177) } func (ts *TidbTestSuite) TestStatusAPIWithTLSCNCheck(c *C) {