From eb150821b5b6f6cecec6021118adc6dec5a542c4 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 20 Mar 2023 18:44:44 +0800 Subject: [PATCH] add more tso tests Signed-off-by: Ryan Leung --- client/pd_service_discovery.go | 3 + pkg/tso/allocator_manager.go | 4 + tests/cluster.go | 14 +++ tests/mcs/testutil.go | 7 ++ tests/mcs/tso/tso_service_test.go | 145 ++++++++++++++++++++++++++++-- 5 files changed, 167 insertions(+), 6 deletions(-) diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 98f51f0a47b1..02f3e080cd13 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -216,6 +216,9 @@ func (c *pdServiceDiscovery) updateMemberLoop() { } func (c *pdServiceDiscovery) updateServiceModeLoop() { + failpoint.Inject("skipUpdateServiceMode", func() { + failpoint.Return() + }) defer c.wg.Done() ctx, cancel := context.WithCancel(c.ctx) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index a3de87311bb8..6f08eea5b9ad 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -610,6 +610,10 @@ func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) { defer patrolTicker.Stop() } tsTicker := time.NewTicker(am.updatePhysicalInterval) + failpoint.Inject("fastUpdatePhysicalInterval", func() { + tsTicker.Stop() + tsTicker = time.NewTicker(100 * time.Millisecond) + }) defer tsTicker.Stop() checkerTicker := time.NewTicker(PriorityCheck) defer checkerTicker.Stop() diff --git a/tests/cluster.go b/tests/cluster.go index dbb15c199b89..1f27e72bcada 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -717,6 +717,20 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ return s, nil } +// JoinAPIServer is used to add a new TestAPIServer into the cluster. +func (c *TestCluster) JoinAPIServer(ctx context.Context, opts ...ConfigOption) (*TestServer, error) { + conf, err := c.config.Join().Generate(opts...) + if err != nil { + return nil, err + } + s, err := NewTestAPIServer(ctx, conf) + if err != nil { + return nil, err + } + c.servers[conf.Name] = s + return s, nil +} + // Destroy is used to destroy a TestCluster. func (c *TestCluster) Destroy() { for _, s := range c.servers { diff --git a/tests/mcs/testutil.go b/tests/mcs/testutil.go index c1c27c59a320..5545866fd47e 100644 --- a/tests/mcs/testutil.go +++ b/tests/mcs/testutil.go @@ -37,6 +37,13 @@ func SetupTSOClient(ctx context.Context, re *require.Assertions, endpoints []str return cli } +// SetupTSOClient creates a TSO client for test. +func SetupClient(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { + cli, err := pd.NewClientWithContext(ctx, endpoints, pd.SecurityOption{}, opts...) + re.NoError(err) + return cli +} + // StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing. func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints string) (*rm.Server, func()) { cfg, err := rm.NewTestDefaultConfig() diff --git a/tests/mcs/tso/tso_service_test.go b/tests/mcs/tso/tso_service_test.go index 69ec6870ba7d..bd49936382a4 100644 --- a/tests/mcs/tso/tso_service_test.go +++ b/tests/mcs/tso/tso_service_test.go @@ -16,10 +16,14 @@ package tso import ( "context" + "math/rand" "strings" "sync" "testing" + "time" + "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" tsosvr "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -28,8 +32,8 @@ import ( ) const ( - tsoRequestConcurrencyNumber = 1 - tsoRequestRound = 30 + tsoRequestConcurrencyNumber = 5 + tsoRequestRound = 100 ) type tsoServiceTestSuite struct { @@ -39,8 +43,8 @@ type tsoServiceTestSuite struct { cluster *tests.TestCluster pdLeader *tests.TestServer backendEndpoints string - tsoSvr1 *tsosvr.Server - tsoSvrCleanup1 func() + tsoSvr *tsosvr.Server + tsoSvrCleanup func() } func TestTSOServiceTestSuite(t *testing.T) { @@ -62,11 +66,11 @@ func (suite *tsoServiceTestSuite) SetupSuite() { suite.pdLeader = suite.cluster.GetServer(leaderName) suite.backendEndpoints = suite.pdLeader.GetAddr() - suite.tsoSvr1, suite.tsoSvrCleanup1 = mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints) + suite.tsoSvr, suite.tsoSvrCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints) } func (suite *tsoServiceTestSuite) TearDownSuite() { - suite.tsoSvrCleanup1() + suite.tsoSvrCleanup() suite.cluster.Destroy() suite.cancel() } @@ -100,3 +104,132 @@ func (suite *tsoServiceTestSuite) TestTSOServerRegister() { } wg.Wait() } + +func TestRandomTransferAPILeader(t *testing.T) { + re := require.New(t) + + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + ctx, cancel := context.WithCancel(context.Background()) + cluster, err := tests.NewTestAPICluster(ctx, 3) + re.NoError(err) + defer cancel() + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + + leaderServer := cluster.GetServer(cluster.WaitLeader()) + backendEndpoints := leaderServer.GetAddr() + + _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints) + defer cleanup() + + cli1 := mcs.SetupTSOClient(ctx, re, strings.Split(backendEndpoints, ",")) + cli2 := mcs.SetupTSOClient(ctx, re, strings.Split(backendEndpoints, ",")) + + var wg sync.WaitGroup + wg.Add(tsoRequestConcurrencyNumber + 1) + go func() { + defer wg.Done() + n := r.Intn(3) + 1 + time.Sleep(time.Duration(n) * time.Second) + leaderServer.ResignLeader() + leaderServer = cluster.GetServer(cluster.WaitLeader()) + }() + + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + go func() { + defer wg.Done() + var lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + time.Sleep(time.Millisecond * 50) + physical, logical, err := cli1.GetTS(context.Background()) + re.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + physical, logical, err = cli2.GetTS(context.Background()) + re.NoError(err) + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + }() + } + wg.Wait() +} + +// When we upgrade the PD cluster +func TestMixedTSODeployment(t *testing.T) { + re := require.New(t) + + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) + defer re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + ctx, cancel := context.WithCancel(context.Background()) + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cancel() + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + + leaderServer := cluster.GetServer(cluster.WaitLeader()) + backendEndpoints := leaderServer.GetAddr() + + apiSvr, err := cluster.JoinAPIServer(ctx) + re.NoError(err) + err = apiSvr.Run() + re.NoError(err) + + _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints) + defer cleanup() + + cli1 := mcs.SetupTSOClient(ctx, re, strings.Split(backendEndpoints, ",")) + cli2 := mcs.SetupTSOClient(ctx, re, strings.Split(backendEndpoints, ",")) + + var wg sync.WaitGroup + wg.Add(tsoRequestConcurrencyNumber + 1) + go func() { + defer wg.Done() + for i := 0; i < 2; i++ { + n := r.Intn(3) + 1 + time.Sleep(time.Duration(n) * time.Second) + leaderServer.ResignLeader() + leaderServer = cluster.GetServer(cluster.WaitLeader()) + } + }() + + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + go func() { + defer wg.Done() + var ts, lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + time.Sleep(time.Millisecond * 100) + physical, logical, err := cli1.GetTS(context.Background()) + if err != nil { + re.ErrorContains(err, "not leader") + } else { + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + physical, logical, _ = cli2.GetTS(context.Background()) + if err != nil { + re.ErrorContains(err, "not leader") + } else { + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + } + }() + } + wg.Wait() +}