diff --git a/server/tso/tso.go b/server/tso/tso.go index 2cf2e559f0a..578bab1f2b6 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -44,7 +44,8 @@ type TimestampOracle struct { // For tso, set after pd becomes leader. ts unsafe.Pointer lastSavedTime atomic.Value - lease *member.LeaderLease + + lease *member.LeaderLease rootPath string member string @@ -255,7 +256,7 @@ func (t *TimestampOracle) ResetTimestamp() { atomic.StorePointer(&t.ts, unsafe.Pointer(zero)) } -var maxRetryCount = 100 +var maxRetryCount = 10 // GetRespTS is used to get a timestamp. func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { @@ -272,9 +273,7 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { for i := 0; i < maxRetryCount; i++ { current := (*atomicObject)(atomic.LoadPointer(&t.ts)) if current == nil || current.physical == typeutil.ZeroTime { - log.Error("we haven't synced timestamp ok, wait and retry", zap.Int("retry-count", i)) - time.Sleep(200 * time.Millisecond) - continue + return pdpb.Timestamp{}, errors.New("can not get timestamp, may be not leader") } resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) @@ -287,6 +286,7 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { time.Sleep(UpdateTimestampStep) continue } + // In case lease expired after the first check. if t.lease == nil || t.lease.IsExpired() { return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired") } diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 974f198950c..4fec5331ffd 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -170,6 +170,46 @@ func (s *testTsoSuite) TestTsoCount0(c *C) { c.Assert(err, NotNil) } +func (s *testTsoSuite) TestRequestFollower(c *C) { + cluster, err := tests.NewTestCluster(s.ctx, 2) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + + var followerServer *tests.TestServer + for _, s := range cluster.GetServers() { + if s.GetConfig().Name != cluster.GetLeader() { + followerServer = s + } + } + c.Assert(followerServer, NotNil) + + grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr()) + clusterID := followerServer.GetClusterID() + req := &pdpb.TsoRequest{ + Header: testutil.NewRequestHeader(clusterID), + Count: 1, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tsoClient, err := grpcPDClient.Tso(ctx) + c.Assert(err, IsNil) + defer tsoClient.CloseSend() + + start := time.Now() + err = tsoClient.Send(req) + c.Assert(err, IsNil) + _, err = tsoClient.Recv() + c.Assert(err, NotNil) + + // Requesting follower should fail fast, or the unavailable time will be + // too long. + c.Assert(time.Since(start), Less, time.Second) +} + var _ = Suite(&testTimeFallBackSuite{}) type testTimeFallBackSuite struct {