diff --git a/config/retry/config.go b/config/retry/config.go index c95d2cbd1..0f89eaf26 100644 --- a/config/retry/config.go +++ b/config/retry/config.go @@ -96,6 +96,11 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn } } +// Base returns the base time of the backoff function. +func (c *Config) Base() int { + return c.fnCfg.base +} + func (c *Config) String() string { return c.name } diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 7e071d4c8..f2e574876 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -3252,6 +3252,16 @@ func (s *Store) GetPeerAddr() string { return s.peerAddr } +func (s *Store) updateServerLoadStats(estimatedWaitMs uint32) { + estimatedWait := time.Duration(estimatedWaitMs) * time.Millisecond + // Update the estimated wait time of the store. + loadStats := &storeLoadStats{ + estimatedWait: estimatedWait, + waitTimeUpdatedAt: time.Now(), + } + s.loadStats.Store(loadStats) +} + // EstimatedWaitTime returns an optimistic estimation of how long a request will wait in the store. // It's calculated by subtracting the time since the last update from the wait time returned from TiKV. func (s *Store) EstimatedWaitTime() time.Duration { diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 3702ab15c..c8934f31f 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -296,6 +296,23 @@ type replicaSelector struct { // TiKV can reject the request when its estimated wait duration exceeds busyThreshold. // Then, the client will receive a ServerIsBusy error and choose another replica to retry. busyThreshold time.Duration + // pendingBackoffs records the pending backoff by store_id for fast retry. Here are some examples to show how it works: + // Example-1, fast retry and success: + // 1. send req to store 1, got ServerIsBusy region error, record `store1 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica. + // 2. retry in store2, and success. + // Since the request is success, we can skip the backoff and fast return result to user. + // Example-2: fast retry different replicas but all failed: + // 1. send req to store 1, got ServerIsBusy region error, record `store1 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica. + // 2. send req to store 2, got ServerIsBusy region error, record `store2 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica. + // 3. send req to store 3, got ServerIsBusy region error, record `store3 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica. + // 4. no candidate since all stores are busy. But before return no candidate error to up layer, we need to call backoffOnNoCandidate function + // to apply a max pending backoff, the backoff is to avoid frequent access and increase the pressure on the cluster. + // Example-3: fast retry same replica: + // 1. send req to store 1, got ServerIsBusy region error, record `store1 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica. + // 2. assume store 2 and store 3 are unreachable. + // 3. re-send req to store 1 with replica-read. But before re-send to store1, we need to call backoffOnRetry function + // to apply pending BoTiKVServerBusy backoff, the backoff is to avoid frequent access and increase the pressure on the cluster. + pendingBackoffs map[uint64]*backoffArgs } func selectorStateToString(state selectorState) string { @@ -1041,6 +1058,7 @@ func newReplicaSelector( -1, -1, time.Duration(req.BusyThresholdMs) * time.Millisecond, + nil, }, nil } @@ -1293,66 +1311,41 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { func (s *replicaSelector) onServerIsBusy( bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy, ) (shouldRetry bool, err error) { - if serverIsBusy.EstimatedWaitMs != 0 && ctx != nil && ctx.Store != nil { - estimatedWait := time.Duration(serverIsBusy.EstimatedWaitMs) * time.Millisecond - // Update the estimated wait time of the store. - loadStats := &storeLoadStats{ - estimatedWait: estimatedWait, - waitTimeUpdatedAt: time.Now(), - } - ctx.Store.loadStats.Store(loadStats) - - if s.busyThreshold != 0 && isReadReq(req.Type) { - // do not retry with batched coprocessor requests. - // it'll be region misses if we send the tasks to replica. - if req.Type == tikvrpc.CmdCop && len(req.Cop().Tasks) > 0 { - return false, nil - } - switch state := s.state.(type) { - case *accessKnownLeader: - // Clear attempt history of the leader, so the leader can be accessed again. - s.replicas[state.leaderIdx].attempts = 0 - s.state = &tryIdleReplica{leaderIdx: state.leaderIdx} - return true, nil - case *tryIdleReplica: - if s.targetIdx != state.leaderIdx { - return true, nil + var store *Store + if ctx != nil && ctx.Store != nil { + store = ctx.Store + if serverIsBusy.EstimatedWaitMs != 0 { + ctx.Store.updateServerLoadStats(serverIsBusy.EstimatedWaitMs) + if s.busyThreshold != 0 && isReadReq(req.Type) { + // do not retry with batched coprocessor requests. + // it'll be region misses if we send the tasks to replica. + if req.Type == tikvrpc.CmdCop && len(req.Cop().Tasks) > 0 { + return false, nil + } + switch state := s.state.(type) { + case *accessKnownLeader: + // Clear attempt history of the leader, so the leader can be accessed again. + s.replicas[state.leaderIdx].attempts = 0 + s.state = &tryIdleReplica{leaderIdx: state.leaderIdx} } - // backoff if still receiving ServerIsBusy after accessing leader again } - } - } else if ctx != nil && ctx.Store != nil { - // Mark the server is busy (the next incoming READs could be redirect - // to expected followers. ) - ctx.Store.healthStatus.markAlreadySlow() - if s.canFallback2Follower() { - return true, nil + } else { + // Mark the server is busy (the next incoming READs could be redirect to expected followers.) + ctx.Store.healthStatus.markAlreadySlow() } } - err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) + backoffErr := errors.Errorf("server is busy, ctx: %v", ctx) + if s.canFastRetry() { + s.addPendingBackoff(store, retry.BoTiKVServerBusy, backoffErr) + return true, nil + } + err = bo.Backoff(retry.BoTiKVServerBusy, backoffErr) if err != nil { return false, err } return true, nil } -// For some reasons, the leader is unreachable by now, try followers instead. -// the state is changed in accessFollower.next when leader is unavailable. -func (s *replicaSelector) canFallback2Follower() bool { - if s == nil || s.state == nil { - return false - } - state, ok := s.state.(*accessFollower) - if !ok { - return false - } - if !state.isStaleRead { - return false - } - // can fallback to follower only when the leader is exhausted. - return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx]) -} - func (s *replicaSelector) onDataIsNotReady() { if target := s.targetReplica(); target != nil { target.dataIsNotReady = true @@ -1525,6 +1518,11 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, totalErrors) + if s.replicaSelector != nil { + if err := s.replicaSelector.backoffOnNoCandidate(bo); err != nil { + return nil, nil, retryTimes, err + } + } resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}) return resp, nil, retryTimes, err } @@ -1554,6 +1552,11 @@ func (s *RegionRequestSender) SendReqCtx( if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { return nil, nil, retryTimes, err } + if s.replicaSelector != nil { + if err := s.replicaSelector.backoffOnRetry(rpcCtx.Store, bo); err != nil { + return nil, nil, retryTimes, err + } + } var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) @@ -2500,3 +2503,59 @@ func (s *replicaSelector) recordAttemptedTime(duration time.Duration) { proxyReplica.attemptedTime += duration } } + +// canFastRetry returns true if the request can be sent to next replica. +func (s *replicaSelector) canFastRetry() bool { + accessLeader, ok := s.state.(*accessKnownLeader) + if ok && isLeaderCandidate(s.replicas[accessLeader.leaderIdx]) { + // If leader is still candidate, the request will be sent to leader again, + // so don't skip since the leader is still busy. + return false + } + return true +} + +type backoffArgs struct { + cfg *retry.Config + err error +} + +// addPendingBackoff adds pending backoff for the store. +func (s *replicaSelector) addPendingBackoff(store *Store, cfg *retry.Config, err error) { + storeId := uint64(0) + if store != nil { + storeId = store.storeID + } + if s.pendingBackoffs == nil { + s.pendingBackoffs = make(map[uint64]*backoffArgs) + } + s.pendingBackoffs[storeId] = &backoffArgs{cfg, err} +} + +// backoffOnRetry apply pending backoff on the store when retry in this store. +func (s *replicaSelector) backoffOnRetry(store *Store, bo *retry.Backoffer) error { + storeId := uint64(0) + if store != nil { + storeId = store.storeID + } + args, ok := s.pendingBackoffs[storeId] + if !ok { + return nil + } + delete(s.pendingBackoffs, storeId) + return bo.Backoff(args.cfg, args.err) +} + +// backoffOnNoCandidate apply the largest base pending backoff when no candidate. +func (s *replicaSelector) backoffOnNoCandidate(bo *retry.Backoffer) error { + var args *backoffArgs + for _, pbo := range s.pendingBackoffs { + if args == nil || args.cfg.Base() < pbo.cfg.Base() { + args = pbo + } + } + if args == nil { + return nil + } + return bo.Backoff(args.cfg, args.err) +} diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index f5034110a..f667bbd98 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -15,6 +15,7 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pkg/errors" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config/retry" "github.com/tikv/client-go/v2/internal/apicodec" @@ -115,8 +116,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) { }, respErr: "", respRegionError: nil, - backoffCnt: 1, - backoffDetail: []string{"tikvServerBusy+1"}, + backoffCnt: 0, + backoffDetail: []string{}, regionIsValid: true, }, } @@ -299,8 +300,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) { "{addr: store3, replica-read: true, stale-read: false}"}, respErr: "", respRegionError: fakeEpochNotMatch, - backoffCnt: 2, - backoffDetail: []string{"tikvServerBusy+2"}, + backoffCnt: 1, + backoffDetail: []string{"tikvServerBusy+1"}, regionIsValid: true, }, } @@ -321,8 +322,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) { "{addr: store3, replica-read: true, stale-read: false}"}, respErr: "", respRegionError: fakeEpochNotMatch, - backoffCnt: 2, - backoffDetail: []string{"tikvServerBusy+2"}, + backoffCnt: 1, + backoffDetail: []string{"tikvServerBusy+1"}, regionIsValid: true, }, } @@ -342,8 +343,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) { "{addr: store2, replica-read: true, stale-read: false}"}, respErr: "", respRegionError: nil, - backoffCnt: 1, - backoffDetail: []string{"tikvServerBusy+1"}, + backoffCnt: 0, + backoffDetail: []string{}, regionIsValid: true, }, } @@ -364,14 +365,35 @@ func TestReplicaReadAccessPathByCase(t *testing.T) { }, respErr: "", respRegionError: fakeEpochNotMatch, - backoffCnt: 2, - backoffDetail: []string{"tikvServerBusy+2"}, + backoffCnt: 1, + backoffDetail: []string{"tikvServerBusy+1"}, regionIsValid: false, }, } s.True(s.runCaseAndCompare(ca)) s.changeRegionLeader(2) + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: true, + accessErr: []RegionErrorType{DataIsNotReadyErr, ServerIsBusyErr, ServerIsBusyErr}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store1, replica-read: false, stale-read: true}", + "{addr: store2, replica-read: false, stale-read: false}", // try leader with leader read. + "{addr: store3, replica-read: true, stale-read: false}", + "{addr: store1, replica-read: true, stale-read: false}", + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, // no backoff since request success. + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) + ca = replicaSelectorAccessPathCase{ reqType: tikvrpc.CmdGet, readType: kv.ReplicaReadMixed, @@ -386,8 +408,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) { }, respErr: "", respRegionError: fakeEpochNotMatch, - backoffCnt: 2, - backoffDetail: []string{"tikvServerBusy+2"}, + backoffCnt: 1, + backoffDetail: []string{"tikvServerBusy+1"}, regionIsValid: false, }, } @@ -408,14 +430,113 @@ func TestReplicaReadAccessPathByCase(t *testing.T) { "{addr: store3, replica-read: true, stale-read: false}"}, respErr: "", respRegionError: nil, - backoffCnt: 1, - backoffDetail: []string{"tikvServerBusy+1"}, + backoffCnt: 0, + backoffDetail: []string{}, + regionIsValid: true, + }, + } + s.True(s.runCaseAndCompare(ca)) + + ca = replicaSelectorAccessPathCase{ + reqType: tikvrpc.CmdGet, + readType: kv.ReplicaReadMixed, + staleRead: true, + timeout: time.Microsecond * 100, + accessErr: []RegionErrorType{ServerIsBusyErr, ServerIsBusyWithEstimatedWaitMsErr}, + expect: &accessPathResult{ + accessPath: []string{ + "{addr: store1, replica-read: false, stale-read: true}", + "{addr: store2, replica-read: true, stale-read: false}", + "{addr: store3, replica-read: true, stale-read: false}", + }, + respErr: "", + respRegionError: nil, + backoffCnt: 0, + backoffDetail: []string{}, regionIsValid: true, }, } s.True(s.runCaseAndCompare(ca)) } +func TestCanFastRetry(t *testing.T) { + s := new(testReplicaSelectorSuite) + s.SetupTest(t) + defer s.TearDownTest() + + // Test for non-leader read. + loc, err := s.cache.LocateKey(s.bo, []byte("key")) + s.Nil(err) + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}) + req.EnableStaleWithMixedReplicaRead() + selector, err := newReplicaSelector(s.cache, loc.Region, req) + s.Nil(err) + for i := 0; i < 3; i++ { + _, err = selector.next(s.bo) + s.Nil(err) + selector.canFastRetry() + s.True(selector.canFastRetry()) + } + + // Test for leader read. + req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}) + req.ReplicaReadType = kv.ReplicaReadLeader + selector, err = newReplicaSelector(s.cache, loc.Region, req) + s.Nil(err) + for i := 0; i < 12; i++ { + _, err = selector.next(s.bo) + s.Nil(err) + ok := selector.canFastRetry() + if i <= 8 { + s.False(ok) // can't skip since leader is available. + } else { + s.True(ok) + } + } +} + +func TestPendingBackoff(t *testing.T) { + s := new(testReplicaSelectorSuite) + s.SetupTest(t) + defer s.TearDownTest() + + loc, err := s.cache.LocateKey(s.bo, []byte("key")) + s.Nil(err) + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}) + req.EnableStaleWithMixedReplicaRead() + selector, err := newReplicaSelector(s.cache, loc.Region, req) + s.Nil(err) + bo := retry.NewNoopBackoff(context.Background()) + err = selector.backoffOnRetry(nil, bo) + s.Nil(err) + err = selector.backoffOnRetry(&Store{storeID: 1}, bo) + s.Nil(err) + err = selector.backoffOnNoCandidate(bo) + s.Nil(err) + selector.addPendingBackoff(nil, retry.BoRegionScheduling, errors.New("err-0")) + s.Equal(1, len(selector.pendingBackoffs)) + selector.addPendingBackoff(&Store{storeID: 1}, retry.BoTiKVRPC, errors.New("err-1")) + s.Equal(2, len(selector.pendingBackoffs)) + selector.addPendingBackoff(&Store{storeID: 2}, retry.BoTiKVDiskFull, errors.New("err-2")) + s.Equal(3, len(selector.pendingBackoffs)) + selector.addPendingBackoff(&Store{storeID: 1}, retry.BoTiKVServerBusy, errors.New("err-3")) + s.Equal(3, len(selector.pendingBackoffs)) + _, ok := selector.pendingBackoffs[0] + s.True(ok) + err = selector.backoffOnRetry(nil, bo) + s.NotNil(err) + s.Equal("err-0", err.Error()) + _, ok = selector.pendingBackoffs[0] + s.False(ok) + s.Equal(2, len(selector.pendingBackoffs)) + err = selector.backoffOnRetry(&Store{storeID: 10}, bo) + s.Nil(err) + s.Equal(2, len(selector.pendingBackoffs)) + err = selector.backoffOnNoCandidate(bo) + s.NotNil(err) + s.Equal("err-3", err.Error()) +} + func TestReplicaReadAccessPathByTryIdleReplicaCase(t *testing.T) { s := new(testReplicaSelectorSuite) s.SetupTest(t) @@ -458,8 +579,8 @@ func TestReplicaReadAccessPathByTryIdleReplicaCase(t *testing.T) { }, respErr: "", respRegionError: nil, - backoffCnt: 0, - backoffDetail: []string{}, + backoffCnt: 1, + backoffDetail: []string{"tikvServerBusy+1"}, regionIsValid: true, }, } @@ -478,8 +599,8 @@ func TestReplicaReadAccessPathByTryIdleReplicaCase(t *testing.T) { "{addr: store3, replica-read: false, stale-read: false}"}, respErr: "", respRegionError: fakeEpochNotMatch, - backoffCnt: 2, - backoffDetail: []string{"regionScheduling+2"}, + backoffCnt: 3, + backoffDetail: []string{"regionScheduling+2", "tikvServerBusy+1"}, regionIsValid: false, }, } @@ -499,8 +620,8 @@ func TestReplicaReadAccessPathByTryIdleReplicaCase(t *testing.T) { "{addr: store3, replica-read: false, stale-read: false}"}, respErr: "", respRegionError: nil, - backoffCnt: 2, - backoffDetail: []string{"regionScheduling+1", "tikvServerBusy+1"}, + backoffCnt: 3, + backoffDetail: []string{"regionScheduling+1", "tikvServerBusy+2"}, regionIsValid: true, }, }