Skip to content

Commit

Permalink
cherry pick #2946 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
rleungx authored and ti-srebot committed Oct 26, 2020
1 parent 775b6a5 commit d694360
Show file tree
Hide file tree
Showing 16 changed files with 485 additions and 10 deletions.
6 changes: 6 additions & 0 deletions plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,13 @@ func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operato
if region == nil {
continue
}
<<<<<<< HEAD
target := s.selector.SelectTarget(cluster, cluster.GetFollowerStores(region))
=======
target := filter.NewCandidates(cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetOpts(), &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true}).
RandomPick()
>>>>>>> 6bf18ce0... metrics: optimize the store state filter metrics (#2946)
if target == nil {
continue
}
Expand Down
128 changes: 128 additions & 0 deletions server/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package checker

import (
"github.com/pingcap/log"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/filter"
"github.com/tikv/pd/server/schedule/opt"
"go.uber.org/zap"
)

// ReplicaStrategy collects some utilities to manipulate region peers. It
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
checkerName string // replica-checker / rule-checker
cluster opt.Cluster
locationLabels []string
isolationLevel string
region *core.RegionInfo
extraFilters []filter.Filter
}

// SelectStoreToAdd returns the store to add a replica to a region.
// `coLocationStores` are the stores used to compare location with target
// store.
// `extraFilters` is used to set up more filters based on the context that
// calling this method.
//
// For example, to select a target store to replace a region's peer, we can use
// the peer list with the peer removed as `coLocationStores`.
// Meanwhile, we need to provide more constraints to ensure that the isolation
// level cannot be reduced after replacement.
func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, extraFilters ...filter.Filter) uint64 {
// The selection process uses a two-stage fashion. The first stage
// ignores the temporary state of the stores and selects the stores
// with the highest score according to the location label. The second
// stage considers all temporary states and capacity factors to select
// the most suitable target.
//
// The reason for it is to prevent the non-optimal replica placement due
// to the short-term state, resulting in redundant scheduling.
filters := []filter.Filter{
filter.NewExcludedFilter(s.checkerName, nil, s.region.GetStoreIds()),
filter.NewStorageThresholdFilter(s.checkerName),
filter.NewSpecialUseFilter(s.checkerName),
&filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowTemporaryStates: true},
}
if len(s.locationLabels) > 0 && s.isolationLevel != "" {
filters = append(filters, filter.NewIsolationFilter(s.checkerName, s.isolationLevel, s.locationLabels, coLocationStores))
}
if len(extraFilters) > 0 {
filters = append(filters, extraFilters...)
}
if len(s.extraFilters) > 0 {
filters = append(filters, s.extraFilters...)
}

isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true}
target := filter.NewCandidates(s.cluster.GetStores()).
FilterTarget(s.cluster.GetOpts(), filters...).
Sort(isolationComparer).Reverse().Top(isolationComparer). // greater isolation score is better
Sort(filter.RegionScoreComparer(s.cluster.GetOpts())). // less region score is better
FilterTarget(s.cluster.GetOpts(), strictStateFilter).PickFirst() // the filter does not ignore temp states
if target == nil {
return 0
}
return target.GetID()
}

// SelectStoreToReplace returns a store to replace oldStore. The location
// placement after scheduling should be not worse than original.
func (s *ReplicaStrategy) SelectStoreToReplace(coLocationStores []*core.StoreInfo, old uint64) uint64 {
// trick to avoid creating a slice with `old` removed.
s.swapStoreToFirst(coLocationStores, old)
safeGuard := filter.NewLocationSafeguard(s.checkerName, s.locationLabels, coLocationStores, s.cluster.GetStore(old))
return s.SelectStoreToAdd(coLocationStores[1:], safeGuard)
}

