Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: placement rules cache reduce cache low usage fit #5879

Merged
merged 3 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions server/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"time"

"github.com/pingcap/failpoint"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -91,13 +92,26 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
}

if c.opts.IsPlacementRulesEnabled() {
fit := c.priorityInspector.Inspect(region)
if op := c.ruleChecker.CheckWithFit(region, fit); op != nil {
if opController.OperatorCount(operator.OpReplica) < c.opts.GetReplicaScheduleLimit() {
return []*operator.Operator{op}
skipRuleCheck := c.cluster.GetOpts().IsPlacementRulesCacheEnabled() &&
c.cluster.GetRuleManager().IsRegionFitCached(c.cluster, region)
if skipRuleCheck {
// If the fit is fetched from cache, it seems that the region doesn't need check
failpoint.Inject("assertShouldNotCache", func() {
panic("cached shouldn't be used")
})
ruleCheckerGetCacheCounter.Inc()
} else {
failpoint.Inject("assertShouldCache", func() {
panic("cached should be used")
})
fit := c.priorityInspector.Inspect(region)
if op := c.ruleChecker.CheckWithFit(region, fit); op != nil {
if opController.OperatorCount(operator.OpReplica) < c.opts.GetReplicaScheduleLimit() {
return []*operator.Operator{op}
}
operator.OperatorLimitCounter.WithLabelValues(c.ruleChecker.GetType(), operator.OpReplica.String()).Inc()
c.regionWaitingList.Put(region.GetID(), nil)
}
operator.OperatorLimitCounter.WithLabelValues(c.ruleChecker.GetType(), operator.OpReplica.String()).Inc()
c.regionWaitingList.Put(region.GetID(), nil)
}
} else {
if op := c.learnerChecker.Check(region); op != nil {
Expand Down
12 changes: 0 additions & 12 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -128,17 +127,6 @@ func (c *RuleChecker) CheckWithFit(region *core.RegionInfo, fit *placement.Regio
log.Debug("fail to check region", zap.Uint64("region-id", region.GetID()), zap.Error(errRegionNoLeader))
return
}
// If the fit is fetched from cache, it seems that the region doesn't need cache
if c.cluster.GetOpts().IsPlacementRulesCacheEnabled() && fit.IsCached() {
failpoint.Inject("assertShouldNotCache", func() {
panic("cached shouldn't be used")
})
ruleCheckerGetCacheCounter.Inc()
return nil
}
failpoint.Inject("assertShouldCache", func() {
panic("cached should be used")
})

// If the fit is calculated by FitRegion, which means we get a new fit result, thus we should
// invalid the cache if it exists
Expand Down
19 changes: 0 additions & 19 deletions server/schedule/placement/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/syncutil"
)

const replicaBaseScore = 100
Expand All @@ -30,30 +29,12 @@ const replicaBaseScore = 100
// All peers are divided into corresponding rules according to the matching
// rules, and the remaining Peers are placed in the OrphanPeers list.
type RegionFit struct {
mu struct {
syncutil.RWMutex
cached bool
}
RuleFits []*RuleFit `json:"rule-fits"`
OrphanPeers []*metapb.Peer `json:"orphan-peers"`
regionStores []*core.StoreInfo
rules []*Rule
}

// SetCached indicates this RegionFit is fetch form cache
func (f *RegionFit) SetCached(cached bool) {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.cached = cached
}

// IsCached indicates whether this result is fetched from caches
func (f *RegionFit) IsCached() bool {
f.mu.RLock()
defer f.mu.RUnlock()
return f.mu.cached
}

// Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin.
func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo) bool {
fit := f.getRuleFitByStoreID(srcStoreID)
Expand Down
4 changes: 3 additions & 1 deletion server/schedule/placement/fit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/assert"
Expand All @@ -28,6 +29,7 @@ import (

func makeStores() StoreSet {
stores := core.NewStoresInfo()
now := time.Now()
for zone := 1; zone <= 5; zone++ {
for rack := 1; rack <= 5; rack++ {
for host := 1; host <= 5; host++ {
Expand All @@ -42,7 +44,7 @@ func makeStores() StoreSet {
if x == 5 {
labels["engine"] = "tiflash"
}
stores.SetStore(core.NewStoreInfoWithLabel(id, labels))
stores.SetStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now)))
}
}
}
Expand Down
18 changes: 15 additions & 3 deletions server/schedule/placement/region_rule_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

const (
minHitCountToCacheHit = 10 // RegionHit is cached only when the number of hits exceeds this
)

// RegionRuleFitCacheManager stores each region's RegionFit Result and involving variables
// only when the RegionFit result is satisfied with its rules
// RegionRuleFitCacheManager caches RegionFit result for each region only when:
Expand Down Expand Up @@ -65,7 +69,7 @@ func (manager *RegionRuleFitCacheManager) CheckAndGetCache(region *core.RegionIn
}
manager.mu.RLock()
defer manager.mu.RUnlock()
if cache, ok := manager.regionCaches[region.GetID()]; ok && cache.bestFit != nil {
if cache, ok := manager.regionCaches[region.GetID()]; ok {
if cache.IsUnchanged(region, rules, stores) {
return true, cache.bestFit
}
Expand All @@ -80,7 +84,13 @@ func (manager *RegionRuleFitCacheManager) SetCache(region *core.RegionInfo, fit
}
manager.mu.Lock()
defer manager.mu.Unlock()
fit.SetCached(true)
if cache, ok := manager.regionCaches[region.GetID()]; ok {
cache.hitCount++
if cache.hitCount >= minHitCountToCacheHit {
cache.bestFit = fit
}
return
}
manager.regionCaches[region.GetID()] = manager.toRegionRuleFitCache(region, fit)
}

Expand All @@ -90,6 +100,7 @@ type regionRuleFitCache struct {
regionStores []*storeCache
rules []ruleCache
bestFit *RegionFit
hitCount uint32
}

// IsUnchanged checks whether the region and rules unchanged for the cache
Expand Down Expand Up @@ -132,7 +143,8 @@ func (manager *RegionRuleFitCacheManager) toRegionRuleFitCache(region *core.Regi
region: toRegionCache(region),
regionStores: manager.toStoreCacheList(fit.regionStores),
rules: toRuleCacheList(fit.rules),
bestFit: fit,
bestFit: nil,
hitCount: 0,
}
}

Expand Down
18 changes: 15 additions & 3 deletions server/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,18 +325,30 @@ func (m *RuleManager) GetRulesForApplyRange(start, end []byte) []*Rule {
return m.ruleList.getRulesForApplyRange(start, end)
}

// IsRegionFitCached returns whether the RegionFit can be cached.
func (m *RuleManager) IsRegionFitCached(storeSet StoreSet, region *core.RegionInfo) bool {
regionStores := getStoresByRegion(storeSet, region)
rules := m.GetRulesForApplyRegion(region)
isCached, _ := m.cache.CheckAndGetCache(region, rules, regionStores)
return isCached
}

// FitRegion fits a region to the rules it matches.
func (m *RuleManager) FitRegion(storeSet StoreSet, region *core.RegionInfo) *RegionFit {
func (m *RuleManager) FitRegion(storeSet StoreSet, region *core.RegionInfo) (fit *RegionFit) {
regionStores := getStoresByRegion(storeSet, region)
rules := m.GetRulesForApplyRegion(region)
var isCached bool
if m.opt.IsPlacementRulesCacheEnabled() {
if ok, fit := m.cache.CheckAndGetCache(region, rules, regionStores); fit != nil && ok {
if isCached, fit = m.cache.CheckAndGetCache(region, rules, regionStores); isCached && fit != nil {
return fit
}
}
fit := fitRegion(regionStores, region, rules, m.opt.IsWitnessAllowed())
fit = fitRegion(regionStores, region, rules, m.opt.IsWitnessAllowed())
fit.regionStores = regionStores
fit.rules = rules
if isCached {
m.SetRegionFitCache(region, fit)
}
return fit
}

Expand Down
46 changes: 45 additions & 1 deletion server/schedule/placement/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/server/config"
)

func newTestManager(t *testing.T) (endpoint.RuleStorage, *RuleManager) {
re := require.New(t)
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
var err error
manager := NewRuleManager(store, nil, nil)
manager := NewRuleManager(store, nil, config.NewTestOptions())
err = manager.Initialize(3, []string{"zone", "rack", "host"})
re.NoError(err)
return store, manager
Expand Down Expand Up @@ -421,6 +422,49 @@ func TestCheckApplyRules(t *testing.T) {
re.Regexp("needs at least one leader or voter", err.Error())
}

func TestCacheManager(t *testing.T) {
re := require.New(t)
_, manager := newTestManager(t)
manager.opt.SetPlacementRulesCacheEnabled(true)
rules := addExtraRules(0)
re.NoError(manager.SetRules(rules))
stores := makeStores()

regionMeta := &metapb.Region{
Id: 1,
StartKey: []byte(""),
EndKey: []byte(""),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 0, Version: 0},
Peers: []*metapb.Peer{
{Id: 11, StoreId: 1111, Role: metapb.PeerRole_Voter},
{Id: 12, StoreId: 2111, Role: metapb.PeerRole_Voter},
{Id: 13, StoreId: 3111, Role: metapb.PeerRole_Voter},
},
}
region := core.NewRegionInfo(regionMeta, regionMeta.Peers[0])
fit := manager.FitRegion(stores, region)
manager.SetRegionFitCache(region, fit)
// bestFit is not stored when the total number of hits is insufficient.
for i := 1; i < minHitCountToCacheHit/2; i++ {
manager.FitRegion(stores, region)
re.True(manager.IsRegionFitCached(stores, region))
cache := manager.cache.regionCaches[1]
re.Equal(uint32(i), cache.hitCount)
re.Nil(cache.bestFit)
}
// Store bestFit when the total number of hits is sufficient.
for i := 0; i < minHitCountToCacheHit; i++ {
manager.FitRegion(stores, region)
}
cache := manager.cache.regionCaches[1]
re.Equal(uint32(minHitCountToCacheHit), cache.hitCount)
re.NotNil(cache.bestFit)
// Cache invalidation after change
regionMeta.Peers[2] = &metapb.Peer{Id: 14, StoreId: 4111, Role: metapb.PeerRole_Voter}
region = core.NewRegionInfo(regionMeta, regionMeta.Peers[0])
re.False(manager.IsRegionFitCached(stores, region))
}

func dhex(hk string) []byte {
k, err := hex.DecodeString(hk)
if err != nil {
Expand Down