Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: always switch to a different peer when meets no-leader (#22449) #23595

Merged
merged 1 commit into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (s *testPessimisticSuite) newAsyncCommitTestKitWithInit(c *C) *testkit.Test
return tk
}

func (s *testPessimisticSuite) new1PCTestKitWithInit(c *C) *testkit.TestKit {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.Se.GetSessionVars().Enable1PC = true
return tk
}

type testPessimisticSuite struct {
testSessionSuiteBase
}
Expand Down Expand Up @@ -2120,9 +2126,9 @@ func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) {
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})

tk := s.newAsyncCommitTestKitWithInit(c)
tk2 := s.newAsyncCommitTestKitWithInit(c)
tk3 := s.newAsyncCommitTestKitWithInit(c)
tk := s.new1PCTestKitWithInit(c)
tk2 := s.new1PCTestKitWithInit(c)
tk3 := s.new1PCTestKitWithInit(c)

tk.MustExec("drop table if exists tk")
tk.MustExec("create table tk (c1 int primary key, c2 int)")
Expand Down Expand Up @@ -2158,7 +2164,7 @@ func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) {
time.Sleep(200 * time.Millisecond)
tk2.MustExec("alter table tk add index k2(c2)")
}()
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1000)"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1200)"), IsNil)
tk.MustExec("commit")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil)
tk3.MustExec("admin check table tk")
Expand Down
75 changes: 58 additions & 17 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,38 @@ var RegionCacheTTLSec int64 = 600

const (
updated int32 = iota // region is updated and no need to reload.
needSync // need sync new region info.
needSync // need sync new region info.
)

// InvalidReason is the reason why a cached region is invalidated.
// The region cache may take different strategies to handle different reasons.
// For example, when a cached region is invalidated due to no leader, region cache
// will always access to a different peer.
type InvalidReason int32

const (
// Ok indicates the cached region is valid
Ok InvalidReason = iota
// NoLeader indicates it's invalidated due to no leader
NoLeader
// RegionNotFound indicates it's invalidated due to region not found in the store
RegionNotFound
// EpochNotMatch indicates it's invalidated due to epoch not match
EpochNotMatch
// StoreNotFound indicates it's invalidated due to store not found in PD
StoreNotFound
// Other indicates it's invalidated due to other reasons, e.g., the store
// is removed from the cluster, fail to send requests to the store.
Other
)

// Region presents kv region
type Region struct {
meta *metapb.Region // raw region meta from PD immutable after init
store unsafe.Pointer // point to region store info, see RegionStore
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
meta *metapb.Region // raw region meta from PD immutable after init
store unsafe.Pointer // point to region store info, see RegionStore
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
}

// AccessIndex represent the index for accessIndex array
Expand Down Expand Up @@ -202,7 +225,7 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *RegionStore) bool {
func (r *Region) checkRegionCacheTTL(ts int64) bool {
// Only consider use percentage on this failpoint, for example, "2%return"
failpoint.Inject("invalidateRegionCache", func() {
r.invalidate()
r.invalidate(Other)
})
for {
lastAccess := atomic.LoadInt64(&r.lastAccess)
Expand All @@ -216,8 +239,9 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool {
}

// invalidate invalidates a region, next time it will got null result.
func (r *Region) invalidate() {
func (r *Region) invalidate(reason InvalidReason) {
metrics.RegionCacheCounterWithInvalidateRegionFromCacheOK.Inc()
atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason))
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
}

Expand Down Expand Up @@ -455,13 +479,13 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe
})
if store == nil || len(addr) == 0 {
// Store not found, region must be out of date.
cachedRegion.invalidate()
cachedRegion.invalidate(StoreNotFound)
return nil, nil
}

storeFailEpoch := atomic.LoadUint32(&store.epoch)
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
cachedRegion.invalidate()
cachedRegion.invalidate(Other)
logutil.BgLogger().Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
Expand Down Expand Up @@ -528,7 +552,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC
return nil, err
}
if len(addr) == 0 {
cachedRegion.invalidate()
cachedRegion.invalidate(StoreNotFound)
return nil, nil
}
if store.getResolveState() == needCheck {
Expand All @@ -539,7 +563,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC
peer := cachedRegion.meta.Peers[storeIdx]
storeFailEpoch := atomic.LoadUint32(&store.epoch)
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
cachedRegion.invalidate()
cachedRegion.invalidate(Other)
logutil.BgLogger().Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
Expand All @@ -558,7 +582,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC
}, nil
}

cachedRegion.invalidate()
cachedRegion.invalidate(Other)
return nil, nil
}

