diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 3c93eae8a85..8709f0d7645 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -143,6 +143,16 @@ type ElectionMember interface { PrecheckLeader() error } +// ConfigProvider is used to provide TSO configuration. +type ConfigProvider interface { + IsLocalTSOEnabled() bool + GetLeaderLease() int64 + GetTSOSaveInterval() time.Duration + GetTSOUpdatePhysicalInterval() time.Duration + GetMaxResetTSGap() time.Duration + GetTLSConfig() *grpcutil.TLSConfig +} + // AllocatorManager is used to manage the TSO Allocators a PD server holds. // It is in charge of maintaining TSO allocators' leadership, checking election // priority, and forwarding TSO allocation requests to correct TSO Allocators. @@ -193,17 +203,12 @@ type AllocatorManager struct { // NewAllocatorManager creates a new TSO Allocator Manager. func NewAllocatorManager( ctx context.Context, - startGlobalLeaderLoop bool, keyspaceGroupID uint32, member ElectionMember, rootPath string, storage endpoint.TSOStorage, - enableLocalTSO bool, - saveInterval time.Duration, - updatePhysicalInterval time.Duration, - leaderLease int64, - tlsConfig *grpcutil.TLSConfig, - maxResetTSGap func() time.Duration, + configProvider ConfigProvider, + startGlobalLeaderLoop bool, ) *AllocatorManager { ctx, cancel := context.WithCancel(ctx) am := &AllocatorManager{ @@ -213,12 +218,12 @@ func NewAllocatorManager( member: member, rootPath: rootPath, storage: storage, - enableLocalTSO: enableLocalTSO, - saveInterval: saveInterval, - updatePhysicalInterval: updatePhysicalInterval, - leaderLease: leaderLease, - maxResetTSGap: maxResetTSGap, - securityConfig: tlsConfig, + enableLocalTSO: configProvider.IsLocalTSOEnabled(), + saveInterval: configProvider.GetTSOSaveInterval(), + updatePhysicalInterval: configProvider.GetTSOUpdatePhysicalInterval(), + leaderLease: configProvider.GetLeaderLease(), + maxResetTSGap: configProvider.GetMaxResetTSGap, + securityConfig: configProvider.GetTLSConfig(), } am.mu.allocatorGroups = make(map[string]*allocatorGroup) am.mu.clusterDCLocations = make(map[string]*DCLocationInfo) @@ -226,6 +231,8 @@ func NewAllocatorManager( // Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully. am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership(), startGlobalLeaderLoop) + am.svcLoopWG.Add(1) + go am.tsoAllocatorLoop() return am } @@ -247,11 +254,6 @@ func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership leadership: leadership, allocator: allocator, } - - if startGlobalLeaderLoop { - am.svcLoopWG.Add(1) - go am.tsoAllocatorLoop() - } } // setUpLocalAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon. @@ -279,7 +281,6 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc } // tsoAllocatorLoop is used to run the TSO Allocator updating daemon. -// tso service starts the loop here, but pd starts its own loop. func (am *AllocatorManager) tsoAllocatorLoop() { defer logutil.LogPanic() defer am.svcLoopWG.Done() diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index ccd5ddce169..2598da4a6b0 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "path" - "time" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -74,8 +73,7 @@ type KeyspaceGroupManager struct { // Note: The {group} is 5 digits integer with leading zeros. tsoSvcRootPath string // cfg is the TSO config - cfg ServiceConfig - maxResetTSGap func() time.Duration + cfg ServiceConfig } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. @@ -102,7 +100,6 @@ func NewKeyspaceGroupManager( defaultKsgStorageTSRootPath: defaultKsgStorageTSRootPath, tsoSvcRootPath: tsoSvcRootPath, cfg: cfg, - maxResetTSGap: func() time.Duration { return cfg.GetMaxResetTSGap() }, } return ksgMgr @@ -127,11 +124,9 @@ func (kgm *KeyspaceGroupManager) initDefaultKeyspaceGroup() { defaultKsgGroupStorage := endpoint.NewStorageEndpoint(kv.NewEtcdKVBase(kgm.etcdClient, kgm.defaultKsgStorageTSRootPath), nil) kgm.ksgAllocatorManagers[mcsutils.DefaultKeySpaceGroupID] = NewAllocatorManager( - kgm.ctx, true, mcsutils.DefaultKeySpaceGroupID, participant, + kgm.ctx, mcsutils.DefaultKeySpaceGroupID, participant, kgm.defaultKsgStorageTSRootPath, defaultKsgGroupStorage, - kgm.cfg.IsLocalTSOEnabled(), kgm.cfg.GetTSOSaveInterval(), - kgm.cfg.GetTSOUpdatePhysicalInterval(), kgm.cfg.GetLeaderLease(), - kgm.cfg.GetTLSConfig(), kgm.maxResetTSGap) + kgm.cfg, true) } // GetAllocatorManager returns the AllocatorManager of the given keyspace group diff --git a/server/server.go b/server/server.go index 23be7dba826..b2573032d7f 100644 --- a/server/server.go +++ b/server/server.go @@ -405,12 +405,7 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} if !s.IsAPIServiceMode() { - s.tsoAllocatorManager = tso.NewAllocatorManager( - s.ctx, false, mcs.DefaultKeySpaceGroupID, s.member, s.rootPath, s.storage, s.cfg.IsLocalTSOEnabled(), - s.cfg.GetTSOSaveInterval(), s.cfg.GetTSOUpdatePhysicalInterval(), s.cfg.GetLeaderLease(), s.cfg.GetTLSConfig(), - func() time.Duration { return s.persistOptions.GetMaxResetTSGap() }) - // Set up the Global TSO Allocator here, it will be initialized once the PD campaigns leader successfully. - s.tsoAllocatorManager.SetUpGlobalAllocator(ctx, s.member.GetLeadership(), false) + s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, mcs.DefaultKeySpaceGroupID, s.member, s.rootPath, s.storage, s, false) // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. if !s.cfg.EnableLocalTSO { if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil { @@ -564,9 +559,6 @@ func (s *Server) startServerLoop(ctx context.Context) { if s.IsAPIServiceMode() { // disable tso service in api server s.serverLoopWg.Add(1) go s.watchServicePrimaryAddrLoop(mcs.TSOServiceName) - } else { // enable tso service - s.serverLoopWg.Add(1) - go s.tsoAllocatorLoop() } } @@ -592,17 +584,6 @@ func (s *Server) serverMetricsLoop() { } } -// tsoAllocatorLoop is used to run the TSO Allocator updating daemon. -func (s *Server) tsoAllocatorLoop() { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() - - ctx, cancel := context.WithCancel(s.serverLoopCtx) - defer cancel() - s.tsoAllocatorManager.AllocatorDaemon(ctx) - log.Info("server is closed, exit allocator loop") -} - // encryptionKeyManagerLoop is used to start monitor encryption key changes. func (s *Server) encryptionKeyManagerLoop() { defer logutil.LogPanic() @@ -1813,3 +1794,28 @@ func (s *Server) SetExternalTS(externalTS, globalTS uint64) error { s.GetRaftCluster().SetExternalTS(externalTS) return nil } + +// IsLocalTSOEnabled returns if the local TSO is enabled. +func (s *Server) IsLocalTSOEnabled() bool { + return s.cfg.IsLocalTSOEnabled() +} + +// GetLeaderLease returns the leader lease. +func (s *Server) GetLeaderLease() int64 { + return s.cfg.GetLeaderLease() +} + +// GetTSOSaveInterval returns TSO save interval. +func (s *Server) GetTSOSaveInterval() time.Duration { + return s.cfg.GetTSOSaveInterval() +} + +// GetTSOUpdatePhysicalInterval returns TSO update physical interval. +func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration { + return s.cfg.GetTSOUpdatePhysicalInterval() +} + +// GetMaxResetTSGap gets the max gap to reset the tso. +func (s *Server) GetMaxResetTSGap() time.Duration { + return s.persistOptions.GetMaxResetTSGap() +}