From 16e99ba78aaa4ca9424f9259554366d68fcd5b80 Mon Sep 17 00:00:00 2001 From: lysu Date: Wed, 24 Jul 2019 13:18:28 +0800 Subject: [PATCH] tikv: invalidate store's regions when send store fail (#11344) # Conflicts: # store/tikv/region_cache.go --- store/tikv/region_cache.go | 43 +++++++++++++++++++++++++------ store/tikv/region_cache_test.go | 28 +++++++++++++++++++- store/tikv/region_request_test.go | 10 +++++-- 3 files changed, 70 insertions(+), 11 deletions(-) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 971c72c10becd..3a69d67c47484 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -67,13 +67,19 @@ type Region struct { type RegionStore struct { workStoreIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) stores []*Store // stores in this region + storeFails []uint32 // snapshots of store's fail, need reload when `storeFails[curr] != stores[cur].fail` } // clone clones region store struct. func (r *RegionStore) clone() *RegionStore { + storeFails := make([]uint32, len(r.stores)) + for i, e := range r.storeFails { + storeFails[i] = e + } return &RegionStore{ workStoreIdx: r.workStoreIdx, stores: r.stores, + storeFails: storeFails, } } @@ -84,6 +90,7 @@ func (r *Region) init(c *RegionCache) { rs := &RegionStore{ workStoreIdx: 0, stores: make([]*Store, 0, len(r.meta.Peers)), + storeFails: make([]uint32, 0, len(r.meta.Peers)), } for _, p := range r.meta.Peers { c.storeMu.RLock() @@ -93,6 +100,7 @@ func (r *Region) init(c *RegionCache) { store = c.getStoreByStoreID(p.StoreId) } rs.stores = append(rs.stores, store) + rs.storeFails = append(rs.storeFails, atomic.LoadUint32(&store.fail)) } atomic.StorePointer(&r.store, unsafe.Pointer(rs)) @@ -270,6 +278,15 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, return nil, nil } + storeFailEpoch := atomic.LoadUint32(&store.fail) + if storeFailEpoch != regionStore.storeFails[regionStore.workStoreIdx] { + cachedRegion.invalidate() + logutil.Logger(context.Background()).Info("invalidate current region, because others failed on same store", + zap.Uint64("region", id.GetID()), + zap.String("store", store.addr)) + return nil, nil + } + return &RPCContext{ Region: id, Meta: cachedRegion.meta, @@ -366,7 +383,7 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload tikvRegionCacheCounterWithSendFail.Inc() r := c.getCachedRegionWithRLock(ctx.Region) if r != nil { - c.switchNextPeer(r, ctx.PeerIdx) + c.switchNextPeer(r, ctx.PeerIdx, err) if scheduleReload { r.scheduleReload() } @@ -499,7 +516,7 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c } if leaderStoreID == 0 { - c.switchNextPeer(r, currentPeerIdx) + c.switchNextPeer(r, currentPeerIdx, nil) logutil.Logger(context.Background()).Info("switch region peer to next due to NotLeader with NULL leader", zap.Int("currIdx", currentPeerIdx), zap.Uint64("regionID", regionID.GetID())) @@ -857,15 +874,24 @@ func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool) return } -func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int) { - regionStore := r.getStore() - if int(regionStore.workStoreIdx) != currentPeerIdx { +func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) { + rs := r.getStore() + if int(rs.workStoreIdx) != currentPeerIdx { return } - nextIdx := (currentPeerIdx + 1) % len(regionStore.stores) - newRegionStore := regionStore.clone() + + if err != nil { // TODO: refine err, only do this for some errors. + s := rs.stores[rs.workStoreIdx] + epoch := rs.storeFails[rs.workStoreIdx] + if atomic.CompareAndSwapUint32(&s.fail, epoch, epoch+1) { + logutil.Logger(context.Background()).Info("mark store's regions need be refill", zap.String("store", s.addr)) + } + } + + nextIdx := (currentPeerIdx + 1) % len(rs.stores) + newRegionStore := rs.clone() newRegionStore.workStoreIdx = int32(nextIdx) - r.compareAndSwapStore(regionStore, newRegionStore) + r.compareAndSwapStore(rs, newRegionStore) } func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bool) { @@ -918,6 +944,7 @@ type Store struct { storeID uint64 // store's id state uint64 // unsafe store storeState resolveMutex sync.Mutex // protect pd from concurrent init requests + fail uint32 // store fail count, see RegionStore.storeFails } type resolveState uint64 diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index e04eb891b8ac9..0431cf034d896 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -15,6 +15,7 @@ package tikv import ( "context" + "errors" "fmt" "testing" "time" @@ -289,6 +290,31 @@ func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { c.Assert(ctx.Peer.Id, Equals, s.peer1) } +func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore(c *C) { + // key range: ['' - 'm' - 'z'] + region2 := s.cluster.AllocID() + newPeers := s.cluster.AllocIDs(2) + s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0]) + + // Check the two regions. + loc1, err := s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + c.Assert(loc1.Region.id, Equals, s.region1) + loc2, err := s.cache.LocateKey(s.bo, []byte("x")) + c.Assert(err, IsNil) + c.Assert(loc2.Region.id, Equals, region2) + + // Send fail on region1 + ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region) + s.checkCache(c, 2) + s.cache.OnSendFail(s.bo, ctx, false, errors.New("test error")) + + // Get region2 cache will get nil then reload. + ctx2, err := s.cache.GetRPCContext(s.bo, loc2.Region) + c.Assert(ctx2, IsNil) + c.Assert(err, IsNil) +} + func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) { // 3 nodes and no.1 is leader. store3 := s.cluster.AllocID() @@ -543,7 +569,7 @@ func BenchmarkOnRequestFail(b *testing.B) { } r := cache.getCachedRegionWithRLock(rpcCtx.Region) if r == nil { - cache.switchNextPeer(r, rpcCtx.PeerIdx) + cache.switchNextPeer(r, rpcCtx.PeerIdx, nil) } } }) diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 52cc1636bef7e..c9de5927b79ee 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -118,7 +118,10 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOn // send to failed store resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second) - c.Assert(err, NotNil) + c.Assert(err, IsNil) + regionErr, err := resp.GetRegionError() + c.Assert(err, IsNil) + c.Assert(regionErr, NotNil) // retry to send store by old region info region, err = s.cache.LocateRegionByID(s.bo, s.region) @@ -127,7 +130,10 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOn // retry again, reload region info and send to new store. resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second) - c.Assert(err, NotNil) + c.Assert(err, IsNil) + regionErr, err = resp.GetRegionError() + c.Assert(err, IsNil) + c.Assert(regionErr, NotNil) } func (s *testRegionRequestSuite) TestSendReqCtx(c *C) {