@@ -278,6 +278,12 @@ func refreshEpochs(regionStore *regionStore) {
278
278
}
279
279
}
280
280
281
+ func refreshLivenessStates (regionStore * regionStore ) {
282
+ for _ , store := range regionStore .stores {
283
+ atomic .StoreUint32 (& store .livenessState , uint32 (reachable ))
284
+ }
285
+ }
286
+
281
287
func AssertRPCCtxEqual (s * testRegionRequestToThreeStoresSuite , rpcCtx * RPCContext , target * replica , proxy * replica ) {
282
288
s .Equal (rpcCtx .Store , target .store )
283
289
s .Equal (rpcCtx .Peer , target .peer )
@@ -567,6 +573,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
567
573
// Test accessFollower state with kv.ReplicaReadFollower request type.
568
574
req = tikvrpc .NewReplicaReadRequest (tikvrpc .CmdGet , & kvrpcpb.GetRequest {}, kv .ReplicaReadFollower , nil )
569
575
refreshEpochs (regionStore )
576
+ refreshLivenessStates (regionStore )
570
577
replicaSelector , err = newReplicaSelector (cache , regionLoc .Region , req )
571
578
s .Nil (err )
572
579
s .NotNil (replicaSelector )
@@ -680,10 +687,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
680
687
region , err := s .cache .LocateRegionByID (s .bo , s .regionID )
681
688
s .Nil (err )
682
689
s .NotNil (region )
690
+ regionStore := s .cache .GetCachedRegionWithRLock (region .Region ).getStore ()
691
+ s .NotNil (regionStore )
683
692
684
693
reloadRegion := func () {
685
694
s .regionRequestSender .replicaSelector .region .invalidate (Other )
686
695
region , _ = s .cache .LocateRegionByID (s .bo , s .regionID )
696
+ regionStore = s .cache .GetCachedRegionWithRLock (region .Region ).getStore ()
687
697
}
688
698
689
699
hasFakeRegionError := func (resp * tikvrpc.Response ) bool {
@@ -715,6 +725,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
715
725
s .Equal (sender .replicaSelector .targetIdx , AccessIndex (1 ))
716
726
s .True (bo .GetTotalBackoffTimes () == 1 )
717
727
s .cluster .StartStore (s .storeIDs [0 ])
728
+ atomic .StoreUint32 (& regionStore .stores [0 ].livenessState , uint32 (reachable ))
718
729
719
730
// Leader is updated because of send success, so no backoff.
720
731
bo = retry .NewBackoffer (context .Background (), - 1 )
@@ -734,6 +745,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
734
745
s .True (hasFakeRegionError (resp ))
735
746
s .Equal (bo .GetTotalBackoffTimes (), 1 )
736
747
s .cluster .StartStore (s .storeIDs [1 ])
748
+ atomic .StoreUint32 (& regionStore .stores [1 ].livenessState , uint32 (reachable ))
737
749
738
750
// Leader is changed. No backoff.
739
751
reloadRegion ()
@@ -750,7 +762,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
750
762
resp , _ , err = sender .SendReq (bo , req , region .Region , time .Second )
751
763
s .Nil (err )
752
764
s .True (hasFakeRegionError (resp ))
753
- s .Equal (bo .GetTotalBackoffTimes (), 2 ) // The unreachable leader is skipped
765
+ s .Equal (bo .GetTotalBackoffTimes (), 3 )
754
766
s .False (sender .replicaSelector .region .isValid ())
755
767
s .cluster .ChangeLeader (s .regionID , s .peerIDs [0 ])
756
768
@@ -1098,3 +1110,81 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderReg
1098
1110
// after region error returned, the region should be invalidated.
1099
1111
s .False (region .isValid ())
1100
1112
}
1113
+
1114
+ func (s * testRegionRequestToThreeStoresSuite ) TestAccessFollowerAfter1TiKVDown () {
1115
+ var leaderAddr string
1116
+ s .regionRequestSender .client = & fnClient {fn : func (ctx context.Context , addr string , req * tikvrpc.Request , timeout time.Duration ) (response * tikvrpc.Response , err error ) {
1117
+ // Returns error when accesses non-leader.
1118
+ if leaderAddr != addr {
1119
+ return nil , context .DeadlineExceeded
1120
+ }
1121
+ return & tikvrpc.Response {Resp : & kvrpcpb.GetResponse {
1122
+ Value : []byte ("value" ),
1123
+ }}, nil
1124
+ }}
1125
+
1126
+ req := tikvrpc .NewRequest (tikvrpc .CmdGet , & kvrpcpb.GetRequest {
1127
+ Key : []byte ("key" ),
1128
+ })
1129
+ req .ReplicaReadType = kv .ReplicaReadMixed
1130
+
1131
+ loc , err := s .cache .LocateKey (s .bo , []byte ("key" ))
1132
+ s .Nil (err )
1133
+ region := s .cache .GetCachedRegionWithRLock (loc .Region )
1134
+ s .NotNil (region )
1135
+ regionStore := region .getStore ()
1136
+ leaderAddr = regionStore .stores [regionStore .workTiKVIdx ].addr
1137
+ s .NotEqual (leaderAddr , "" )
1138
+ for i := 0 ; i < 10 ; i ++ {
1139
+ bo := retry .NewBackofferWithVars (context .Background (), 100 , nil )
1140
+ resp , _ , _ , err := s .regionRequestSender .SendReqCtx (bo , req , loc .Region , time .Second , tikvrpc .TiKV )
1141
+ s .Nil (err )
1142
+ s .NotNil (resp )
1143
+
1144
+ // Since send req to follower will receive error, then all follower will be marked as unreachable and epoch stale.
1145
+ allFollowerStoreEpochStale := true
1146
+ for i , store := range regionStore .stores {
1147
+ if i == int (regionStore .workTiKVIdx ) {
1148
+ continue
1149
+ }
1150
+ if store .epoch == regionStore .storeEpochs [i ] {
1151
+ allFollowerStoreEpochStale = false
1152
+ break
1153
+ } else {
1154
+ s .Equal (store .getLivenessState (), unreachable )
1155
+ }
1156
+ }
1157
+ if allFollowerStoreEpochStale {
1158
+ break
1159
+ }
1160
+ }
1161
+
1162
+ // mock for GC leader reload all regions.
1163
+ bo := retry .NewBackofferWithVars (context .Background (), 10 , nil )
1164
+ _ , err = s .cache .BatchLoadRegionsWithKeyRange (bo , []byte ("" ), nil , 1 )
1165
+ s .Nil (err )
1166
+
1167
+ loc , err = s .cache .LocateKey (s .bo , []byte ("key" ))
1168
+ s .Nil (err )
1169
+ region = s .cache .GetCachedRegionWithRLock (loc .Region )
1170
+ s .NotNil (region )
1171
+ regionStore = region .getStore ()
1172
+ for i , store := range regionStore .stores {
1173
+ if i == int (regionStore .workTiKVIdx ) {
1174
+ continue
1175
+ }
1176
+ // After reload region, the region epoch will be updated, but the store liveness state is still unreachable.
1177
+ s .Equal (store .epoch , regionStore .storeEpochs [i ])
1178
+ s .Equal (store .getLivenessState (), unreachable )
1179
+ }
1180
+
1181
+ for i := 0 ; i < 100 ; i ++ {
1182
+ bo := retry .NewBackofferWithVars (context .Background (), 1 , nil )
1183
+ resp , _ , retryTimes , err := s .regionRequestSender .SendReqCtx (bo , req , loc .Region , time .Second , tikvrpc .TiKV )
1184
+ s .Nil (err )
1185
+ s .NotNil (resp )
1186
+ // since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0.
1187
+ s .Equal (0 , bo .GetTotalBackoffTimes ())
1188
+ s .Equal (0 , retryTimes )
1189
+ }
1190
+ }
0 commit comments