Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reload region cache when store is resolved from invalid status (#843) #846

Merged
merged 12 commits into from
Jun 28, 2023
2 changes: 1 addition & 1 deletion error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ type ErrAssertionFailed struct {
*kvrpcpb.AssertionFailed
}

// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues`` is not.
// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not.
type ErrLockOnlyIfExistsNoReturnValue struct {
StartTS uint64
ForUpdateTs uint64
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ type checkRequestClient struct {

func (c *checkRequestClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
resp, err := c.Client.SendRequest(ctx, addr, req, timeout)
if err != nil {
return resp, err
}
if c.priority != req.Priority {
if resp.Resp != nil {
if getResp, ok := resp.Resp.(*kvrpcpb.GetResponse); ok {
Expand Down
81 changes: 74 additions & 7 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type Region struct {
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload int32 // the region need to be reloaded in async mode
}

// AccessIndex represent the index for accessIndex array
Expand Down Expand Up @@ -363,6 +364,8 @@ func (r *Region) isValid() bool {
return r != nil && !r.checkNeedReload() && r.checkRegionCacheTTL(time.Now().Unix())
}

type livenessFunc func(s *Store, bo *retry.Backoffer) livenessState

// RegionCache caches Regions loaded from PD.
// All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing
// purposes only.
Expand Down Expand Up @@ -395,7 +398,13 @@ type RegionCache struct {
testingKnobs struct {
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
// requestLiveness always returns unreachable.
mockRequestLiveness func(s *Store, bo *retry.Backoffer) livenessState
mockRequestLiveness atomic.Value
}

regionsNeedReload struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a channel so Mutex could be saved and operations on the RegionCache are already synchronized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A channel is bounded, if it's full when trying scheduling a region to it, it'll wait(maybe the asyncCheckAndResolveLoop is doing something and cannot pull the channel immediately).

sync.Mutex
regions []uint64
toReload map[uint64]struct{}
}
}

Expand All @@ -419,6 +428,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.regionsNeedReload.toReload = make(map[uint64]struct{})
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
interval := config.GetGlobalConfig().StoresRefreshInterval
go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second)
Expand Down Expand Up @@ -447,7 +457,11 @@ func (c *RegionCache) Close() {
// asyncCheckAndResolveLoop with
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
reloadRegionTicker := time.NewTicker(10 * time.Second)
defer func() {
ticker.Stop()
reloadRegionTicker.Stop()
}()
var needCheckStores []*Store
for {
needCheckStores = needCheckStores[:0]
Expand All @@ -466,6 +480,22 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
// there's a deleted store in the stores map which guaranteed by reReslve().
return state != unresolved && state != tombstone && state != deleted
})

case <-reloadRegionTicker.C:
for regionID := range c.regionsNeedReload.toReload {
c.reloadRegion(regionID)
delete(c.regionsNeedReload.toReload, regionID)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look like those codes should be put after line #498 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delay the reload to next loop which avoids some errors and backoffs, see the comment of Line491 to Line494

c.regionsNeedReload.Lock()
for _, regionID := range c.regionsNeedReload.regions {
// will reload in next tick, wait a while for two reasons:
// 1. there may an unavailable duration while recreating the connection.
// 2. the store may just be started, and wait safe ts synced to avoid the
// possible dataIsNotReady error.
c.regionsNeedReload.toReload[regionID] = struct{}{}
}
c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0]
c.regionsNeedReload.Unlock()
}
}
}
Expand Down Expand Up @@ -1142,6 +1172,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}, nil
}

func (c *RegionCache) scheduleReloadRegion(region *Region) {
if region == nil || !atomic.CompareAndSwapInt32(&region.asyncReload, 0, 1) {
// async reload triggered by other thread.
return
}
regionID := region.GetID()
if regionID > 0 {
c.regionsNeedReload.Lock()
c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID)
c.regionsNeedReload.Unlock()
}
}

func (c *RegionCache) reloadRegion(regionID uint64) {
bo := retry.NewNoopBackoff(context.Background())
lr, err := c.loadRegionByID(bo, regionID)
if err != nil {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
atomic.StoreInt32(&oldRegion.asyncReload, 0)
}
return
}
c.mu.Lock()
c.insertRegionToCache(lr)
c.mu.Unlock()
}

// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
Expand Down Expand Up @@ -1315,8 +1375,11 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
}
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
// If the region info is async reloaded, the old region is still valid.
if atomic.LoadInt32(&oldRegion.asyncReload) == 0 {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
Expand Down Expand Up @@ -2371,8 +2434,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
}

