diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 90146de91594d..3341484d1cccb 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -47,6 +47,12 @@ func (s *testPessimisticSuite) newAsyncCommitTestKitWithInit(c *C) *testkit.Test return tk } +func (s *testPessimisticSuite) new1PCTestKitWithInit(c *C) *testkit.TestKit { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.Se.GetSessionVars().Enable1PC = true + return tk +} + type testPessimisticSuite struct { testSessionSuiteBase } @@ -2122,9 +2128,9 @@ func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) { conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 }) - tk := s.newAsyncCommitTestKitWithInit(c) - tk2 := s.newAsyncCommitTestKitWithInit(c) - tk3 := s.newAsyncCommitTestKitWithInit(c) + tk := s.new1PCTestKitWithInit(c) + tk2 := s.new1PCTestKitWithInit(c) + tk3 := s.new1PCTestKitWithInit(c) tk.MustExec("drop table if exists tk") tk.MustExec("create table tk (c1 int primary key, c2 int)") @@ -2160,7 +2166,7 @@ func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) { time.Sleep(200 * time.Millisecond) tk2.MustExec("alter table tk add index k2(c2)") }() - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1000)"), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1200)"), IsNil) tk.MustExec("commit") c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil) tk3.MustExec("admin check table tk") diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 4429e35cd102a..bd513f8a71e5c 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -58,15 +58,38 @@ var RegionCacheTTLSec int64 = 600 const ( updated int32 = iota // region is updated and no need to reload. - needSync // need sync new region info. + needSync // need sync new region info. +) + +// InvalidReason is the reason why a cached region is invalidated. +// The region cache may take different strategies to handle different reasons. +// For example, when a cached region is invalidated due to no leader, region cache +// will always access to a different peer. +type InvalidReason int32 + +const ( + // Ok indicates the cached region is valid + Ok InvalidReason = iota + // NoLeader indicates it's invalidated due to no leader + NoLeader + // RegionNotFound indicates it's invalidated due to region not found in the store + RegionNotFound + // EpochNotMatch indicates it's invalidated due to epoch not match + EpochNotMatch + // StoreNotFound indicates it's invalidated due to store not found in PD + StoreNotFound + // Other indicates it's invalidated due to other reasons, e.g., the store + // is removed from the cluster, fail to send requests to the store. + Other ) // Region presents kv region type Region struct { - meta *metapb.Region // raw region meta from PD immutable after init - store unsafe.Pointer // point to region store info, see RegionStore - syncFlag int32 // region need be sync in next turn - lastAccess int64 // last region access time, see checkRegionCacheTTL + meta *metapb.Region // raw region meta from PD immutable after init + store unsafe.Pointer // point to region store info, see RegionStore + syncFlag int32 // region need be sync in next turn + lastAccess int64 // last region access time, see checkRegionCacheTTL + invalidReason InvalidReason // the reason why the region is invalidated } // AccessIndex represent the index for accessIndex array @@ -203,7 +226,7 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *RegionStore) bool { func (r *Region) checkRegionCacheTTL(ts int64) bool { // Only consider use percentage on this failpoint, for example, "2%return" failpoint.Inject("invalidateRegionCache", func() { - r.invalidate() + r.invalidate(Other) }) for { lastAccess := atomic.LoadInt64(&r.lastAccess) @@ -217,8 +240,9 @@ func (r *Region) checkRegionCacheTTL(ts int64) bool { } // invalidate invalidates a region, next time it will got null result. -func (r *Region) invalidate() { +func (r *Region) invalidate(reason InvalidReason) { metrics.RegionCacheCounterWithInvalidateRegionFromCacheOK.Inc() + atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason)) atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime) } @@ -456,13 +480,13 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe }) if store == nil || len(addr) == 0 { // Store not found, region must be out of date. - cachedRegion.invalidate() + cachedRegion.invalidate(StoreNotFound) return nil, nil } storeFailEpoch := atomic.LoadUint32(&store.epoch) if storeFailEpoch != regionStore.storeEpochs[storeIdx] { - cachedRegion.invalidate() + cachedRegion.invalidate(Other) logutil.BgLogger().Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), zap.String("store", store.addr)) @@ -529,7 +553,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC return nil, err } if len(addr) == 0 { - cachedRegion.invalidate() + cachedRegion.invalidate(StoreNotFound) return nil, nil } if store.getResolveState() == needCheck { @@ -540,7 +564,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC peer := cachedRegion.meta.Peers[storeIdx] storeFailEpoch := atomic.LoadUint32(&store.epoch) if storeFailEpoch != regionStore.storeEpochs[storeIdx] { - cachedRegion.invalidate() + cachedRegion.invalidate(Other) logutil.BgLogger().Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), zap.String("store", store.addr)) @@ -559,7 +583,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC }, nil } - cachedRegion.invalidate() + cachedRegion.invalidate(Other) return nil, nil } @@ -901,11 +925,16 @@ func (c *RegionCache) BatchLoadRegionsFromKey(bo *Backoffer, startKey []byte, co // InvalidateCachedRegion removes a cached Region. func (c *RegionCache) InvalidateCachedRegion(id RegionVerID) { + c.InvalidateCachedRegionWithReason(id, Other) +} + +// InvalidateCachedRegionWithReason removes a cached Region with the reason why it's invalidated. +func (c *RegionCache) InvalidateCachedRegionWithReason(id RegionVerID, reason InvalidReason) { cachedRegion := c.getCachedRegionWithRLock(id) if cachedRegion == nil { return } - cachedRegion.invalidate() + cachedRegion.invalidate(reason) } // UpdateLeader update some region cache with newer leader info. @@ -932,7 +961,7 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c zap.Uint64("regionID", regionID.GetID()), zap.Int("currIdx", int(currentPeerIdx)), zap.Uint64("leaderStoreID", leaderStoreID)) - r.invalidate() + r.invalidate(StoreNotFound) } else { logutil.BgLogger().Info("switch region leader to specific leader due to kv return NotLeader", zap.Uint64("regionID", regionID.GetID()), @@ -942,13 +971,25 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c } // insertRegionToCache tries to insert the Region to cache. +// It should be protected by c.mu.Lock(). func (c *RegionCache) insertRegionToCache(cachedRegion *Region) { old := c.mu.sorted.ReplaceOrInsert(newBtreeItem(cachedRegion)) if old != nil { + store := cachedRegion.getStore() + oldRegion := old.(*btreeItem).cachedRegion + oldRegionStore := oldRegion.getStore() + // Joint consensus is enabled in v5.0, which is possible to make a leader step down as a learner during a conf change. + // And if hibernate region is enabled, after the leader step down, there can be a long time that there is no leader + // in the region and the leader info in PD is stale until requests are sent to followers or hibernate timeout. + // To solve it, one solution is always to try a different peer if the invalid reason of the old cached region is no-leader. + // There is a small probability that the current peer who reports no-leader becomes a leader and TiDB has to retry once in this case. + if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader { + store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(TiKVOnly)) + } // Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which // is under transferring regions. - atomic.StoreInt32(&cachedRegion.getStore().workTiFlashIdx, atomic.LoadInt32(&old.(*btreeItem).cachedRegion.getStore().workTiFlashIdx)) - delete(c.mu.regions, old.(*btreeItem).cachedRegion.VerID()) + store.workTiFlashIdx = atomic.LoadInt32(&oldRegionStore.workTiFlashIdx) + delete(c.mu.regions, oldRegion.VerID()) } c.mu.regions[cachedRegion.VerID()] = cachedRegion } @@ -1364,7 +1405,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr if needInvalidateOld { cachedRegion, ok := c.mu.regions[ctx.Region] if ok { - cachedRegion.invalidate() + cachedRegion.invalidate(EpochNotMatch) } } return nil diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index c47b154ba0bac..8a09cfb996304 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -1392,6 +1392,25 @@ func (s *testRegionCacheSuite) TestContainsByEnd(c *C) { c.Assert(createSampleRegion([]byte{10}, []byte{20}).ContainsByEnd([]byte{30}), IsFalse) } +func (s *testRegionCacheSuite) TestSwitchPeerWhenNoLeader(c *C) { + var prevCtx *RPCContext + for i := 0; i <= len(s.cluster.GetAllStores()); i++ { + loc, err := s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + ctx, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + if prevCtx == nil { + c.Assert(i, Equals, 0) + } else { + c.Assert(ctx.AccessIdx, Not(Equals), prevCtx.AccessIdx) + c.Assert(ctx.Peer, Not(DeepEquals), prevCtx.Peer) + } + s.cache.InvalidateCachedRegionWithReason(loc.Region, NoLeader) + c.Assert(s.cache.getCachedRegionWithRLock(loc.Region).invalidReason, Equals, NoLeader) + prevCtx = ctx + } +} + func BenchmarkOnRequestFail(b *testing.B) { /* This benchmark simulate many concurrent requests call OnSendRequestFail method diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 59db371394170..09ac9c1e7635e 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -655,7 +655,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed // the Raft group is in an election, but it's possible that the peer is // isolated and removed from the Raft group. So it's necessary to reload // the region from PD. - s.regionCache.InvalidateCachedRegion(ctx.Region) + s.regionCache.InvalidateCachedRegionWithReason(ctx.Region, NoLeader) if err = bo.Backoff(BoRegionMiss, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil { return false, errors.Trace(err) } diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 7fcfa4d21a855..2b488861077f5 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -572,6 +572,34 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnMaxTimestampNotSyncedError(c }() } +func (s *testRegionRequestToThreeStoresSuite) TestSwitchPeerWhenNoLeader(c *C) { + var leaderAddr string + s.regionRequestSender.client = &fnClient{func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if leaderAddr == "" { + leaderAddr = addr + } + // Returns OK when switches to a different peer. + if leaderAddr != addr { + return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.RawPutResponse{ + RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}, + }}, nil + }} + + req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ + Key: []byte("key"), + Value: []byte("value"), + }) + + bo := NewBackofferWithVars(context.Background(), 5, nil) + loc, err := s.cache.LocateKey(s.bo, []byte("key")) + c.Assert(err, IsNil) + resp, err := s.regionRequestSender.SendReq(bo, req, loc.Region, time.Second) + c.Assert(err, IsNil) + c.Assert(resp, NotNil) +} + func (s *testRegionRequestToThreeStoresSuite) loadAndGetLeaderStore(c *C) (*Store, string) { region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, []byte("a"), false) c.Assert(err, IsNil)