From daca134a31e40bbf517dd37615e71efc90b84df6 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 24 Jul 2023 17:25:08 +0800 Subject: [PATCH] add test case to cover the fallback path when leader peer returns data not ready error at first Signed-off-by: cfzjywxk --- internal/locate/region_request3_test.go | 56 +++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 4a7b7dee6..ef21359f5 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -36,6 +36,7 @@ package locate import ( "context" + "strconv" "sync/atomic" "testing" "time" @@ -929,3 +930,58 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg // after region error returned, the region should be invalidated. s.False(region.isValid()) } + +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { + leaderStore, _ := s.loadAndGetLeaderStore() + leaderLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(leaderStore.StoreID(), 10), + }, + } + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + value := []byte("value") + isFirstReq := true + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + // Return `DataIsNotReady` for the first time on leader. + if isFirstReq { + isFirstReq = false + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil + }} + + region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID()) + s.True(region.isValid()) + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadMixed + var ops []StoreSelectorOption + ops = append(ops, WithMatchLabels(leaderLabel)) + + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) + s.True(ok) + s.Equal(getResp.Value, value) +}