From 6362b67a208bfe02e62fd361e818824ddbe7dced Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Sun, 19 Feb 2023 21:44:25 -0800 Subject: [PATCH] Add TSO primary/leader election loop. Add tso.Config.Adjust() to adjust config with default values if they're not in configuration file or passed from commandline. Signed-off-by: Bin Shi --- pkg/mcs/tso/server/server.go | 142 +++++++++++++++++++++++++++++++++-- pkg/tso/config.go | 135 ++++++++++++++++++++++++++++++--- 2 files changed, 260 insertions(+), 17 deletions(-) diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index e9f93286cfbf..db042290eda1 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -33,6 +33,7 @@ import ( "time" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" @@ -59,6 +60,7 @@ import ( ) const ( + leaderTickInterval = 50 * time.Millisecond // tsoRootPath for all tso servers. tsoRootPath = "/tso" tsoClusterIDPath = "/tso/cluster_id" @@ -108,6 +110,8 @@ type Server struct { // Callback functions for different stages // startCallbacks will be called after the server is started. startCallbacks []func() + // primaryCallbacks will be called after the server becomes the primary. + primaryCallbacks []func(context.Context) } // Implement the following methods defined in bs.Server @@ -133,7 +137,134 @@ func (s *Server) Run() error { } s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - return s.startServer(s.ctx) + if err := s.startServer(s.ctx); err != nil { + return err + } + + s.startServerLoop(s.ctx) + + return nil +} + +func (s *Server) startServerLoop(context.Context) { + s.serverLoopWg.Add(2) + go s.primaryElectionLoop() + go s.tsoAllocatorLoop() +} + +// tsoAllocatorLoop is used to run the TSO Allocator updating daemon. +func (s *Server) tsoAllocatorLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + ctx, cancel := context.WithCancel(s.serverLoopCtx) + defer cancel() + s.tsoAllocatorManager.AllocatorDaemon(ctx) + log.Info("tso server is closed, exit allocator loop") +} + +func (s *Server) primaryElectionLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + for { + if s.IsClosed() { + log.Info("server is closed, exit TSO primary election loop") + return + } + + primary, rev, checkAgain := s.participant.CheckLeader() + if checkAgain { + continue + } + if primary != nil { + // TODO: if enable-local-tso is true, check the cluster dc-location after the primary/leader is elected + // go s.tsoAllocatorManager.ClusterDCLocationChecker() + + log.Info("start to watch the leader", zap.Stringer("tso-leader", primary)) + // WatchLeader will keep looping and never return unless the primary/leader has changed. + s.participant.WatchLeader(s.serverLoopCtx, primary, rev) + log.Info("the tso primary/leader has changed, try to re-campaign a primary/leader") + } + + s.campaignLeader() + } +} + +func (s *Server) campaignLeader() { + log.Info("start to campaign the primary/leader", zap.String("campaign-tso-primary-name", s.Name())) + if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err.Error() == errs.ErrEtcdTxnConflict.Error() { + log.Info("campaign pd leader meets error due to txn conflict, another PD server may campaign successfully", + zap.String("campaign-pd-leader-name", s.Name())) + } else { + log.Error("campaign pd leader meets error due to etcd error", + zap.String("campaign-pd-leader-name", s.Name()), + errs.ZapError(err)) + } + return + } + + // Start keepalive the leadership and enable TSO service. + // TSO service is strictly enabled/disabled by the leader lease for 2 reasons: + // 1. lease based approach is not affected by thread pause, slow runtime schedule, etc. + // 2. load region could be slow. Based on lease we can recover TSO service faster. + ctx, cancel := context.WithCancel(s.serverLoopCtx) + var resetLeaderOnce sync.Once + defer resetLeaderOnce.Do(func() { + cancel() + s.participant.ResetLeader() + }) + + // maintain the the leadership, after this, TSO can be service. + s.participant.KeepLeader(ctx) + log.Info("campaign tso leader ok", zap.String("campaign-tso-leader-name", s.Name())) + + allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) + if err != nil { + log.Error("failed to get the global TSO allocator", errs.ZapError(err)) + return + } + log.Info("initializing the global TSO allocator") + if err := allocator.Initialize(0); err != nil { + log.Error("failed to initialize the global TSO allocator", errs.ZapError(err)) + return + } + defer func() { + s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation) + failpoint.Inject("updateAfterResetTSO", func() { + if err = allocator.UpdateTSO(); err != nil { + panic(err) + } + }) + }() + + log.Info("triggering the primary callback functions") + for _, cb := range s.primaryCallbacks { + cb(ctx) + } + + // TODO: if enable-local-tso is true, check the cluster dc-location after the primary/leader is elected + // go s.tsoAllocatorManager.ClusterDCLocationChecker() + + log.Info("TSO cluster primary server is ready to serve", zap.String("tso-primary-name", s.Name())) + + leaderTicker := time.NewTicker(leaderTickInterval) + defer leaderTicker.Stop() + + for { + select { + case <-leaderTicker.C: + if !s.participant.IsLeader() { + log.Info("no longer a leader because lease has expired, pd leader will step down") + return + } + case <-ctx.Done(): + // Server is closed and it should return nil. + log.Info("server is closed") + return + } + } } // Close closes the server. @@ -144,9 +275,11 @@ func (s *Server) Close() { } log.Info("Closing TSO Server ...") + // TODO: double check when muxListener is closed, grpc.Server.serve() and http.Server.serve() // will also close with error cmux.ErrListenerClosed. s.muxListener.Close() + s.serverLoopCancel() s.serverLoopWg.Wait() if s.etcdClient != nil { @@ -184,11 +317,10 @@ func (s *Server) IsServing() bool { return false } -// AddServiceReadyCallback implments basicserver. it adds a callback when the server -// becomes the leader if there is embedded etcd, or the primary otherwise. +// AddServiceReadyCallback implments basicserver. It adds a callback when the server +// becomes the primary. func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { - // Do nothing here before integrating with cross-dc txn feature and electing - // the Global TSO Allocator. + s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) } // Implement the other methods diff --git a/pkg/tso/config.go b/pkg/tso/config.go index 4bfaffba57b1..0e05a0d71f31 100644 --- a/pkg/tso/config.go +++ b/pkg/tso/config.go @@ -15,9 +15,15 @@ package tso import ( + "fmt" + "os" + "path/filepath" + "strings" "time" + "github.com/BurntSushi/toml" "github.com/pingcap/log" + "github.com/pkg/errors" "github.com/spf13/pflag" "github.com/tikv/pd/pkg/utils/configutil" "github.com/tikv/pd/pkg/utils/grpcutil" @@ -26,17 +32,15 @@ import ( "go.uber.org/zap" ) -const ( - // defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`. - defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond - defaultMaxResetTSGap = 24 * time.Hour -) - // Config is the configuration for the TSO. type Config struct { BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"` ListenAddr string `toml:"listen-addr" json:"listen-addr"` + Name string `toml:"name" json:"name"` + DataDir string `toml:"data-dir" json:"data-dir"` + EnableGRPCGateway bool `json:"enable-grpc-gateway"` + // EnableLocalTSO is used to enable the Local TSO Allocator feature, // which allows the PD server to generate Local TSO for certain DC-level transactions. // To make this feature meaningful, user has to set the "zone" label for the PD server @@ -61,6 +65,11 @@ type Config struct { // WarningMsgs contains all warnings during parsing. WarningMsgs []string + // LeaderLease defines the time within which a TSO primary/leader must update its TTL + // in etcd, otherwise etcd will expire the leader key and other servers can campaign + // the primary/leader again. Etcd only supports seconds TTL, so here is second too. + LeaderLease int64 `toml:"lease" json:"lease"` + // Log related config. Log log.Config `toml:"log" json:"log"` @@ -75,6 +84,25 @@ func NewConfig() *Config { return &Config{} } +const ( + defaultLeaderLease = int64(3) + defaultMaxResetTSGap = 24 * time.Hour + + defaultName = "TSO" + defaultBackendEndpoints = "http://127.0.0.1:2379" + defaultListenAddr = "http://127.0.0.1:2381" + defaultEnableGRPCGateway = true + + defaultTSOSaveInterval = time.Duration(defaultLeaderLease) * time.Second + defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond + maxTSOUpdatePhysicalInterval = 10 * time.Second + minTSOUpdatePhysicalInterval = 1 * time.Millisecond + + defaultDashboardAddress = "auto" + defaultLogFormat = "text" + defaultDisableErrorVerbose = true +) + // IsLocalTSOEnabled returns if the local TSO is enabled. func (c *Config) IsLocalTSOEnabled() bool { return c.EnableLocalTSO @@ -98,17 +126,18 @@ func (c *Config) GetTLSConfig() *grpcutil.TLSConfig { // Parse parses flag definitions from the argument list. func (c *Config) Parse(flagSet *pflag.FlagSet) error { // Load config file if specified. + var ( + meta *toml.MetaData + err error + ) if configFile, _ := flagSet.GetString("config"); configFile != "" { - _, err := configutil.ConfigFromFile(c, configFile) + meta, err = configutil.ConfigFromFile(c, configFile) if err != nil { return err } } - // TODO: add commandline parameter - configutil.AdjustDuration(&c.MaxResetTSGap, defaultMaxResetTSGap) - - // ignore the error check here + // Ignore the error check here configutil.AdjustCommandlineString(flagSet, &c.Log.Level, "log-level") configutil.AdjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file") configutil.AdjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr") @@ -118,6 +147,88 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error { configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints") configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr") - // TODO: Implement the main function body + return c.Adjust(meta, false) +} + +// Adjust is used to adjust the PD configurations. +func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { + configMetaData := configutil.NewConfigMetadata(meta) + if err := configMetaData.CheckUndecoded(); err != nil { + c.WarningMsgs = append(c.WarningMsgs, err.Error()) + } + + if c.Name == "" { + hostname, err := os.Hostname() + if err != nil { + return err + } + configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname)) + } + configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name)) + adjustPath(&c.DataDir) + + if err := c.Validate(); err != nil { + return err + } + + configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints) + configutil.AdjustString(&c.ListenAddr, defaultListenAddr) + + configutil.AdjustDuration(&c.MaxResetTSGap, defaultMaxResetTSGap) + configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease) + configutil.AdjustDuration(&c.TSOSaveInterval, defaultTSOSaveInterval) + configutil.AdjustDuration(&c.TSOUpdatePhysicalInterval, defaultTSOUpdatePhysicalInterval) + + if c.TSOUpdatePhysicalInterval.Duration > maxTSOUpdatePhysicalInterval { + c.TSOUpdatePhysicalInterval.Duration = maxTSOUpdatePhysicalInterval + } else if c.TSOUpdatePhysicalInterval.Duration < minTSOUpdatePhysicalInterval { + c.TSOUpdatePhysicalInterval.Duration = minTSOUpdatePhysicalInterval + } + + if !configMetaData.IsDefined("enable-grpc-gateway") { + c.EnableGRPCGateway = defaultEnableGRPCGateway + } + + c.adjustLog(configMetaData.Child("log")) + c.Security.Encryption.Adjust() + + if len(c.Log.Format) == 0 { + c.Log.Format = defaultLogFormat + } + + return nil +} + +func adjustPath(p *string) { + absPath, err := filepath.Abs(*p) + if err == nil { + *p = absPath + } +} + +func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { + if !meta.IsDefined("disable-error-verbose") { + c.Log.DisableErrorVerbose = defaultDisableErrorVerbose + } +} + +// Validate is used to validate if some configurations are right. +func (c *Config) Validate() error { + dataDir, err := filepath.Abs(c.DataDir) + if err != nil { + return errors.WithStack(err) + } + logFile, err := filepath.Abs(c.Log.File.Filename) + if err != nil { + return errors.WithStack(err) + } + rel, err := filepath.Rel(dataDir, filepath.Dir(logFile)) + if err != nil { + return errors.WithStack(err) + } + if !strings.HasPrefix(rel, "..") { + return errors.New("log directory shouldn't be the subdirectory of data directory") + } + return nil }