-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
reload region cache when store is resolved from invalid status (#843) #846
Changes from 9 commits
46bf1cc
506e941
35eea84
6892a82
4fb70c7
4836c10
31ba336
1fabb91
3b066b5
5587278
98e21b3
8f9d6ea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -126,6 +126,7 @@ type Region struct { | |
syncFlag int32 // region need be sync in next turn | ||
lastAccess int64 // last region access time, see checkRegionCacheTTL | ||
invalidReason InvalidReason // the reason why the region is invalidated | ||
asyncReload int32 // the region need to be reloaded in async mode | ||
} | ||
|
||
// AccessIndex represent the index for accessIndex array | ||
|
@@ -363,6 +364,8 @@ func (r *Region) isValid() bool { | |
return r != nil && !r.checkNeedReload() && r.checkRegionCacheTTL(time.Now().Unix()) | ||
} | ||
|
||
type livenessFunc func(s *Store, bo *retry.Backoffer) livenessState | ||
|
||
// RegionCache caches Regions loaded from PD. | ||
// All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing | ||
// purposes only. | ||
|
@@ -395,7 +398,13 @@ type RegionCache struct { | |
testingKnobs struct { | ||
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set, | ||
// requestLiveness always returns unreachable. | ||
mockRequestLiveness func(s *Store, bo *retry.Backoffer) livenessState | ||
mockRequestLiveness atomic.Value | ||
} | ||
|
||
regionsNeedReload struct { | ||
sync.Mutex | ||
regions []uint64 | ||
toReload map[uint64]struct{} | ||
you06 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
|
@@ -419,6 +428,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { | |
c.tiflashComputeStoreMu.needReload = true | ||
c.tiflashComputeStoreMu.stores = make([]*Store, 0) | ||
c.notifyCheckCh = make(chan struct{}, 1) | ||
c.regionsNeedReload.toReload = make(map[uint64]struct{}) | ||
c.ctx, c.cancelFunc = context.WithCancel(context.Background()) | ||
interval := config.GetGlobalConfig().StoresRefreshInterval | ||
go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second) | ||
|
@@ -447,7 +457,11 @@ func (c *RegionCache) Close() { | |
// asyncCheckAndResolveLoop with | ||
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { | ||
ticker := time.NewTicker(interval) | ||
defer ticker.Stop() | ||
reloadRegionTicker := time.NewTicker(10 * time.Second) | ||
defer func() { | ||
ticker.Stop() | ||
reloadRegionTicker.Stop() | ||
}() | ||
var needCheckStores []*Store | ||
for { | ||
needCheckStores = needCheckStores[:0] | ||
|
@@ -466,6 +480,22 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { | |
// there's a deleted store in the stores map which guaranteed by reReslve(). | ||
return state != unresolved && state != tombstone && state != deleted | ||
}) | ||
|
||
case <-reloadRegionTicker.C: | ||
for regionID := range c.regionsNeedReload.toReload { | ||
c.reloadRegion(regionID) | ||
delete(c.regionsNeedReload.toReload, regionID) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Look like those codes should be put after line #498 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delay the reload to next loop which avoids some errors and backoffs, see the comment of Line491 to Line494 |
||
c.regionsNeedReload.Lock() | ||
for _, regionID := range c.regionsNeedReload.regions { | ||
// will reload in next tick, wait a while for two reasons: | ||
// 1. there may an unavailable duration while recreating the connection. | ||
// 2. the store may just be started, and wait safe ts synced to avoid the | ||
// possible dataIsNotReady error. | ||
c.regionsNeedReload.toReload[regionID] = struct{}{} | ||
} | ||
c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0] | ||
c.regionsNeedReload.Unlock() | ||
} | ||
} | ||
} | ||
|
@@ -1142,6 +1172,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K | |
}, nil | ||
} | ||
|
||
func (c *RegionCache) scheduleReloadRegion(region *Region) { | ||
if region == nil || !atomic.CompareAndSwapInt32(®ion.asyncReload, 0, 1) { | ||
// async reload triggered by other thread. | ||
return | ||
} | ||
regionID := region.GetID() | ||
if regionID > 0 { | ||
c.regionsNeedReload.Lock() | ||
c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID) | ||
c.regionsNeedReload.Unlock() | ||
} | ||
} | ||
|
||
func (c *RegionCache) reloadRegion(regionID uint64) { | ||
bo := retry.NewNoopBackoff(context.Background()) | ||
lr, err := c.loadRegionByID(bo, regionID) | ||
if err != nil { | ||
// ignore error and use old region info. | ||
logutil.Logger(bo.GetCtx()).Error("load region failure", | ||
zap.Uint64("regionID", regionID), zap.Error(err)) | ||
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil { | ||
atomic.StoreInt32(&oldRegion.asyncReload, 0) | ||
} | ||
return | ||
} | ||
c.mu.Lock() | ||
c.insertRegionToCache(lr) | ||
c.mu.Unlock() | ||
} | ||
|
||
// GroupKeysByRegion separates keys into groups by their belonging Regions. | ||
// Specially it also returns the first key's region which may be used as the | ||
// 'PrimaryLockKey' and should be committed ahead of others. | ||
|
@@ -1315,8 +1375,11 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region) { | |
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader { | ||
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly)) | ||
} | ||
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information. | ||
oldRegion.invalidate(Other) | ||
// If the region info is async reloaded, the old region is still valid. | ||
if atomic.LoadInt32(&oldRegion.asyncReload) == 0 { | ||
you06 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information. | ||
oldRegion.invalidate(Other) | ||
} | ||
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which | ||
// is under transferring regions. | ||
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load()) | ||
|
@@ -2371,8 +2434,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { | |
} | ||
|
||
func (s *Store) getResolveState() resolveState { | ||
var state resolveState | ||
if s == nil { | ||
var state resolveState | ||
return state | ||
you06 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return resolveState(atomic.LoadUint64(&s.state)) | ||
|
@@ -2544,8 +2607,12 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness | |
return unknown | ||
} | ||
} | ||
if c != nil && c.testingKnobs.mockRequestLiveness != nil { | ||
return c.testingKnobs.mockRequestLiveness(s, bo) | ||
|
||
if c != nil { | ||
lf := c.testingKnobs.mockRequestLiveness.Load() | ||
if lf != nil { | ||
return (*lf.(*livenessFunc))(s, bo) | ||
} | ||
} | ||
|
||
if storeLivenessTimeout == 0 { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using a channel so
Mutex
could be saved and operations on theRegionCache
are already synchronized?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A channel is bounded, if it's full when trying scheduling a region to it, it'll wait(maybe the
asyncCheckAndResolveLoop
is doing something and cannot pull the channel immediately).