diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 8b9427a8896..c374456a6eb 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -1508,7 +1508,7 @@ func transferPrimary(c *gin.Context) { } if err := mcsutils.TransferPrimary(svr.GetClient(), svr.GetParticipant().GetExpectedPrimaryLease(), - constant.SchedulingServiceName, svr.Name(), newPrimary, 0); err != nil { + constant.SchedulingServiceName, svr.Name(), newPrimary, 0, nil); err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return } diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index 19b3a1be612..68a654c7315 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -307,9 +307,15 @@ func transferPrimary(c *gin.Context) { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return } + // only members of specific group are valid primary candidates. + group := svr.GetKeyspaceGroupManager().GetKeyspaceGroups()[keyspaceGroupID] + memberMap := make(map[string]bool, len(group.Members)) + for _, member := range group.Members { + memberMap[member.Address] = true + } if err := utils.TransferPrimary(svr.GetClient(), globalAllocator.(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(), - constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID); err != nil { + constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID, memberMap); err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return } diff --git a/pkg/mcs/utils/expected_primary.go b/pkg/mcs/utils/expected_primary.go index c65d0a1cc5e..102bb8d785c 100644 --- a/pkg/mcs/utils/expected_primary.go +++ b/pkg/mcs/utils/expected_primary.go @@ -122,7 +122,7 @@ func watchExpectedPrimary(ctx context.Context, // TransferPrimary transfers the primary of the specified service. // keyspaceGroupID is optional, only used for TSO service. func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName, - oldPrimary, newPrimary string, keyspaceGroupID uint32) error { + oldPrimary, newPrimary string, keyspaceGroupID uint32, tsoMembersMap map[string]bool) error { if lease == nil { return errors.New("current lease is nil, please check leadership") } @@ -139,6 +139,10 @@ func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName var primaryIDs []string for _, member := range entries { + // only members of specific group are valid primary candidates for TSO service. + if tsoMembersMap != nil && !tsoMembersMap[member.ServiceAddr] { + continue + } if (newPrimary == "" && member.Name != oldPrimary) || (newPrimary != "" && member.Name == newPrimary) { primaryIDs = append(primaryIDs, member.ServiceAddr) } diff --git a/tests/integrations/mcs/members/member_test.go b/tests/integrations/mcs/members/member_test.go index dffa6305d0b..4e1e6534416 100644 --- a/tests/integrations/mcs/members/member_test.go +++ b/tests/integrations/mcs/members/member_test.go @@ -19,14 +19,19 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "testing" "time" "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pdClient "github.com/tikv/pd/client/http" bs "github.com/tikv/pd/pkg/basicserver" + tso "github.com/tikv/pd/pkg/mcs/tso/server" + "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" + "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" @@ -41,6 +46,9 @@ type memberTestSuite struct { backendEndpoints string pdClient pdClient.Client + // We only test `DefaultKeyspaceGroupID` here. + // tsoAvailMembers is used to check the tso members which in the DefaultKeyspaceGroupID. + tsoAvailMembers map[string]bool tsoNodes map[string]bs.Server schedulingNodes map[string]bs.Server } @@ -51,6 +59,7 @@ func TestMemberTestSuite(t *testing.T) { func (suite *memberTestSuite) SetupTest() { re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx cluster, err := tests.NewTestAPICluster(suite.ctx, 1) @@ -65,6 +74,7 @@ func (suite *memberTestSuite) SetupTest() { // TSO nodes := make(map[string]bs.Server) + // mock 3 tso nodes, which is more than the default replica count(DefaultKeyspaceGroupReplicaCount). for i := 0; i < 3; i++ { s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) nodes[s.GetAddr()] = s @@ -72,8 +82,16 @@ func (suite *memberTestSuite) SetupTest() { cleanup() }) } - tests.WaitForPrimaryServing(re, nodes) + primary := tests.WaitForPrimaryServing(re, nodes) + members := mustGetKeyspaceGroupMembers(re, nodes[primary].(*tso.Server)) + // Get the tso nodes suite.tsoNodes = nodes + // We only test `DefaultKeyspaceGroupID` here. + // tsoAvailMembers is used to check the tso members which in the DefaultKeyspaceGroupID. + suite.tsoAvailMembers = make(map[string]bool) + for _, member := range members[constant.DefaultKeyspaceGroupID].Group.Members { + suite.tsoAvailMembers[member.Address] = true + } // Scheduling nodes = make(map[string]bs.Server) @@ -100,6 +118,8 @@ func (suite *memberTestSuite) TearDownTest() { suite.pdClient.Close() } suite.cluster.Destroy() + re := suite.Require() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) } func (suite *memberTestSuite) TestMembers() { @@ -124,7 +144,7 @@ func (suite *memberTestSuite) TestPrimary() { re.NotEmpty(primary) } -func (suite *memberTestSuite) TestCampaignPrimaryWhileServerClose() { +func (suite *memberTestSuite) TestPrimaryWorkWhileOtherServerClose() { re := suite.Require() primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, "tso") re.NoError(err) @@ -143,20 +163,18 @@ func (suite *memberTestSuite) TestCampaignPrimaryWhileServerClose() { primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) re.NoError(err) - // Close old and new primary to mock campaign primary + // Close non-primary node. for _, member := range nodes { if member.GetAddr() != primary { nodes[member.Name()].Close() - break } } - nodes[primary].Close() tests.WaitForPrimaryServing(re, nodes) - // primary should be different with before - onlyPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + // primary should be same with before. + curPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) re.NoError(err) - re.NotEqual(primary, onlyPrimary) + re.Equal(primary, curPrimary) } } @@ -200,6 +218,9 @@ func (suite *memberTestSuite) TestTransferPrimary() { // Test transfer primary to a specific node var newPrimary string for _, member := range nodes { + if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] { + continue + } if member.GetAddr() != primary { newPrimary = member.Name() break @@ -251,6 +272,9 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() { // Test transfer primary to a specific node var newPrimary string for _, member := range nodes { + if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] { + continue + } if member.GetAddr() != primary { newPrimary = member.Name() break @@ -270,15 +294,13 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() { re.NoError(err) re.NotEqual(primary, newPrimary) - // Close old and new primary to mock campaign primary - nodes[primary].Close() + // Close primary to push other nodes campaign primary nodes[newPrimary].Close() tests.WaitForPrimaryServing(re, nodes) // Primary should be different with before - onlyPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + anotherPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) re.NoError(err) - re.NotEqual(primary, onlyPrimary) - re.NotEqual(newPrimary, onlyPrimary) + re.NotEqual(newPrimary, anotherPrimary) } } @@ -304,6 +326,9 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpired() { // Test transfer primary to a specific node var newPrimary string for _, member := range nodes { + if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] { + continue + } if member.GetAddr() != primary { newPrimary = member.Name() break @@ -356,6 +381,9 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown( // Test transfer primary to a specific node var newPrimary string for _, member := range nodes { + if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] { + continue + } if member.GetAddr() != primary { newPrimary = member.Name() break @@ -390,3 +418,17 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown( re.NotEqual(newPrimary, onlyPrimary) } } + +func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember { + httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+"/tso/api/v1/keyspace-groups/members", nil) + re.NoError(err) + httpResp, err := tests.TestDialClient.Do(httpReq) + re.NoError(err) + defer httpResp.Body.Close() + data, err := io.ReadAll(httpResp.Body) + re.NoError(err) + re.Equal(http.StatusOK, httpResp.StatusCode, string(data)) + var resp map[uint32]*apis.KeyspaceGroupMember + re.NoError(json.Unmarshal(data, &resp)) + return resp +}