From dcf71cdeffdf6821ee8b2e50ac46351676b286f8 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 21 Apr 2023 18:43:20 +0800 Subject: [PATCH] mcs, tests: use TSO cluster to do the failover test (#6356) ref tikv/pd#5895 Use TSO cluster to do the failover test. Signed-off-by: JmPotato --- pkg/mcs/tso/server/server.go | 11 ++++ tests/integrations/mcs/cluster.go | 5 ++ tests/integrations/tso/client_test.go | 78 +++++++++++++-------------- 3 files changed, 52 insertions(+), 42 deletions(-) diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 67498409371..be4f2be00a5 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -244,6 +244,17 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb return member, nil } +// ResignPrimary resigns the primary of the given keyspace and keyspace group. +func (s *Server) ResignPrimary() error { + member, err := s.keyspaceGroupManager.GetElectionMember( + mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) + if err != nil { + return err + } + member.ResetLeader() + return nil +} + // AddServiceReadyCallback implements basicserver. // It adds callbacks when it's ready for providing tso service. func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go index baad22c9148..e0802b9f974 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/integrations/mcs/cluster.go @@ -91,6 +91,11 @@ func (tc *TestTSOCluster) DestroyServer(addr string) { delete(tc.servers, addr) } +// ResignPrimary resigns the primary TSO server. +func (tc *TestTSOCluster) ResignPrimary() { + tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary() +} + // GetPrimary returns the primary TSO server. func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server { for _, server := range tc.servers { diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index c98caa9db7f..0bde4f1a18b 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -28,13 +28,14 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/testutil" - tso "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" ) +var r = rand.New(rand.NewSource(time.Now().UnixNano())) + type tsoClientTestSuite struct { suite.Suite legacy bool @@ -44,8 +45,7 @@ type tsoClientTestSuite struct { // The PD cluster. cluster *tests.TestCluster // The TSO service in microservice mode. - tsoServer *tso.Server - tsoServerCleanup func() + tsoCluster *mcs.TestTSOCluster backendEndpoints string @@ -85,7 +85,8 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.client, err = pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}) re.NoError(err) } else { - suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints) + re.NoError(err) suite.client = mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ",")) } } @@ -93,7 +94,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { func (suite *tsoClientTestSuite) TearDownSuite() { suite.cancel() if !suite.legacy { - suite.tsoServerCleanup() + suite.tsoCluster.Destroy() } suite.cluster.Destroy() } @@ -175,27 +176,30 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) } -func (suite *tsoClientTestSuite) TestRandomTransferLeader() { +func (suite *tsoClientTestSuite) TestRandomResignLeader() { re := suite.Require() - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - r := rand.New(rand.NewSource(time.Now().UnixNano())) ctx, cancel := context.WithCancel(suite.ctx) var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber + 1) + checkTSO(ctx, re, &wg, suite.backendEndpoints) + wg.Add(1) go func() { defer wg.Done() n := r.Intn(2) + 1 time.Sleep(time.Duration(n) * time.Second) - err := suite.cluster.ResignLeader() - re.NoError(err) - suite.cluster.WaitLeader() + if !suite.legacy { + suite.tsoCluster.ResignPrimary() + suite.tsoCluster.WaitForDefaultPrimaryServing(re) + } else { + err := suite.cluster.ResignLeader() + re.NoError(err) + suite.cluster.WaitLeader() + } + time.Sleep(time.Duration(n) * time.Second) cancel() }() - - checkTSO(ctx, re, &wg, suite.backendEndpoints) wg.Wait() } @@ -204,32 +208,22 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - tsoSvr, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) - defer cleanup() - ctx, cancel := context.WithCancel(suite.ctx) var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber + 1) + checkTSO(ctx, re, &wg, suite.backendEndpoints) + wg.Add(1) go func() { defer wg.Done() - r := rand.New(rand.NewSource(time.Now().UnixNano())) n := r.Intn(2) + 1 time.Sleep(time.Duration(n) * time.Second) if !suite.legacy { - // random close one of the tso servers - if r.Intn(2) == 0 { - tsoSvr.Close() - } else { - suite.tsoServer.Close() - } + suite.tsoCluster.WaitForDefaultPrimaryServing(re).Close() } else { - // close pd leader server suite.cluster.GetServer(suite.cluster.GetLeader()).GetServer().Close() } + time.Sleep(time.Duration(n) * time.Second) cancel() }() - - checkTSO(ctx, re, &wg, suite.backendEndpoints) wg.Wait() suite.TearDownSuite() suite.SetupSuite() @@ -266,10 +260,10 @@ func TestMixedTSODeployment(t *testing.T) { ctx1, cancel1 := context.WithCancel(context.Background()) var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber + 1) + checkTSO(ctx1, re, &wg, backendEndpoints) + wg.Add(1) go func() { defer wg.Done() - r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := 0; i < 2; i++ { n := r.Intn(2) + 1 time.Sleep(time.Duration(n) * time.Second) @@ -278,32 +272,32 @@ func TestMixedTSODeployment(t *testing.T) { } cancel1() }() - checkTSO(ctx1, re, &wg, backendEndpoints) wg.Wait() } func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) { + wg.Add(tsoRequestConcurrencyNumber) for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() - cli := mcs.SetupClientWithKeyspace(context.Background(), re, strings.Split(backendEndpoints, ",")) + cli := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) var ts, lastTS uint64 for { - physical, logical, err := cli.GetTS(context.Background()) - // omit the error check since there are many kinds of errors - if err == nil { - ts = tsoutil.ComposeTS(physical, logical) - re.Less(lastTS, ts) - lastTS = ts - } select { case <-ctx.Done(): - physical, logical, _ := cli.GetTS(context.Background()) - ts = tsoutil.ComposeTS(physical, logical) - re.Less(lastTS, ts) + // Make sure the lastTS is not empty + re.NotEmpty(lastTS) return default: } + physical, logical, err := cli.GetTS(ctx) + // omit the error check since there are many kinds of errors + if err != nil { + continue + } + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts } }() }