Skip to content

Commit

Permalink
tikv: invalidate store's regions when send store fail (#11344) (#11498)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and jackysp committed Jul 30, 2019
1 parent 40ef5e1 commit aeeeb15
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 11 deletions.
43 changes: 35 additions & 8 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@ type Region struct {
type RegionStore struct {
workStoreIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx)
stores []*Store // stores in this region
storeFails []uint32 // snapshots of store's fail, need reload when `storeFails[curr] != stores[cur].fail`
}

// clone clones region store struct.
func (r *RegionStore) clone() *RegionStore {
storeFails := make([]uint32, len(r.stores))
for i, e := range r.storeFails {
storeFails[i] = e
}
return &RegionStore{
workStoreIdx: r.workStoreIdx,
stores: r.stores,
storeFails: storeFails,
}
}

Expand All @@ -84,6 +90,7 @@ func (r *Region) init(c *RegionCache) {
rs := &RegionStore{
workStoreIdx: 0,
stores: make([]*Store, 0, len(r.meta.Peers)),
storeFails: make([]uint32, 0, len(r.meta.Peers)),
}
for _, p := range r.meta.Peers {
c.storeMu.RLock()
Expand All @@ -93,6 +100,7 @@ func (r *Region) init(c *RegionCache) {
store = c.getStoreByStoreID(p.StoreId)
}
rs.stores = append(rs.stores, store)
rs.storeFails = append(rs.storeFails, atomic.LoadUint32(&store.fail))
}
atomic.StorePointer(&r.store, unsafe.Pointer(rs))

Expand Down Expand Up @@ -270,6 +278,15 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext,
return nil, nil
}

storeFailEpoch := atomic.LoadUint32(&store.fail)
if storeFailEpoch != regionStore.storeFails[regionStore.workStoreIdx] {
cachedRegion.invalidate()
logutil.Logger(context.Background()).Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
return nil, nil
}

return &RPCContext{
Region: id,
Meta: cachedRegion.meta,
Expand Down Expand Up @@ -366,7 +383,7 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload
tikvRegionCacheCounterWithSendFail.Inc()
r := c.getCachedRegionWithRLock(ctx.Region)
if r != nil {
c.switchNextPeer(r, ctx.PeerIdx)
c.switchNextPeer(r, ctx.PeerIdx, err)
if scheduleReload {
r.scheduleReload()
}
Expand Down Expand Up @@ -499,7 +516,7 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c
}

if leaderStoreID == 0 {
c.switchNextPeer(r, currentPeerIdx)
c.switchNextPeer(r, currentPeerIdx, nil)
logutil.Logger(context.Background()).Info("switch region peer to next due to NotLeader with NULL leader",
zap.Int("currIdx", currentPeerIdx),
zap.Uint64("regionID", regionID.GetID()))
Expand Down Expand Up @@ -857,15 +874,24 @@ func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool)
return
}

func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int) {
regionStore := r.getStore()
if int(regionStore.workStoreIdx) != currentPeerIdx {
func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) {
rs := r.getStore()
if int(rs.workStoreIdx) != currentPeerIdx {
return
}
nextIdx := (currentPeerIdx + 1) % len(regionStore.stores)
newRegionStore := regionStore.clone()

if err != nil { // TODO: refine err, only do this for some errors.
s := rs.stores[rs.workStoreIdx]
epoch := rs.storeFails[rs.workStoreIdx]
if atomic.CompareAndSwapUint32(&s.fail, epoch, epoch+1) {
logutil.Logger(context.Background()).Info("mark store's regions need be refill", zap.String("store", s.addr))
}
}

nextIdx := (currentPeerIdx + 1) % len(rs.stores)
newRegionStore := rs.clone()
newRegionStore.workStoreIdx = int32(nextIdx)
r.compareAndSwapStore(regionStore, newRegionStore)
r.compareAndSwapStore(rs, newRegionStore)
}

func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bool) {
Expand Down Expand Up @@ -918,6 +944,7 @@ type Store struct {
storeID uint64 // store's id
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
fail uint32 // store fail count, see RegionStore.storeFails
}

type resolveState uint64
Expand Down
28 changes: 27 additions & 1 deletion store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tikv

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -289,6 +290,31 @@ func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) {
c.Assert(ctx.Peer.Id, Equals, s.peer1)
}

func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore(c *C) {
// key range: ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
newPeers := s.cluster.AllocIDs(2)
s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0])

// Check the two regions.
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
c.Assert(loc1.Region.id, Equals, s.region1)
loc2, err := s.cache.LocateKey(s.bo, []byte("x"))
c.Assert(err, IsNil)
c.Assert(loc2.Region.id, Equals, region2)

// Send fail on region1
ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region)
s.checkCache(c, 2)
s.cache.OnSendFail(s.bo, ctx, false, errors.New("test error"))

// Get region2 cache will get nil then reload.
ctx2, err := s.cache.GetRPCContext(s.bo, loc2.Region)
c.Assert(ctx2, IsNil)
c.Assert(err, IsNil)
}

func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) {
// 3 nodes and no.1 is leader.
store3 := s.cluster.AllocID()
Expand Down Expand Up @@ -543,7 +569,7 @@ func BenchmarkOnRequestFail(b *testing.B) {
}
r := cache.getCachedRegionWithRLock(rpcCtx.Region)
if r == nil {
cache.switchNextPeer(r, rpcCtx.PeerIdx)
cache.switchNextPeer(r, rpcCtx.PeerIdx, nil)
}
}
})
Expand Down
10 changes: 8 additions & 2 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOn

// send to failed store
resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second)
c.Assert(err, NotNil)
c.Assert(err, IsNil)
regionErr, err := resp.GetRegionError()
c.Assert(err, IsNil)
c.Assert(regionErr, NotNil)

// retry to send store by old region info
region, err = s.cache.LocateRegionByID(s.bo, s.region)
Expand All @@ -127,7 +130,10 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOn

// retry again, reload region info and send to new store.
resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second)
c.Assert(err, NotNil)
c.Assert(err, IsNil)
regionErr, err = resp.GetRegionError()
c.Assert(err, IsNil)
c.Assert(regionErr, NotNil)
}

func (s *testRegionRequestSuite) TestSendReqCtx(c *C) {
Expand Down

0 comments on commit aeeeb15

Please sign in to comment.