func (s *Store) getResolveState() resolveState {
var state resolveState
if s == nil {
var state resolveState
return state
}
return resolveState(atomic.LoadUint64(&s.state))
Expand Down Expand Up @@ -2544,8 +2607,12 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness
return unknown
}
}
if c != nil && c.testingKnobs.mockRequestLiveness != nil {
return c.testingKnobs.mockRequestLiveness(s, bo)

if c != nil {
lf := c.testingKnobs.mockRequestLiveness.Load()
if lf != nil {
return (*lf.(*livenessFunc))(s, bo)
}
}

if storeLivenessTimeout == 0 {
Expand Down
12 changes: 11 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,13 +551,23 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
state.lastIdx++
}

reloadRegion := false
for i := 0; i < len(selector.replicas) && !state.option.leaderOnly; i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if state.isCandidate(idx, selector.replicas[idx]) {
selectReplica := selector.replicas[idx]
if state.isCandidate(idx, selectReplica) {
state.lastIdx = idx
selector.targetIdx = idx
break
}
if selectReplica.isEpochStale() &&
selectReplica.store.getResolveState() == resolved &&
selectReplica.store.getLivenessState() == reachable {
reloadRegion = true
}
}
if reloadRegion {
selector.regionCache.scheduleReloadRegion(selector.region)
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
Expand Down
32 changes: 20 additions & 12 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
return innerClient.SendRequest(ctx, addr, req, timeout)
}}
var storeState = uint32(unreachable)
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
tf := func(s *Store, bo *retry.Backoffer) livenessState {
if s.addr == leaderAddr {
return livenessState(atomic.LoadUint32(&storeState))
}
return reachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))

loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("k"))
s.Nil(err)
Expand Down Expand Up @@ -298,7 +299,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
region = &Region{
meta: region.GetMeta(),
}
region.lastAccess = time.Now().Unix()
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
region.meta.Peers = append(region.meta.Peers, peer)
atomic.StorePointer(&region.store, unsafe.Pointer(regionStore))

Expand Down Expand Up @@ -373,13 +374,14 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.False(replicaSelector.region.isValid())

// Test switching to tryFollower if leader is unreachable
region.lastAccess = time.Now().Unix()
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
tf := func(s *Store, bo *retry.Backoffer) livenessState {
return unreachable
}
cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
s.IsType(&accessKnownLeader{}, replicaSelector.state)
_, err = replicaSelector.next(s.bo)
s.Nil(err)
Expand Down Expand Up @@ -415,9 +417,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Do not try to use proxy if livenessState is unknown instead of unreachable.
refreshEpochs(regionStore)
cache.enableForwarding = true
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
tf = func(s *Store, bo *retry.Backoffer) livenessState {
return unknown
}
cache.testingKnobs.mockRequestLiveness.Store(
(*livenessFunc)(&tf))
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
Expand All @@ -439,9 +443,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
tf = func(s *Store, bo *retry.Backoffer) livenessState {
return unreachable
}
cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
s.Eventually(func() bool {
return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable
}, 3*time.Second, 200*time.Millisecond)
Expand Down Expand Up @@ -558,7 +563,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)

// Test accessFollower state filtering label-not-match stores.
region.lastAccess = time.Now().Unix()
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
refreshEpochs(regionStore)
labels := []*metapb.StoreLabel{
{
Expand All @@ -580,7 +585,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
}

// Test accessFollower state with leaderOnly option
region.lastAccess = time.Now().Unix()
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
refreshEpochs(regionStore)
for i := 0; i < 5; i++ {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req, WithLeaderOnly())
Expand All @@ -593,15 +598,15 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
}

// Test accessFollower state with kv.ReplicaReadMixed request type.
region.lastAccess = time.Now().Unix()
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
refreshEpochs(regionStore)
req.ReplicaReadType = kv.ReplicaReadMixed
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)

// Invalidate the region if the leader is not in the region.
region.lastAccess = time.Now().Unix()
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
replicaSelector.updateLeader(&metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()})
s.False(region.isValid())
// Don't try next replica if the region is invalidated.
Expand Down Expand Up @@ -695,9 +700,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])

// The leader store is alive but can't provide service.
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {

tf := func(s *Store, bo *retry.Backoffer) livenessState {
return reachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
s.Eventually(func() bool {
stores := s.regionRequestSender.replicaSelector.regionStore.stores
return stores[0].getLivenessState() == reachable &&
Expand Down Expand Up @@ -823,9 +830,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
}

// Runs out of all replicas and then returns a send error.
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
tf = func(s *Store, bo *retry.Backoffer) livenessState {
return unreachable
}
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
reloadRegion()
for _, store := range s.storeIDs {
s.cluster.StopStore(store)
Expand Down