Skip to content

Commit

Permalink
ReadMode: introduce AutoFollowerRead mechanism by adding a new `ReadM…
Browse files Browse the repository at this point in the history
…ode == PreferLeader` (#671)

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>
  • Loading branch information
LykxSassinator authored Feb 7, 2023
1 parent 50e86f7 commit bce56a5
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 12 deletions.
87 changes: 83 additions & 4 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ func (r *regionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex {

func (r *regionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp) bool {
_, s := r.accessStore(tiKVOnly, aidx)
// filter label unmatched store
return s.IsLabelsMatch(op.labels)
// filter label unmatched store and slow stores when ReplicaReadMode == PreferLeader
return s.IsLabelsMatch(op.labels) && (!op.preferLeader || (aidx == r.workTiKVIdx && !s.isSlow()))
}

func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Region, error) {
Expand Down Expand Up @@ -430,6 +430,8 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second)
go c.cacheGC()
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
// Default use 15s as the update inerval.
go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second)
return c
}

Expand Down Expand Up @@ -549,8 +551,9 @@ func (c *RPCContext) String() string {
}

type storeSelectorOp struct {
leaderOnly bool
labels []*metapb.StoreLabel
leaderOnly bool
preferLeader bool
labels []*metapb.StoreLabel
}

// StoreSelectorOption configures storeSelectorOp.
Expand All @@ -570,6 +573,13 @@ func WithLeaderOnly() StoreSelectorOption {
}
}

// WithPerferLeader indicates selecting stores with leader as priority until leader unaccessible.
func WithPerferLeader() StoreSelectorOption {
return func(op *storeSelectorOp) {
op.preferLeader = true
}
}

// GetTiKVRPCContext returns RPCContext for a region. If it returns nil, the region
// must be out of date and already dropped from cache.
func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, replicaRead kv.ReplicaReadType, followerStoreSeed uint32, opts ...StoreSelectorOption) (*RPCContext, error) {
Expand Down Expand Up @@ -605,6 +615,9 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep
store, peer, accessIdx, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed, options)
case kv.ReplicaReadMixed:
store, peer, accessIdx, storeIdx = cachedRegion.AnyStorePeer(regionStore, followerStoreSeed, options)
case kv.ReplicaReadPreferLeader:
options.preferLeader = true
store, peer, accessIdx, storeIdx = cachedRegion.AnyStorePeer(regionStore, followerStoreSeed, options)
default:
isLeaderReq = true
store, peer, accessIdx, storeIdx = cachedRegion.WorkStorePeer(regionStore)
Expand Down Expand Up @@ -2230,6 +2243,9 @@ type Store struct {
// this mechanism is currently only applicable for TiKV stores.
livenessState uint32
unreachableSince time.Time

// A statistic for counting the request latency to this store
slowScore SlowScoreStat
}

type resolveState uint64
Expand Down Expand Up @@ -2352,6 +2368,9 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
c.storeMu.Lock()
if s.addr == addr {
newStore.slowScore = s.slowScore
}
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
s.setResolveState(deleted)
Expand Down Expand Up @@ -2629,6 +2648,66 @@ func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) {
return
}

// getSlowScore returns the slow score of store.
func (s *Store) getSlowScore() uint64 {
return s.slowScore.getSlowScore()
}

// isSlow returns whether current Store is slow or not.
func (s *Store) isSlow() bool {
return s.slowScore.isSlow()
}

// updateSlowScore updates the slow score of this store according to the timecost of current request.
func (s *Store) updateSlowScoreStat() {
s.slowScore.updateSlowScore()
}

// recordSlowScoreStat records timecost of each request.
func (s *Store) recordSlowScoreStat(timecost time.Duration) {
s.slowScore.recordSlowScoreStat(timecost)
}

func (s *Store) markAlreadySlow() {
s.slowScore.markAlreadySlow()
}

// asyncUpdateStoreSlowScore updates the slow score of each store periodically.
func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
// update store slowScores
c.checkAndUpdateStoreSlowScores()
}
}
}

func (c *RegionCache) checkAndUpdateStoreSlowScores() {
defer func() {
r := recover()
if r != nil {
logutil.BgLogger().Error("panic in the checkAndUpdateStoreSlowScores goroutine",
zap.Reflect("r", r),
zap.Stack("stack trace"))
}
}()
slowScoreMetrics := make(map[string]float64)
c.storeMu.RLock()
for _, store := range c.storeMu.stores {
store.updateSlowScoreStat()
slowScoreMetrics[store.addr] = float64(store.getSlowScore())
}
c.storeMu.RUnlock()
for store, score := range slowScoreMetrics {
metrics.TiKVStoreSlowScoreGauge.WithLabelValues(store).Set(score)
}
}

func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) {
// Temporarily directly load the config from the global config, however it's not a good idea to let RegionCache to
// access it.
Expand Down
26 changes: 26 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1702,3 +1702,29 @@ func (s *testRegionCacheSuite) TestBackgroundCacheGC() {
}, 3*time.Second, 200*time.Millisecond)
s.checkCache(remaining)
}

