Skip to content

Commit

Permalink
Fix tso server close stuck issue (#6529)
Browse files Browse the repository at this point in the history
ref #5895, close #6304

Rewrite TSO gPRC/HTTP server Close().

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing authored May 29, 2023
1 parent 778d6aa commit b39188f
Showing 1 changed file with 102 additions and 48 deletions.
150 changes: 102 additions & 48 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ const (
// tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes.
// format: "/ms/{cluster_id}/tso".
tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName

// maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID.
maxRetryTimesWaitAPIService = 360
// retryIntervalWaitAPIService is the interval to retry.
Expand Down Expand Up @@ -106,7 +105,11 @@ type Server struct {
// http client
httpClient *http.Client

secure bool
muxListener net.Listener
httpListener net.Listener
grpcServer *grpc.Server
httpServer *http.Server
service *Service
keyspaceGroupManager *tso.KeyspaceGroupManager
// Store as map[string]*grpc.ClientConn
Expand Down Expand Up @@ -187,6 +190,8 @@ func (s *Server) Close() {
// close tso service loops in the keyspace group manager
s.keyspaceGroupManager.Close()
s.serviceRegister.Deregister()
s.stopHTTPServer()
s.stopGRPCServer()
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()
Expand Down Expand Up @@ -389,83 +394,128 @@ func (s *Server) startGRPCServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

gs := grpc.NewServer()
s.service.RegisterGRPCService(gs)
diagnosticspb.RegisterDiagnosticsServer(gs, s)
serverr := gs.Serve(l)
log.Info("grpc server stopped serving")

// Attempt graceful stop (waits for pending RPCs), but force a stop if
// it doesn't happen in a reasonable amount of time.
done := make(chan struct{})
go func() {
defer logutil.LogPanic()
log.Info("try to gracefully stop the server now")
gs.GracefulStop()
close(done)
}()
select {
case <-done:
case <-time.After(mcsutils.DefaultGRPCGracefulStopTimeout):
log.Info("stopping grpc gracefully is taking longer than expected and force stopping now")
gs.Stop()
}

log.Info("grpc server starts serving", zap.String("address", l.Addr().String()))
err := s.grpcServer.Serve(l)
if s.IsClosed() {
log.Info("grpc server stopped")
} else {
log.Fatal("grpc server stopped unexpectedly", errs.ZapError(serverr))
log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err))
}
}

func (s *Server) startHTTPServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

handler, _ := SetUpRestHandler(s.service)
hs := &http.Server{
Handler: handler,
ReadTimeout: 5 * time.Minute,
ReadHeaderTimeout: 5 * time.Second,
}
serverr := hs.Serve(l)
log.Info("http server stopped serving")

ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultHTTPGracefulShutdownTimeout)
defer cancel()
if err := hs.Shutdown(ctx); err != nil {
log.Error("http server shutdown encountered problem", errs.ZapError(err))
} else {
log.Info("all http(s) requests finished")
}
log.Info("http server starts serving", zap.String("address", l.Addr().String()))
err := s.httpServer.Serve(l)
if s.IsClosed() {
log.Info("http server stopped")
} else {
log.Fatal("http server stopped unexpectedly", errs.ZapError(serverr))
log.Fatal("http server stopped unexpectedly", errs.ZapError(err))
}
}

func (s *Server) startGRPCAndHTTPServers(l net.Listener) {
func (s *Server) startGRPCAndHTTPServers(serverReadyChan chan<- struct{}, l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

mux := cmux.New(l)
// Don't hang on matcher after closing listener
mux.SetReadTimeout(3 * time.Second)
grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpL := mux.Match(cmux.Any())
if s.secure {
s.httpListener = mux.Match(cmux.Any())
} else {
s.httpListener = mux.Match(cmux.HTTP1())
}

s.serverLoopWg.Add(2)
s.grpcServer = grpc.NewServer()
s.service.RegisterGRPCService(s.grpcServer)
diagnosticspb.RegisterDiagnosticsServer(s.grpcServer, s)
s.serverLoopWg.Add(1)
go s.startGRPCServer(grpcL)
go s.startHTTPServer(httpL)

handler, _ := SetUpRestHandler(s.service)
s.httpServer = &http.Server{
Handler: handler,
ReadTimeout: 3 * time.Second,
}
s.serverLoopWg.Add(1)
go s.startHTTPServer(s.httpListener)

serverReadyChan <- struct{}{}
if err := mux.Serve(); err != nil {
if s.IsClosed() {
log.Info("mux stop serving", errs.ZapError(err))
log.Info("mux stopped serving", errs.ZapError(err))
} else {
log.Panic("mux stop serving unexpectedly", errs.ZapError(err))
log.Fatal("mux stopped serving unexpectedly", errs.ZapError(err))
}
}
}

func (s *Server) stopHTTPServer() {
log.Info("stopping http server")
defer log.Info("http server stopped")

ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultHTTPGracefulShutdownTimeout)
defer cancel()

// First, try to gracefully shutdown the http server
ch := make(chan struct{})
go func() {
defer close(ch)
s.httpServer.Shutdown(ctx)
}()

select {
case <-ch:
case <-ctx.Done():
// Took too long, manually close open transports
log.Warn("http server graceful shutdown timeout, forcing close")
s.httpServer.Close()
// concurrent Graceful Shutdown should be interrupted
<-ch
}
}

func (s *Server) stopGRPCServer() {
log.Info("stopping grpc server")
defer log.Info("grpc server stopped")

// Do not grpc.Server.GracefulStop with TLS enabled etcd server
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
// and https://github.com/etcd-io/etcd/issues/8916
if s.secure {
s.grpcServer.Stop()
return
}

ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultGRPCGracefulStopTimeout)
defer cancel()

// First, try to gracefully shutdown the grpc server
ch := make(chan struct{})
go func() {
defer close(ch)
// Close listeners to stop accepting new connections,
// will block on any existing transports
s.grpcServer.GracefulStop()
}()

// Wait until all pending RPCs are finished
select {
case <-ch:
case <-ctx.Done():
// Took too long, manually close open transports
// e.g. watch streams
log.Warn("grpc server graceful shutdown timeout, forcing close")
s.grpcServer.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}

func (s *Server) startServer() (err error) {
if s.clusterID, err = mcsutils.InitClusterID(s.ctx, s.etcdClient); err != nil {
return err
Expand Down Expand Up @@ -502,6 +552,7 @@ func (s *Server) startServer() (err error) {
return err
}
if tlsConfig != nil {
s.secure = true
s.muxListener, err = tls.Listen(mcsutils.TCPNetworkStr, s.listenURL.Host, tlsConfig)
} else {
s.muxListener, err = net.Listen(mcsutils.TCPNetworkStr, s.listenURL.Host)
Expand All @@ -510,8 +561,11 @@ func (s *Server) startServer() (err error) {
return err
}

serverReadyChan := make(chan struct{})
defer close(serverReadyChan)
s.serverLoopWg.Add(1)
go s.startGRPCAndHTTPServers(s.muxListener)
go s.startGRPCAndHTTPServers(serverReadyChan, s.muxListener)
<-serverReadyChan

// Run callbacks
log.Info("triggering the start callback functions")
Expand Down

0 comments on commit b39188f

Please sign in to comment.