Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce unnecessary tikvServerBusy backoff when able to try next replica #1184

Merged
merged 21 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/retry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn
}
}

// Base returns the base time of the backoff function.
func (c *Config) Base() int {
return c.fnCfg.base
}

func (c *Config) String() string {
return c.name
}
Expand Down
10 changes: 10 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3252,6 +3252,16 @@ func (s *Store) GetPeerAddr() string {
return s.peerAddr
}

func (s *Store) updateServerLoadStats(estimatedWaitMs uint32) {
estimatedWait := time.Duration(estimatedWaitMs) * time.Millisecond
// Update the estimated wait time of the store.
loadStats := &storeLoadStats{
estimatedWait: estimatedWait,
waitTimeUpdatedAt: time.Now(),
}
s.loadStats.Store(loadStats)
}

// EstimatedWaitTime returns an optimistic estimation of how long a request will wait in the store.
// It's calculated by subtracting the time since the last update from the wait time returned from TiKV.
func (s *Store) EstimatedWaitTime() time.Duration {
Expand Down
159 changes: 109 additions & 50 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,23 @@ type replicaSelector struct {
// TiKV can reject the request when its estimated wait duration exceeds busyThreshold.
// Then, the client will receive a ServerIsBusy error and choose another replica to retry.
busyThreshold time.Duration
// pendingBackoffs records the pending backoff by store_id for fast retry. Here are some examples to show how it works:
// Example-1, fast retry and success:
// 1. send req to store 1, got ServerIsBusy region error, record `store1 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica.
// 2. retry in store2, and success.
// Since the request is success, we can skip the backoff and fast return result to user.
// Example-2: fast retry different replicas but all failed:
// 1. send req to store 1, got ServerIsBusy region error, record `store1 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica.
// 2. send req to store 2, got ServerIsBusy region error, record `store2 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica.
// 3. send req to store 3, got ServerIsBusy region error, record `store3 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica.
// 4. no candidate since all stores are busy. But before return no candidate error to up layer, we need to call backoffOnNoCandidate function
// to apply a max pending backoff, the backoff is to avoid frequent access and increase the pressure on the cluster.
// Example-3: fast retry same replica:
// 1. send req to store 1, got ServerIsBusy region error, record `store1 -> BoTiKVServerBusy` backoff in pendingBackoffs and fast retry next replica.
// 2. assume store 2 and store 3 are unreachable.
// 3. re-send req to store 1 with replica-read. But before re-send to store1, we need to call backoffOnRetry function
// to apply pending BoTiKVServerBusy backoff, the backoff is to avoid frequent access and increase the pressure on the cluster.
pendingBackoffs map[uint64]*backoffArgs
}

func selectorStateToString(state selectorState) string {
Expand Down Expand Up @@ -1041,6 +1058,7 @@ func newReplicaSelector(
-1,
-1,
time.Duration(req.BusyThresholdMs) * time.Millisecond,
nil,
}, nil
}

Expand Down Expand Up @@ -1293,66 +1311,41 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
func (s *replicaSelector) onServerIsBusy(
bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy,
) (shouldRetry bool, err error) {
if serverIsBusy.EstimatedWaitMs != 0 && ctx != nil && ctx.Store != nil {
estimatedWait := time.Duration(serverIsBusy.EstimatedWaitMs) * time.Millisecond
// Update the estimated wait time of the store.
loadStats := &storeLoadStats{
estimatedWait: estimatedWait,
waitTimeUpdatedAt: time.Now(),
}
ctx.Store.loadStats.Store(loadStats)

if s.busyThreshold != 0 && isReadReq(req.Type) {
// do not retry with batched coprocessor requests.
// it'll be region misses if we send the tasks to replica.
if req.Type == tikvrpc.CmdCop && len(req.Cop().Tasks) > 0 {
return false, nil
}
switch state := s.state.(type) {
case *accessKnownLeader:
// Clear attempt history of the leader, so the leader can be accessed again.
s.replicas[state.leaderIdx].attempts = 0
s.state = &tryIdleReplica{leaderIdx: state.leaderIdx}
return true, nil
case *tryIdleReplica:
if s.targetIdx != state.leaderIdx {
return true, nil
var store *Store
if ctx != nil && ctx.Store != nil {
store = ctx.Store
if serverIsBusy.EstimatedWaitMs != 0 {
ctx.Store.updateServerLoadStats(serverIsBusy.EstimatedWaitMs)
if s.busyThreshold != 0 && isReadReq(req.Type) {
// do not retry with batched coprocessor requests.
// it'll be region misses if we send the tasks to replica.
if req.Type == tikvrpc.CmdCop && len(req.Cop().Tasks) > 0 {
return false, nil
}
switch state := s.state.(type) {
case *accessKnownLeader:
// Clear attempt history of the leader, so the leader can be accessed again.
s.replicas[state.leaderIdx].attempts = 0
s.state = &tryIdleReplica{leaderIdx: state.leaderIdx}
}
// backoff if still receiving ServerIsBusy after accessing leader again
}
}
} else if ctx != nil && ctx.Store != nil {
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
ctx.Store.healthStatus.markAlreadySlow()
if s.canFallback2Follower() {
return true, nil
} else {
// Mark the server is busy (the next incoming READs could be redirect to expected followers.)
ctx.Store.healthStatus.markAlreadySlow()
}
}
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
backoffErr := errors.Errorf("server is busy, ctx: %v", ctx)
if s.canFastRetry() {
s.addPendingBackoff(store, retry.BoTiKVServerBusy, backoffErr)
return true, nil
}
err = bo.Backoff(retry.BoTiKVServerBusy, backoffErr)
if err != nil {
return false, err
}
return true, nil
}

// For some reasons, the leader is unreachable by now, try followers instead.
// the state is changed in accessFollower.next when leader is unavailable.
func (s *replicaSelector) canFallback2Follower() bool {
if s == nil || s.state == nil {
return false
}
state, ok := s.state.(*accessFollower)
if !ok {
return false
}
if !state.isStaleRead {
return false
}
// can fallback to follower only when the leader is exhausted.
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
}

func (s *replicaSelector) onDataIsNotReady() {
if target := s.targetReplica(); target != nil {
target.dataIsNotReady = true
Expand Down Expand Up @@ -1525,6 +1518,11 @@ func (s *RegionRequestSender) SendReqCtx(
// TODO: Change the returned error to something like "region missing in cache",
// and handle this error like EpochNotMatch, which means to re-split the request and retry.
s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, totalErrors)
if s.replicaSelector != nil {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
if err := s.replicaSelector.backoffOnNoCandidate(bo); err != nil {
return nil, nil, retryTimes, err
}
}
resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
return resp, nil, retryTimes, err
}
Expand Down Expand Up @@ -1554,6 +1552,11 @@ func (s *RegionRequestSender) SendReqCtx(
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, nil, retryTimes, err
}
if s.replicaSelector != nil {
if err := s.replicaSelector.backoffOnRetry(rpcCtx.Store, bo); err != nil {
return nil, nil, retryTimes, err
}
}

var retry bool
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
Expand Down Expand Up @@ -2500,3 +2503,59 @@ func (s *replicaSelector) recordAttemptedTime(duration time.Duration) {
proxyReplica.attemptedTime += duration
}
}

// canFastRetry returns true if the request can be sent to next replica.
func (s *replicaSelector) canFastRetry() bool {
accessLeader, ok := s.state.(*accessKnownLeader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just do fast retry when tikv_client_read_timeout is configured? It looks a little bit dangerous to use

if not fast retry
   return false
return true

as most of the request would be fast retried .

Besides, maybe we could consult slow information to decide whether fast retry should work. /cc @zyguan @MyonKeminta

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the original logic of canFallback2Follower is trying to do fast backoff when retry next replica, so fast backoff is not only work for tikv_client_read_timeout is configured.
cc @you06

Copy link
Contributor

@cfzjywxk cfzjywxk Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the original logic of canFallback2Follower is trying to do fast backoff when retry next replica, so fast backoff is not only work for tikv_client_read_timeout is configured. cc @you06

@crazycs520 It the current logic the same? If so we could continue first.

Copy link
Contributor Author

@crazycs520 crazycs520 Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is not same with before, this PR is for the fix issue #1166, and keep some optimization from #990 and #923

if ok && isLeaderCandidate(s.replicas[accessLeader.leaderIdx]) {
// If leader is still candidate, the request will be sent to leader again,
// so don't skip since the leader is still busy.
return false
}
return true
}

type backoffArgs struct {
cfg *retry.Config
err error
}

// addPendingBackoff adds pending backoff for the store.
func (s *replicaSelector) addPendingBackoff(store *Store, cfg *retry.Config, err error) {
storeId := uint64(0)
if store != nil {
storeId = store.storeID
}
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
if s.pendingBackoffs == nil {
s.pendingBackoffs = make(map[uint64]*backoffArgs)
}
s.pendingBackoffs[storeId] = &backoffArgs{cfg, err}
}

// backoffOnRetry apply pending backoff on the store when retry in this store.
func (s *replicaSelector) backoffOnRetry(store *Store, bo *retry.Backoffer) error {
storeId := uint64(0)
if store != nil {
storeId = store.storeID
}
args, ok := s.pendingBackoffs[storeId]
if !ok {
return nil
}
delete(s.pendingBackoffs, storeId)
return bo.Backoff(args.cfg, args.err)
}

// backoffOnNoCandidate apply the largest base pending backoff when no candidate.
func (s *replicaSelector) backoffOnNoCandidate(bo *retry.Backoffer) error {
var args *backoffArgs
for _, pbo := range s.pendingBackoffs {
if args == nil || args.cfg.Base() < pbo.cfg.Base() {
args = pbo
}
}
if args == nil {
return nil
}
return bo.Backoff(args.cfg, args.err)
}
Loading
Loading