Skip to content

Commit

Permalink
*: move keyspace group primary path code to key_path.go (tikv#6755)
Browse files Browse the repository at this point in the history
ref tikv#5895

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent 5f51460 commit c6cd2a2
Show file tree
Hide file tree
Showing 14 changed files with 124 additions and 104 deletions.
9 changes: 5 additions & 4 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ package discovery
import (
"strconv"
"strings"

"github.com/tikv/pd/pkg/mcs/utils"
)

const (
registryPrefix = "/ms"
registryKey = "registry"
registryKey = "registry"
)

// RegistryPath returns the full path to store microservice addresses.
func RegistryPath(clusterID, serviceName, serviceAddr string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/")
return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey, serviceAddr}, "/")
}

// ServicePath returns the path to store microservice addresses.
func ServicePath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey}, "/")
}

// TSOPath returns the path to store TSO addresses.
Expand Down
5 changes: 3 additions & 2 deletions pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
Expand Down Expand Up @@ -366,10 +367,10 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
resourceManagerPrimaryPrefix := fmt.Sprintf("/ms/%d/resource_manager", s.clusterID)
resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
"primary", "keyspace group primary election", s.cfg.AdvertiseListenAddr)
utils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", s.cfg.AdvertiseListenAddr)

s.service = &Service{
ctx: s.ctx,
Expand Down
12 changes: 3 additions & 9 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net/url"
"os"
"os/signal"
"path"
"strconv"
"strings"
"sync"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/systimemon"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand All @@ -63,12 +63,6 @@ import (
)

const (
// pdRootPath is the old path for storing the tso related root path.
pdRootPath = "/pd"
msServiceRootPath = "/ms"
// tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes.
// format: "/ms/{cluster_id}/tso".
tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName
// maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID.
maxRetryTimesWaitAPIService = 360
// retryIntervalWaitAPIService is the interval to retry.
Expand Down Expand Up @@ -535,8 +529,8 @@ func (s *Server) startServer() (err error) {

// Initialize the TSO service.
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID)
tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr,
Expand Down
6 changes: 4 additions & 2 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const (
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)

// MicroserviceKey is the key of microservice.
MicroserviceKey = "ms"
// MicroserviceRootPath is the root path of microservice in etcd.
MicroserviceRootPath = "/ms"
// APIServiceName is the name of api server.
APIServiceName = "api"
// TSOServiceName is the name of tso server.
Expand All @@ -59,6 +59,8 @@ const (
ResourceManagerServiceName = "resource_manager"
// KeyspaceGroupsKey is the path component of keyspace groups.
KeyspaceGroupsKey = "keyspace_groups"
// KeyspaceGroupsPrimaryKey is the path component of primary for keyspace groups.
KeyspaceGroupsPrimaryKey = "primary"

// MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
Expand Down
95 changes: 87 additions & 8 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

const (
<<<<<<< HEAD
clusterPath = "raft"
configPath = "config"
serviceMiddlewarePath = "service_middleware"
Expand All @@ -36,6 +37,21 @@ const (
replicationPath = "replication_mode"
customScheduleConfigPath = "scheduler_config"
gcWorkerServiceSafePointID = "gc_worker"
=======
pdRootPath = "/pd"
clusterPath = "raft"
configPath = "config"
serviceMiddlewarePath = "service_middleware"
schedulePath = "schedule"
gcPath = "gc"
rulesPath = "rules"
ruleGroupPath = "rule_group"
regionLabelPath = "region_label"
replicationPath = "replication_mode"
customScheduleConfigPath = "scheduler_config"
// GCWorkerServiceSafePointID is the service id of GC worker.
GCWorkerServiceSafePointID = "gc_worker"
>>>>>>> dbc936698... *: move keyspace group primary path code to key_path.go (#6755)
minResolvedTS = "min_resolved_ts"
externalTimeStamp = "external_timestamp"
keyspaceSafePointPrefix = "keyspaces/gc_safepoint"
Expand All @@ -55,8 +71,9 @@ const (
// TimestampKey is the key of timestamp oracle used for the suffix.
TimestampKey = "timestamp"

tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"
tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupsMembershipKey = "membership"
keyspaceGroupsElectionKey = "election"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
Expand Down Expand Up @@ -232,13 +249,13 @@ func encodeKeyspaceID(spaceID uint32) string {
// KeyspaceGroupIDPrefix returns the prefix of keyspace group id.
// Path: tso/keyspace_groups/membership
func KeyspaceGroupIDPrefix() string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey)
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey)
}

// KeyspaceGroupIDPath returns the path to keyspace id from the given name.
// Path: tso/keyspace_groups/membership/{id}
func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey, encodeKeyspaceGroupID(id))
}

// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id.
Expand All @@ -247,6 +264,54 @@ func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp {
return regexp.MustCompile(pattern)
}

// ResourceManagerSvcRootPath returns the root path of resource manager service.
// Path: /ms/{cluster_id}/resource_manager
func ResourceManagerSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.ResourceManagerServiceName)
}

