diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index f208cd96b79..c5e78d6b797 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -173,7 +173,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { case <-ticker.C: } countOfNodes := m.GetNodesCount() - if countOfNodes < utils.KeyspaceGroupDefaultReplicaCount { + if countOfNodes < utils.DefaultKeyspaceGroupReplicaCount { continue } groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0) @@ -187,8 +187,8 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { } withError := false for _, group := range groups { - if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount { - nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.KeyspaceGroupDefaultReplicaCount) + if len(group.Members) < utils.DefaultKeyspaceGroupReplicaCount { + nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.DefaultKeyspaceGroupReplicaCount) if err != nil { withError = true log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err)) @@ -531,7 +531,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3 return ErrKeyspaceGroupInMerging } // Check if the source keyspace group has enough replicas. - if len(splitSourceKg.Members) < utils.KeyspaceGroupDefaultReplicaCount { + if len(splitSourceKg.Members) < utils.DefaultKeyspaceGroupReplicaCount { return ErrKeyspaceGroupNotEnoughReplicas } // Check if the new keyspace group already exists. @@ -702,7 +702,10 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount continue } exists[addr] = struct{}{} - nodes = append(nodes, endpoint.KeyspaceGroupMember{Address: addr}) + nodes = append(nodes, endpoint.KeyspaceGroupMember{ + Address: addr, + Priority: utils.DefaultKeyspaceGroupReplicaPriority, + }) } kg.Members = nodes return m.store.SaveKeyspaceGroup(txn, kg) @@ -737,7 +740,52 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error } members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes)) for _, node := range nodes { - members = append(members, endpoint.KeyspaceGroupMember{Address: node}) + members = append(members, endpoint.KeyspaceGroupMember{ + Address: node, + Priority: utils.DefaultKeyspaceGroupReplicaPriority, + }) + } + kg.Members = members + return m.store.SaveKeyspaceGroup(txn, kg) + }) + if err != nil { + return err + } + m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg) + return nil +} + +// SetPriorityForKeyspaceGroup sets the priority of node for the keyspace group. +func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, priority int) error { + m.Lock() + defer m.Unlock() + var kg *endpoint.KeyspaceGroup + err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { + var err error + kg, err = m.store.LoadKeyspaceGroup(txn, id) + if err != nil { + return err + } + if kg == nil { + return ErrKeyspaceGroupNotExists + } + if kg.IsSplitting() { + return ErrKeyspaceGroupInSplit + } + if kg.IsMerging() { + return ErrKeyspaceGroupInMerging + } + inKeyspaceGroup := false + members := make([]endpoint.KeyspaceGroupMember, 0, len(kg.Members)) + for _, member := range kg.Members { + if member.Address == node { + inKeyspaceGroup = true + member.Priority = priority + } + members = append(members, member) + } + if !inKeyspaceGroup { + return ErrNodeNotInKeyspaceGroup } kg.Members = members return m.store.SaveKeyspaceGroup(txn, kg) diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index 8d2f6f6f034..60f2793b8bb 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -244,7 +244,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { ID: uint32(2), UserKind: endpoint.Standard.String(), Keyspaces: []uint32{111, 222, 333}, - Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount), + Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount), }, } err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups) @@ -330,7 +330,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() { ID: uint32(1), UserKind: endpoint.Basic.String(), Keyspaces: []uint32{111, 222, 333}, - Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount), + Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount), }, { ID: uint32(3), diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index fa661780a99..8e29e728328 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -60,6 +60,8 @@ var ( ErrKeyspaceGroupNotInMerging = errors.New("keyspace group is not in merging state") // ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group. ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group") + // ErrNodeNotInKeyspaceGroup is used to indicate the tso node is not in this keyspace group. + ErrNodeNotInKeyspaceGroup = errors.New("the tso node is not in this keyspace group") // ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group. ErrKeyspaceGroupNotEnoughReplicas = errors.New("not enough replicas in the keyspace group") // ErrModifyDefaultKeyspaceGroup is used to indicate that default keyspace group cannot be modified. diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index b73ddd7ed8f..21a4a655afe 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -72,6 +72,9 @@ const ( // foreseen future, and the former is just for extensibility in theory. MaxKeyspaceGroupCountInUse = uint32(4096) - // KeyspaceGroupDefaultReplicaCount is the default replica count of keyspace group. - KeyspaceGroupDefaultReplicaCount = 2 + // DefaultKeyspaceGroupReplicaCount is the default replica count of keyspace group. + DefaultKeyspaceGroupReplicaCount = 2 + + // DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica. + DefaultKeyspaceGroupReplicaPriority = 0 ) diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 5b0e45a547c..498cd878887 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -72,8 +72,12 @@ func IsUserKindValid(kind string) bool { } // KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group. +// Its `Priority` is used in keyspace group primary weighted-election to balance primaries' distribution. +// Among multiple replicas of a keyspace group, the higher the priority, the more likely +// the replica is to be elected as primary. type KeyspaceGroupMember struct { - Address string `json:"address"` + Address string `json:"address"` + Priority int `json:"priority"` } // SplitState defines the split state of a keyspace group. diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index d962cfd0306..62a6986422c 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -355,8 +355,11 @@ func (kgm *KeyspaceGroupManager) Initialize() error { if !defaultKGConfigured { log.Info("initializing default keyspace group") group := &endpoint.KeyspaceGroup{ - ID: mcsutils.DefaultKeyspaceGroupID, - Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}, + ID: mcsutils.DefaultKeyspaceGroupID, + Members: []endpoint.KeyspaceGroupMember{{ + Address: kgm.tsoServiceID.ServiceAddr, + Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority, + }}, Keyspaces: []uint32{mcsutils.DefaultKeyspaceID}, } kgm.updateKeyspaceGroup(group) @@ -400,7 +403,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro // If the default keyspace group isn't assigned to any tso node/pod, assign it to everyone. if group.ID == mcsutils.DefaultKeyspaceGroupID && len(group.Members) == 0 { // TODO: fill members with all tso nodes/pods. - group.Members = []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}} + group.Members = []endpoint.KeyspaceGroupMember{{ + Address: kgm.tsoServiceID.ServiceAddr, + Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority, + }} } if !kgm.isAssignedToMe(group) { @@ -493,12 +499,12 @@ func validateSplit( // could not be modified during the split process, so we can only check the // member count of the source group here. memberCount := len(sourceGroup.Members) - if memberCount < mcsutils.KeyspaceGroupDefaultReplicaCount { + if memberCount < mcsutils.DefaultKeyspaceGroupReplicaCount { log.Error("the split source keyspace group does not have enough members", zap.Uint32("target", targetGroup.ID), zap.Uint32("source", splitSourceID), zap.Int("member-count", memberCount), - zap.Int("replica-count", mcsutils.KeyspaceGroupDefaultReplicaCount)) + zap.Int("replica-count", mcsutils.DefaultKeyspaceGroupReplicaCount)) return false } return true @@ -611,8 +617,11 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) { log.Info("removed default keyspace group meta config from the storage. " + "now every tso node/pod will initialize it") group := &endpoint.KeyspaceGroup{ - ID: mcsutils.DefaultKeyspaceGroupID, - Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}, + ID: mcsutils.DefaultKeyspaceGroupID, + Members: []endpoint.KeyspaceGroupMember{{ + Address: kgm.tsoServiceID.ServiceAddr, + Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority, + }}, Keyspaces: []uint32{mcsutils.DefaultKeyspaceID}, } kgm.updateKeyspaceGroup(group) diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index 38bb9b86e74..2d16f6ac360 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/server" "github.com/tikv/pd/server/apiv2/middlewares" @@ -40,6 +41,7 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) { router.DELETE("/:id", DeleteKeyspaceGroupByID) router.POST("/:id/alloc", AllocNodesForKeyspaceGroup) router.POST("/:id/nodes", SetNodesForKeyspaceGroup) + router.POST("/:id/priority", SetPriorityForKeyspaceGroup) router.POST("/:id/split", SplitKeyspaceGroupByID) router.DELETE("/:id/split", FinishSplitKeyspaceByID) router.POST("/:id/merge", MergeKeyspaceGroups) @@ -325,7 +327,7 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) { c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause()) return } - if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.KeyspaceGroupDefaultReplicaCount { + if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.DefaultKeyspaceGroupReplicaCount { c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [2, nodes_num]") return } @@ -347,7 +349,7 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) { c.JSON(http.StatusOK, nodes) } -// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups. +// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace group. // Notes: it should be used carefully. type SetNodesForKeyspaceGroupParams struct { Nodes []string `json:"nodes"` @@ -379,7 +381,7 @@ func SetNodesForKeyspaceGroup(c *gin.Context) { return } // check if nodes is less than default replica count - if len(setParams.Nodes) < utils.KeyspaceGroupDefaultReplicaCount { + if len(setParams.Nodes) < utils.DefaultKeyspaceGroupReplicaCount { c.AbortWithStatusJSON(http.StatusBadRequest, "invalid num of nodes") return } @@ -399,6 +401,53 @@ func SetNodesForKeyspaceGroup(c *gin.Context) { c.JSON(http.StatusOK, nil) } +// SetPriorityForKeyspaceGroupParams defines the params for setting priority of tso node for the keyspace group. +type SetPriorityForKeyspaceGroupParams struct { + Node string `json:"node"` + Priority int `json:"priority"` +} + +// SetPriorityForKeyspaceGroup sets priority of tso node for the keyspace group. +func SetPriorityForKeyspaceGroup(c *gin.Context) { + id, err := validateKeyspaceGroupID(c) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + manager := svr.GetKeyspaceGroupManager() + if manager == nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + return + } + setParams := &SetPriorityForKeyspaceGroupParams{} + err = c.BindJSON(setParams) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause()) + return + } + // check if keyspace group exists + kg, err := manager.GetKeyspaceGroupByID(id) + if err != nil || kg == nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist") + return + } + // check if node exists + members := kg.Members + if slice.NoneOf(members, func(i int) bool { + return members[i].Address == setParams.Node + }) { + c.AbortWithStatusJSON(http.StatusBadRequest, "tso node does not exist in the keyspace group") + } + // set priority + err = manager.SetPriorityForKeyspaceGroup(id, setParams.Node, setParams.Priority) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.JSON(http.StatusOK, nil) +} + func validateKeyspaceGroupID(c *gin.Context) (uint32, error) { id, err := strconv.ParseUint(c.Param("id"), 10, 64) if err != nil { diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index 6d2c51e1dd0..a284f8a44ac 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -82,7 +82,7 @@ func (suite *keyspaceGroupTestSuite) TearDownTest() { func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { // add three nodes. nodes := make(map[string]bs.Server) - for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount+1; i++ { + for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount+1; i++ { s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup() nodes[s.GetAddr()] = s @@ -102,11 +102,11 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { // alloc nodes for the keyspace group. id := 1 params := &handlers.AllocNodesForKeyspaceGroupParams{ - Replica: utils.KeyspaceGroupDefaultReplicaCount, + Replica: utils.DefaultKeyspaceGroupReplicaCount, } got, code := suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusOK, code) - suite.Equal(utils.KeyspaceGroupDefaultReplicaCount, len(got)) + suite.Equal(utils.DefaultKeyspaceGroupReplicaCount, len(got)) oldMembers := make(map[string]struct{}) for _, member := range got { suite.Contains(nodes, member.Address) @@ -114,7 +114,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { } // alloc node update to 3. - params.Replica = utils.KeyspaceGroupDefaultReplicaCount + 1 + params.Replica = utils.DefaultKeyspaceGroupReplicaCount + 1 got, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusOK, code) suite.Equal(params.Replica, len(got)) @@ -131,7 +131,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { func (suite *keyspaceGroupTestSuite) TestAllocReplica() { nodes := make(map[string]bs.Server) - for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount; i++ { + for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ { s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup() nodes[s.GetAddr()] = s @@ -147,14 +147,14 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() { // replica is less than default replica. params = &handlers.AllocNodesForKeyspaceGroupParams{ - Replica: utils.KeyspaceGroupDefaultReplicaCount - 1, + Replica: utils.DefaultKeyspaceGroupReplicaCount - 1, } _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) // there is no any keyspace group. params = &handlers.AllocNodesForKeyspaceGroupParams{ - Replica: utils.KeyspaceGroupDefaultReplicaCount, + Replica: utils.DefaultKeyspaceGroupReplicaCount, } _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) @@ -169,7 +169,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() { code = suite.tryCreateKeyspaceGroup(kgs) suite.Equal(http.StatusOK, code) params = &handlers.AllocNodesForKeyspaceGroupParams{ - Replica: utils.KeyspaceGroupDefaultReplicaCount, + Replica: utils.DefaultKeyspaceGroupReplicaCount, } got, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusOK, code) @@ -179,7 +179,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() { // the keyspace group is exist, but the replica is more than the num of nodes. params = &handlers.AllocNodesForKeyspaceGroupParams{ - Replica: utils.KeyspaceGroupDefaultReplicaCount + 1, + Replica: utils.DefaultKeyspaceGroupReplicaCount + 1, } _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) @@ -190,7 +190,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() { nodes[s2.GetAddr()] = s2 mcs.WaitForPrimaryServing(suite.Require(), nodes) params = &handlers.AllocNodesForKeyspaceGroupParams{ - Replica: utils.KeyspaceGroupDefaultReplicaCount + 1, + Replica: utils.DefaultKeyspaceGroupReplicaCount + 1, } got, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusOK, code) @@ -200,14 +200,14 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() { // the keyspace group is exist, the new replica is equal to the old replica. params = &handlers.AllocNodesForKeyspaceGroupParams{ - Replica: utils.KeyspaceGroupDefaultReplicaCount + 1, + Replica: utils.DefaultKeyspaceGroupReplicaCount + 1, } _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) // the keyspace group is exist, the new replica is less than the old replica. params = &handlers.AllocNodesForKeyspaceGroupParams{ - Replica: utils.KeyspaceGroupDefaultReplicaCount, + Replica: utils.DefaultKeyspaceGroupReplicaCount, } _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) @@ -215,7 +215,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() { // the keyspace group is not exist. id = 2 params = &handlers.AllocNodesForKeyspaceGroupParams{ - Replica: utils.KeyspaceGroupDefaultReplicaCount, + Replica: utils.DefaultKeyspaceGroupReplicaCount, } _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) @@ -224,7 +224,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() { func (suite *keyspaceGroupTestSuite) TestSetNodes() { nodes := make(map[string]bs.Server) nodesList := []string{} - for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount; i++ { + for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ { s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup() nodes[s.GetAddr()] = s @@ -284,7 +284,7 @@ func (suite *keyspaceGroupTestSuite) TestSetNodes() { func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() { nodes := make(map[string]bs.Server) - for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount; i++ { + for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ { s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup() nodes[s.GetAddr()] = s @@ -296,7 +296,7 @@ func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() { kg, code := suite.tryGetKeyspaceGroup(utils.DefaultKeyspaceGroupID) suite.Equal(http.StatusOK, code) suite.Equal(utils.DefaultKeyspaceGroupID, kg.ID) - suite.Len(kg.Members, utils.KeyspaceGroupDefaultReplicaCount) + suite.Len(kg.Members, utils.DefaultKeyspaceGroupReplicaCount) for _, member := range kg.Members { suite.Contains(nodes, member.Address) } diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 133edb78579..927ce803d30 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "strconv" "strings" "testing" "time" @@ -66,7 +67,7 @@ func TestKeyspaceGroup(t *testing.T) { { ID: 1, UserKind: endpoint.Standard.String(), - Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount), + Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount), Keyspaces: []uint32{111, 222, 333}, }, }, @@ -177,3 +178,105 @@ func TestExternalAllocNodeWhenStart(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) } + +func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + keyspaces := make([]string, 0) + for i := 0; i < 10; i++ { + keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) + } + tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = keyspaces + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + s1, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) + defer tsoServerCleanup1() + re.NoError(err) + s2, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) + defer tsoServerCleanup2() + re.NoError(err) + cmd := pdctlCmd.GetRootCmd() + + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + + // set-node keyspace group. + defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID) + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, s1.GetAddr(), s2.GetAddr()} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + + // set-priority keyspace group. + checkPriority := func(p int) { + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "set-priority", defaultKeyspaceGroupID, s1.GetAddr()} + if p >= 0 { + args = append(args, strconv.Itoa(p)) + } else { + args = append(args, "--", strconv.Itoa(p)) + } + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + + // check keyspace group information. + args := []string{"-u", pdAddr, "keyspace-group"} + output, err := pdctl.ExecuteCommand(cmd, append(args, defaultKeyspaceGroupID)...) + re.NoError(err) + var keyspaceGroup endpoint.KeyspaceGroup + err = json.Unmarshal(output, &keyspaceGroup) + re.NoError(err) + re.Equal(utils.DefaultKeyspaceGroupID, keyspaceGroup.ID) + re.Len(keyspaceGroup.Members, 2) + for _, member := range keyspaceGroup.Members { + re.Contains([]string{s1.GetAddr(), s2.GetAddr()}, member.Address) + if member.Address == s1.GetAddr() { + re.Equal(p, member.Priority) + } else { + re.Equal(0, member.Priority) + } + } + } + + checkPriority(200) + checkPriority(-200) + + // params error for set-node. + args := []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, s1.GetAddr()} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "invalid num of nodes") + args = []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, "", ""} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "Failed to parse the tso node address") + args = []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, s1.GetAddr(), "http://pingcap.com"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "node does not exist") + + // params error for set-priority. + args = []string{"-u", pdAddr, "keyspace-group", "set-priority", defaultKeyspaceGroupID, "", "200"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "Failed to parse the tso node address") + args = []string{"-u", pdAddr, "keyspace-group", "set-priority", defaultKeyspaceGroupID, "http://pingcap.com", "200"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "node does not exist") + args = []string{"-u", pdAddr, "keyspace-group", "set-priority", defaultKeyspaceGroupID, s1.GetAddr(), "xxx"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "Failed to parse the priority") +} diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go index 630b745adb1..1f0189c532f 100644 --- a/tests/server/apiv2/handlers/tso_keyspace_group_test.go +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -137,7 +137,7 @@ func (suite *keyspaceGroupTestSuite) TestSplitKeyspaceGroup() { ID: uint32(1), UserKind: endpoint.Standard.String(), Keyspaces: []uint32{111, 222, 333}, - Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount), + Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount), }, }} diff --git a/tools/pd-ctl/pdctl/command/keyspace_group_command.go b/tools/pd-ctl/pdctl/command/keyspace_group_command.go index 6fbb8170461..662a4aa157e 100644 --- a/tools/pd-ctl/pdctl/command/keyspace_group_command.go +++ b/tools/pd-ctl/pdctl/command/keyspace_group_command.go @@ -17,6 +17,7 @@ package command import ( "fmt" "net/http" + "net/url" "strconv" "github.com/spf13/cobra" @@ -32,6 +33,8 @@ func NewKeyspaceGroupCommand() *cobra.Command { Run: showKeyspaceGroupCommandFunc, } cmd.AddCommand(newSplitKeyspaceGroupCommand()) + cmd.AddCommand(newSetNodesKeyspaceGroupCommand()) + cmd.AddCommand(newSetPriorityKeyspaceGroupCommand()) return cmd } @@ -44,6 +47,24 @@ func newSplitKeyspaceGroupCommand() *cobra.Command { return r } +func newSetNodesKeyspaceGroupCommand() *cobra.Command { + r := &cobra.Command{ + Use: "set-node [...]", + Short: "set the address of tso nodes for keyspace group with the given ID", + Run: setNodesKeyspaceGroupCommandFunc, + } + return r +} + +func newSetPriorityKeyspaceGroupCommand() *cobra.Command { + r := &cobra.Command{ + Use: "set-priority ", + Short: "set the priority of tso nodes for keyspace group with the given ID. If the priority is negative, it need to add a prefix with -- to avoid identified as flag.", + Run: setPriorityKeyspaceGroupCommandFunc, + } + return r +} + func showKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { if len(args) < 1 { cmd.Usage() @@ -62,6 +83,11 @@ func splitKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { cmd.Usage() return } + _, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the old keyspace group ID: %s\n", err) + return + } newID, err := strconv.ParseUint(args[1], 10, 32) if err != nil { cmd.Printf("Failed to parse the new keyspace group ID: %s\n", err) @@ -81,3 +107,57 @@ func splitKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { "keyspaces": keyspaces, }) } + +func setNodesKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 2 { + cmd.Usage() + return + } + _, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the keyspace group ID: %s\n", err) + return + } + addresses := make([]string, 0, len(args)-1) + for _, arg := range args[1:] { + u, err := url.ParseRequestURI(arg) + if u == nil || err != nil { + cmd.Printf("Failed to parse the tso node address: %s\n", err) + return + } + addresses = append(addresses, arg) + } + postJSON(cmd, fmt.Sprintf("%s/%s/nodes", keyspaceGroupsPrefix, args[0]), map[string]interface{}{ + "Nodes": addresses, + }) +} + +func setPriorityKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 3 { + cmd.Usage() + return + } + _, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the keyspace group ID: %s\n", err) + return + } + + address := args[1] + u, err := url.ParseRequestURI(address) + if u == nil || err != nil { + cmd.Printf("Failed to parse the tso node address: %s\n", err) + return + } + + priority, err := strconv.ParseInt(args[2], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the priority: %s\n", err) + return + } + + postJSON(cmd, fmt.Sprintf("%s/%s/priority", keyspaceGroupsPrefix, args[0]), map[string]interface{}{ + "Node": address, + "Priority": priority, + }) +}