Skip to content

Commit

Permalink
fallback to follower when leader is busy
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <you1474600@gmail.com>
  • Loading branch information
you06 committed Jul 26, 2023
1 parent bf9390f commit dff471a
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 8 deletions.
10 changes: 8 additions & 2 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,17 @@ func (c *RPCContext) String() string {
}

type contextPatcher struct {
staleRead *bool
staleRead *bool
replicaRead *bool
}

func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) {
if patcher.staleRead != nil {
pbCtx.StaleRead = *patcher.staleRead
}
if patcher.replicaRead != nil {
pbCtx.ReplicaRead = *patcher.replicaRead
}
}

type storeSelectorOp struct {
Expand Down Expand Up @@ -1191,9 +1195,11 @@ func (c *RegionCache) reloadRegion(regionID uint64) {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
c.mu.RLock()
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
atomic.StoreInt32(&oldRegion.asyncReload, 0)
atomic.CompareAndSwapInt32(&oldRegion.asyncReload, 1, 0)
}
c.mu.RUnlock()
return
}
c.mu.Lock()
Expand Down
44 changes: 39 additions & 5 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) {
// the leader will be updated to replicas[0] and give it another chance.
type tryFollower struct {
stateBase
leaderIdx AccessIndex
lastIdx AccessIndex
fallbackFromLeader bool
leaderIdx AccessIndex
lastIdx AccessIndex
}

func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
Expand All @@ -397,12 +398,25 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
selector.invalidateRegion()
return nil, nil
}
return selector.buildRPCContext(bo)
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return rpcCtx, err
}
if state.fallbackFromLeader {
replicaRead := true
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
return rpcCtx, err
}

func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) {
panic("the store must exist")
if !state.fallbackFromLeader {
peer := selector.targetReplica().peer
if !selector.region.switchWorkLeaderToPeer(peer) {
logutil.BgLogger().Warn("the store must exist",
zap.Uint64("store", peer.StoreId),
zap.Uint64("peer", peer.Id))
}
}
}

Expand Down Expand Up @@ -888,6 +902,22 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
s.region.invalidate(StoreNotFound)
}

// For some reason, the leader is unreachable by now, try followers instead.
func (s *replicaSelector) fallback2Follower(ctx *RPCContext) bool {
if ctx == nil || s == nil || s.state == nil {
return false
}
state, ok := s.state.(*accessFollower)
if !ok {
return false
}
if state.lastIdx != state.leaderIdx {
return false
}
s.state = &tryFollower{fallbackFromLeader: true, leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
return true
}

func (s *replicaSelector) invalidateRegion() {
if s.region != nil {
s.region.invalidate(Other)
Expand Down Expand Up @@ -1566,6 +1596,10 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
if s.replicaSelector.fallback2Follower(ctx) {
// immediately retry on followers.
return true, nil
}
if ctx != nil && ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() {
err = bo.Backoff(retry.BoTiFlashServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
} else {
Expand Down
68 changes: 67 additions & 1 deletion internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
}
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Leader() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Expand Down Expand Up @@ -1100,3 +1100,69 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
s.NotNil(regionErr.GetEpochNotMatch())
s.Nil(regionErr.GetDiskFull())
}

func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
value := []byte("value")

dataIsNotReady := false
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
if addr == leaderStore.addr {
if dataIsNotReady && req.StaleRead {
dataIsNotReady = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
ServerIsBusy: &errorpb.ServerIsBusy{},
}}}, nil
}
if !req.ReplicaRead {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
NotLeader: &errorpb.NotLeader{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
}}

dataIsNotReady = true
// data is not ready, then server is busy in the first round,
// directly server is busy in the second round.
for i := 0; i < 2; i++ {
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
ops = append(ops, WithMatchLabels(leaderLabel))

ctx, _ := context.WithTimeout(context.Background(), 10000*time.Second)
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)

regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
s.Equal(getResp.Value, value)
}
}

0 comments on commit dff471a

Please sign in to comment.