From dff471aa3e3d74b38db0001d901e96dc5d03b8db Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 26 Jul 2023 20:11:24 +0800 Subject: [PATCH] fallback to follower when leader is busy Signed-off-by: you06 --- internal/locate/region_cache.go | 10 +++- internal/locate/region_request.go | 44 ++++++++++++++-- internal/locate/region_request3_test.go | 68 ++++++++++++++++++++++++- 3 files changed, 114 insertions(+), 8 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 8482c81d0..c12dd436f 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -572,13 +572,17 @@ func (c *RPCContext) String() string { } type contextPatcher struct { - staleRead *bool + staleRead *bool + replicaRead *bool } func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) { if patcher.staleRead != nil { pbCtx.StaleRead = *patcher.staleRead } + if patcher.replicaRead != nil { + pbCtx.ReplicaRead = *patcher.replicaRead + } } type storeSelectorOp struct { @@ -1191,9 +1195,11 @@ func (c *RegionCache) reloadRegion(regionID uint64) { // ignore error and use old region info. logutil.Logger(bo.GetCtx()).Error("load region failure", zap.Uint64("regionID", regionID), zap.Error(err)) + c.mu.RLock() if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil { - atomic.StoreInt32(&oldRegion.asyncReload, 0) + atomic.CompareAndSwapInt32(&oldRegion.asyncReload, 1, 0) } + c.mu.RUnlock() return } c.mu.Lock() diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index f54320c08..8e3aca793 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -371,8 +371,9 @@ func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { // the leader will be updated to replicas[0] and give it another chance. type tryFollower struct { stateBase - leaderIdx AccessIndex - lastIdx AccessIndex + fallbackFromLeader bool + leaderIdx AccessIndex + lastIdx AccessIndex } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { @@ -397,12 +398,25 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( selector.invalidateRegion() return nil, nil } - return selector.buildRPCContext(bo) + rpcCtx, err := selector.buildRPCContext(bo) + if err != nil || rpcCtx == nil { + return rpcCtx, err + } + if state.fallbackFromLeader { + replicaRead := true + rpcCtx.contextPatcher.replicaRead = &replicaRead + } + return rpcCtx, err } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { - if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) { - panic("the store must exist") + if !state.fallbackFromLeader { + peer := selector.targetReplica().peer + if !selector.region.switchWorkLeaderToPeer(peer) { + logutil.BgLogger().Warn("the store must exist", + zap.Uint64("store", peer.StoreId), + zap.Uint64("peer", peer.Id)) + } } } @@ -888,6 +902,22 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { s.region.invalidate(StoreNotFound) } +// For some reason, the leader is unreachable by now, try followers instead. +func (s *replicaSelector) fallback2Follower(ctx *RPCContext) bool { + if ctx == nil || s == nil || s.state == nil { + return false + } + state, ok := s.state.(*accessFollower) + if !ok { + return false + } + if state.lastIdx != state.leaderIdx { + return false + } + s.state = &tryFollower{fallbackFromLeader: true, leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + return true +} + func (s *replicaSelector) invalidateRegion() { if s.region != nil { s.region.invalidate(Other) @@ -1566,6 +1596,10 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later", zap.String("reason", regionErr.GetServerIsBusy().GetReason()), zap.Stringer("ctx", ctx)) + if s.replicaSelector.fallback2Follower(ctx) { + // immediately retry on followers. + return true, nil + } if ctx != nil && ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() { err = bo.Backoff(retry.BoTiFlashServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) } else { diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 850f24d30..4b63641a8 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1020,7 +1020,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() } } -func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Leader() { leaderStore, _ := s.loadAndGetLeaderStore() leaderLabel := []*metapb.StoreLabel{ { @@ -1100,3 +1100,69 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { s.NotNil(regionErr.GetEpochNotMatch()) s.Nil(regionErr.GetDiskFull()) } + +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() { + leaderStore, _ := s.loadAndGetLeaderStore() + leaderLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(leaderStore.StoreID(), 10), + }, + } + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + value := []byte("value") + + dataIsNotReady := false + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + if addr == leaderStore.addr { + if dataIsNotReady && req.StaleRead { + dataIsNotReady = false + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{}, + }}}, nil + } + if !req.ReplicaRead { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + NotLeader: &errorpb.NotLeader{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil + }} + + dataIsNotReady = true + // data is not ready, then server is busy in the first round, + // directly server is busy in the second round. + for i := 0; i < 2; i++ { + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadMixed + var ops []StoreSelectorOption + ops = append(ops, WithMatchLabels(leaderLabel)) + + ctx, _ := context.WithTimeout(context.Background(), 10000*time.Second) + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) + s.True(ok) + s.Equal(getResp.Value, value) + } +}