Skip to content

Commit

Permalink
add more tso tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Mar 20, 2023
1 parent 220dbed commit eb15082
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 6 deletions.
3 changes: 3 additions & 0 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
}

func (c *pdServiceDiscovery) updateServiceModeLoop() {
failpoint.Inject("skipUpdateServiceMode", func() {
failpoint.Return()
})
defer c.wg.Done()

ctx, cancel := context.WithCancel(c.ctx)
Expand Down
4 changes: 4 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,10 @@ func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
defer patrolTicker.Stop()
}
tsTicker := time.NewTicker(am.updatePhysicalInterval)
failpoint.Inject("fastUpdatePhysicalInterval", func() {
tsTicker.Stop()
tsTicker = time.NewTicker(100 * time.Millisecond)
})
defer tsTicker.Stop()
checkerTicker := time.NewTicker(PriorityCheck)
defer checkerTicker.Stop()
Expand Down
14 changes: 14 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,20 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ
return s, nil
}

// JoinAPIServer is used to add a new TestAPIServer into the cluster.
func (c *TestCluster) JoinAPIServer(ctx context.Context, opts ...ConfigOption) (*TestServer, error) {
conf, err := c.config.Join().Generate(opts...)
if err != nil {
return nil, err
}
s, err := NewTestAPIServer(ctx, conf)
if err != nil {
return nil, err
}
c.servers[conf.Name] = s
return s, nil
}

// Destroy is used to destroy a TestCluster.
func (c *TestCluster) Destroy() {
for _, s := range c.servers {
Expand Down
7 changes: 7 additions & 0 deletions tests/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func SetupTSOClient(ctx context.Context, re *require.Assertions, endpoints []str
return cli
}

// SetupTSOClient creates a TSO client for test.
func SetupClient(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
cli, err := pd.NewClientWithContext(ctx, endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}

// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing.
func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints string) (*rm.Server, func()) {
cfg, err := rm.NewTestDefaultConfig()
Expand Down
145 changes: 139 additions & 6 deletions tests/mcs/tso/tso_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ package tso

import (
"context"
"math/rand"
"strings"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
tsosvr "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/utils/tsoutil"
Expand All @@ -28,8 +32,8 @@ import (
)

const (
tsoRequestConcurrencyNumber = 1
tsoRequestRound = 30
tsoRequestConcurrencyNumber = 5
tsoRequestRound = 100
)

type tsoServiceTestSuite struct {
Expand All @@ -39,8 +43,8 @@ type tsoServiceTestSuite struct {
cluster *tests.TestCluster
pdLeader *tests.TestServer
backendEndpoints string
tsoSvr1 *tsosvr.Server
tsoSvrCleanup1 func()
tsoSvr *tsosvr.Server
tsoSvrCleanup func()
}

func TestTSOServiceTestSuite(t *testing.T) {
Expand All @@ -62,11 +66,11 @@ func (suite *tsoServiceTestSuite) SetupSuite() {
suite.pdLeader = suite.cluster.GetServer(leaderName)
suite.backendEndpoints = suite.pdLeader.GetAddr()

suite.tsoSvr1, suite.tsoSvrCleanup1 = mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints)
suite.tsoSvr, suite.tsoSvrCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints)
}

func (suite *tsoServiceTestSuite) TearDownSuite() {
suite.tsoSvrCleanup1()
suite.tsoSvrCleanup()
suite.cluster.Destroy()
suite.cancel()
}
Expand Down Expand Up @@ -100,3 +104,132 @@ func (suite *tsoServiceTestSuite) TestTSOServerRegister() {
}
wg.Wait()
}

func TestRandomTransferAPILeader(t *testing.T) {
re := require.New(t)

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
r := rand.New(rand.NewSource(time.Now().UnixNano()))
ctx, cancel := context.WithCancel(context.Background())
cluster, err := tests.NewTestAPICluster(ctx, 3)
re.NoError(err)
defer cancel()
defer cluster.Destroy()

err = cluster.RunInitialServers()
re.NoError(err)

leaderServer := cluster.GetServer(cluster.WaitLeader())
backendEndpoints := leaderServer.GetAddr()

_, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints)
defer cleanup()

cli1 := mcs.SetupTSOClient(ctx, re, strings.Split(backendEndpoints, ","))
cli2 := mcs.SetupTSOClient(ctx, re, strings.Split(backendEndpoints, ","))

var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
go func() {
defer wg.Done()
n := r.Intn(3) + 1
time.Sleep(time.Duration(n) * time.Second)
leaderServer.ResignLeader()
leaderServer = cluster.GetServer(cluster.WaitLeader())
}()

for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
time.Sleep(time.Millisecond * 50)
physical, logical, err := cli1.GetTS(context.Background())
re.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
physical, logical, err = cli2.GetTS(context.Background())
re.NoError(err)
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
}()
}
wg.Wait()
}

// When we upgrade the PD cluster
func TestMixedTSODeployment(t *testing.T) {
re := require.New(t)

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)"))

r := rand.New(rand.NewSource(time.Now().UnixNano()))
ctx, cancel := context.WithCancel(context.Background())
cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cancel()
defer cluster.Destroy()

err = cluster.RunInitialServers()
re.NoError(err)

leaderServer := cluster.GetServer(cluster.WaitLeader())
backendEndpoints := leaderServer.GetAddr()

apiSvr, err := cluster.JoinAPIServer(ctx)
re.NoError(err)
err = apiSvr.Run()
re.NoError(err)

_, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints)
defer cleanup()

cli1 := mcs.SetupTSOClient(ctx, re, strings.Split(backendEndpoints, ","))
cli2 := mcs.SetupTSOClient(ctx, re, strings.Split(backendEndpoints, ","))

var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
go func() {
defer wg.Done()
for i := 0; i < 2; i++ {
n := r.Intn(3) + 1
time.Sleep(time.Duration(n) * time.Second)
leaderServer.ResignLeader()
leaderServer = cluster.GetServer(cluster.WaitLeader())
}
}()

for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
var ts, lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
time.Sleep(time.Millisecond * 100)
physical, logical, err := cli1.GetTS(context.Background())
if err != nil {
re.ErrorContains(err, "not leader")
} else {
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
physical, logical, _ = cli2.GetTS(context.Background())
if err != nil {
re.ErrorContains(err, "not leader")
} else {
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
}
}()
}
wg.Wait()
}

0 comments on commit eb15082

Please sign in to comment.