From 8d3c945bce6a7ab33573ecaa36faf1b6accfb6d2 Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 7 Jul 2020 19:09:12 +0800 Subject: [PATCH 1/4] tso: make follower fail fast for tso requests Signed-off-by: disksing --- server/tso/tso.go | 5 +++++ tests/server/tso/tso_test.go | 40 ++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/server/tso/tso.go b/server/tso/tso.go index 2cf2e559f0a..927462b3e55 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -270,6 +270,10 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { }) for i := 0; i < maxRetryCount; i++ { + if t.lease == nil || t.lease.IsExpired() { + return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired") + } + current := (*atomicObject)(atomic.LoadPointer(&t.ts)) if current == nil || current.physical == typeutil.ZeroTime { log.Error("we haven't synced timestamp ok, wait and retry", zap.Int("retry-count", i)) @@ -287,6 +291,7 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { time.Sleep(UpdateTimestampStep) continue } + // double check in case lease expired after the first check. if t.lease == nil || t.lease.IsExpired() { return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired") } diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 974f198950c..4e0b243e82e 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -53,6 +53,46 @@ func (s *testTsoSuite) TearDownSuite(c *C) { s.cancel() } +func (s *testTsoSuite) TestRequestFollower(c *C) { + cluster, err := tests.NewTestCluster(s.ctx, 2) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + + var followerServer *tests.TestServer + for _, s := range cluster.GetServers() { + if s.GetConfig().Name != cluster.GetLeader() { + followerServer = s + } + } + c.Assert(followerServer, NotNil) + + grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr()) + clusterID := followerServer.GetClusterID() + req := &pdpb.TsoRequest{ + Header: testutil.NewRequestHeader(clusterID), + Count: 1, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tsoClient, err := grpcPDClient.Tso(ctx) + c.Assert(err, IsNil) + defer tsoClient.CloseSend() + + start := time.Now() + err = tsoClient.Send(req) + c.Assert(err, IsNil) + _, err = tsoClient.Recv() + c.Assert(err, NotNil) + + // Requesting follower should fail fast, or the unavailable time will be + // too long. + c.Assert(time.Since(start), Less, time.Second) +} + func (s *testTsoSuite) testGetTimestamp(c *C, n int) *pdpb.Timestamp { var err error cluster, err := tests.NewTestCluster(s.ctx, 1) From 21fbf7aea78d2faa32cd0cb1b171a4c1148fc873 Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 8 Jul 2020 16:20:03 +0800 Subject: [PATCH 2/4] another method Signed-off-by: disksing --- server/tso/tso.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/server/tso/tso.go b/server/tso/tso.go index 927462b3e55..93e48ac5ca6 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -15,6 +15,7 @@ package tso import ( "path" + "sync" "sync/atomic" "time" "unsafe" @@ -44,7 +45,9 @@ type TimestampOracle struct { // For tso, set after pd becomes leader. ts unsafe.Pointer lastSavedTime atomic.Value - lease *member.LeaderLease + + mu sync.RWMutex + lease *member.LeaderLease rootPath string member string @@ -255,7 +258,7 @@ func (t *TimestampOracle) ResetTimestamp() { atomic.StorePointer(&t.ts, unsafe.Pointer(zero)) } -var maxRetryCount = 100 +var maxRetryCount = 10 // GetRespTS is used to get a timestamp. func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { @@ -270,15 +273,9 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { }) for i := 0; i < maxRetryCount; i++ { - if t.lease == nil || t.lease.IsExpired() { - return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired") - } - current := (*atomicObject)(atomic.LoadPointer(&t.ts)) if current == nil || current.physical == typeutil.ZeroTime { - log.Error("we haven't synced timestamp ok, wait and retry", zap.Int("retry-count", i)) - time.Sleep(200 * time.Millisecond) - continue + return pdpb.Timestamp{}, errors.New("alloc timestamp failed, may be not leader") } resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) @@ -291,7 +288,7 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { time.Sleep(UpdateTimestampStep) continue } - // double check in case lease expired after the first check. + // In case lease expired after the first check. if t.lease == nil || t.lease.IsExpired() { return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired") } From 08b7141547f64d36adfa04e84329dd5d6461d40a Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 8 Jul 2020 16:23:37 +0800 Subject: [PATCH 3/4] minor update Signed-off-by: disksing --- server/tso/tso.go | 2 - tests/server/tso/tso_test.go | 80 ++++++++++++++++++------------------ 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/server/tso/tso.go b/server/tso/tso.go index 93e48ac5ca6..a7b645773ae 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -15,7 +15,6 @@ package tso import ( "path" - "sync" "sync/atomic" "time" "unsafe" @@ -46,7 +45,6 @@ type TimestampOracle struct { ts unsafe.Pointer lastSavedTime atomic.Value - mu sync.RWMutex lease *member.LeaderLease rootPath string diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 4e0b243e82e..4fec5331ffd 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -53,46 +53,6 @@ func (s *testTsoSuite) TearDownSuite(c *C) { s.cancel() } -func (s *testTsoSuite) TestRequestFollower(c *C) { - cluster, err := tests.NewTestCluster(s.ctx, 2) - c.Assert(err, IsNil) - defer cluster.Destroy() - - err = cluster.RunInitialServers() - c.Assert(err, IsNil) - cluster.WaitLeader() - - var followerServer *tests.TestServer - for _, s := range cluster.GetServers() { - if s.GetConfig().Name != cluster.GetLeader() { - followerServer = s - } - } - c.Assert(followerServer, NotNil) - - grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr()) - clusterID := followerServer.GetClusterID() - req := &pdpb.TsoRequest{ - Header: testutil.NewRequestHeader(clusterID), - Count: 1, - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - tsoClient, err := grpcPDClient.Tso(ctx) - c.Assert(err, IsNil) - defer tsoClient.CloseSend() - - start := time.Now() - err = tsoClient.Send(req) - c.Assert(err, IsNil) - _, err = tsoClient.Recv() - c.Assert(err, NotNil) - - // Requesting follower should fail fast, or the unavailable time will be - // too long. - c.Assert(time.Since(start), Less, time.Second) -} - func (s *testTsoSuite) testGetTimestamp(c *C, n int) *pdpb.Timestamp { var err error cluster, err := tests.NewTestCluster(s.ctx, 1) @@ -210,6 +170,46 @@ func (s *testTsoSuite) TestTsoCount0(c *C) { c.Assert(err, NotNil) } +func (s *testTsoSuite) TestRequestFollower(c *C) { + cluster, err := tests.NewTestCluster(s.ctx, 2) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + + var followerServer *tests.TestServer + for _, s := range cluster.GetServers() { + if s.GetConfig().Name != cluster.GetLeader() { + followerServer = s + } + } + c.Assert(followerServer, NotNil) + + grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr()) + clusterID := followerServer.GetClusterID() + req := &pdpb.TsoRequest{ + Header: testutil.NewRequestHeader(clusterID), + Count: 1, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tsoClient, err := grpcPDClient.Tso(ctx) + c.Assert(err, IsNil) + defer tsoClient.CloseSend() + + start := time.Now() + err = tsoClient.Send(req) + c.Assert(err, IsNil) + _, err = tsoClient.Recv() + c.Assert(err, NotNil) + + // Requesting follower should fail fast, or the unavailable time will be + // too long. + c.Assert(time.Since(start), Less, time.Second) +} + var _ = Suite(&testTimeFallBackSuite{}) type testTimeFallBackSuite struct { From d3d4a94ada682dc836dc9820e90c787b3730fafa Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 8 Jul 2020 16:58:56 +0800 Subject: [PATCH 4/4] minor update Signed-off-by: disksing --- server/tso/tso.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tso/tso.go b/server/tso/tso.go index a7b645773ae..578bab1f2b6 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -273,7 +273,7 @@ func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { for i := 0; i < maxRetryCount; i++ { current := (*atomicObject)(atomic.LoadPointer(&t.ts)) if current == nil || current.physical == typeutil.ZeroTime { - return pdpb.Timestamp{}, errors.New("alloc timestamp failed, may be not leader") + return pdpb.Timestamp{}, errors.New("can not get timestamp, may be not leader") } resp.Physical = current.physical.UnixNano() / int64(time.Millisecond)