Skip to content

Commit

Permalink
store/tikv: always switch to a different peer when meets no-leader (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
youjiali1995 authored Mar 26, 2021
1 parent fe250b9 commit 2df2ca0
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 22 deletions.
14 changes: 10 additions & 4 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func (s *testPessimisticSuite) newAsyncCommitTestKitWithInit(c *C) *testkit.Test
return tk
}

func (s *testPessimisticSuite) new1PCTestKitWithInit(c *C) *testkit.TestKit {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.Se.GetSessionVars().Enable1PC = true
return tk
}

type testPessimisticSuite struct {
testSessionSuiteBase
}
Expand Down Expand Up @@ -2122,9 +2128,9 @@ func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) {
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})

tk := s.newAsyncCommitTestKitWithInit(c)
tk2 := s.newAsyncCommitTestKitWithInit(c)
tk3 := s.newAsyncCommitTestKitWithInit(c)
tk := s.new1PCTestKitWithInit(c)
tk2 := s.new1PCTestKitWithInit(c)
tk3 := s.new1PCTestKitWithInit(c)

tk.MustExec("drop table if exists tk")
tk.MustExec("create table tk (c1 int primary key, c2 int)")
Expand Down Expand Up @@ -2160,7 +2166,7 @@ func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) {
time.Sleep(200 * time.Millisecond)
tk2.MustExec("alter table tk add index k2(c2)")
}()
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1000)"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1200)"), IsNil)
tk.MustExec("commit")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil)
tk3.MustExec("admin check table tk")
Expand Down
75 changes: 58 additions & 17 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,38 @@ var RegionCacheTTLSec int64 = 600

const (
updated int32 = iota // region is updated and no need to reload.
needSync // need sync new region info.
needSync // need sync new region info.
)

// InvalidReason is the reason why a cached region is invalidated.
// The region cache may take different strategies to handle different reasons.
// For example, when a cached region is invalidated due to no leader, region cache
// will always access to a different peer.
type InvalidReason int32

const (
// Ok indicates the cached region is valid
Ok InvalidReason = iota
// NoLeader indicates it's invalidated due to no leader
NoLeader
// RegionNotFound indicates it's invalidated due to region not found in the store
RegionNotFound
// EpochNotMatch indicates it's invalidated due to epoch not match
EpochNotMatch
// StoreNotFound indicates it's invalidated due to store not found in PD
StoreNotFound
// Other indicates it's invalidated due to other reasons, e.g., the store
// is removed from the cluster, fail to send requests to the store.
Other
)

// Region presents kv region
type Region struct {
meta *metapb.Region // raw region meta from PD immutable after init
store unsafe.Pointer // point to region store info, see RegionStore
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
meta *metapb.Region // raw region meta from PD immutable after init
store unsafe.Pointer // point to region store info, see RegionStore
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
}

// AccessIndex represent the index for accessIndex array
Expand Down Expand Up @@ -203,7 +226,7 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *RegionStore) bool {
func (r *Region) checkRegionCacheTTL(ts int64) bool {
// Only consider use percentage on this failpoint, for example, "2%return"
failpoint.Inject("invalidateRegionCache", func() {
r.invalidate()
r.invalidate(Other)
})
for {
lastAccess := atomic.LoadInt64(&r.lastAccess)
Expand All @@ -217,8 +240,9 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool {
}

// invalidate invalidates a region, next time it will got null result.
func (r *Region) invalidate() {
func (r *Region) invalidate(reason InvalidReason) {
metrics.RegionCacheCounterWithInvalidateRegionFromCacheOK.Inc()
atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason))
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
}

Expand Down Expand Up @@ -456,13 +480,13 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe
})
if store == nil || len(addr) == 0 {
// Store not found, region must be out of date.
cachedRegion.invalidate()
cachedRegion.invalidate(StoreNotFound)
return nil, nil
}

storeFailEpoch := atomic.LoadUint32(&store.epoch)
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
cachedRegion.invalidate()
cachedRegion.invalidate(Other)
logutil.BgLogger().Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
Expand Down Expand Up @@ -529,7 +553,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC
return nil, err
}
if len(addr) == 0 {
cachedRegion.invalidate()
cachedRegion.invalidate(StoreNotFound)
return nil, nil
}
if store.getResolveState() == needCheck {
Expand All @@ -540,7 +564,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC
peer := cachedRegion.meta.Peers[storeIdx]
storeFailEpoch := atomic.LoadUint32(&store.epoch)
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
cachedRegion.invalidate()
cachedRegion.invalidate(Other)
logutil.BgLogger().Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
Expand All @@ -559,7 +583,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC
}, nil
}

cachedRegion.invalidate()
cachedRegion.invalidate(Other)
return nil, nil
}

