Skip to content

Commit

Permalink
client: fix Stream timeout logic (tikv#5551) (tikv#5583)
Browse files Browse the repository at this point in the history
close tikv#5207, ref tikv#5551

fix Stream timeout logic

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: Yongbo Jiang <cabinfeveroier@gmail.com>
Co-authored-by: Cabinfever_B <cabinfeveroier@gmail.com>
  • Loading branch information
ti-chi-bot and CabinfeverB authored Nov 1, 2022
1 parent a66761b commit bb3953a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 19 deletions.
42 changes: 23 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ const (
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
tsLoopDCCheckInterval = time.Minute
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
retryInterval = 1 * time.Second
maxRetryTimes = 5
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
)

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
Expand Down Expand Up @@ -653,11 +653,10 @@ func (c *client) handleDispatcher(
dc string,
tbc *tsoBatchController) {
var (
retryTimeConsuming time.Duration
err error
streamAddr string
stream pdpb.PD_TsoClient
cancel context.CancelFunc
err error
streamAddr string
stream pdpb.PD_TsoClient
cancel context.CancelFunc
// addr -> connectionContext
connectionCtxs sync.Map
opts []opentracing.StartSpanOption
Expand Down Expand Up @@ -714,6 +713,7 @@ func (c *client) handleDispatcher(
}

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
for {
select {
case <-dispatcherCtx.Done():
Expand All @@ -728,24 +728,22 @@ func (c *client) handleDispatcher(
// Check stream and retry if necessary.
if stream == nil {
log.Info("[pd] tso stream is not ready", zap.String("dc", dc))
c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs)
if retryTimeConsuming >= c.option.timeout {
err = errs.ErrClientCreateTSOStream.FastGenByArgs()
log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.ScheduleCheckLeader()
c.revokeTSORequest(errors.WithStack(err), tbc.tsoRequestCh)
retryTimeConsuming = 0
if c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) {
continue
}
select {
case <-dispatcherCtx.Done():
return
case <-time.After(time.Second):
retryTimeConsuming += time.Second
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs()
log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.ScheduleCheckLeader()
c.revokeTSORequest(errors.WithStack(err), tbc.tsoRequestCh)
continue
case <-time.After(retryInterval):
continue
}
}
retryTimeConsuming = 0
// Start to collect the TSO requests.
maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval()
if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil {
Expand Down Expand Up @@ -835,15 +833,17 @@ type connectionContext struct {
cancel context.CancelFunc
}

func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) {
func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnect
if c.allowTSOFollowerProxy(dc) {
createTSOConnection = c.tryConnectWithProxy
}
if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil {
log.Error("[pd] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err))
return false
}
return true
}

// tryConnect will try to connect to the TSO allocator leader. If the connection becomes unreachable
Expand All @@ -859,6 +859,8 @@ func (c *client) tryConnect(
networkErrNum uint64
err error
stream pdpb.PD_TsoClient
url string
cc *grpc.ClientConn
)
updateAndClear := func(newAddr string, connectionCtx *connectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded {
Expand All @@ -874,9 +876,11 @@ func (c *client) tryConnect(
return true
})
}
cc, url := c.getAllocatorClientConnByDCLocation(dc)
// retry several times before falling back to the follower when the network problem happens

for i := 0; i < maxRetryTimes; i++ {
c.ScheduleCheckLeader()
cc, url = c.getAllocatorClientConnByDCLocation(dc)
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.createTsoStream(cctx, cancel, pdpb.NewPDClient(cc))
failpoint.Inject("unreachableNetwork", func() {
Expand Down
66 changes: 66 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,72 @@ func (s *clientTestSuite) TestTSOFollowerProxy(c *C) {
wg.Wait()
}

// TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207
func (s *clientTestSuite) TestUnavailableTimeAfterLeaderIsReady(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 3)
c.Assert(err, IsNil)
defer cluster.Destroy()

endpoints := s.runServer(c, cluster)
cli := setupCli(c, s.ctx, endpoints)

var wg sync.WaitGroup
var maxUnavailableTime, leaderReadyTime time.Time
getTsoFunc := func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
var physical, logical int64
var ts uint64
physical, logical, err = cli.GetTS(context.Background())
ts = tsoutil.ComposeTS(physical, logical)
if err != nil {
maxUnavailableTime = time.Now()
continue
}
c.Assert(err, IsNil)
c.Assert(lastTS, Less, ts)
lastTS = ts
}
}

// test resign pd leader or stop pd leader
wg.Add(1 + 1)
go getTsoFunc()
go func() {
defer wg.Done()
leader := cluster.GetServer(cluster.GetLeader())
leader.Stop()
cluster.WaitLeader()
leaderReadyTime = time.Now()
cluster.RunServers([]*tests.TestServer{leader})
}()
wg.Wait()
c.Assert(maxUnavailableTime.Unix(), LessEqual, leaderReadyTime.Add(1*time.Second).Unix())
if maxUnavailableTime.Unix() == leaderReadyTime.Add(1*time.Second).Unix() {
c.Assert(maxUnavailableTime.Nanosecond(), Less, leaderReadyTime.Add(1*time.Second).Nanosecond())
}

// test kill pd leader pod or network of leader is unreachable
wg.Add(1 + 1)
maxUnavailableTime, leaderReadyTime = time.Time{}, time.Time{}
go getTsoFunc()
go func() {
defer wg.Done()
leader := cluster.GetServer(cluster.GetLeader())
c.Assert(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"), IsNil)
leader.Stop()
cluster.WaitLeader()
c.Assert(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"), IsNil)
leaderReadyTime = time.Now()
}()
wg.Wait()
c.Assert(maxUnavailableTime.Unix(), LessEqual, leaderReadyTime.Add(1*time.Second).Unix())
if maxUnavailableTime.Unix() == leaderReadyTime.Add(1*time.Second).Unix() {
c.Assert(maxUnavailableTime.Nanosecond(), Less, leaderReadyTime.Add(1*time.Second).Nanosecond())
}
}

func (s *clientTestSuite) TestGlobalAndLocalTSO(c *C) {
dcLocationConfig := map[string]string{
"pd1": "dc-1",
Expand Down

0 comments on commit bb3953a

Please sign in to comment.