From bb3953a54e367a9316bca4d8151f41e4b04ce640 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 1 Nov 2022 19:54:00 +0800 Subject: [PATCH] client: fix Stream timeout logic (#5551) (#5583) close tikv/pd#5207, ref tikv/pd#5551 fix Stream timeout logic Signed-off-by: ti-chi-bot Signed-off-by: Cabinfever_B Co-authored-by: Yongbo Jiang Co-authored-by: Cabinfever_B --- client/client.go | 42 ++++++++++++----------- tests/client/client_test.go | 66 +++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 19 deletions(-) diff --git a/client/client.go b/client/client.go index 46eab397412..18d170e8bed 100644 --- a/client/client.go +++ b/client/client.go @@ -280,8 +280,8 @@ const ( updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation. tsLoopDCCheckInterval = time.Minute defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst - retryInterval = 1 * time.Second - maxRetryTimes = 5 + retryInterval = 500 * time.Millisecond + maxRetryTimes = 6 ) // LeaderHealthCheckInterval might be changed in the unit to shorten the testing time. @@ -653,11 +653,10 @@ func (c *client) handleDispatcher( dc string, tbc *tsoBatchController) { var ( - retryTimeConsuming time.Duration - err error - streamAddr string - stream pdpb.PD_TsoClient - cancel context.CancelFunc + err error + streamAddr string + stream pdpb.PD_TsoClient + cancel context.CancelFunc // addr -> connectionContext connectionCtxs sync.Map opts []opentracing.StartSpanOption @@ -714,6 +713,7 @@ func (c *client) handleDispatcher( } // Loop through each batch of TSO requests and send them for processing. + streamLoopTimer := time.NewTimer(c.option.timeout) for { select { case <-dispatcherCtx.Done(): @@ -728,24 +728,22 @@ func (c *client) handleDispatcher( // Check stream and retry if necessary. if stream == nil { log.Info("[pd] tso stream is not ready", zap.String("dc", dc)) - c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - if retryTimeConsuming >= c.option.timeout { - err = errs.ErrClientCreateTSOStream.FastGenByArgs() - log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) - c.ScheduleCheckLeader() - c.revokeTSORequest(errors.WithStack(err), tbc.tsoRequestCh) - retryTimeConsuming = 0 + if c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { continue } select { case <-dispatcherCtx.Done(): return - case <-time.After(time.Second): - retryTimeConsuming += time.Second + case <-streamLoopTimer.C: + err = errs.ErrClientCreateTSOStream.FastGenByArgs() + log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) + c.ScheduleCheckLeader() + c.revokeTSORequest(errors.WithStack(err), tbc.tsoRequestCh) + continue + case <-time.After(retryInterval): continue } } - retryTimeConsuming = 0 // Start to collect the TSO requests. maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { @@ -835,7 +833,7 @@ type connectionContext struct { cancel context.CancelFunc } -func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) { +func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { // Normal connection creating, it will be affected by the `enableForwarding`. createTSOConnection := c.tryConnect if c.allowTSOFollowerProxy(dc) { @@ -843,7 +841,9 @@ func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, con } if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { log.Error("[pd] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) + return false } + return true } // tryConnect will try to connect to the TSO allocator leader. If the connection becomes unreachable @@ -859,6 +859,8 @@ func (c *client) tryConnect( networkErrNum uint64 err error stream pdpb.PD_TsoClient + url string + cc *grpc.ClientConn ) updateAndClear := func(newAddr string, connectionCtx *connectionContext) { if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded { @@ -874,9 +876,11 @@ func (c *client) tryConnect( return true }) } - cc, url := c.getAllocatorClientConnByDCLocation(dc) // retry several times before falling back to the follower when the network problem happens + for i := 0; i < maxRetryTimes; i++ { + c.ScheduleCheckLeader() + cc, url = c.getAllocatorClientConnByDCLocation(dc) cctx, cancel := context.WithCancel(dispatcherCtx) stream, err = c.createTsoStream(cctx, cancel, pdpb.NewPDClient(cc)) failpoint.Inject("unreachableNetwork", func() { diff --git a/tests/client/client_test.go b/tests/client/client_test.go index b3f5f6df15e..7f952a4bacb 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -316,6 +316,72 @@ func (s *clientTestSuite) TestTSOFollowerProxy(c *C) { wg.Wait() } +// TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207 +func (s *clientTestSuite) TestUnavailableTimeAfterLeaderIsReady(c *C) { + cluster, err := tests.NewTestCluster(s.ctx, 3) + c.Assert(err, IsNil) + defer cluster.Destroy() + + endpoints := s.runServer(c, cluster) + cli := setupCli(c, s.ctx, endpoints) + + var wg sync.WaitGroup + var maxUnavailableTime, leaderReadyTime time.Time + getTsoFunc := func() { + defer wg.Done() + var lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + var physical, logical int64 + var ts uint64 + physical, logical, err = cli.GetTS(context.Background()) + ts = tsoutil.ComposeTS(physical, logical) + if err != nil { + maxUnavailableTime = time.Now() + continue + } + c.Assert(err, IsNil) + c.Assert(lastTS, Less, ts) + lastTS = ts + } + } + + // test resign pd leader or stop pd leader + wg.Add(1 + 1) + go getTsoFunc() + go func() { + defer wg.Done() + leader := cluster.GetServer(cluster.GetLeader()) + leader.Stop() + cluster.WaitLeader() + leaderReadyTime = time.Now() + cluster.RunServers([]*tests.TestServer{leader}) + }() + wg.Wait() + c.Assert(maxUnavailableTime.Unix(), LessEqual, leaderReadyTime.Add(1*time.Second).Unix()) + if maxUnavailableTime.Unix() == leaderReadyTime.Add(1*time.Second).Unix() { + c.Assert(maxUnavailableTime.Nanosecond(), Less, leaderReadyTime.Add(1*time.Second).Nanosecond()) + } + + // test kill pd leader pod or network of leader is unreachable + wg.Add(1 + 1) + maxUnavailableTime, leaderReadyTime = time.Time{}, time.Time{} + go getTsoFunc() + go func() { + defer wg.Done() + leader := cluster.GetServer(cluster.GetLeader()) + c.Assert(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"), IsNil) + leader.Stop() + cluster.WaitLeader() + c.Assert(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"), IsNil) + leaderReadyTime = time.Now() + }() + wg.Wait() + c.Assert(maxUnavailableTime.Unix(), LessEqual, leaderReadyTime.Add(1*time.Second).Unix()) + if maxUnavailableTime.Unix() == leaderReadyTime.Add(1*time.Second).Unix() { + c.Assert(maxUnavailableTime.Nanosecond(), Less, leaderReadyTime.Add(1*time.Second).Nanosecond()) + } +} + func (s *clientTestSuite) TestGlobalAndLocalTSO(c *C) { dcLocationConfig := map[string]string{ "pd1": "dc-1",