Expand Down Expand Up @@ -900,11 +924,16 @@ func (c *RegionCache) BatchLoadRegionsFromKey(bo *Backoffer, startKey []byte, co

// InvalidateCachedRegion removes a cached Region.
func (c *RegionCache) InvalidateCachedRegion(id RegionVerID) {
c.InvalidateCachedRegionWithReason(id, Other)
}

// InvalidateCachedRegionWithReason removes a cached Region with the reason why it's invalidated.
func (c *RegionCache) InvalidateCachedRegionWithReason(id RegionVerID, reason InvalidReason) {
cachedRegion := c.getCachedRegionWithRLock(id)
if cachedRegion == nil {
return
}
cachedRegion.invalidate()
cachedRegion.invalidate(reason)
}

// UpdateLeader update some region cache with newer leader info.
Expand All @@ -931,7 +960,7 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c
zap.Uint64("regionID", regionID.GetID()),
zap.Int("currIdx", int(currentPeerIdx)),
zap.Uint64("leaderStoreID", leaderStoreID))
r.invalidate()
r.invalidate(StoreNotFound)
} else {
logutil.BgLogger().Info("switch region leader to specific leader due to kv return NotLeader",
zap.Uint64("regionID", regionID.GetID()),
Expand All @@ -941,13 +970,25 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c
}

// insertRegionToCache tries to insert the Region to cache.
// It should be protected by c.mu.Lock().
func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
old := c.mu.sorted.ReplaceOrInsert(newBtreeItem(cachedRegion))
if old != nil {
store := cachedRegion.getStore()
oldRegion := old.(*btreeItem).cachedRegion
oldRegionStore := oldRegion.getStore()
// Joint consensus is enabled in v5.0, which is possible to make a leader step down as a learner during a conf change.
// And if hibernate region is enabled, after the leader step down, there can be a long time that there is no leader
// in the region and the leader info in PD is stale until requests are sent to followers or hibernate timeout.
// To solve it, one solution is always to try a different peer if the invalid reason of the old cached region is no-leader.
// There is a small probability that the current peer who reports no-leader becomes a leader and TiDB has to retry once in this case.
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(TiKVOnly))
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
atomic.StoreInt32(&cachedRegion.getStore().workTiFlashIdx, atomic.LoadInt32(&old.(*btreeItem).cachedRegion.getStore().workTiFlashIdx))
delete(c.mu.regions, old.(*btreeItem).cachedRegion.VerID())
store.workTiFlashIdx = atomic.LoadInt32(&oldRegionStore.workTiFlashIdx)
delete(c.mu.regions, oldRegion.VerID())
}
c.mu.regions[cachedRegion.VerID()] = cachedRegion
}
Expand Down Expand Up @@ -1363,7 +1404,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr
if needInvalidateOld {
cachedRegion, ok := c.mu.regions[ctx.Region]
if ok {
cachedRegion.invalidate()
cachedRegion.invalidate(EpochNotMatch)
}
}
return nil
Expand Down
19 changes: 19 additions & 0 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,25 @@ func (s *testRegionCacheSuite) TestContainsByEnd(c *C) {
c.Assert(createSampleRegion([]byte{10}, []byte{20}).ContainsByEnd([]byte{30}), IsFalse)
}

func (s *testRegionCacheSuite) TestSwitchPeerWhenNoLeader(c *C) {
var prevCtx *RPCContext
for i := 0; i <= len(s.cluster.GetAllStores()); i++ {
loc, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
ctx, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0)
c.Assert(err, IsNil)
if prevCtx == nil {
c.Assert(i, Equals, 0)
} else {
c.Assert(ctx.AccessIdx, Not(Equals), prevCtx.AccessIdx)
c.Assert(ctx.Peer, Not(DeepEquals), prevCtx.Peer)
}
s.cache.InvalidateCachedRegionWithReason(loc.Region, NoLeader)
c.Assert(s.cache.getCachedRegionWithRLock(loc.Region).invalidReason, Equals, NoLeader)
prevCtx = ctx
}
}

func BenchmarkOnRequestFail(b *testing.B) {
/*
This benchmark simulate many concurrent requests call OnSendRequestFail method
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
// the Raft group is in an election, but it's possible that the peer is
// isolated and removed from the Raft group. So it's necessary to reload
// the region from PD.
s.regionCache.InvalidateCachedRegion(ctx.Region)
s.regionCache.InvalidateCachedRegionWithReason(ctx.Region, NoLeader)
if err = bo.Backoff(BoRegionMiss, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil {
return false, errors.Trace(err)
}
Expand Down
28 changes: 28 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,34 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnMaxTimestampNotSyncedError(c
}()
}

func (s *testRegionRequestToThreeStoresSuite) TestSwitchPeerWhenNoLeader(c *C) {
var leaderAddr string
s.regionRequestSender.client = &fnClient{func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if leaderAddr == "" {
leaderAddr = addr
}
// Returns OK when switches to a different peer.
if leaderAddr != addr {
return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{
RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}},
}}, nil
}}

req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
})

bo := NewBackofferWithVars(context.Background(), 5, nil)
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
c.Assert(err, IsNil)
resp, err := s.regionRequestSender.SendReq(bo, req, loc.Region, time.Second)
c.Assert(err, IsNil)
c.Assert(resp, NotNil)
}

func (s *testRegionRequestToThreeStoresSuite) loadAndGetLeaderStore(c *C) (*Store, string) {
region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, []byte("a"), false)
c.Assert(err, IsNil)
Expand Down