Skip to content

Commit

Permalink
Add TSO primary/leader election loop.
Browse files Browse the repository at this point in the history
Add tso.Config.Adjust() to adjust config with default values if they're not in configuration file or passed from commandline.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Feb 20, 2023
1 parent 34bc29d commit 6362b67
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 17 deletions.
142 changes: 137 additions & 5 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
Expand All @@ -59,6 +60,7 @@ import (
)

const (
leaderTickInterval = 50 * time.Millisecond
// tsoRootPath for all tso servers.
tsoRootPath = "/tso"
tsoClusterIDPath = "/tso/cluster_id"
Expand Down Expand Up @@ -108,6 +110,8 @@ type Server struct {
// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
// primaryCallbacks will be called after the server becomes the primary.
primaryCallbacks []func(context.Context)
}

// Implement the following methods defined in bs.Server
Expand All @@ -133,7 +137,134 @@ func (s *Server) Run() error {
}

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
return s.startServer(s.ctx)
if err := s.startServer(s.ctx); err != nil {
return err
}

s.startServerLoop(s.ctx)

return nil
}

func (s *Server) startServerLoop(context.Context) {
s.serverLoopWg.Add(2)
go s.primaryElectionLoop()
go s.tsoAllocatorLoop()
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (s *Server) tsoAllocatorLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
s.tsoAllocatorManager.AllocatorDaemon(ctx)
log.Info("tso server is closed, exit allocator loop")
}

func (s *Server) primaryElectionLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

for {
if s.IsClosed() {
log.Info("server is closed, exit TSO primary election loop")
return
}

primary, rev, checkAgain := s.participant.CheckLeader()
if checkAgain {
continue
}
if primary != nil {
// TODO: if enable-local-tso is true, check the cluster dc-location after the primary/leader is elected
// go s.tsoAllocatorManager.ClusterDCLocationChecker()

log.Info("start to watch the leader", zap.Stringer("tso-leader", primary))
// WatchLeader will keep looping and never return unless the primary/leader has changed.
s.participant.WatchLeader(s.serverLoopCtx, primary, rev)
log.Info("the tso primary/leader has changed, try to re-campaign a primary/leader")
}

s.campaignLeader()
}
}

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-tso-primary-name", s.Name()))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign pd leader meets error due to txn conflict, another PD server may campaign successfully",
zap.String("campaign-pd-leader-name", s.Name()))
} else {
log.Error("campaign pd leader meets error due to etcd error",
zap.String("campaign-pd-leader-name", s.Name()),
errs.ZapError(err))
}
return
}

// Start keepalive the leadership and enable TSO service.
// TSO service is strictly enabled/disabled by the leader lease for 2 reasons:
// 1. lease based approach is not affected by thread pause, slow runtime schedule, etc.
// 2. load region could be slow. Based on lease we can recover TSO service faster.
ctx, cancel := context.WithCancel(s.serverLoopCtx)
var resetLeaderOnce sync.Once
defer resetLeaderOnce.Do(func() {
cancel()
s.participant.ResetLeader()
})

// maintain the the leadership, after this, TSO can be service.
s.participant.KeepLeader(ctx)
log.Info("campaign tso leader ok", zap.String("campaign-tso-leader-name", s.Name()))

allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get the global TSO allocator", errs.ZapError(err))
return
}
log.Info("initializing the global TSO allocator")
if err := allocator.Initialize(0); err != nil {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return
}
defer func() {
s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)
failpoint.Inject("updateAfterResetTSO", func() {
if err = allocator.UpdateTSO(); err != nil {
panic(err)
}
})
}()

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
cb(ctx)
}

// TODO: if enable-local-tso is true, check the cluster dc-location after the primary/leader is elected
// go s.tsoAllocatorManager.ClusterDCLocationChecker()

log.Info("TSO cluster primary server is ready to serve", zap.String("tso-primary-name", s.Name()))

leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()

for {
select {
case <-leaderTicker.C:
if !s.participant.IsLeader() {
log.Info("no longer a leader because lease has expired, pd leader will step down")
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
return
}
}
}

// Close closes the server.
Expand All @@ -144,9 +275,11 @@ func (s *Server) Close() {
}

log.Info("Closing TSO Server ...")

// TODO: double check when muxListener is closed, grpc.Server.serve() and http.Server.serve()
// will also close with error cmux.ErrListenerClosed.
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

