Skip to content

Commit 59adec2

Browse files
authored
fix unexpected slow query during GC running after stop 1 tikv-server (#899)
* fix unexpected slow query during GC running after stop 1 tikv-server Signed-off-by: crazycs520 <crazycs520@gmail.com> * add test Signed-off-by: crazycs520 <crazycs520@gmail.com> * address comment Signed-off-by: crazycs520 <crazycs520@gmail.com> * address comment Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix test Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com>
1 parent c7e214f commit 59adec2

File tree

2 files changed

+94
-3
lines changed

2 files changed

+94
-3
lines changed

internal/locate/region_request.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
385385
}
386386
targetReplica = selector.replicas[idx]
387387
// Each follower is only tried once
388-
if !targetReplica.isExhausted(1) {
388+
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable {
389389
state.lastIdx = idx
390390
selector.targetIdx = idx
391391
break
@@ -604,7 +604,9 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool
604604
// The request can only be sent to the leader.
605605
((state.option.leaderOnly && idx == state.leaderIdx) ||
606606
// Choose a replica with matched labels.
607-
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels)))
607+
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels))) &&
608+
// Make sure the replica is not unreachable.
609+
replica.store.getLivenessState() != unreachable
608610
}
609611

610612
type invalidStore struct {

internal/locate/region_request3_test.go

+90-1
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,12 @@ func refreshEpochs(regionStore *regionStore) {
276276
}
277277
}
278278

279+
func refreshLivenessStates(regionStore *regionStore) {
280+
for _, store := range regionStore.stores {
281+
atomic.StoreUint32(&store.livenessState, uint32(reachable))
282+
}
283+
}
284+
279285
func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
280286
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
281287
s.Nil(err)
@@ -511,6 +517,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
511517
// Test accessFollower state with kv.ReplicaReadFollower request type.
512518
req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil)
513519
refreshEpochs(regionStore)
520+
refreshLivenessStates(regionStore)
514521
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
515522
s.Nil(err)
516523
s.NotNil(replicaSelector)
@@ -625,10 +632,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
625632
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
626633
s.Nil(err)
627634
s.NotNil(region)
635+
regionStore := s.cache.GetCachedRegionWithRLock(region.Region).getStore()
636+
s.NotNil(regionStore)
628637

629638
reloadRegion := func() {
630639
s.regionRequestSender.replicaSelector.region.invalidate(Other)
631640
region, _ = s.cache.LocateRegionByID(s.bo, s.regionID)
641+
regionStore = s.cache.GetCachedRegionWithRLock(region.Region).getStore()
632642
}
633643

634644
hasFakeRegionError := func(resp *tikvrpc.Response) bool {
@@ -660,6 +670,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
660670
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
661671
s.True(bo.GetTotalBackoffTimes() == 1)
662672
s.cluster.StartStore(s.storeIDs[0])
673+
atomic.StoreUint32(&regionStore.stores[0].livenessState, uint32(reachable))
663674

664675
// Leader is updated because of send success, so no backoff.
665676
bo = retry.NewBackoffer(context.Background(), -1)
@@ -679,6 +690,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
679690
s.True(hasFakeRegionError(resp))
680691
s.Equal(bo.GetTotalBackoffTimes(), 1)
681692
s.cluster.StartStore(s.storeIDs[1])
693+
atomic.StoreUint32(&regionStore.stores[1].livenessState, uint32(reachable))
682694

683695
// Leader is changed. No backoff.
684696
reloadRegion()
@@ -695,7 +707,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
695707
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
696708
s.Nil(err)
697709
s.True(hasFakeRegionError(resp))
698-
s.Equal(bo.GetTotalBackoffTimes(), 2) // The unreachable leader is skipped
710+
s.Equal(bo.GetTotalBackoffTimes(), 3)
699711
s.False(sender.replicaSelector.region.isValid())
700712
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])
701713

@@ -929,3 +941,80 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg
929941
// after region error returned, the region should be invalidated.
930942
s.False(region.isValid())
931943
}
944+
945+
func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() {
946+
var leaderAddr string
947+
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
948+
// Returns error when accesses non-leader.
949+
if leaderAddr != addr {
950+
return nil, context.DeadlineExceeded
951+
}
952+
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
953+
Value: []byte("value"),
954+
}}, nil
955+
}}
956+
957+
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
958+
Key: []byte("key"),
959+
})
960+
req.ReplicaReadType = kv.ReplicaReadMixed
961+
962+
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
963+
s.Nil(err)
964+
region := s.cache.GetCachedRegionWithRLock(loc.Region)
965+
s.NotNil(region)
966+
regionStore := region.getStore()
967+
leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr
968+
s.NotEqual(leaderAddr, "")
969+
for i := 0; i < 10; i++ {
970+
bo := retry.NewBackofferWithVars(context.Background(), 100, nil)
971+
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
972+
s.Nil(err)
973+
s.NotNil(resp)
974+
975+
// Since send req to follower will receive error, then all follower will be marked as unreachable and epoch stale.
976+
allFollowerStoreEpochStale := true
977+
for i, store := range regionStore.stores {
978+
if i == int(regionStore.workTiKVIdx) {
979+
continue
980+
}
981+
if store.epoch == regionStore.storeEpochs[i] {
982+
allFollowerStoreEpochStale = false
983+
break
984+
} else {
985+
s.Equal(store.getLivenessState(), unreachable)
986+
}
987+
}
988+
if allFollowerStoreEpochStale {
989+
break
990+
}
991+
}
992+
993+
// mock for GC leader reload all regions.
994+
bo := retry.NewBackofferWithVars(context.Background(), 10, nil)
995+
_, err = s.cache.BatchLoadRegionsWithKeyRange(bo, []byte(""), nil, 1)
996+
s.Nil(err)
997+
998+
loc, err = s.cache.LocateKey(s.bo, []byte("key"))
999+
s.Nil(err)
1000+
region = s.cache.GetCachedRegionWithRLock(loc.Region)
1001+
s.NotNil(region)
1002+
regionStore = region.getStore()
1003+
for i, store := range regionStore.stores {
1004+
if i == int(regionStore.workTiKVIdx) {
1005+
continue
1006+
}
1007+
// After reload region, the region epoch will be updated, but the store liveness state is still unreachable.
1008+
s.Equal(store.epoch, regionStore.storeEpochs[i])
1009+
s.Equal(store.getLivenessState(), unreachable)
1010+
}
1011+
1012+
for i := 0; i < 100; i++ {
1013+
bo := retry.NewBackofferWithVars(context.Background(), 1, nil)
1014+
resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
1015+
s.Nil(err)
1016+
s.NotNil(resp)
1017+
// since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0.
1018+
s.Equal(0, bo.GetTotalBackoffTimes())
1019+
}
1020+
}

0 commit comments

Comments
 (0)