Skip to content

Commit

Permalink
add fail path test
Browse files Browse the repository at this point in the history
Signed-off-by: cfzjywxk <lsswxrxr@163.com>
  • Loading branch information
cfzjywxk committed Jul 26, 2023
1 parent d7ecec6 commit a54a4d9
Showing 1 changed file with 30 additions and 4 deletions.
34 changes: 30 additions & 4 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,27 +1156,40 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
s.Nil(err)
s.NotNil(regionLoc)
value := []byte("value")
isFirstReq := true

type testState struct {
tryTimes uint8
succ bool
}

state := &testState{}
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
// Return `DataIsNotReady` for the first time on leader.
if isFirstReq {
isFirstReq = false
if state.tryTimes == 0 {
state.tryTimes++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
} else if state.tryTimes == 1 && state.succ {
state.tryTimes++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
state.tryTimes++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DiskFull: &errorpb.DiskFull{},
}}}, nil
}}

region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID())
s.True(region.isValid())

// Test the successful path.
state.succ = true
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
Expand All @@ -1197,4 +1210,17 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
s.Equal(getResp.Value, value)

// Test the fail path leader retry limit is reached, epoch not match error would be returned.
state.tryTimes = 0
state.succ = false
req.EnableStaleRead()
resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)

regionErr, err = resp.GetRegionError()
s.Nil(err)
s.NotNil(regionErr)
s.NotNil(regionErr.GetEpochNotMatch())
s.Nil(regionErr.GetDiskFull())
}

0 comments on commit a54a4d9

Please sign in to comment.