if s.etcdClient != nil {
Expand Down Expand Up @@ -184,11 +317,10 @@ func (s *Server) IsServing() bool {
return false
}

// AddServiceReadyCallback implments basicserver. it adds a callback when the server
// becomes the leader if there is embedded etcd, or the primary otherwise.
// AddServiceReadyCallback implments basicserver. It adds a callback when the server
// becomes the primary.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
// Do nothing here before integrating with cross-dc txn feature and electing
// the Global TSO Allocator.
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

// Implement the other methods
Expand Down
135 changes: 123 additions & 12 deletions pkg/tso/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@
package tso

import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
Expand All @@ -26,17 +32,15 @@ import (
"go.uber.org/zap"
)

const (
// defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`.
defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
defaultMaxResetTSGap = 24 * time.Hour
)

// Config is the configuration for the TSO.
type Config struct {
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"`
ListenAddr string `toml:"listen-addr" json:"listen-addr"`

Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
EnableGRPCGateway bool `json:"enable-grpc-gateway"`

// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
Expand All @@ -61,6 +65,11 @@ type Config struct {
// WarningMsgs contains all warnings during parsing.
WarningMsgs []string

// LeaderLease defines the time within which a TSO primary/leader must update its TTL
// in etcd, otherwise etcd will expire the leader key and other servers can campaign
// the primary/leader again. Etcd only supports seconds TTL, so here is second too.
LeaderLease int64 `toml:"lease" json:"lease"`

// Log related config.
Log log.Config `toml:"log" json:"log"`

Expand All @@ -75,6 +84,25 @@ func NewConfig() *Config {
return &Config{}
}

const (
defaultLeaderLease = int64(3)
defaultMaxResetTSGap = 24 * time.Hour

defaultName = "TSO"
defaultBackendEndpoints = "http://127.0.0.1:2379"
defaultListenAddr = "http://127.0.0.1:2381"
defaultEnableGRPCGateway = true

defaultTSOSaveInterval = time.Duration(defaultLeaderLease) * time.Second
defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
maxTSOUpdatePhysicalInterval = 10 * time.Second
minTSOUpdatePhysicalInterval = 1 * time.Millisecond

defaultDashboardAddress = "auto"
defaultLogFormat = "text"
defaultDisableErrorVerbose = true
)

// IsLocalTSOEnabled returns if the local TSO is enabled.
func (c *Config) IsLocalTSOEnabled() bool {
return c.EnableLocalTSO
Expand All @@ -98,17 +126,18 @@ func (c *Config) GetTLSConfig() *grpcutil.TLSConfig {
// Parse parses flag definitions from the argument list.
func (c *Config) Parse(flagSet *pflag.FlagSet) error {
// Load config file if specified.
var (
meta *toml.MetaData
err error
)
if configFile, _ := flagSet.GetString("config"); configFile != "" {
_, err := configutil.ConfigFromFile(c, configFile)
meta, err = configutil.ConfigFromFile(c, configFile)
if err != nil {
return err
}
}

// TODO: add commandline parameter
configutil.AdjustDuration(&c.MaxResetTSGap, defaultMaxResetTSGap)

// ignore the error check here
// Ignore the error check here
configutil.AdjustCommandlineString(flagSet, &c.Log.Level, "log-level")
configutil.AdjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file")
configutil.AdjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr")
Expand All @@ -118,6 +147,88 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error {
configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints")
configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr")

// TODO: Implement the main function body
return c.Adjust(meta, false)
}

// Adjust is used to adjust the PD configurations.
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := configutil.NewConfigMetadata(meta)
if err := configMetaData.CheckUndecoded(); err != nil {
c.WarningMsgs = append(c.WarningMsgs, err.Error())
}

if c.Name == "" {
hostname, err := os.Hostname()
if err != nil {
return err
}
configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname))
}
configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name))
adjustPath(&c.DataDir)

if err := c.Validate(); err != nil {
return err
}

configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints)
configutil.AdjustString(&c.ListenAddr, defaultListenAddr)

configutil.AdjustDuration(&c.MaxResetTSGap, defaultMaxResetTSGap)
configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)
configutil.AdjustDuration(&c.TSOSaveInterval, defaultTSOSaveInterval)
configutil.AdjustDuration(&c.TSOUpdatePhysicalInterval, defaultTSOUpdatePhysicalInterval)

if c.TSOUpdatePhysicalInterval.Duration > maxTSOUpdatePhysicalInterval {
c.TSOUpdatePhysicalInterval.Duration = maxTSOUpdatePhysicalInterval
} else if c.TSOUpdatePhysicalInterval.Duration < minTSOUpdatePhysicalInterval {
c.TSOUpdatePhysicalInterval.Duration = minTSOUpdatePhysicalInterval
}

if !configMetaData.IsDefined("enable-grpc-gateway") {
c.EnableGRPCGateway = defaultEnableGRPCGateway
}

c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
c.Log.Format = defaultLogFormat
}

return nil
}

func adjustPath(p *string) {
absPath, err := filepath.Abs(*p)
if err == nil {
*p = absPath
}
}

func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("disable-error-verbose") {
c.Log.DisableErrorVerbose = defaultDisableErrorVerbose
}
}

// Validate is used to validate if some configurations are right.
func (c *Config) Validate() error {
dataDir, err := filepath.Abs(c.DataDir)
if err != nil {
return errors.WithStack(err)
}
logFile, err := filepath.Abs(c.Log.File.Filename)
if err != nil {
return errors.WithStack(err)
}
rel, err := filepath.Rel(dataDir, filepath.Dir(logFile))
if err != nil {
return errors.WithStack(err)
}
if !strings.HasPrefix(rel, "..") {
return errors.New("log directory shouldn't be the subdirectory of data directory")
}

return nil
}

0 comments on commit 6362b67

Please sign in to comment.