Skip to content

Commit

Permalink
Fix tso server close stuck issue
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed May 28, 2023
1 parent 8b16b71 commit 9c1366e
Showing 1 changed file with 94 additions and 47 deletions.
141 changes: 94 additions & 47 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ const (
// tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes.
// format: "/ms/{cluster_id}/tso".
tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName

// maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID.
maxRetryTimesWaitAPIService = 360
// retryIntervalWaitAPIService is the interval to retry.
Expand Down Expand Up @@ -106,7 +105,11 @@ type Server struct {
// http client
httpClient *http.Client

secure bool
muxListener net.Listener
httpListener net.Listener
grpcServer *grpc.Server
httpServer *http.Server
service *Service
keyspaceGroupManager *tso.KeyspaceGroupManager
// Store as map[string]*grpc.ClientConn
Expand Down Expand Up @@ -187,6 +190,8 @@ func (s *Server) Close() {
// close tso service loops in the keyspace group manager
s.keyspaceGroupManager.Close()
s.serviceRegister.Deregister()
s.stopHTTPServer()
s.stopGRPCServer()
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()
Expand Down Expand Up @@ -389,59 +394,23 @@ func (s *Server) startGRPCServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

gs := grpc.NewServer()
s.service.RegisterGRPCService(gs)
diagnosticspb.RegisterDiagnosticsServer(gs, s)
serverr := gs.Serve(l)
log.Info("grpc server stopped serving")

// Attempt graceful stop (waits for pending RPCs), but force a stop if
// it doesn't happen in a reasonable amount of time.
done := make(chan struct{})
go func() {
defer logutil.LogPanic()
log.Info("try to gracefully stop the server now")
gs.GracefulStop()
close(done)
}()
select {
case <-done:
case <-time.After(mcsutils.DefaultGRPCGracefulStopTimeout):
log.Info("stopping grpc gracefully is taking longer than expected and force stopping now")
gs.Stop()
}

err := s.grpcServer.Serve(l)
if s.IsClosed() {
log.Info("grpc server stopped")
log.Info("[tso] grpc server stopped")
} else {
log.Fatal("grpc server stopped unexpectedly", errs.ZapError(serverr))
log.Fatal("[tso] grpc server stopped unexpectedly", errs.ZapError(err))
}
}

func (s *Server) startHTTPServer(l net.Listener) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

handler, _ := SetUpRestHandler(s.service)
hs := &http.Server{
Handler: handler,
ReadTimeout: 5 * time.Minute,
ReadHeaderTimeout: 5 * time.Second,
}
serverr := hs.Serve(l)
log.Info("http server stopped serving")

ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultHTTPGracefulShutdownTimeout)
defer cancel()
if err := hs.Shutdown(ctx); err != nil {
log.Error("http server shutdown encountered problem", errs.ZapError(err))
} else {
log.Info("all http(s) requests finished")
}
err := s.httpServer.Serve(l)
if s.IsClosed() {
log.Info("http server stopped")
} else {
log.Fatal("http server stopped unexpectedly", errs.ZapError(serverr))
log.Fatal("http server stopped unexpectedly", errs.ZapError(err))
}
}

Expand All @@ -451,21 +420,98 @@ func (s *Server) startGRPCAndHTTPServers(l net.Listener) {

mux := cmux.New(l)
grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
httpL := mux.Match(cmux.Any())
if s.secure {
s.httpListener = mux.Match(cmux.Any())
} else {
s.httpListener = mux.Match(cmux.HTTP1())
}

s.serverLoopWg.Add(2)
s.grpcServer = grpc.NewServer()
s.service.RegisterGRPCService(s.grpcServer)
diagnosticspb.RegisterDiagnosticsServer(s.grpcServer, s)
s.serverLoopWg.Add(1)
go s.startGRPCServer(grpcL)
go s.startHTTPServer(httpL)

handler, _ := SetUpRestHandler(s.service)
s.httpServer = &http.Server{
Handler: handler,
ReadTimeout: 5 * time.Minute,
ReadHeaderTimeout: 5 * time.Second,
}
s.serverLoopWg.Add(1)
go s.startHTTPServer(s.httpListener)

if err := mux.Serve(); err != nil {
if s.IsClosed() {
log.Info("mux stop serving", errs.ZapError(err))
log.Info("[tso] mux stopped serving", errs.ZapError(err))
} else {
log.Panic("mux stop serving unexpectedly", errs.ZapError(err))
log.Fatal("[tso] mux stopped serving unexpectedly", errs.ZapError(err))
}
}
}

func (s *Server) stopHTTPServer() {
log.Info("[tso] stopping http server")
defer log.Info("[tso] http server stopped")

ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultHTTPGracefulShutdownTimeout)
defer cancel()

// First, try to gracefully shutdown the http server
ch := make(chan struct{})
go func() {
defer close(ch)
log.Warn("[tso] http server graceful shutdown timeout, forcing close")
s.httpServer.Shutdown(ctx)
}()

select {
case <-ch:
case <-ctx.Done():
// Took too long, manually close open transports
s.httpServer.Close()
// concurrent Graceful Shutdown should be interrupted
<-ch
}
}

func (s *Server) stopGRPCServer() {
log.Info("[tso] stopping grpc server")
defer log.Info("[tso] grpc server stopped")

// Do not grpc.Server.GracefulStop with TLS enabled etcd server
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
// and https://github.com/etcd-io/etcd/issues/8916
if s.secure {
s.grpcServer.Stop()
return
}

ctx, cancel := context.WithTimeout(context.Background(), mcsutils.DefaultGRPCGracefulStopTimeout)
defer cancel()

// First, try to gracefully shutdown the grpc server
ch := make(chan struct{})
go func() {
defer close(ch)
// Close listeners to stop accepting new connections,
// will block on any existing transports
s.grpcServer.GracefulStop()
}()

// Wait until all pending RPCs are finished
select {
case <-ch:
case <-ctx.Done():
// Took too long, manually close open transports
// e.g. watch streams
log.Warn("[tso] grpc server graceful shutdown timeout, forcing close")
s.grpcServer.Stop()
// concurrent GracefulStop should be interrupted
<-ch
}
}

func (s *Server) startServer() (err error) {
if s.clusterID, err = mcsutils.InitClusterID(s.ctx, s.etcdClient); err != nil {
return err
Expand Down Expand Up @@ -502,6 +548,7 @@ func (s *Server) startServer() (err error) {
return err
}
if tlsConfig != nil {
s.secure = true
s.muxListener, err = tls.Listen(mcsutils.TCPNetworkStr, s.listenURL.Host, tlsConfig)
} else {
s.muxListener, err = net.Listen(mcsutils.TCPNetworkStr, s.listenURL.Host)
Expand Down

0 comments on commit 9c1366e

Please sign in to comment.