Skip to content

Commit

Permalink
reduce unnecessary tikvServerBusy backoff when able to try next repli…
Browse files Browse the repository at this point in the history
…ca (#1184)

* reduce unnecessary tikvServerBusy backoff when

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix lint

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine code and add test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add comment and refine code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add comment and refine code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix lint

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add more comment

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* address comment

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine comment

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* address comment

Signed-off-by: crazycs520 <crazycs520@gmail.com>

---------

Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 authored Mar 4, 2024
1 parent 02cd637 commit 50c4085
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 70 deletions.
5 changes: 5 additions & 0 deletions config/retry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn
}
}

// Base returns the base time of the backoff function.
func (c *Config) Base() int {
return c.fnCfg.base
}

func (c *Config) String() string {
return c.name
}
Expand Down
10 changes: 10 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3252,6 +3252,16 @@ func (s *Store) GetPeerAddr() string {
return s.peerAddr
}

func (s *Store) updateServerLoadStats(estimatedWaitMs uint32) {
estimatedWait := time.Duration(estimatedWaitMs) * time.Millisecond
// Update the estimated wait time of the store.
loadStats := &storeLoadStats{
estimatedWait: estimatedWait,
waitTimeUpdatedAt: time.Now(),
}
s.loadStats.Store(loadStats)
}

