Skip to content

Commit

Permalink
schedule: placement rules cache reduce cache low usage fit
Browse files Browse the repository at this point in the history
Signed-off-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
HunDunDM committed Jan 20, 2023
1 parent 5c518d0 commit 1f809e2
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 43 deletions.
26 changes: 20 additions & 6 deletions server/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"time"

"github.com/pingcap/failpoint"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -91,13 +92,26 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
}

if c.opts.IsPlacementRulesEnabled() {
fit := c.priorityInspector.Inspect(region)
if op := c.ruleChecker.CheckWithFit(region, fit); op != nil {
if opController.OperatorCount(operator.OpReplica) < c.opts.GetReplicaScheduleLimit() {
return []*operator.Operator{op}
skipRuleCheck := c.cluster.GetOpts().IsPlacementRulesCacheEnabled() &&
c.cluster.GetRuleManager().IsRegionFitCached(c.cluster, region)
if skipRuleCheck {
// If the fit is fetched from cache, it seems that the region doesn't need check
failpoint.Inject("assertShouldNotCache", func() {
panic("cached shouldn't be used")
})
ruleCheckerGetCacheCounter.Inc()
} else {
failpoint.Inject("assertShouldCache", func() {
panic("cached should be used")
})
fit := c.priorityInspector.Inspect(region)
if op := c.ruleChecker.CheckWithFit(region, fit); op != nil {
if opController.OperatorCount(operator.OpReplica) < c.opts.GetReplicaScheduleLimit() {
return []*operator.Operator{op}
}
operator.OperatorLimitCounter.WithLabelValues(c.ruleChecker.GetType(), operator.OpReplica.String()).Inc()
c.regionWaitingList.Put(region.GetID(), nil)
}
operator.OperatorLimitCounter.WithLabelValues(c.ruleChecker.GetType(), operator.OpReplica.String()).Inc()
c.regionWaitingList.Put(region.GetID(), nil)
}
} else {
if op := c.learnerChecker.Check(region); op != nil {
Expand Down
12 changes: 0 additions & 12 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -128,17 +127,6 @@ func (c *RuleChecker) CheckWithFit(region *core.RegionInfo, fit *placement.Regio
log.Debug("fail to check region", zap.Uint64("region-id", region.GetID()), zap.Error(errRegionNoLeader))
return
}
// If the fit is fetched from cache, it seems that the region doesn't need cache
if c.cluster.GetOpts().IsPlacementRulesCacheEnabled() && fit.IsCached() {
failpoint.Inject("assertShouldNotCache", func() {
panic("cached shouldn't be used")
})
ruleCheckerGetCacheCounter.Inc()
return nil
}
failpoint.Inject("assertShouldCache", func() {
panic("cached should be used")
})

// If the fit is calculated by FitRegion, which means we get a new fit result, thus we should
// invalid the cache if it exists
Expand Down
19 changes: 0 additions & 19 deletions server/schedule/placement/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/syncutil"
)

const replicaBaseScore = 100
Expand All @@ -30,30 +29,12 @@ const replicaBaseScore = 100
// All peers are divided into corresponding rules according to the matching
// rules, and the remaining Peers are placed in the OrphanPeers list.
type RegionFit struct {
mu struct {
syncutil.RWMutex
cached bool
}
RuleFits []*RuleFit `json:"rule-fits"`
OrphanPeers []*metapb.Peer `json:"orphan-peers"`
regionStores []*core.StoreInfo
rules []*Rule
}

// SetCached indicates this RegionFit is fetch form cache
func (f *RegionFit) SetCached(cached bool) {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.cached = cached
}

// IsCached indicates whether this result is fetched from caches
func (f *RegionFit) IsCached() bool {
f.mu.RLock()
defer f.mu.RUnlock()
return f.mu.cached
}

// Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin.
func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo) bool {
fit := f.getRuleFitByStoreID(srcStoreID)
Expand Down
19 changes: 16 additions & 3 deletions server/schedule/placement/region_rule_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
package placement

import (
"sync/atomic"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/syncutil"
)

const (
minHitCountToCacheHit = 10 // RegionHit is cached only when the number of hits exceeds this
)

// RegionRuleFitCacheManager stores each region's RegionFit Result and involving variables
// only when the RegionFit result is satisfied with its rules
// RegionRuleFitCacheManager caches RegionFit result for each region only when:
Expand Down Expand Up @@ -65,7 +71,7 @@ func (manager *RegionRuleFitCacheManager) CheckAndGetCache(region *core.RegionIn
}
manager.mu.RLock()
defer manager.mu.RUnlock()
if cache, ok := manager.regionCaches[region.GetID()]; ok && cache.bestFit != nil {
if cache, ok := manager.regionCaches[region.GetID()]; ok {
if cache.IsUnchanged(region, rules, stores) {
return true, cache.bestFit
}
Expand All @@ -80,7 +86,12 @@ func (manager *RegionRuleFitCacheManager) SetCache(region *core.RegionInfo, fit
}
manager.mu.Lock()
defer manager.mu.Unlock()
fit.SetCached(true)
if cache, ok := manager.regionCaches[region.GetID()]; ok {
if atomic.AddUint32(&cache.hitCount, 1) >= minHitCountToCacheHit {
cache.bestFit = fit
}
return
}
manager.regionCaches[region.GetID()] = manager.toRegionRuleFitCache(region, fit)
}

Expand All @@ -90,6 +101,7 @@ type regionRuleFitCache struct {
regionStores []*storeCache
rules []ruleCache
bestFit *RegionFit
hitCount uint32
}

// IsUnchanged checks whether the region and rules unchanged for the cache
Expand Down Expand Up @@ -132,7 +144,8 @@ func (manager *RegionRuleFitCacheManager) toRegionRuleFitCache(region *core.Regi
region: toRegionCache(region),
regionStores: manager.toStoreCacheList(fit.regionStores),
rules: toRuleCacheList(fit.rules),
bestFit: fit,
bestFit: nil,
hitCount: 0,
}
}

Expand Down
18 changes: 15 additions & 3 deletions server/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,18 +325,30 @@ func (m *RuleManager) GetRulesForApplyRange(start, end []byte) []*Rule {
return m.ruleList.getRulesForApplyRange(start, end)
}

// IsRegionFitCached returns whether the RegionFit can be cached.
func (m *RuleManager) IsRegionFitCached(storeSet StoreSet, region *core.RegionInfo) bool {
regionStores := getStoresByRegion(storeSet, region)
rules := m.GetRulesForApplyRegion(region)
isCached, _ := m.cache.CheckAndGetCache(region, rules, regionStores)
return isCached
}

// FitRegion fits a region to the rules it matches.
func (m *RuleManager) FitRegion(storeSet StoreSet, region *core.RegionInfo) *RegionFit {
func (m *RuleManager) FitRegion(storeSet StoreSet, region *core.RegionInfo) (fit *RegionFit) {
regionStores := getStoresByRegion(storeSet, region)
rules := m.GetRulesForApplyRegion(region)
var isCached bool
if m.opt.IsPlacementRulesCacheEnabled() {
if ok, fit := m.cache.CheckAndGetCache(region, rules, regionStores); fit != nil && ok {
if isCached, fit = m.cache.CheckAndGetCache(region, rules, regionStores); isCached && fit != nil {
return fit
}
}
fit := fitRegion(regionStores, region, rules, m.opt.IsWitnessAllowed())
fit = fitRegion(regionStores, region, rules, m.opt.IsWitnessAllowed())
fit.regionStores = regionStores
fit.rules = rules
if isCached {
m.SetRegionFitCache(region, fit)
}
return fit
}

Expand Down

0 comments on commit 1f809e2

Please sign in to comment.