diff --git a/client/client.go b/client/client.go index 07598910af4..8cabcc2b3ed 100644 --- a/client/client.go +++ b/client/client.go @@ -320,8 +320,13 @@ const ( updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation. tsLoopDCCheckInterval = time.Minute defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst +<<<<<<< HEAD retryInterval = 1 * time.Second maxRetryTimes = 5 +======= + retryInterval = 500 * time.Millisecond + maxRetryTimes = 6 +>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551)) ) // LeaderHealthCheckInterval might be changed in the unit to shorten the testing time. @@ -694,12 +699,11 @@ func (c *client) handleDispatcher( dc string, tbc *tsoBatchController) { var ( - retryTimeConsuming time.Duration - err error - streamAddr string - stream pdpb.PD_TsoClient - streamCtx context.Context - cancel context.CancelFunc + err error + streamAddr string + stream pdpb.PD_TsoClient + streamCtx context.Context + cancel context.CancelFunc // addr -> connectionContext connectionCtxs sync.Map opts []opentracing.StartSpanOption @@ -756,6 +760,7 @@ func (c *client) handleDispatcher( } // Loop through each batch of TSO requests and send them for processing. + streamLoopTimer := time.NewTimer(c.option.timeout) tsoBatchLoop: for { select { @@ -778,6 +783,7 @@ tsoBatchLoop: if maxBatchWaitInterval >= 0 { tbc.adjustBestBatchSize() } + streamLoopTimer.Reset(c.option.timeout) // Choose a stream to send the TSO gRPC request. streamChoosingLoop: for { @@ -788,24 +794,22 @@ tsoBatchLoop: // Check stream and retry if necessary. if stream == nil { log.Info("[pd] tso stream is not ready", zap.String("dc", dc)) - c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - if retryTimeConsuming >= c.option.timeout { - err = errs.ErrClientCreateTSOStream.FastGenByArgs("retry timeout") - log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) - c.ScheduleCheckLeader() - c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) - retryTimeConsuming = 0 - continue tsoBatchLoop + if c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { + continue streamChoosingLoop } select { case <-dispatcherCtx.Done(): return - case <-time.After(time.Second): - retryTimeConsuming += time.Second - continue + case <-streamLoopTimer.C: + err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) + log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) + c.ScheduleCheckLeader() + c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) + continue tsoBatchLoop + case <-time.After(retryInterval): + continue streamChoosingLoop } } - retryTimeConsuming = 0 select { case <-streamCtx.Done(): log.Info("[pd] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr)) @@ -899,7 +903,7 @@ type connectionContext struct { cancel context.CancelFunc } -func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) { +func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { // Normal connection creating, it will be affected by the `enableForwarding`. createTSOConnection := c.tryConnect if c.allowTSOFollowerProxy(dc) { @@ -907,7 +911,9 @@ func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, con } if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { log.Error("[pd] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) + return false } + return true } // tryConnect will try to connect to the TSO allocator leader. If the connection becomes unreachable @@ -923,6 +929,8 @@ func (c *client) tryConnect( networkErrNum uint64 err error stream pdpb.PD_TsoClient + url string + cc *grpc.ClientConn ) updateAndClear := func(newAddr string, connectionCtx *connectionContext) { if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded { @@ -938,9 +946,11 @@ func (c *client) tryConnect( return true }) } - cc, url := c.getAllocatorClientConnByDCLocation(dc) // retry several times before falling back to the follower when the network problem happens + for i := 0; i < maxRetryTimes; i++ { + c.ScheduleCheckLeader() + cc, url = c.getAllocatorClientConnByDCLocation(dc) cctx, cancel := context.WithCancel(dispatcherCtx) stream, err = c.createTsoStream(cctx, cancel, pdpb.NewPDClient(cc)) failpoint.Inject("unreachableNetwork", func() { diff --git a/client/errs/errno.go b/client/errs/errno.go index 118b92f4127..f42128f7d99 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -21,6 +21,7 @@ const ( NotLeaderErr = "is not leader" // MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader. MismatchLeaderErr = "mismatch leader id" + RetryTimeoutErr = "retry timeout" ) // client errors diff --git a/tests/client/client_test.go b/tests/client/client_test.go index c4c6c25d50e..0beccf8a505 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -317,7 +317,77 @@ func (s *clientTestSuite) TestTSOFollowerProxy(c *C) { wg.Wait() } +<<<<<<< HEAD func (s *clientTestSuite) TestGlobalAndLocalTSO(c *C) { +======= +// TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207 +func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 3) + re.NoError(err) + defer cluster.Destroy() + + endpoints := runServer(re, cluster) + cli := setupCli(re, ctx, endpoints) + + var wg sync.WaitGroup + var maxUnavailableTime, leaderReadyTime time.Time + getTsoFunc := func() { + defer wg.Done() + var lastTS uint64 + for i := 0; i < tsoRequestRound; i++ { + var physical, logical int64 + var ts uint64 + physical, logical, err = cli.GetTS(context.Background()) + ts = tsoutil.ComposeTS(physical, logical) + if err != nil { + maxUnavailableTime = time.Now() + continue + } + re.NoError(err) + re.Less(lastTS, ts) + lastTS = ts + } + } + + // test resign pd leader or stop pd leader + wg.Add(1 + 1) + go getTsoFunc() + go func() { + defer wg.Done() + leader := cluster.GetServer(cluster.GetLeader()) + leader.Stop() + cluster.WaitLeader() + leaderReadyTime = time.Now() + cluster.RunServers([]*tests.TestServer{leader}) + }() + wg.Wait() + re.Less(maxUnavailableTime.UnixMilli(), leaderReadyTime.Add(1*time.Second).UnixMilli()) + + // test kill pd leader pod or network of leader is unreachable + wg.Add(1 + 1) + maxUnavailableTime, leaderReadyTime = time.Time{}, time.Time{} + go getTsoFunc() + go func() { + defer wg.Done() + leader := cluster.GetServer(cluster.GetLeader()) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)")) + leader.Stop() + cluster.WaitLeader() + re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork")) + leaderReadyTime = time.Now() + }() + wg.Wait() + re.Less(maxUnavailableTime.UnixMilli(), leaderReadyTime.Add(1*time.Second).UnixMilli()) +} + +func TestGlobalAndLocalTSO(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() +>>>>>>> d50e5fe43 (client: fix Stream timeout logic (#5551)) dcLocationConfig := map[string]string{ "pd1": "dc-1", "pd2": "dc-2",