func (s *testRegionCacheSuite) TestSlowScoreStat() {
slowScore := SlowScoreStat{
avgScore: 1,
}
s.False(slowScore.isSlow())
slowScore.recordSlowScoreStat(time.Millisecond * 1)
slowScore.updateSlowScore()
s.False(slowScore.isSlow())
for i := 2; i <= 100; i++ {
slowScore.recordSlowScoreStat(time.Millisecond * time.Duration(i))
if i%5 == 0 {
slowScore.updateSlowScore()
s.False(slowScore.isSlow())
}
}
for i := 100; i >= 2; i-- {
slowScore.recordSlowScoreStat(time.Millisecond * time.Duration(i))
if i%5 == 0 {
slowScore.updateSlowScore()
s.False(slowScore.isSlow())
}
}
slowScore.markAlreadySlow()
s.True(slowScore.isSlow())
}
37 changes: 29 additions & 8 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,8 @@ type replicaSelector struct {
// selectorState is the interface of states of the replicaSelector.
// Here is the main state transition diagram:
//
// exceeding maxReplicaAttempt
// +-------------------+ || RPC failure && unreachable && no forwarding
//
// exceeding maxReplicaAttempt
// +-------------------+ || RPC failure && unreachable && no forwarding
// +-------->+ accessKnownLeader +----------------+
// | +------+------------+ |
// | | |
Expand All @@ -283,8 +282,7 @@ type replicaSelector struct {
// | leader becomes v +---+---+
// | reachable +-----+-----+ all proxies are tried ^
// +------------+tryNewProxy+-------------------------+
//
// +-----------+
// +-----------+
type selectorState interface {
next(*retry.Backoffer, *replicaSelector) (*RPCContext, error)
onSendSuccess(*replicaSelector)
Expand Down Expand Up @@ -516,7 +514,7 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) {
// If there is no suitable follower, requests will be sent to the leader as a fallback.
type accessFollower struct {
stateBase
// If tryLeader is true, the request can also be sent to the leader.
// If tryLeader is true, the request can also be sent to the leader when !leader.isSlow()
tryLeader bool
isGlobalStaleRead bool
option storeSelectorOp
Expand Down Expand Up @@ -551,6 +549,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
state.lastIdx++
}

// If selector is under `ReplicaReadPreferLeader` mode, we should choose leader as high priority.
if state.option.preferLeader {
state.lastIdx = state.leaderIdx
}
for i := 0; i < replicaSize && !state.option.leaderOnly; i++ {
idx := AccessIndex((int(state.lastIdx) + i) % replicaSize)
// If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader
Expand Down Expand Up @@ -592,7 +594,10 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool
// The request can only be sent to the leader.
((state.option.leaderOnly && idx == state.leaderIdx) ||
// Choose a replica with matched labels.
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner)))
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner)) &&
// And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers
// as candidates to serve the Read request.
(!state.option.preferLeader || !replica.store.isSlow()))
}

type invalidStore struct {
Expand Down Expand Up @@ -632,6 +637,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
attempts: 0,
})
}

var state selectorState
if !req.ReplicaReadType.IsFollowerRead() {
if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 {
Expand All @@ -644,8 +650,12 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
for _, op := range opts {
op(&option)
}
if req.ReplicaReadType == kv.ReplicaReadPreferLeader {
WithPerferLeader()(&option)
}
tryLeader := req.ReplicaReadType == kv.ReplicaReadMixed || req.ReplicaReadType == kv.ReplicaReadPreferLeader
state = &accessFollower{
tryLeader: req.ReplicaReadType == kv.ReplicaReadMixed,
tryLeader: tryLeader,
isGlobalStaleRead: req.IsGlobalStaleRead(),
option: option,
leaderIdx: regionStore.workTiKVIdx,
Expand Down Expand Up @@ -805,6 +815,7 @@ func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error)
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
// schedule a store addr resolve.
store.markNeedCheck(s.regionCache.notifyCheckCh)
store.markAlreadySlow()
}
}

Expand Down Expand Up @@ -1202,6 +1213,10 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo
if !injectFailOnSend {
start := time.Now()
resp, err = s.client.SendRequest(ctx, sendToAddr, req, timeout)
// Record timecost of external requests on related Store when ReplicaReadMode == PreferLeader.
if req.ReplicaReadType == kv.ReplicaReadPreferLeader && !util.IsInternalRequest(req.RequestSource) {
rpcCtx.Store.recordSlowScoreStat(time.Since(start))
}
if s.Stats != nil {
RecordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start))
if val, fpErr := util.EvalFailpoint("tikvStoreRespResult"); fpErr == nil {
Expand Down Expand Up @@ -1522,6 +1537,12 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
}

if regionErr.GetServerIsBusy() != nil {
// Mark the server is busy (the next incoming READs could be redirect
// to expected followers. )
if ctx != nil && ctx.Store != nil {
ctx.Store.markAlreadySlow()
}

logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
Expand Down
Loading

0 comments on commit bce56a5

Please sign in to comment.