Skip to content

Commit

Permalink
mcs/transfer: Added checks for available tso nodes (tikv#8530)
Browse files Browse the repository at this point in the history
close tikv#8529

Signed-off-by: husharp <ihusharp@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] authored Aug 26, 2024
1 parent 61a85e5 commit 6998fb5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,7 @@ func transferPrimary(c *gin.Context) {
}

if err := mcsutils.TransferPrimary(svr.GetClient(), svr.GetParticipant().GetExpectedPrimaryLease(),
constant.SchedulingServiceName, svr.Name(), newPrimary, 0); err != nil {
constant.SchedulingServiceName, svr.Name(), newPrimary, 0, nil); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,15 @@ func transferPrimary(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
// only members of specific group are valid primary candidates.
group := svr.GetKeyspaceGroupManager().GetKeyspaceGroups()[keyspaceGroupID]
memberMap := make(map[string]bool, len(group.Members))
for _, member := range group.Members {
memberMap[member.Address] = true
}

if err := utils.TransferPrimary(svr.GetClient(), globalAllocator.(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(),
constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID); err != nil {
constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID, memberMap); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/utils/expected_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func watchExpectedPrimary(ctx context.Context,
// TransferPrimary transfers the primary of the specified service.
// keyspaceGroupID is optional, only used for TSO service.
func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName,
oldPrimary, newPrimary string, keyspaceGroupID uint32) error {
oldPrimary, newPrimary string, keyspaceGroupID uint32, tsoMembersMap map[string]bool) error {
if lease == nil {
return errors.New("current lease is nil, please check leadership")
}
Expand All @@ -139,6 +139,10 @@ func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName

var primaryIDs []string
for _, member := range entries {
// only members of specific group are valid primary candidates for TSO service.
if tsoMembersMap != nil && !tsoMembersMap[member.ServiceAddr] {
continue
}
if (newPrimary == "" && member.Name != oldPrimary) || (newPrimary != "" && member.Name == newPrimary) {
primaryIDs = append(primaryIDs, member.ServiceAddr)
}
Expand Down
68 changes: 55 additions & 13 deletions tests/integrations/mcs/members/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pdClient "github.com/tikv/pd/client/http"
bs "github.com/tikv/pd/pkg/basicserver"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
Expand All @@ -41,6 +46,9 @@ type memberTestSuite struct {
backendEndpoints string
pdClient pdClient.Client

// We only test `DefaultKeyspaceGroupID` here.
// tsoAvailMembers is used to check the tso members which in the DefaultKeyspaceGroupID.
tsoAvailMembers map[string]bool
tsoNodes map[string]bs.Server
schedulingNodes map[string]bs.Server
}
Expand All @@ -51,6 +59,7 @@ func TestMemberTestSuite(t *testing.T) {

func (suite *memberTestSuite) SetupTest() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
suite.ctx = ctx
cluster, err := tests.NewTestAPICluster(suite.ctx, 1)
Expand All @@ -65,15 +74,24 @@ func (suite *memberTestSuite) SetupTest() {

// TSO
nodes := make(map[string]bs.Server)
// mock 3 tso nodes, which is more than the default replica count(DefaultKeyspaceGroupReplicaCount).
for i := 0; i < 3; i++ {
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
nodes[s.GetAddr()] = s
suite.cleanupFunc = append(suite.cleanupFunc, func() {
cleanup()
})
}
tests.WaitForPrimaryServing(re, nodes)
primary := tests.WaitForPrimaryServing(re, nodes)
members := mustGetKeyspaceGroupMembers(re, nodes[primary].(*tso.Server))
// Get the tso nodes
suite.tsoNodes = nodes
// We only test `DefaultKeyspaceGroupID` here.
// tsoAvailMembers is used to check the tso members which in the DefaultKeyspaceGroupID.
suite.tsoAvailMembers = make(map[string]bool)
for _, member := range members[constant.DefaultKeyspaceGroupID].Group.Members {
suite.tsoAvailMembers[member.Address] = true
}

// Scheduling
nodes = make(map[string]bs.Server)
Expand All @@ -100,6 +118,8 @@ func (suite *memberTestSuite) TearDownTest() {
suite.pdClient.Close()
}
suite.cluster.Destroy()
re := suite.Require()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

func (suite *memberTestSuite) TestMembers() {
Expand All @@ -124,7 +144,7 @@ func (suite *memberTestSuite) TestPrimary() {
re.NotEmpty(primary)
}

func (suite *memberTestSuite) TestCampaignPrimaryWhileServerClose() {
func (suite *memberTestSuite) TestPrimaryWorkWhileOtherServerClose() {
re := suite.Require()
primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, "tso")
re.NoError(err)
Expand All @@ -143,20 +163,18 @@ func (suite *memberTestSuite) TestCampaignPrimaryWhileServerClose() {
primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
re.NoError(err)

// Close old and new primary to mock campaign primary
// Close non-primary node.
for _, member := range nodes {
if member.GetAddr() != primary {
nodes[member.Name()].Close()
break
}
}
nodes[primary].Close()
tests.WaitForPrimaryServing(re, nodes)

// primary should be different with before
onlyPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
// primary should be same with before.
curPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
re.NoError(err)
re.NotEqual(primary, onlyPrimary)
re.Equal(primary, curPrimary)
}
}

Expand Down Expand Up @@ -200,6 +218,9 @@ func (suite *memberTestSuite) TestTransferPrimary() {
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand Down Expand Up @@ -251,6 +272,9 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() {
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand All @@ -270,15 +294,13 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() {
re.NoError(err)
re.NotEqual(primary, newPrimary)

// Close old and new primary to mock campaign primary
nodes[primary].Close()
// Close primary to push other nodes campaign primary
nodes[newPrimary].Close()
tests.WaitForPrimaryServing(re, nodes)
// Primary should be different with before
onlyPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
anotherPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
re.NoError(err)
re.NotEqual(primary, onlyPrimary)
re.NotEqual(newPrimary, onlyPrimary)
re.NotEqual(newPrimary, anotherPrimary)
}
}

Expand All @@ -304,6 +326,9 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpired() {
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand Down Expand Up @@ -356,6 +381,9 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown(
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand Down Expand Up @@ -390,3 +418,17 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown(
re.NotEqual(newPrimary, onlyPrimary)
}
}

func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember {
httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+"/tso/api/v1/keyspace-groups/members", nil)
re.NoError(err)
httpResp, err := tests.TestDialClient.Do(httpReq)
re.NoError(err)
defer httpResp.Body.Close()
data, err := io.ReadAll(httpResp.Body)
re.NoError(err)
re.Equal(http.StatusOK, httpResp.StatusCode, string(data))
var resp map[uint32]*apis.KeyspaceGroupMember
re.NoError(json.Unmarshal(data, &resp))
return resp
}

0 comments on commit 6998fb5

Please sign in to comment.