diff --git a/pkg/mcs/discovery/key_path.go b/pkg/mcs/discovery/key_path.go index 4eb339dd5dbe..d77aa98f3a9c 100644 --- a/pkg/mcs/discovery/key_path.go +++ b/pkg/mcs/discovery/key_path.go @@ -17,21 +17,22 @@ package discovery import ( "strconv" "strings" + + "github.com/tikv/pd/pkg/mcs/utils" ) const ( - registryPrefix = "/ms" - registryKey = "registry" + registryKey = "registry" ) // RegistryPath returns the full path to store microservice addresses. func RegistryPath(clusterID, serviceName, serviceAddr string) string { - return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/") + return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey, serviceAddr}, "/") } // ServicePath returns the path to store microservice addresses. func ServicePath(clusterID, serviceName string) string { - return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/") + return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey}, "/") } // TSOPath returns the path to store TSO addresses. diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index a0ccb52ed38d..a4a1423ee40d 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -39,6 +39,7 @@ import ( "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" @@ -366,10 +367,10 @@ func (s *Server) startServer() (err error) { uniqueName := s.cfg.ListenAddr uniqueID := memberutil.GenerateUniqueID(uniqueName) log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) - resourceManagerPrimaryPrefix := fmt.Sprintf("/ms/%d/resource_manager", s.clusterID) + resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID) s.participant = member.NewParticipant(s.etcdClient) s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)), - "primary", "keyspace group primary election", s.cfg.AdvertiseListenAddr) + utils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", s.cfg.AdvertiseListenAddr) s.service = &Service{ ctx: s.ctx, diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 67e9c33f65ca..c94b5f00f782 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -23,7 +23,6 @@ import ( "net/url" "os" "os/signal" - "path" "strconv" "strings" "sync" @@ -46,6 +45,7 @@ import ( "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/systimemon" "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -63,12 +63,6 @@ import ( ) const ( - // pdRootPath is the old path for storing the tso related root path. - pdRootPath = "/pd" - msServiceRootPath = "/ms" - // tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes. - // format: "/ms/{cluster_id}/tso". - tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName // maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID. maxRetryTimesWaitAPIService = 360 // retryIntervalWaitAPIService is the interval to retry. @@ -535,8 +529,8 @@ func (s *Server) startServer() (err error) { // Initialize the TSO service. s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10)) - tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID) + legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID) + tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID) s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index c87cec16a64c..26c0f3698196 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -49,8 +49,8 @@ const ( // We also reserved 0 for the keyspace group for the same purpose. DefaultKeyspaceGroupID = uint32(0) - // MicroserviceKey is the key of microservice. - MicroserviceKey = "ms" + // MicroserviceRootPath is the root path of microservice in etcd. + MicroserviceRootPath = "/ms" // APIServiceName is the name of api server. APIServiceName = "api" // TSOServiceName is the name of tso server. @@ -59,6 +59,8 @@ const ( ResourceManagerServiceName = "resource_manager" // KeyspaceGroupsKey is the path component of keyspace groups. KeyspaceGroupsKey = "keyspace_groups" + // KeyspaceGroupsPrimaryKey is the path component of primary for keyspace groups. + KeyspaceGroupsPrimaryKey = "primary" // MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso // is the sharding unit, i.e., by the definition here, the max count of the shards diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 019e75dc12e2..52a09ae381d1 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -25,6 +25,7 @@ import ( ) const ( +<<<<<<< HEAD clusterPath = "raft" configPath = "config" serviceMiddlewarePath = "service_middleware" @@ -36,6 +37,21 @@ const ( replicationPath = "replication_mode" customScheduleConfigPath = "scheduler_config" gcWorkerServiceSafePointID = "gc_worker" +======= + pdRootPath = "/pd" + clusterPath = "raft" + configPath = "config" + serviceMiddlewarePath = "service_middleware" + schedulePath = "schedule" + gcPath = "gc" + rulesPath = "rules" + ruleGroupPath = "rule_group" + regionLabelPath = "region_label" + replicationPath = "replication_mode" + customScheduleConfigPath = "scheduler_config" + // GCWorkerServiceSafePointID is the service id of GC worker. + GCWorkerServiceSafePointID = "gc_worker" +>>>>>>> dbc936698... *: move keyspace group primary path code to key_path.go (#6755) minResolvedTS = "min_resolved_ts" externalTimeStamp = "external_timestamp" keyspaceSafePointPrefix = "keyspaces/gc_safepoint" @@ -55,8 +71,9 @@ const ( // TimestampKey is the key of timestamp oracle used for the suffix. TimestampKey = "timestamp" - tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey - keyspaceGroupMembershipKey = "membership" + tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey + keyspaceGroupsMembershipKey = "membership" + keyspaceGroupsElectionKey = "election" // we use uint64 to represent ID, the max length of uint64 is 20. keyLen = 20 @@ -232,13 +249,13 @@ func encodeKeyspaceID(spaceID uint32) string { // KeyspaceGroupIDPrefix returns the prefix of keyspace group id. // Path: tso/keyspace_groups/membership func KeyspaceGroupIDPrefix() string { - return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey) + return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey) } // KeyspaceGroupIDPath returns the path to keyspace id from the given name. // Path: tso/keyspace_groups/membership/{id} func KeyspaceGroupIDPath(id uint32) string { - return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id)) + return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey, encodeKeyspaceGroupID(id)) } // GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id. @@ -247,6 +264,54 @@ func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp { return regexp.MustCompile(pattern) } +// ResourceManagerSvcRootPath returns the root path of resource manager service. +// Path: /ms/{cluster_id}/resource_manager +func ResourceManagerSvcRootPath(clusterID uint64) string { + return svcRootPath(clusterID, utils.ResourceManagerServiceName) +} + +// TSOSvcRootPath returns the root path of tso service. +// Path: /ms/{cluster_id}/tso +func TSOSvcRootPath(clusterID uint64) string { + return svcRootPath(clusterID, utils.TSOServiceName) +} + +func svcRootPath(clusterID uint64, svcName string) string { + c := strconv.FormatUint(clusterID, 10) + return path.Join(utils.MicroserviceRootPath, c, svcName) +} + +// LegacyRootPath returns the root path of legacy pd service. +// Path: /pd/{cluster_id} +func LegacyRootPath(clusterID uint64) string { + return path.Join(pdRootPath, strconv.FormatUint(clusterID, 10)) +} + +// KeyspaceGroupPrimaryPath returns the path of keyspace group primary. +// default keyspace group: "/ms/{cluster_id}/tso/00000/primary". +// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary". +func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string { + electionPath := KeyspaceGroupsElectionPath(rootPath, keyspaceGroupID) + return path.Join(electionPath, utils.KeyspaceGroupsPrimaryKey) +} + +// KeyspaceGroupsElectionPath returns the path of keyspace groups election. +// default keyspace group: "/ms/{cluster_id}/tso/00000". +// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}". +func KeyspaceGroupsElectionPath(rootPath string, keyspaceGroupID uint32) string { + if keyspaceGroupID == utils.DefaultKeyspaceGroupID { + return path.Join(rootPath, "00000") + } + return path.Join(rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, fmt.Sprintf("%05d", keyspaceGroupID)) +} + +// GetCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id. +func GetCompiledNonDefaultIDRegexp(clusterID uint64) *regexp.Regexp { + rootPath := TSOSvcRootPath(clusterID) + pattern := strings.Join([]string{rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, `(\d{5})`, utils.KeyspaceGroupsPrimaryKey + `$`}, "/") + return regexp.MustCompile(pattern) +} + // encodeKeyspaceGroupID from uint32 to string. func encodeKeyspaceGroupID(groupID uint32) string { return fmt.Sprintf("%05d", groupID) @@ -266,19 +331,33 @@ func buildPath(withSuffix bool, str ...string) string { return sb.String() } -// GetKeyspaceGroupTSPath constructs the timestampOracle path prefix, which is: +// KeyspaceGroupTSPath constructs the timestampOracle path prefix, which is: // 1. for the default keyspace group: // "" in /pd/{cluster_id}/timestamp // 2. for the non-default keyspace groups: // {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp -func GetKeyspaceGroupTSPath(groupID uint32) string { +func KeyspaceGroupTSPath(groupID uint32) string { if groupID == utils.DefaultKeyspaceGroupID { return "" } return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix) } -// GetTimestampPath returns the timestamp path for the given timestamp oracle path prefix. -func GetTimestampPath(tsPath string) string { +// TimestampPath returns the timestamp path for the given timestamp oracle path prefix. +func TimestampPath(tsPath string) string { return path.Join(tsPath, TimestampKey) } + +// FullTimestampPath returns the full timestamp path. +// 1. for the default keyspace group: +// /pd/{cluster_id}/timestamp +// 2. for the non-default keyspace groups: +// /ms/{cluster_id}/tso/{group}/gta/timestamp +func FullTimestampPath(clusterID uint64, groupID uint32) string { + rootPath := TSOSvcRootPath(clusterID) + tsPath := TimestampPath(KeyspaceGroupTSPath(groupID)) + if groupID == utils.DefaultKeyspaceGroupID { + rootPath = LegacyRootPath(clusterID) + } + return path.Join(rootPath, tsPath) +} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 07eec5490c8a..e81c05f9707b 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -97,7 +97,7 @@ func NewGlobalTSOAllocator( member: am.member, timestampOracle: ×tampOracle{ client: am.member.GetLeadership().GetClient(), - tsPath: endpoint.GetKeyspaceGroupTSPath(am.kgID), + tsPath: endpoint.KeyspaceGroupTSPath(am.kgID), storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 892deec65e48..04f5945901be 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -20,7 +20,6 @@ import ( "fmt" "math" "net/http" - "path" "regexp" "sort" "strings" @@ -51,9 +50,6 @@ import ( ) const ( - keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election" - // primaryKey is the key for keyspace group primary election. - primaryKey = "primary" // mergingCheckInterval is the interval for merging check to see if the keyspace groups // merging process could be moved forward. mergingCheckInterval = 5 * time.Second @@ -256,32 +252,6 @@ func (s *state) getNextPrimaryToReset( return nil, nil, 0, groupID } -// kgPrimaryPathBuilder builds the path for keyspace group primary election. -// default keyspace group: "/ms/{cluster_id}/tso/00000/primary". -// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary". -type kgPrimaryPathBuilder struct { - // rootPath is "/ms/{cluster_id}/tso". - rootPath string - // defaultKeyspaceGroupIDPath is "/ms/{cluster_id}/tso/00000". - defaultKeyspaceGroupIDPath string -} - -// getKeyspaceGroupIDPath returns the keyspace group primary ID path. -// default keyspace group: "/ms/{cluster_id}/tso/00000". -// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}". -func (p *kgPrimaryPathBuilder) getKeyspaceGroupIDPath(keyspaceGroupID uint32) string { - if keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID { - return p.defaultKeyspaceGroupIDPath - } - return path.Join(p.rootPath, keyspaceGroupsElectionPath, fmt.Sprintf("%05d", keyspaceGroupID)) -} - -// getCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id. -func (p *kgPrimaryPathBuilder) getCompiledNonDefaultIDRegexp() *regexp.Regexp { - pattern := strings.Join([]string{p.rootPath, keyspaceGroupsElectionPath, `(\d{5})`, primaryKey + `$`}, "/") - return regexp.MustCompile(pattern) -} - // KeyspaceGroupManager manages the members of the keyspace groups assigned to this host. // The replicas campaign for the leaders which provide the tso service for the corresponding // keyspace groups. @@ -355,7 +325,6 @@ type KeyspaceGroupManager struct { // mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group. mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc - primaryPathBuilder *kgPrimaryPathBuilder primaryPriorityCheckInterval time.Duration // tsoNodes is the registered tso servers. @@ -406,10 +375,6 @@ func NewKeyspaceGroupManager( kgm.tsoSvcStorage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil) kgm.compiledKGMembershipIDRegexp = endpoint.GetCompiledKeyspaceGroupIDRegexp() - kgm.primaryPathBuilder = &kgPrimaryPathBuilder{ - rootPath: kgm.tsoSvcRootPath, - defaultKeyspaceGroupIDPath: path.Join(kgm.tsoSvcRootPath, "00000"), - } kgm.state.initialize() return kgm } @@ -718,8 +683,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro // Initialize the participant info to join the primary election. participant := member.NewParticipant(kgm.etcdClient) participant.InitInfo( - uniqueName, uniqueID, kgm.primaryPathBuilder.getKeyspaceGroupIDPath(group.ID), - primaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr()) + uniqueName, uniqueID, endpoint.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), + mcsutils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr()) // If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group // is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot // be broken until the entire split process is completed. @@ -1276,7 +1241,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget // Check if the keyspace group primaries in the merge map are all gone. if len(mergeMap) != 0 { for id := range mergeMap { - leaderPath := path.Join(kgm.primaryPathBuilder.getKeyspaceGroupIDPath(id), primaryKey) + leaderPath := endpoint.KeyspaceGroupPrimaryPath(kgm.tsoSvcRootPath, id) val, err := kgm.tsoSvcStorage.Load(leaderPath) if err != nil { log.Error("failed to check if the keyspace group primary in the merge list has gone", @@ -1305,7 +1270,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget // calculate the newly merged TSO to make sure it is greater than the original ones. var mergedTS time.Time for _, id := range mergeList { - ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(id)) + ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(id)) if err != nil { log.Error("failed to load the keyspace group TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), @@ -1455,8 +1420,8 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() { // Clean up the remaining TSO keys. // TODO: support the Local TSO Allocator clean up. err := kgm.tsoSvcStorage.DeleteTimestamp( - endpoint.GetTimestampPath( - endpoint.GetKeyspaceGroupTSPath(groupID), + endpoint.TimestampPath( + endpoint.KeyspaceGroupTSPath(groupID), ), ) if err != nil { diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 09baac9a5924..600c46348a4b 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -110,7 +110,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr})}) // Check if the TSO key is created. testutil.Eventually(re, func() bool { - ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(1)) + ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(1)) re.NoError(err) return ts != typeutil.ZeroTime }) @@ -118,7 +118,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupDeleteEvent(1)}) // Check if the TSO key is deleted. testutil.Eventually(re, func() bool { - ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(1)) + ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(1)) re.NoError(err) return ts == typeutil.ZeroTime }) @@ -137,7 +137,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { re.NotContains(mgr.deletedGroups, mcsutils.DefaultKeyspaceGroupID) mgr.RUnlock() // Default keyspace group TSO key should NOT be deleted. - ts, err := mgr.legacySvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(mcsutils.DefaultKeyspaceGroupID)) + ts, err := mgr.legacySvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(mcsutils.DefaultKeyspaceGroupID)) re.NoError(err) re.NotEmpty(ts) @@ -153,7 +153,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { guid := uuid.New().String() tsoServiceKey := discovery.ServicePath(guid, "tso") + "/" legacySvcRootPath := path.Join("/pd", guid) - tsoSvcRootPath := path.Join("/ms", guid, "tso") + tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, guid, "tso") electionNamePrefix := "tso-server-" + guid kgm := NewKeyspaceGroupManager( @@ -819,7 +819,7 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()} tsoServiceKey := discovery.ServicePath(uniqueStr, "tso") + "/" legacySvcRootPath := path.Join("/pd", uniqueStr) - tsoSvcRootPath := path.Join("/ms", uniqueStr, "tso") + tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, uniqueStr, "tso") electionNamePrefix := "kgm-test-" + cfg.GetAdvertiseListenAddr() kgm := NewKeyspaceGroupManager( diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 757f7df8d8d2..33da6e8d11f9 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -140,7 +140,7 @@ func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int // GetTimestampPath returns the timestamp path in etcd. func (t *timestampOracle) GetTimestampPath() string { - return endpoint.GetTimestampPath(t.tsPath) + return endpoint.TimestampPath(t.tsPath) } // SyncTimestamp is used to synchronize the timestamp. diff --git a/pkg/tso/util_test.go b/pkg/tso/util_test.go index 8b7c7a4ce0c0..8e4971797d59 100644 --- a/pkg/tso/util_test.go +++ b/pkg/tso/util_test.go @@ -15,7 +15,6 @@ package tso import ( - "path" "testing" "github.com/stretchr/testify/require" @@ -75,13 +74,7 @@ func TestExtractKeyspaceGroupIDFromKeyspaceGroupMembershipPath(t *testing.T) { func TestExtractKeyspaceGroupIDFromKeyspaceGroupPrimaryPath(t *testing.T) { re := require.New(t) - tsoSvcRootPath := "/ms/111/tso" - primaryPathBuilder := &kgPrimaryPathBuilder{ - rootPath: tsoSvcRootPath, - defaultKeyspaceGroupIDPath: path.Join(tsoSvcRootPath, "00000"), - } - - compiledRegexp := primaryPathBuilder.getCompiledNonDefaultIDRegexp() + compiledRegexp := endpoint.GetCompiledNonDefaultIDRegexp(uint64(111)) rightCases := []struct { path string diff --git a/server/server.go b/server/server.go index 94d3d6e29b09..8a512531d087 100644 --- a/server/server.go +++ b/server/server.go @@ -1835,13 +1835,10 @@ func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { s.servicePrimaryMap.Store(serviceName, addr) } -func (s *Server) servicePrimaryKey(serviceName string) string { - return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary") -} - func (s *Server) initTSOPrimaryWatcher() { serviceName := mcs.TSOServiceName - tsoServicePrimaryKey := s.servicePrimaryKey(serviceName) + tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID) + tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID) putFn := func(kv *mvccpb.KeyValue) error { primary := &tsopb.Participant{} // TODO: use Generics if err := proto.Unmarshal(kv.Value, primary); err != nil { diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 3f00f85cacb7..ed4a1b964b0d 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -16,8 +16,6 @@ package tso import ( "context" - "fmt" - "strconv" "strings" "sync" "testing" @@ -200,20 +198,10 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe // Make sure every keyspace group is using the right timestamp path // for loading/saving timestamp from/to etcd and the right primary path // for primary election. - var ( - timestampPath string - primaryPath string - ) - clusterID := strconv.FormatUint(suite.pdLeaderServer.GetClusterID(), 10) - if param.keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID { - timestampPath = fmt.Sprintf("/pd/%s/timestamp", clusterID) - primaryPath = fmt.Sprintf("/ms/%s/tso/00000/primary", clusterID) - } else { - timestampPath = fmt.Sprintf("/ms/%s/tso/%05d/gta/timestamp", - clusterID, param.keyspaceGroupID) - primaryPath = fmt.Sprintf("/ms/%s/tso/%s/election/%05d/primary", - clusterID, mcsutils.KeyspaceGroupsKey, param.keyspaceGroupID) - } + clusterID := suite.pdLeaderServer.GetClusterID() + rootPath := endpoint.TSOSvcRootPath(clusterID) + primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, param.keyspaceGroupID) + timestampPath := endpoint.FullTimestampPath(clusterID, param.keyspaceGroupID) re.Equal(timestampPath, am.GetTimestampPath(tsopkg.GlobalDCLocation)) re.Equal(primaryPath, am.GetMember().GetLeaderPath()) diff --git a/tools/pd-backup/pdbackup/backup.go b/tools/pd-backup/pdbackup/backup.go index eeb5ee3cf302..f0b17e482673 100644 --- a/tools/pd-backup/pdbackup/backup.go +++ b/tools/pd-backup/pdbackup/backup.go @@ -75,7 +75,7 @@ func GetBackupInfo(client *clientv3.Client, pdAddr string) (*BackupInfo, error) backInfo.AllocIDMax = allocIDMax - resp, err = etcdutil.EtcdKVGet(client, endpoint.GetTimestampPath(rootPath)) + resp, err = etcdutil.EtcdKVGet(client, endpoint.TimestampPath(rootPath)) if err != nil { return nil, err } diff --git a/tools/pd-backup/pdbackup/backup_test.go b/tools/pd-backup/pdbackup/backup_test.go index 8e3ca1eaaac4..ccbefbe4fe85 100644 --- a/tools/pd-backup/pdbackup/backup_test.go +++ b/tools/pd-backup/pdbackup/backup_test.go @@ -136,7 +136,7 @@ func (s *backupTestSuite) BeforeTest(suiteName, testName string) { rootPath = path.Join(pdRootPath, strconv.FormatUint(clusterID, 10)) allocTimestampMaxBytes = typeutil.Uint64ToBytes(allocTimestampMax) ) - _, err = s.etcdClient.Put(ctx, endpoint.GetTimestampPath(rootPath), string(allocTimestampMaxBytes)) + _, err = s.etcdClient.Put(ctx, endpoint.TimestampPath(rootPath), string(allocTimestampMaxBytes)) s.NoError(err) var (