Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso, server, tests: add a test case for tso allocation bug (#2675) #2678

Merged
merged 1 commit into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type Server struct {
serverLoopCancel func()
serverLoopWg sync.WaitGroup

// leader lease
lease *member.LeaderLease
leaseMu sync.RWMutex

member *member.Member
// etcd client
client *clientv3.Client
Expand Down Expand Up @@ -640,6 +644,20 @@ func (s *Server) GetMember() *member.Member {
return s.member
}

// GetLease returns the lease of member and only leader server's lease is not nil.
func (s *Server) GetLease() *member.LeaderLease {
s.leaseMu.RLock()
defer s.leaseMu.RUnlock()
return s.lease
}

// SetLease changes the lease.
func (s *Server) SetLease(lease *member.LeaderLease) {
s.leaseMu.Lock()
defer s.leaseMu.Unlock()
s.lease = lease
}

// GetStorage returns the backend storage of server.
func (s *Server) GetStorage() *core.Storage {
return s.storage
Expand Down Expand Up @@ -1101,6 +1119,8 @@ func (s *Server) campaignLeader() {
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
go lease.KeepAlive(ctx)
s.SetLease(lease)
defer s.SetLease(nil)
log.Info("campaign leader ok", zap.String("campaign-leader-name", s.Name()))

log.Debug("sync timestamp for tso")
Expand Down
18 changes: 15 additions & 3 deletions server/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (t *TimestampOracle) checkLease() bool {
return t.lease != nil && !t.lease.IsExpired()
}

func (t *TimestampOracle) setLease(lease *member.LeaderLease) {
t.mu.Lock()
defer t.mu.Unlock()
t.lease = lease
}

// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it,
// otherwise, update it.
func (t *TimestampOracle) saveTimestamp(ts time.Time) error {
Expand All @@ -119,6 +125,12 @@ func (t *TimestampOracle) saveTimestamp(ts time.Time) error {
func (t *TimestampOracle) SyncTimestamp(lease *member.LeaderLease) error {
tsoCounter.WithLabelValues("sync").Inc()

t.setLease(lease)

failpoint.Inject("delaySyncTimestamp", func() {
time.Sleep(time.Second)
})

last, err := t.loadTimestamp()
if err != nil {
return err
Expand Down Expand Up @@ -148,9 +160,6 @@ func (t *TimestampOracle) SyncTimestamp(lease *member.LeaderLease) error {
current := &atomicObject{
physical: next,
}
t.mu.Lock()
t.lease = lease
t.mu.Unlock()
atomic.StorePointer(&t.ts, unsafe.Pointer(current))

return nil
Expand Down Expand Up @@ -264,6 +273,7 @@ func (t *TimestampOracle) ResetTimestamp() {
physical: typeutil.ZeroTime,
}
atomic.StorePointer(&t.ts, unsafe.Pointer(zero))
t.setLease(nil)
}

var maxRetryCount = 10
Expand All @@ -285,9 +295,11 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) {
if current == nil || current.physical == typeutil.ZeroTime {
// If it's leader, maybe SyncTimestamp hasn't completed yet
if t.checkLease() {
log.Info("sync hasn't completed yet, wait for a while")
time.Sleep(200 * time.Millisecond)
continue
}
log.Error("invalid timestamp", zap.Any("timestamp", current))
return pdpb.Timestamp{}, errors.New("can not get timestamp, may be not leader")
}

Expand Down
14 changes: 14 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/id"
"github.com/pingcap/pd/v4/server/join"
"github.com/pingcap/pd/v4/server/member"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -328,6 +329,19 @@ func (s *TestServer) BootstrapCluster() error {
return nil
}

// WaitLease is used to get leader lease.
// If it exceeds the maximum number of loops, it will return nil.
func (s *TestServer) WaitLease() *member.LeaderLease {
for i := 0; i < 100; i++ {
lease := s.server.GetLease()
if lease != nil {
return lease
}
time.Sleep(WaitLeaderCheckInterval)
}
return nil
}

// TestCluster is only for test.
type TestCluster struct {
config *clusterConfig
Expand Down
47 changes: 47 additions & 0 deletions tests/server/tso/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,53 @@ func (s *testTsoSuite) TestRequestFollower(c *C) {
c.Assert(time.Since(start), Less, time.Second)
}

// In some cases, when a TSO request arrives, the SyncTimestamp may not finish yet.
// This test is used to simulate this situation and verify that the retry mechanism.
func (s *testTsoSuite) TestDeplaySyncTimestamp(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 2)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()

var leaderServer, nextLeaderServer *tests.TestServer
leaderServer = cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer, NotNil)
for _, s := range cluster.GetServers() {
if s.GetConfig().Name != cluster.GetLeader() {
nextLeaderServer = s
}
}
c.Assert(nextLeaderServer, NotNil)

grpcPDClient := testutil.MustNewGrpcClient(c, nextLeaderServer.GetAddr())
clusterID := nextLeaderServer.GetClusterID()
req := &pdpb.TsoRequest{
Header: testutil.NewRequestHeader(clusterID),
Count: 1,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

c.Assert(failpoint.Enable("github.com/pingcap/pd/v4/server/tso/delaySyncTimestamp", `return(true)`), IsNil)

// Make the old leader resign and wait for the new leader to get a lease
leaderServer.ResignLeader()
c.Assert(nextLeaderServer.WaitLease(), NotNil)

tsoClient, err := grpcPDClient.Tso(ctx)
c.Assert(err, IsNil)
defer tsoClient.CloseSend()
err = tsoClient.Send(req)
c.Assert(err, IsNil)
resp, err := tsoClient.Recv()
c.Assert(err, IsNil)
c.Assert(resp.GetCount(), Equals, uint32(1))
failpoint.Disable("github.com/pingcap/pd/v4/server/tso/delaySyncTimestamp")
}

var _ = Suite(&testTimeFallBackSuite{})

type testTimeFallBackSuite struct {
Expand Down