Skip to content

Commit cfd5507

Browse files
committed
Resume max retry time check for stale read retry with leader option
Signed-off-by: cfzjywxk <lsswxrxr@163.com>
1 parent 719e645 commit cfd5507

File tree

2 files changed

+70
-1
lines changed

2 files changed

+70
-1
lines changed

internal/locate/region_request.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
610610
// If there is no candidate, fallback to the leader.
611611
if selector.targetIdx < 0 {
612612
leader := selector.replicas[state.leaderIdx]
613-
leaderInvalid := leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1))
613+
leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader)
614614
if len(state.option.labels) > 0 {
615615
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
616616
zap.Uint64("region", selector.region.GetID()),
@@ -644,6 +644,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
644644
return rpcCtx, nil
645645
}
646646

647+
func (state *accessFollower) IsLeaderExhausted(leader *replica) bool {
648+
// Allow another extra retry for the following case:
649+
// 1. The stale read is enabled and leader peer is selected as the target peer at first.
650+
// 2. Data is not ready is returned from the leader peer.
651+
// 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer.
652+
// 4. The leader peer should be retried again using snapshot read.
653+
if state.isStaleRead && state.option.leaderOnly {
654+
return leader.isExhausted(2)
655+
} else {
656+
return leader.isExhausted(1)
657+
}
658+
}
659+
647660
func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
648661
if selector.checkLiveness(bo, selector.targetReplica()) != reachable {
649662
selector.invalidateReplicaStore(selector.targetReplica(), cause)

internal/locate/region_request3_test.go

+56
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ package locate
3737
import (
3838
"context"
3939
"fmt"
40+
"strconv"
4041
"sync/atomic"
4142
"testing"
4243
"time"
@@ -1142,3 +1143,58 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
11421143
s.Equal(0, retryTimes)
11431144
}
11441145
}
1146+
1147+
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
1148+
leaderStore, _ := s.loadAndGetLeaderStore()
1149+
leaderLabel := []*metapb.StoreLabel{
1150+
{
1151+
Key: "id",
1152+
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
1153+
},
1154+
}
1155+
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
1156+
s.Nil(err)
1157+
s.NotNil(regionLoc)
1158+
value := []byte("value")
1159+
isFirstReq := true
1160+
1161+
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
1162+
select {
1163+
case <-ctx.Done():
1164+
return nil, errors.New("timeout")
1165+
default:
1166+
}
1167+
// Return `DataIsNotReady` for the first time on leader.
1168+
if isFirstReq {
1169+
isFirstReq = false
1170+
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
1171+
DataIsNotReady: &errorpb.DataIsNotReady{},
1172+
}}}, nil
1173+
}
1174+
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
1175+
}}
1176+
1177+
region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID())
1178+
s.True(region.isValid())
1179+
1180+
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
1181+
req.ReadReplicaScope = oracle.GlobalTxnScope
1182+
req.TxnScope = oracle.GlobalTxnScope
1183+
req.EnableStaleRead()
1184+
req.ReplicaReadType = kv.ReplicaReadMixed
1185+
var ops []StoreSelectorOption
1186+
ops = append(ops, WithMatchLabels(leaderLabel))
1187+
1188+
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
1189+
bo := retry.NewBackoffer(ctx, -1)
1190+
s.Nil(err)
1191+
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
1192+
s.Nil(err)
1193+
1194+
regionErr, err := resp.GetRegionError()
1195+
s.Nil(err)
1196+
s.Nil(regionErr)
1197+
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
1198+
s.True(ok)
1199+
s.Equal(getResp.Value, value)
1200+
}

0 commit comments

Comments
 (0)