Skip to content

Commit 6bb6771

Browse files
cfzjywxkyou06
authored andcommitted
Resume max retry time check for stale read retry with leader option(tikv#903) (tikv#911)
* Resume max retry time check for stale read retry with leader option Signed-off-by: cfzjywxk <lsswxrxr@163.com> * add cancel Signed-off-by: cfzjywxk <lsswxrxr@163.com> --------- Signed-off-by: cfzjywxk <lsswxrxr@163.com>
1 parent 669c634 commit 6bb6771

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed

internal/locate/region_request.go

+13
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
703703
return rpcCtx, nil
704704
}
705705

706+
func (state *accessFollower) IsLeaderExhausted(leader *replica) bool {
707+
// Allow another extra retry for the following case:
708+
// 1. The stale read is enabled and leader peer is selected as the target peer at first.
709+
// 2. Data is not ready is returned from the leader peer.
710+
// 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer.
711+
// 4. The leader peer should be retried again using snapshot read.
712+
if state.isStaleRead && state.option.leaderOnly {
713+
return leader.isExhausted(2)
714+
} else {
715+
return leader.isExhausted(1)
716+
}
717+
}
718+
706719
func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
707720
if selector.checkLiveness(bo, selector.targetReplica()) != reachable {
708721
selector.invalidateReplicaStore(selector.targetReplica(), cause)

internal/locate/region_request3_test.go

+57
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ package locate
3737
import (
3838
"context"
3939
"fmt"
40+
"strconv"
4041
"sync/atomic"
4142
"testing"
4243
"time"
@@ -1190,3 +1191,59 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
11901191
}
11911192
}
11921193
}
1194+
1195+
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
1196+
leaderStore, _ := s.loadAndGetLeaderStore()
1197+
leaderLabel := []*metapb.StoreLabel{
1198+
{
1199+
Key: "id",
1200+
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
1201+
},
1202+
}
1203+
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
1204+
s.Nil(err)
1205+
s.NotNil(regionLoc)
1206+
value := []byte("value")
1207+
isFirstReq := true
1208+
1209+
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
1210+
select {
1211+
case <-ctx.Done():
1212+
return nil, errors.New("timeout")
1213+
default:
1214+
}
1215+
// Return `DataIsNotReady` for the first time on leader.
1216+
if isFirstReq {
1217+
isFirstReq = false
1218+
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
1219+
DataIsNotReady: &errorpb.DataIsNotReady{},
1220+
}}}, nil
1221+
}
1222+
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
1223+
}}
1224+
1225+
region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID())
1226+
s.True(region.isValid())
1227+
1228+
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
1229+
req.ReadReplicaScope = oracle.GlobalTxnScope
1230+
req.TxnScope = oracle.GlobalTxnScope
1231+
req.EnableStaleRead()
1232+
req.ReplicaReadType = kv.ReplicaReadMixed
1233+
var ops []StoreSelectorOption
1234+
ops = append(ops, WithMatchLabels(leaderLabel))
1235+
1236+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1237+
defer cancel()
1238+
bo := retry.NewBackoffer(ctx, -1)
1239+
s.Nil(err)
1240+
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
1241+
s.Nil(err)
1242+
1243+
regionErr, err := resp.GetRegionError()
1244+
s.Nil(err)
1245+
s.Nil(regionErr)
1246+
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
1247+
s.True(ok)
1248+
s.Equal(getResp.Value, value)
1249+
}

0 commit comments

Comments
 (0)