Skip to content

Commit

Permalink
mcs, tests: use TSO cluster to do the failover test (#6356)
Browse files Browse the repository at this point in the history
ref #5895

Use TSO cluster to do the failover test.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Apr 21, 2023
1 parent 4b8162f commit a746a3e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 42 deletions.
11 changes: 11 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb
return member, nil
}

// ResignPrimary resigns the primary of the given keyspace and keyspace group.
func (s *Server) ResignPrimary() error {
member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
if err != nil {
return err
}
member.ResetLeader()
return nil
}

// AddServiceReadyCallback implements basicserver.
// It adds callbacks when it's ready for providing tso service.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
Expand Down
5 changes: 5 additions & 0 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func (tc *TestTSOCluster) DestroyServer(addr string) {
delete(tc.servers, addr)
}

// ResignPrimary resigns the primary TSO server.
func (tc *TestTSOCluster) ResignPrimary() {
tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary()
}

// GetPrimary returns the primary TSO server.
func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
Expand Down
78 changes: 36 additions & 42 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import (
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/testutil"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)

var r = rand.New(rand.NewSource(time.Now().UnixNano()))

type tsoClientTestSuite struct {
suite.Suite
legacy bool
Expand All @@ -44,8 +45,7 @@ type tsoClientTestSuite struct {
// The PD cluster.
cluster *tests.TestCluster
// The TSO service in microservice mode.
tsoServer *tso.Server
tsoServerCleanup func()
tsoCluster *mcs.TestTSOCluster

backendEndpoints string

Expand Down Expand Up @@ -85,15 +85,16 @@ func (suite *tsoClientTestSuite) SetupSuite() {
suite.client, err = pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{})
re.NoError(err)
} else {
suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints)
re.NoError(err)
suite.client = mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ","))
}
}

func (suite *tsoClientTestSuite) TearDownSuite() {
suite.cancel()
if !suite.legacy {
suite.tsoServerCleanup()
suite.tsoCluster.Destroy()
}
suite.cluster.Destroy()
}
Expand Down Expand Up @@ -175,27 +176,30 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp"))
}

func (suite *tsoClientTestSuite) TestRandomTransferLeader() {
func (suite *tsoClientTestSuite) TestRandomResignLeader() {
re := suite.Require()

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
r := rand.New(rand.NewSource(time.Now().UnixNano()))

ctx, cancel := context.WithCancel(suite.ctx)
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
checkTSO(ctx, re, &wg, suite.backendEndpoints)
wg.Add(1)
go func() {
defer wg.Done()
n := r.Intn(2) + 1
time.Sleep(time.Duration(n) * time.Second)
err := suite.cluster.ResignLeader()
re.NoError(err)
suite.cluster.WaitLeader()
if !suite.legacy {
suite.tsoCluster.ResignPrimary()
suite.tsoCluster.WaitForDefaultPrimaryServing(re)
} else {
err := suite.cluster.ResignLeader()
re.NoError(err)
suite.cluster.WaitLeader()
}
time.Sleep(time.Duration(n) * time.Second)
cancel()
}()

checkTSO(ctx, re, &wg, suite.backendEndpoints)
wg.Wait()
}

Expand All @@ -204,32 +208,22 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))

tsoSvr, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
defer cleanup()

ctx, cancel := context.WithCancel(suite.ctx)
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
checkTSO(ctx, re, &wg, suite.backendEndpoints)
wg.Add(1)
go func() {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
n := r.Intn(2) + 1
time.Sleep(time.Duration(n) * time.Second)
if !suite.legacy {
// random close one of the tso servers
if r.Intn(2) == 0 {
tsoSvr.Close()
} else {
suite.tsoServer.Close()
}
suite.tsoCluster.WaitForDefaultPrimaryServing(re).Close()
} else {
// close pd leader server
suite.cluster.GetServer(suite.cluster.GetLeader()).GetServer().Close()
}
time.Sleep(time.Duration(n) * time.Second)
cancel()
}()

checkTSO(ctx, re, &wg, suite.backendEndpoints)
wg.Wait()
suite.TearDownSuite()
suite.SetupSuite()
Expand Down Expand Up @@ -266,10 +260,10 @@ func TestMixedTSODeployment(t *testing.T) {

ctx1, cancel1 := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
checkTSO(ctx1, re, &wg, backendEndpoints)
wg.Add(1)
go func() {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 2; i++ {
n := r.Intn(2) + 1
time.Sleep(time.Duration(n) * time.Second)
Expand All @@ -278,32 +272,32 @@ func TestMixedTSODeployment(t *testing.T) {
}
cancel1()
}()
checkTSO(ctx1, re, &wg, backendEndpoints)
wg.Wait()
}

func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) {
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
cli := mcs.SetupClientWithKeyspace(context.Background(), re, strings.Split(backendEndpoints, ","))
cli := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ","))
var ts, lastTS uint64
for {
physical, logical, err := cli.GetTS(context.Background())
// omit the error check since there are many kinds of errors
if err == nil {
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
select {
case <-ctx.Done():
physical, logical, _ := cli.GetTS(context.Background())
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
// Make sure the lastTS is not empty
re.NotEmpty(lastTS)
return
default:
}
physical, logical, err := cli.GetTS(ctx)
// omit the error check since there are many kinds of errors
if err != nil {
continue
}
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
}()
}
Expand Down

0 comments on commit a746a3e

Please sign in to comment.