Skip to content

Commit

Permalink
Merge branch 'master' into fix-leak1
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Apr 23, 2023
2 parents 90b3bb0 + a746a3e commit 6017620
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 46 deletions.
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
}
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision
revision = resp.Header.Revision + 1
for _, item := range resp.Kvs {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(item.Value, s); err != nil {
Expand Down
11 changes: 11 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb
return member, nil
}

// ResignPrimary resigns the primary of the given keyspace and keyspace group.
func (s *Server) ResignPrimary() error {
member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
if err != nil {
return err
}
member.ResetLeader()
return nil
}

// AddServiceReadyCallback implements basicserver.
// It adds callbacks when it's ready for providing tso service.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/endpoint/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error {
}
}
if previousTS != typeutil.ZeroTime && typeutil.SubRealTimeByWallClock(ts, previousTS) <= 0 {
return nil
return errors.Errorf("saving timestamp %d is less than or equal to the previous one %d", ts.UnixNano(), previousTS.UnixNano())
}
data := typeutil.Uint64ToBytes(uint64(ts.UnixNano()))
return txn.Save(key, string(data))
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/storage_tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestTimestampTxn(t *testing.T) {

globalTS2 := globalTS1.Add(-time.Millisecond).Round(0)
err = storage.SaveTimestamp(timestampKey, globalTS2)
re.NoError(err)
re.Error(err)

ts, err := storage.LoadTimestamp("")
re.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups(
}

if resp.Header != nil {
revision = resp.Header.Revision
revision = resp.Header.Revision + 1
}

return revision, kgs, resp.More, nil
Expand Down
5 changes: 5 additions & 0 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func (tc *TestTSOCluster) DestroyServer(addr string) {
delete(tc.servers, addr)
}

// ResignPrimary resigns the primary TSO server.
func (tc *TestTSOCluster) ResignPrimary() {
tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary()
}

// GetPrimary returns the primary TSO server.
func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
Expand Down
78 changes: 36 additions & 42 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import (
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/testutil"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)

var r = rand.New(rand.NewSource(time.Now().UnixNano()))

type tsoClientTestSuite struct {
suite.Suite
legacy bool
Expand All @@ -44,8 +45,7 @@ type tsoClientTestSuite struct {
// The PD cluster.
cluster *tests.TestCluster
// The TSO service in microservice mode.
tsoServer *tso.Server
tsoServerCleanup func()
tsoCluster *mcs.TestTSOCluster

backendEndpoints string

Expand Down Expand Up @@ -85,15 +85,16 @@ func (suite *tsoClientTestSuite) SetupSuite() {
suite.client, err = pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{})
re.NoError(err)
} else {
suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints)
re.NoError(err)
suite.client = mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ","))
}
}

func (suite *tsoClientTestSuite) TearDownSuite() {
suite.cancel()
if !suite.legacy {
suite.tsoServerCleanup()
suite.tsoCluster.Destroy()
}
suite.cluster.Destroy()
}
Expand Down Expand Up @@ -175,27 +176,30 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp"))
}

func (suite *tsoClientTestSuite) TestRandomTransferLeader() {
func (suite *tsoClientTestSuite) TestRandomResignLeader() {
re := suite.Require()

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
r := rand.New(rand.NewSource(time.Now().UnixNano()))

ctx, cancel := context.WithCancel(suite.ctx)
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
checkTSO(ctx, re, &wg, suite.backendEndpoints)
wg.Add(1)
go func() {
defer wg.Done()
n := r.Intn(2) + 1
time.Sleep(time.Duration(n) * time.Second)
err := suite.cluster.ResignLeader()
re.NoError(err)
suite.cluster.WaitLeader()
if !suite.legacy {
suite.tsoCluster.ResignPrimary()
suite.tsoCluster.WaitForDefaultPrimaryServing(re)
} else {
err := suite.cluster.ResignLeader()
re.NoError(err)
suite.cluster.WaitLeader()
}
time.Sleep(time.Duration(n) * time.Second)
cancel()
}()

checkTSO(ctx, re, &wg, suite.backendEndpoints)
wg.Wait()
}

Expand All @@ -204,32 +208,22 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))

tsoSvr, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
defer cleanup()

ctx, cancel := context.WithCancel(suite.ctx)
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
checkTSO(ctx, re, &wg, suite.backendEndpoints)
wg.Add(1)
go func() {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
n := r.Intn(2) + 1
time.Sleep(time.Duration(n) * time.Second)
if !suite.legacy {
// random close one of the tso servers
if r.Intn(2) == 0 {
tsoSvr.Close()
} else {
suite.tsoServer.Close()
}
suite.tsoCluster.WaitForDefaultPrimaryServing(re).Close()
} else {
// close pd leader server
suite.cluster.GetServer(suite.cluster.GetLeader()).GetServer().Close()
}
time.Sleep(time.Duration(n) * time.Second)
cancel()
}()

checkTSO(ctx, re, &wg, suite.backendEndpoints)
wg.Wait()
suite.TearDownSuite()
suite.SetupSuite()
Expand Down Expand Up @@ -266,10 +260,10 @@ func TestMixedTSODeployment(t *testing.T) {

ctx1, cancel1 := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
checkTSO(ctx1, re, &wg, backendEndpoints)
wg.Add(1)
go func() {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 2; i++ {
n := r.Intn(2) + 1
time.Sleep(time.Duration(n) * time.Second)
Expand All @@ -278,32 +272,32 @@ func TestMixedTSODeployment(t *testing.T) {
}
cancel1()
}()
checkTSO(ctx1, re, &wg, backendEndpoints)
wg.Wait()
}

func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) {
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
cli := mcs.SetupClientWithKeyspace(context.Background(), re, strings.Split(backendEndpoints, ","))
cli := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ","))
var ts, lastTS uint64
for {
physical, logical, err := cli.GetTS(context.Background())
// omit the error check since there are many kinds of errors
if err == nil {
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
select {
case <-ctx.Done():
physical, logical, _ := cli.GetTS(context.Background())
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
// Make sure the lastTS is not empty
re.NotEmpty(lastTS)
return
default:
}
physical, logical, err := cli.GetTS(ctx)
// omit the error check since there are many kinds of errors
if err != nil {
continue
}
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
}()
}
Expand Down

0 comments on commit 6017620

Please sign in to comment.