// SelectStoreToImprove returns a store to replace oldStore. The location
// placement after scheduling should be better than original.
func (s *ReplicaStrategy) SelectStoreToImprove(coLocationStores []*core.StoreInfo, old uint64) uint64 {
// trick to avoid creating a slice with `old` removed.
s.swapStoreToFirst(coLocationStores, old)
filters := []filter.Filter{
filter.NewLocationImprover(s.checkerName, s.locationLabels, coLocationStores, s.cluster.GetStore(old)),
}
if len(s.locationLabels) > 0 && s.isolationLevel != "" {
filters = append(filters, filter.NewIsolationFilter(s.checkerName, s.isolationLevel, s.locationLabels, coLocationStores[1:]))
}
return s.SelectStoreToAdd(coLocationStores[1:], filters...)
}

func (s *ReplicaStrategy) swapStoreToFirst(stores []*core.StoreInfo, id uint64) {
for i, s := range stores {
if s.GetID() == id {
stores[0], stores[i] = stores[i], stores[0]
return
}
}
}

// SelectStoreToRemove returns the best option to remove from the region.
func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo) uint64 {
isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
source := filter.NewCandidates(coLocationStores).
FilterSource(s.cluster.GetOpts(), &filter.StoreStateFilter{ActionScope: replicaCheckerName, MoveRegion: true}).
Sort(isolationComparer).Top(isolationComparer).
Sort(filter.RegionScoreComparer(s.cluster.GetOpts())).Reverse().
PickFirst()
if source == nil {
log.Debug("no removable store", zap.Uint64("region-id", s.region.GetID()))
return 0
}
return source.GetID()
}
5 changes: 5 additions & 0 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,13 @@ func (c *RuleChecker) allowLeader(fit *placement.RegionFit, peer *metapb.Peer) b
if s == nil {
return false
}
<<<<<<< HEAD
stateFilter := filter.StoreStateFilter{ActionScope: "rule-checker", TransferLeader: true}
if !stateFilter.Target(c.cluster, s) {
=======
stateFilter := &filter.StoreStateFilter{ActionScope: "rule-checker", TransferLeader: true}
if !stateFilter.Target(c.cluster.GetOpts(), s) {
>>>>>>> 6bf18ce0... metrics: optimize the store state filter metrics (#2946)
return false
}
for _, rf := range fit.RuleFits {
Expand Down
117 changes: 114 additions & 3 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,16 +361,23 @@ type StoreStateFilter struct {
TransferLeader bool
// Set true if the schedule involves any move region operation.
MoveRegion bool
<<<<<<< HEAD
=======
// Set true if allows temporary states.
AllowTemporaryStates bool
// Reason is used to distinguish the reason of store state filter
Reason string
>>>>>>> 6bf18ce0... metrics: optimize the store state filter metrics (#2946)
}

// Scope returns the scheduler or the checker which the filter acts on.
func (f StoreStateFilter) Scope() string {
func (f *StoreStateFilter) Scope() string {
return f.ActionScope
}

// Type returns the type of the Filter.
func (f StoreStateFilter) Type() string {
return "store-state-filter"
func (f *StoreStateFilter) Type() string {
return fmt.Sprintf("store-state-%s-filter", f.Reason)
}

// Source returns true when the store can be selected as the schedule
Expand All @@ -384,6 +391,7 @@ func (f StoreStateFilter) Source(opt opt.Options, store *core.StoreInfo) bool {
return false
}

<<<<<<< HEAD
if f.MoveRegion && !f.filterMoveRegion(opt, true, store) {
return false
}
Expand Down Expand Up @@ -461,6 +469,65 @@ func NewBlacklistStoreFilter(scope string, typ BlacklistType) *BlacklistStoreFil
blacklist: make(map[uint64]struct{}),
flag: typ,
}
=======
func (f *StoreStateFilter) isTombstone(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "tombstone"
return store.IsTombstone()
}

func (f *StoreStateFilter) isDown(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "down"
return store.DownTime() > opt.GetMaxStoreDownTime()
}

func (f *StoreStateFilter) isOffline(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "offline"
return store.IsOffline()
}

func (f *StoreStateFilter) pauseLeaderTransfer(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "pause-leader"
return !store.AllowLeaderTransfer()
}

func (f *StoreStateFilter) isDisconnected(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "disconnected"
return !f.AllowTemporaryStates && store.IsDisconnected()
}

func (f *StoreStateFilter) isBusy(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "busy"
return !f.AllowTemporaryStates && store.IsBusy()
}

func (f *StoreStateFilter) exceedRemoveLimit(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "exceed-remove-limit"
return !f.AllowTemporaryStates && !store.IsAvailable(storelimit.RemovePeer)
}

func (f *StoreStateFilter) exceedAddLimit(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "exceed-add-limit"
return !f.AllowTemporaryStates && !store.IsAvailable(storelimit.AddPeer)
}

func (f *StoreStateFilter) tooManySnapshots(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "too-many-snapshot"
return !f.AllowTemporaryStates && (uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount())
}

func (f *StoreStateFilter) tooManyPendingPeers(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "too-many-pending-peer"
return !f.AllowTemporaryStates &&
opt.GetMaxPendingPeerCount() > 0 &&
store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount())
}

func (f *StoreStateFilter) hasRejectLeaderProperty(opts *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "reject-leader"
return opts.CheckLabelProperty(opt.RejectLeader, store.GetLabels())
>>>>>>> 6bf18ce0... metrics: optimize the store state filter metrics (#2946)
}

// Scope returns the scheduler or the checker which the filter acts on.
Expand All @@ -473,14 +540,36 @@ func (f *BlacklistStoreFilter) Type() string {
return "blacklist-store-filter"
}

<<<<<<< HEAD
// Source implements the Filter.
func (f *BlacklistStoreFilter) Source(opt opt.Options, store *core.StoreInfo) bool {
if f.flag&BlacklistSource != BlacklistSource {
return true
=======
func (f *StoreStateFilter) anyConditionMatch(typ int, opt *config.PersistOptions, store *core.StoreInfo) bool {
var funcs []conditionFunc
switch typ {
case leaderSource:
funcs = []conditionFunc{f.isTombstone, f.isDown, f.pauseLeaderTransfer, f.isDisconnected}
case regionSource:
funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots}
case leaderTarget:
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.pauseLeaderTransfer,
f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty}
case regionTarget:
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.isDisconnected, f.isBusy,
f.exceedAddLimit, f.tooManySnapshots, f.tooManyPendingPeers}
}
for _, cf := range funcs {
if cf(opt, store) {
return true
}
>>>>>>> 6bf18ce0... metrics: optimize the store state filter metrics (#2946)
}
return f.filter(store)
}

<<<<<<< HEAD
// Add adds the store to the blacklist.
func (f *BlacklistStoreFilter) Add(storeID uint64) {
f.blacklist[storeID] = struct{}{}
Expand All @@ -490,6 +579,28 @@ func (f *BlacklistStoreFilter) Add(storeID uint64) {
func (f *BlacklistStoreFilter) Target(opt opt.Options, store *core.StoreInfo) bool {
if f.flag&BlacklistTarget != BlacklistTarget {
return true
=======
// Source returns true when the store can be selected as the schedule
// source.
func (f *StoreStateFilter) Source(opts *config.PersistOptions, store *core.StoreInfo) bool {
if f.TransferLeader && f.anyConditionMatch(leaderSource, opts, store) {
return false
}
if f.MoveRegion && f.anyConditionMatch(regionSource, opts, store) {
return false
}
return true
}

// Target returns true when the store can be selected as the schedule
// target.
func (f *StoreStateFilter) Target(opts *config.PersistOptions, store *core.StoreInfo) bool {
if f.TransferLeader && f.anyConditionMatch(leaderTarget, opts, store) {
return false
}
if f.MoveRegion && f.anyConditionMatch(regionTarget, opts, store) {
return false
>>>>>>> 6bf18ce0... metrics: optimize the store state filter metrics (#2946)
}
return f.filter(store)
}
Expand Down
Loading

0 comments on commit d694360

Please sign in to comment.