Expand Down Expand Up @@ -901,11 +925,16 @@ func (c *RegionCache) BatchLoadRegionsFromKey(bo *Backoffer, startKey []byte, co

// InvalidateCachedRegion removes a cached Region.
func (c *RegionCache) InvalidateCachedRegion(id RegionVerID) {
c.InvalidateCachedRegionWithReason(id, Other)
}

// InvalidateCachedRegionWithReason removes a cached Region with the reason why it's invalidated.
func (c *RegionCache) InvalidateCachedRegionWithReason(id RegionVerID, reason InvalidReason) {
cachedRegion := c.getCachedRegionWithRLock(id)
if cachedRegion == nil {
return
}
cachedRegion.invalidate()
cachedRegion.invalidate(reason)
}

// UpdateLeader update some region cache with newer leader info.
Expand All @@ -932,7 +961,7 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c
zap.Uint64("regionID", regionID.GetID()),
zap.Int("currIdx", int(currentPeerIdx)),
zap.Uint64("leaderStoreID", leaderStoreID))
r.invalidate()
r.invalidate(StoreNotFound)
} else {
logutil.BgLogger().Info("switch region leader to specific leader due to kv return NotLeader",
zap.Uint64("regionID", regionID.GetID()),
Expand All @@ -942,13 +971,25 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c
}

// insertRegionToCache tries to insert the Region to cache.
// It should be protected by c.mu.Lock().
func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
old := c.mu.sorted.ReplaceOrInsert(newBtreeItem(cachedRegion))
if old != nil {
store := cachedRegion.getStore()
oldRegion := old.(*btreeItem).cachedRegion
oldRegionStore := oldRegion.getStore()
// Joint consensus is enabled in v5.0, which is possible to make a leader step down as a learner during a conf change.
// And if hibernate region is enabled, after the leader step down, there can be a long time that there is no leader
// in the region and the leader info in PD is stale until requests are sent to followers or hibernate timeout.
// To solve it, one solution is always to try a different peer if the invalid reason of the old cached region is no-leader.
// There is a small probability that the current peer who reports no-leader becomes a leader and TiDB has to retry once in this case.
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(TiKVOnly))
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
atomic.StoreInt32(&cachedRegion.getStore().workTiFlashIdx, atomic.LoadInt32(&old.(*btreeItem).cachedRegion.getStore().workTiFlashIdx))
delete(c.mu.regions, old.(*btreeItem).cachedRegion.VerID())
store.workTiFlashIdx = atomic.LoadInt32(&oldRegionStore.workTiFlashIdx)
delete(c.mu.regions, oldRegion.VerID())
}
c.mu.regions[cachedRegion.VerID()] = cachedRegion
}
Expand Down Expand Up @@ -1364,7 +1405,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr
if needInvalidateOld {
cachedRegion, ok := c.mu.regions[ctx.Region]
if ok {
cachedRegion.invalidate()
cachedRegion.invalidate(EpochNotMatch)
}
}
return nil
Expand Down
19 changes: 19 additions & 0 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,25 @@ func (s *testRegionCacheSuite) TestContainsByEnd(c *C) {
c.Assert(createSampleRegion([]byte{10}, []byte{20}).ContainsByEnd([]byte{30}), IsFalse)
}

func (s *testRegionCacheSuite) TestSwitchPeerWhenNoLeader(c *C) {
var prevCtx *RPCContext
for i := 0; i <= len(s.cluster.GetAllStores()); i++ {
loc, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
ctx, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0)
c.Assert(err, IsNil)
if prevCtx == nil {
c.Assert(i, Equals, 0)
} else {
c.Assert(ctx.AccessIdx, Not(Equals), prevCtx.AccessIdx)
c.Assert(ctx.Peer, Not(DeepEquals), prevCtx.Peer)
}
s.cache.InvalidateCachedRegionWithReason(loc.Region, NoLeader)
c.Assert(s.cache.getCachedRegionWithRLock(loc.Region).invalidReason, Equals, NoLeader)
prevCtx = ctx
}
}

func BenchmarkOnRequestFail(b *testing.B) {
/*
This benchmark simulate many concurrent requests call OnSendRequestFail method
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
// the Raft group is in an election, but it's possible that the peer is
// isolated and removed from the Raft group. So it's necessary to reload
// the region from PD.
s.regionCache.InvalidateCachedRegion(ctx.Region)
s.regionCache.InvalidateCachedRegionWithReason(ctx.Region, NoLeader)
if err = bo.Backoff(BoRegionMiss, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil {
return false, errors.Trace(err)
}
Expand Down
28 changes: 28 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,34 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnMaxTimestampNotSyncedError(c
}()
}

func (s *testRegionRequestToThreeStoresSuite) TestSwitchPeerWhenNoLeader(c *C) {
var leaderAddr string
s.regionRequestSender.client = &fnClient{func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if leaderAddr == "" {
leaderAddr = addr
}
// Returns OK when switches to a different peer.
if leaderAddr != addr {
return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{
RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}},
}}, nil
}}

req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
})

bo := NewBackofferWithVars(context.Background(), 5, nil)
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
c.Assert(err, IsNil)
resp, err := s.regionRequestSender.SendReq(bo, req, loc.Region, time.Second)
c.Assert(err, IsNil)
c.Assert(resp, NotNil)
}

func (s *testRegionRequestToThreeStoresSuite) loadAndGetLeaderStore(c *C) (*Store, string) {
region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, []byte("a"), false)
c.Assert(err, IsNil)
Expand Down

0 comments on commit 2df2ca0

Please sign in to comment.