Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce unnecessary tikvServerBusy backoff when able to try next replica #1184

Merged
merged 21 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,21 +1265,15 @@ func (s *replicaSelector) onServerIsBusy(
// Clear attempt history of the leader, so the leader can be accessed again.
s.replicas[state.leaderIdx].attempts = 0
s.state = &tryIdleReplica{leaderIdx: state.leaderIdx}
return true, nil
case *tryIdleReplica:
if s.targetIdx != state.leaderIdx {
return true, nil
}
// backoff if still receiving ServerIsBusy after accessing leader again
}
}
} else if ctx != nil && ctx.Store != nil {
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
ctx.Store.markAlreadySlow()
if s.canFallback2Follower() {
return true, nil
}
}
if s.canSkipServerIsBusyBackoff() {
return true, nil
}
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
if err != nil {
Expand All @@ -1288,21 +1282,24 @@ func (s *replicaSelector) onServerIsBusy(
return true, nil
}

// For some reasons, the leader is unreachable by now, try followers instead.
// the state is changed in accessFollower.next when leader is unavailable.
func (s *replicaSelector) canFallback2Follower() bool {
// canSkipServerIsBusyBackoff returns true if the request can be sent to next replica and can skip ServerIsBusy backoff.
func (s *replicaSelector) canSkipServerIsBusyBackoff() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can reuse this function handling region unavailable and changing it's name in the future. Not this PR's work.

if s == nil || s.state == nil {
return false
}
state, ok := s.state.(*accessFollower)
if !ok {
accessLeader, ok := s.state.(*accessKnownLeader)
if ok && accessLeader.isCandidate(s.replicas[accessLeader.leaderIdx]) {
// If leader is still candidate, the request will be sent to leader again,
// so don't skip since the leader is still busy.
return false
}
if !state.isStaleRead {
return false
for _, replica := range s.replicas {
if replica.attempts == 0 && replica.store.getLivenessState() == reachable &&
!replica.isEpochStale() && !replica.store.isSlow() {
return true
}
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
// can fallback to follower only when the leader is exhausted.
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
return false
}

func (s *replicaSelector) onDataIsNotReady() {
Expand Down
6 changes: 6 additions & 0 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,12 @@ func refreshLivenessStates(regionStore *regionStore) {
}
}

func refreshStoreScores(regionStore *regionStore) {
for _, store := range regionStore.stores {
store.slowScore.resetSlowScore()
}
}

func AssertRPCCtxEqual(s *testRegionRequestToThreeStoresSuite, rpcCtx *RPCContext, target *replica, proxy *replica) {
s.Equal(rpcCtx.Store, target.store)
s.Equal(rpcCtx.Peer, target.peer)
Expand Down
101 changes: 93 additions & 8 deletions internal/locate/replica_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
},
respErr: "",
respRegionError: nil,
backoffCnt: 1,
backoffDetail: []string{"tikvServerBusy+1"},
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
Expand Down Expand Up @@ -319,8 +319,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
"{addr: store2, replica-read: true, stale-read: false}"},
respErr: "",
respRegionError: nil,
backoffCnt: 1,
backoffDetail: []string{"tikvServerBusy+1"},
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
Expand All @@ -341,8 +341,8 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
},
respErr: "",
respRegionError: fakeEpochNotMatch,
backoffCnt: 2,
backoffDetail: []string{"tikvServerBusy+2"},
backoffCnt: 1,
backoffDetail: []string{"tikvServerBusy+1"},
regionIsValid: false,
},
}
Expand Down Expand Up @@ -385,14 +385,99 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
"{addr: store3, replica-read: true, stale-read: false}"},
respErr: "",
respRegionError: nil,
backoffCnt: 1,
backoffDetail: []string{"tikvServerBusy+1"},
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))

ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadMixed,
staleRead: true,
timeout: time.Microsecond * 100,
accessErr: []RegionErrorType{ServerIsBusyErr, ServerIsBusyWithEstimatedWaitMsErr},
expect: &accessPathResult{
accessPath: []string{
"{addr: store1, replica-read: false, stale-read: true}",
"{addr: store2, replica-read: true, stale-read: false}",
"{addr: store3, replica-read: true, stale-read: false}",
},
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))
}

func TestCanSkipServerIsBusyBackoff(t *testing.T) {
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
defer s.TearDownTest()

loc, err := s.cache.LocateKey(s.bo, []byte("key"))
s.Nil(err)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
req.EnableStaleWithMixedReplicaRead()
selector, err := newReplicaSelector(s.cache, loc.Region, req)
s.Nil(err)
for i := 0; i < 3; i++ {
_, err = selector.next(s.bo)
s.Nil(err)
skip := selector.canSkipServerIsBusyBackoff()
if i < 2 {
s.True(skip)
} else {
s.False(skip)
}
}

selector, err = newReplicaSelector(s.cache, loc.Region, req)
s.Nil(err)
_, err = selector.next(s.bo)
s.Nil(err)
// mock all replica's store is unreachable.
for _, replica := range selector.replicas {
atomic.StoreUint32(&replica.store.livenessState, uint32(unreachable))
}
skip := selector.canSkipServerIsBusyBackoff()
s.False(skip) // can't skip since no replica is available.
refreshLivenessStates(selector.regionStore)
skip = selector.canSkipServerIsBusyBackoff()
s.True(skip)
// mock all replica's store is slow.
for _, replica := range selector.replicas {
replica.store.markAlreadySlow()
}
skip = selector.canSkipServerIsBusyBackoff()
s.False(skip)
refreshStoreScores(selector.regionStore)
skip = selector.canSkipServerIsBusyBackoff()
s.True(skip)

// Test for leader read.
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
req.ReplicaReadType = kv.ReplicaReadLeader
selector, err = newReplicaSelector(s.cache, loc.Region, req)
s.Nil(err)
for i := 0; i < 12; i++ {
_, err = selector.next(s.bo)
s.Nil(err)
skip := selector.canSkipServerIsBusyBackoff()
if i <= 8 {
s.False(skip) // can't skip since leader is available.
} else if i <= 10 {
s.True(skip)
} else {
s.False(skip)
}
}
}

func (s *testReplicaSelectorSuite) changeRegionLeader(storeId uint64) {
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
s.Nil(err)
Expand Down
Loading