diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index b76f9fbc2fdf..ebcb88b2768c 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -69,7 +69,6 @@ const ( // tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes. // format: "/ms/{cluster_id}/tso". tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName - // maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID. maxRetryTimesWaitAPIService = 360 // retryIntervalWaitAPIService is the interval to retry. @@ -106,7 +105,11 @@ type Server struct { // http client httpClient *http.Client + secure bool muxListener net.Listener + httpListener net.Listener + grpcServer *grpc.Server + httpServer *http.Server service *Service keyspaceGroupManager *tso.KeyspaceGroupManager // Store as map[string]*grpc.ClientConn @@ -187,6 +190,8 @@ func (s *Server) Close() { // close tso service loops in the keyspace group manager s.keyspaceGroupManager.Close() s.serviceRegister.Deregister() + s.stopHTTPServer() + s.stopGRPCServer() s.muxListener.Close() s.serverLoopCancel() s.serverLoopWg.Wait() @@ -389,32 +394,11 @@ func (s *Server) startGRPCServer(l net.Listener) { defer logutil.LogPanic() defer s.serverLoopWg.Done() - gs := grpc.NewServer() - s.service.RegisterGRPCService(gs) - diagnosticspb.RegisterDiagnosticsServer(gs, s) - serverr := gs.Serve(l) - log.Info("grpc server stopped serving") - - // Attempt graceful stop (waits for pending RPCs), but force a stop if - // it doesn't happen in a reasonable amount of time. - done := make(chan struct{}) - go func() { - defer logutil.LogPanic() - log.Info("try to gracefully stop the server now") - gs.GracefulStop() - close(done) - }() - select { - case <-done: - case <-time.After(mcsutils.DefaultGRPCGracefulStopTimeout): - log.Info("stopping grpc gracefully is taking longer than expected and force stopping now") - gs.Stop() - } - + err := s.grpcServer.Serve(l) if s.IsClosed() { - log.Info("grpc server stopped") + log.Info("[tso] grpc server stopped") } else { - log.Fatal("grpc server stopped unexpectedly", errs.ZapError(serverr)) + log.Fatal("[tso] grpc server stopped unexpectedly", errs.ZapError(err)) } } @@ -422,26 +406,11 @@ func (s *Server) startHTTPServer(l net.Listener) { defer logutil.LogPanic() defer s.serverLoopWg.Done() - handler, _ := SetUpRestHandler(s.service) - hs := &http.Server{ - Handler: handler, - ReadTimeout: 5 * time.Minute, - ReadHeaderTimeout: 5 * time.Second, - } - serverr := hs.Serve(l) - log.Info("http server stopped serving") - - ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultHTTPGracefulShutdownTimeout) - defer cancel() - if err := hs.Shutdown(ctx); err != nil { - log.Error("http server shutdown encountered problem", errs.ZapError(err)) - } else { - log.Info("all http(s) requests finished") - } + err := s.httpServer.Serve(l) if s.IsClosed() { log.Info("http server stopped") } else { - log.Fatal("http server stopped unexpectedly", errs.ZapError(serverr)) + log.Fatal("http server stopped unexpectedly", errs.ZapError(err)) } } @@ -451,21 +420,98 @@ func (s *Server) startGRPCAndHTTPServers(l net.Listener) { mux := cmux.New(l) grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) - httpL := mux.Match(cmux.Any()) + if s.secure { + s.httpListener = mux.Match(cmux.Any()) + } else { + s.httpListener = mux.Match(cmux.HTTP1()) + } - s.serverLoopWg.Add(2) + s.grpcServer = grpc.NewServer() + s.service.RegisterGRPCService(s.grpcServer) + diagnosticspb.RegisterDiagnosticsServer(s.grpcServer, s) + s.serverLoopWg.Add(1) go s.startGRPCServer(grpcL) - go s.startHTTPServer(httpL) + + handler, _ := SetUpRestHandler(s.service) + s.httpServer = &http.Server{ + Handler: handler, + ReadTimeout: 5 * time.Minute, + ReadHeaderTimeout: 5 * time.Second, + } + s.serverLoopWg.Add(1) + go s.startHTTPServer(s.httpListener) if err := mux.Serve(); err != nil { if s.IsClosed() { - log.Info("mux stop serving", errs.ZapError(err)) + log.Info("[tso] mux stopped serving", errs.ZapError(err)) } else { - log.Panic("mux stop serving unexpectedly", errs.ZapError(err)) + log.Fatal("[tso] mux stopped serving unexpectedly", errs.ZapError(err)) } } } +func (s *Server) stopHTTPServer() { + log.Info("[tso] stopping http server") + defer log.Info("[tso] http server stopped") + + ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultHTTPGracefulShutdownTimeout) + defer cancel() + + // First, try to gracefully shutdown the http server + ch := make(chan struct{}) + go func() { + defer close(ch) + log.Warn("[tso] http server graceful shutdown timeout, forcing close") + s.httpServer.Shutdown(ctx) + }() + + select { + case <-ch: + case <-ctx.Done(): + // Took too long, manually close open transports + s.httpServer.Close() + // concurrent Graceful Shutdown should be interrupted + <-ch + } +} + +func (s *Server) stopGRPCServer() { + log.Info("[tso] stopping grpc server") + defer log.Info("[tso] grpc server stopped") + + // Do not grpc.Server.GracefulStop with TLS enabled etcd server + // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531 + // and https://github.com/etcd-io/etcd/issues/8916 + if s.secure { + s.grpcServer.Stop() + return + } + + ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultGRPCGracefulStopTimeout) + defer cancel() + + // First, try to gracefully shutdown the grpc server + ch := make(chan struct{}) + go func() { + defer close(ch) + // Close listeners to stop accepting new connections, + // will block on any existing transports + s.grpcServer.GracefulStop() + }() + + // Wait until all pending RPCs are finished + select { + case <-ch: + case <-ctx.Done(): + // Took too long, manually close open transports + // e.g. watch streams + log.Warn("[tso] grpc server graceful shutdown timeout, forcing close") + s.grpcServer.Stop() + // concurrent GracefulStop should be interrupted + <-ch + } +} + func (s *Server) startServer() (err error) { if s.clusterID, err = mcsutils.InitClusterID(s.ctx, s.etcdClient); err != nil { return err @@ -502,6 +548,7 @@ func (s *Server) startServer() (err error) { return err } if tlsConfig != nil { + s.secure = true s.muxListener, err = tls.Listen(mcsutils.TCPNetworkStr, s.listenURL.Host, tlsConfig) } else { s.muxListener, err = net.Listen(mcsutils.TCPNetworkStr, s.listenURL.Host)