// TSOSvcRootPath returns the root path of tso service.
// Path: /ms/{cluster_id}/tso
func TSOSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.TSOServiceName)
}

func svcRootPath(clusterID uint64, svcName string) string {
c := strconv.FormatUint(clusterID, 10)
return path.Join(utils.MicroserviceRootPath, c, svcName)
}

// LegacyRootPath returns the root path of legacy pd service.
// Path: /pd/{cluster_id}
func LegacyRootPath(clusterID uint64) string {
return path.Join(pdRootPath, strconv.FormatUint(clusterID, 10))
}

// KeyspaceGroupPrimaryPath returns the path of keyspace group primary.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string {
electionPath := KeyspaceGroupsElectionPath(rootPath, keyspaceGroupID)
return path.Join(electionPath, utils.KeyspaceGroupsPrimaryKey)
}

// KeyspaceGroupsElectionPath returns the path of keyspace groups election.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
func KeyspaceGroupsElectionPath(rootPath string, keyspaceGroupID uint32) string {
if keyspaceGroupID == utils.DefaultKeyspaceGroupID {
return path.Join(rootPath, "00000")
}
return path.Join(rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, fmt.Sprintf("%05d", keyspaceGroupID))
}

// GetCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func GetCompiledNonDefaultIDRegexp(clusterID uint64) *regexp.Regexp {
rootPath := TSOSvcRootPath(clusterID)
pattern := strings.Join([]string{rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, `(\d{5})`, utils.KeyspaceGroupsPrimaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// encodeKeyspaceGroupID from uint32 to string.
func encodeKeyspaceGroupID(groupID uint32) string {
return fmt.Sprintf("%05d", groupID)
Expand All @@ -266,19 +331,33 @@ func buildPath(withSuffix bool, str ...string) string {
return sb.String()
}

// GetKeyspaceGroupTSPath constructs the timestampOracle path prefix, which is:
// KeyspaceGroupTSPath constructs the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func GetKeyspaceGroupTSPath(groupID uint32) string {
func KeyspaceGroupTSPath(groupID uint32) string {
if groupID == utils.DefaultKeyspaceGroupID {
return ""
}
return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}

// GetTimestampPath returns the timestamp path for the given timestamp oracle path prefix.
func GetTimestampPath(tsPath string) string {
// TimestampPath returns the timestamp path for the given timestamp oracle path prefix.
func TimestampPath(tsPath string) string {
return path.Join(tsPath, TimestampKey)
}

// FullTimestampPath returns the full timestamp path.
// 1. for the default keyspace group:
// /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// /ms/{cluster_id}/tso/{group}/gta/timestamp
func FullTimestampPath(clusterID uint64, groupID uint32) string {
rootPath := TSOSvcRootPath(clusterID)
tsPath := TimestampPath(KeyspaceGroupTSPath(groupID))
if groupID == utils.DefaultKeyspaceGroupID {
rootPath = LegacyRootPath(clusterID)
}
return path.Join(rootPath, tsPath)
}
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewGlobalTSOAllocator(
member: am.member,
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
tsPath: endpoint.GetKeyspaceGroupTSPath(am.kgID),
tsPath: endpoint.KeyspaceGroupTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand Down
47 changes: 6 additions & 41 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"net/http"
"path"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -51,9 +50,6 @@ import (
)

const (
keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election"
// primaryKey is the key for keyspace group primary election.
primaryKey = "primary"
// mergingCheckInterval is the interval for merging check to see if the keyspace groups
// merging process could be moved forward.
mergingCheckInterval = 5 * time.Second
Expand Down Expand Up @@ -256,32 +252,6 @@ func (s *state) getNextPrimaryToReset(
return nil, nil, 0, groupID
}

// kgPrimaryPathBuilder builds the path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
type kgPrimaryPathBuilder struct {
// rootPath is "/ms/{cluster_id}/tso".
rootPath string
// defaultKeyspaceGroupIDPath is "/ms/{cluster_id}/tso/00000".
defaultKeyspaceGroupIDPath string
}

// getKeyspaceGroupIDPath returns the keyspace group primary ID path.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
func (p *kgPrimaryPathBuilder) getKeyspaceGroupIDPath(keyspaceGroupID uint32) string {
if keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
return p.defaultKeyspaceGroupIDPath
}
return path.Join(p.rootPath, keyspaceGroupsElectionPath, fmt.Sprintf("%05d", keyspaceGroupID))
}

// getCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func (p *kgPrimaryPathBuilder) getCompiledNonDefaultIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{p.rootPath, keyspaceGroupsElectionPath, `(\d{5})`, primaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
Expand Down Expand Up @@ -355,7 +325,6 @@ type KeyspaceGroupManager struct {
// mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group.
mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc

primaryPathBuilder *kgPrimaryPathBuilder
primaryPriorityCheckInterval time.Duration

// tsoNodes is the registered tso servers.
Expand Down Expand Up @@ -406,10 +375,6 @@ func NewKeyspaceGroupManager(
kgm.tsoSvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil)
kgm.compiledKGMembershipIDRegexp = endpoint.GetCompiledKeyspaceGroupIDRegexp()
kgm.primaryPathBuilder = &kgPrimaryPathBuilder{
rootPath: kgm.tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(kgm.tsoSvcRootPath, "00000"),
}
kgm.state.initialize()
return kgm
}
Expand Down Expand Up @@ -718,8 +683,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
// Initialize the participant info to join the primary election.
participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, kgm.primaryPathBuilder.getKeyspaceGroupIDPath(group.ID),
primaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
uniqueName, uniqueID, endpoint.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID),
mcsutils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down Expand Up @@ -1276,7 +1241,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// Check if the keyspace group primaries in the merge map are all gone.
if len(mergeMap) != 0 {
for id := range mergeMap {
leaderPath := path.Join(kgm.primaryPathBuilder.getKeyspaceGroupIDPath(id), primaryKey)
leaderPath := endpoint.KeyspaceGroupPrimaryPath(kgm.tsoSvcRootPath, id)
val, err := kgm.tsoSvcStorage.Load(leaderPath)
if err != nil {
log.Error("failed to check if the keyspace group primary in the merge list has gone",
Expand Down Expand Up @@ -1305,7 +1270,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// calculate the newly merged TSO to make sure it is greater than the original ones.
var mergedTS time.Time
for _, id := range mergeList {
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(id))
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(id))
if err != nil {
log.Error("failed to load the keyspace group TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
Expand Down Expand Up @@ -1455,8 +1420,8 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() {
// Clean up the remaining TSO keys.
// TODO: support the Local TSO Allocator clean up.
err := kgm.tsoSvcStorage.DeleteTimestamp(
endpoint.GetTimestampPath(
endpoint.GetKeyspaceGroupTSPath(groupID),
endpoint.TimestampPath(
endpoint.KeyspaceGroupTSPath(groupID),
),
)
if err != nil {
Expand Down
Loading

0 comments on commit c6cd2a2

Please sign in to comment.