Skip to content

Commit 2cf75b1

Browse files
authored
fix unexpected slow query during GC running after stop 1 tikv-server (#899) (#930)
Signed-off-by: crazycs520 <crazycs520@gmail.com>
1 parent a3875bc commit 2cf75b1

File tree

2 files changed

+94
-4
lines changed

2 files changed

+94
-4
lines changed

internal/locate/region_request.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
402402
}
403403
targetReplica = selector.replicas[idx]
404404
// Each follower is only tried once
405-
if !targetReplica.isExhausted(1) {
405+
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable {
406406
state.lastIdx = idx
407407
selector.targetIdx = idx
408408
break
@@ -628,8 +628,8 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic
628628
}
629629

630630
func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
631-
// the epoch is staled or retry exhausted.
632-
if replica.isEpochStale() || replica.isExhausted(1) {
631+
// the epoch is staled or retry exhausted, or the store is unreachable.
632+
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
633633
return false
634634
}
635635
// The request can only be sent to the leader.

internal/locate/region_request3_test.go

+91-1
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,12 @@ func refreshEpochs(regionStore *regionStore) {
278278
}
279279
}
280280

281+
func refreshLivenessStates(regionStore *regionStore) {
282+
for _, store := range regionStore.stores {
283+
atomic.StoreUint32(&store.livenessState, uint32(reachable))
284+
}
285+
}
286+
281287
func AssertRPCCtxEqual(s *testRegionRequestToThreeStoresSuite, rpcCtx *RPCContext, target *replica, proxy *replica) {
282288
s.Equal(rpcCtx.Store, target.store)
283289
s.Equal(rpcCtx.Peer, target.peer)
@@ -567,6 +573,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
567573
// Test accessFollower state with kv.ReplicaReadFollower request type.
568574
req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil)
569575
refreshEpochs(regionStore)
576+
refreshLivenessStates(regionStore)
570577
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
571578
s.Nil(err)
572579
s.NotNil(replicaSelector)
@@ -680,10 +687,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
680687
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
681688
s.Nil(err)
682689
s.NotNil(region)
690+
regionStore := s.cache.GetCachedRegionWithRLock(region.Region).getStore()
691+
s.NotNil(regionStore)
683692

684693
reloadRegion := func() {
685694
s.regionRequestSender.replicaSelector.region.invalidate(Other)
686695
region, _ = s.cache.LocateRegionByID(s.bo, s.regionID)
696+
regionStore = s.cache.GetCachedRegionWithRLock(region.Region).getStore()
687697
}
688698

689699
hasFakeRegionError := func(resp *tikvrpc.Response) bool {
@@ -715,6 +725,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
715725
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
716726
s.True(bo.GetTotalBackoffTimes() == 1)
717727
s.cluster.StartStore(s.storeIDs[0])
728+
atomic.StoreUint32(&regionStore.stores[0].livenessState, uint32(reachable))
718729

719730
// Leader is updated because of send success, so no backoff.
720731
bo = retry.NewBackoffer(context.Background(), -1)
@@ -734,6 +745,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
734745
s.True(hasFakeRegionError(resp))
735746
s.Equal(bo.GetTotalBackoffTimes(), 1)
736747
s.cluster.StartStore(s.storeIDs[1])
748+
atomic.StoreUint32(&regionStore.stores[1].livenessState, uint32(reachable))
737749

738750
// Leader is changed. No backoff.
739751
reloadRegion()
@@ -750,7 +762,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
750762
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
751763
s.Nil(err)
752764
s.True(hasFakeRegionError(resp))
753-
s.Equal(bo.GetTotalBackoffTimes(), 2) // The unreachable leader is skipped
765+
s.Equal(bo.GetTotalBackoffTimes(), 3)
754766
s.False(sender.replicaSelector.region.isValid())
755767
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])
756768

@@ -1098,3 +1110,81 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg
10981110
// after region error returned, the region should be invalidated.
10991111
s.False(region.isValid())
11001112
}
1113+
1114+
func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() {
1115+
var leaderAddr string
1116+
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
1117+
// Returns error when accesses non-leader.
1118+
if leaderAddr != addr {
1119+
return nil, context.DeadlineExceeded
1120+
}
1121+
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
1122+
Value: []byte("value"),
1123+
}}, nil
1124+
}}
1125+
1126+
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
1127+
Key: []byte("key"),
1128+
})
1129+
req.ReplicaReadType = kv.ReplicaReadMixed
1130+
1131+
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
1132+
s.Nil(err)
1133+
region := s.cache.GetCachedRegionWithRLock(loc.Region)
1134+
s.NotNil(region)
1135+
regionStore := region.getStore()
1136+
leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr
1137+
s.NotEqual(leaderAddr, "")
1138+
for i := 0; i < 10; i++ {
1139+
bo := retry.NewBackofferWithVars(context.Background(), 100, nil)
1140+
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
1141+
s.Nil(err)
1142+
s.NotNil(resp)
1143+
1144+
// Since send req to follower will receive error, then all follower will be marked as unreachable and epoch stale.
1145+
allFollowerStoreEpochStale := true
1146+
for i, store := range regionStore.stores {
1147+
if i == int(regionStore.workTiKVIdx) {
1148+
continue
1149+
}
1150+
if store.epoch == regionStore.storeEpochs[i] {
1151+
allFollowerStoreEpochStale = false
1152+
break
1153+
} else {
1154+
s.Equal(store.getLivenessState(), unreachable)
1155+
}
1156+
}
1157+
if allFollowerStoreEpochStale {
1158+
break
1159+
}
1160+
}
1161+
1162+
// mock for GC leader reload all regions.
1163+
bo := retry.NewBackofferWithVars(context.Background(), 10, nil)
1164+
_, err = s.cache.BatchLoadRegionsWithKeyRange(bo, []byte(""), nil, 1)
1165+
s.Nil(err)
1166+
1167+
loc, err = s.cache.LocateKey(s.bo, []byte("key"))
1168+
s.Nil(err)
1169+
region = s.cache.GetCachedRegionWithRLock(loc.Region)
1170+
s.NotNil(region)
1171+
regionStore = region.getStore()
1172+
for i, store := range regionStore.stores {
1173+
if i == int(regionStore.workTiKVIdx) {
1174+
continue
1175+
}
1176+
// After reload region, the region epoch will be updated, but the store liveness state is still unreachable.
1177+
s.Equal(store.epoch, regionStore.storeEpochs[i])
1178+
s.Equal(store.getLivenessState(), unreachable)
1179+
}
1180+
1181+
for i := 0; i < 100; i++ {
1182+
bo := retry.NewBackofferWithVars(context.Background(), 1, nil)
1183+
resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
1184+
s.Nil(err)
1185+
s.NotNil(resp)
1186+
// since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0.
1187+
s.Equal(0, bo.GetTotalBackoffTimes())
1188+
s.Equal(0, retryTimes)
1189+
}
1190+
}

0 commit comments

Comments
 (0)