// EstimatedWaitTime returns an optimistic estimation of how long a request will wait in the store.
// It's calculated by subtracting the time since the last update from the wait time returned from TiKV.
func (s *Store) EstimatedWaitTime() time.Duration {
Expand Down
159 changes: 109 additions & 50 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,23 @@ type replicaSelector struct {
// TiKV can reject the request when its estimated wait duration exceeds busyThreshold.
// Then, the client will receive a ServerIsBusy error and choose another replica to retry.
busyThreshold time.Duration
// pendingBackoffs records the pending backoff by store_id for fast retry. Here are some examples to show how it works:
// Example-1, fast retry and success:
// 1. send req to store 1, got ServerIsBusy region error, record `store1 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica.
// 2. retry in store2, and success.
// Since the request is success, we can skip the backoff and fast return result to user.
// Example-2: fast retry different replicas but all failed:
// 1. send req to store 1, got ServerIsBusy region error, record `store1 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica.
// 2. send req to store 2, got ServerIsBusy region error, record `store2 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica.
// 3. send req to store 3, got ServerIsBusy region error, record `store3 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica.
// 4. no candidate since all stores are busy. But before return no candidate error to up layer, we need to call backoffOnNoCandidate function
// to apply a max pending backoff, the backoff is to avoid frequent access and increase the pressure on the cluster.
// Example-3: fast retry same replica:
// 1. send req to store 1, got ServerIsBusy region error, record `store1 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica.
// 2. assume store 2 and store 3 are unreachable.
// 3. re-send req to store 1 with replica-read. But before re-send to store1, we need to call backoffOnRetry function
// to apply pending BoTiKVServerBusy backoff, the backoff is to avoid frequent access and increase the pressure on the cluster.
pendingBackoffs map[uint64]*backoffArgs
}

func selectorStateToString(state selectorState) string {
Expand Down Expand Up @@ -1041,6 +1058,7 @@ func newReplicaSelector(
-1,
-1,
time.Duration(req.BusyThresholdMs) * time.Millisecond,
nil,
}, nil
}

Expand Down Expand Up @@ -1293,66 +1311,41 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
func (s *replicaSelector) onServerIsBusy(
bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy,
) (shouldRetry bool, err error) {
if serverIsBusy.EstimatedWaitMs != 0 && ctx != nil && ctx.Store != nil {
estimatedWait := time.Duration(serverIsBusy.EstimatedWaitMs) * time.Millisecond
// Update the estimated wait time of the store.
loadStats := &storeLoadStats{
estimatedWait: estimatedWait,
waitTimeUpdatedAt: time.Now(),
}
ctx.Store.loadStats.Store(loadStats)

if s.busyThreshold != 0 && isReadReq(req.Type) {
// do not retry with batched coprocessor requests.
// it'll be region misses if we send the tasks to replica.
if req.Type == tikvrpc.CmdCop && len(req.Cop().Tasks) > 0 {
return false, nil
}
switch state := s.state.(type) {
case *accessKnownLeader:
// Clear attempt history of the leader, so the leader can be accessed again.
s.replicas[state.leaderIdx].attempts = 0
s.state = &tryIdleReplica{leaderIdx: state.leaderIdx}
return true, nil
case *tryIdleReplica:
if s.targetIdx != state.leaderIdx {
return true, nil
var store *Store
if ctx != nil && ctx.Store != nil {
store = ctx.Store
if serverIsBusy.EstimatedWaitMs != 0 {
ctx.Store.updateServerLoadStats(serverIsBusy.EstimatedWaitMs)
if s.busyThreshold != 0 && isReadReq(req.Type) {
// do not retry with batched coprocessor requests.
// it'll be region misses if we send the tasks to replica.
if req.Type == tikvrpc.CmdCop && len(req.Cop().Tasks) > 0 {
return false, nil
}
switch state := s.state.(type) {
case *accessKnownLeader:
// Clear attempt history of the leader, so the leader can be accessed again.
s.replicas[state.leaderIdx].attempts = 0
s.state = &tryIdleReplica{leaderIdx: state.leaderIdx}
}
// backoff if still receiving ServerIsBusy after accessing leader again
}
}
} else if ctx != nil && ctx.Store != nil {
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
ctx.Store.healthStatus.markAlreadySlow()
if s.canFallback2Follower() {
return true, nil
} else {
// Mark the server is busy (the next incoming READs could be redirect to expected followers.)
ctx.Store.healthStatus.markAlreadySlow()
}
}
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
backoffErr := errors.Errorf("server is busy, ctx: %v", ctx)
if s.canFastRetry() {
s.addPendingBackoff(store, retry.BoTiKVServerBusy, backoffErr)
return true, nil
}
err = bo.Backoff(retry.BoTiKVServerBusy, backoffErr)
if err != nil {
return false, err
}
return true, nil
}

// For some reasons, the leader is unreachable by now, try followers instead.
// the state is changed in accessFollower.next when leader is unavailable.
func (s *replicaSelector) canFallback2Follower() bool {
if s == nil || s.state == nil {
return false
}
state, ok := s.state.(*accessFollower)
if !ok {
return false
}
if !state.isStaleRead {
return false
}
// can fallback to follower only when the leader is exhausted.
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
}

func (s *replicaSelector) onDataIsNotReady() {
if target := s.targetReplica(); target != nil {
target.dataIsNotReady = true
Expand Down Expand Up @@ -1525,6 +1518,11 @@ func (s *RegionRequestSender) SendReqCtx(
// TODO: Change the returned error to something like "region missing in cache",
// and handle this error like EpochNotMatch, which means to re-split the request and retry.
s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, totalErrors)
if s.replicaSelector != nil {
if err := s.replicaSelector.backoffOnNoCandidate(bo); err != nil {
return nil, nil, retryTimes, err
}
}
resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
return resp, nil, retryTimes, err
}
Expand Down Expand Up @@ -1554,6 +1552,11 @@ func (s *RegionRequestSender) SendReqCtx(
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, nil, retryTimes, err
}
if s.replicaSelector != nil {
if err := s.replicaSelector.backoffOnRetry(rpcCtx.Store, bo); err != nil {
return nil, nil, retryTimes, err
}
}

var retry bool
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
Expand Down Expand Up @@ -2500,3 +2503,59 @@ func (s *replicaSelector) recordAttemptedTime(duration time.Duration) {
proxyReplica.attemptedTime += duration
}
}

// canFastRetry returns true if the request can be sent to next replica.
func (s *replicaSelector) canFastRetry() bool {
accessLeader, ok := s.state.(*accessKnownLeader)
if ok && isLeaderCandidate(s.replicas[accessLeader.leaderIdx]) {
// If leader is still candidate, the request will be sent to leader again,
// so don't skip since the leader is still busy.
return false
}
return true
}

type backoffArgs struct {
cfg *retry.Config
err error
}

// addPendingBackoff adds pending backoff for the store.
func (s *replicaSelector) addPendingBackoff(store *Store, cfg *retry.Config, err error) {
storeId := uint64(0)
if store != nil {
storeId = store.storeID
}
if s.pendingBackoffs == nil {
s.pendingBackoffs = make(map[uint64]*backoffArgs)
}
s.pendingBackoffs[storeId] = &backoffArgs{cfg, err}
}

// backoffOnRetry apply pending backoff on the store when retry in this store.
func (s *replicaSelector) backoffOnRetry(store *Store, bo *retry.Backoffer) error {
storeId := uint64(0)
if store != nil {
storeId = store.storeID
}
args, ok := s.pendingBackoffs[storeId]
if !ok {
return nil
}
delete(s.pendingBackoffs, storeId)
return bo.Backoff(args.cfg, args.err)
}

// backoffOnNoCandidate apply the largest base pending backoff when no candidate.
func (s *replicaSelector) backoffOnNoCandidate(bo *retry.Backoffer) error {
var args *backoffArgs
for _, pbo := range s.pendingBackoffs {
if args == nil || args.cfg.Base() < pbo.cfg.Base() {
args = pbo
}
}
if args == nil {
return nil
}
return bo.Backoff(args.cfg, args.err)
}
Loading

0 comments on commit 50c4085

Please sign in to comment.