Skip to content

Commit

Permalink
reduce rand NewSource
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Sep 29, 2024
1 parent 76dc560 commit dac2d7f
Show file tree
Hide file tree
Showing 17 changed files with 89 additions and 71 deletions.
89 changes: 47 additions & 42 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package checker

import (
"fmt"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -45,6 +47,7 @@ type ReplicaChecker struct {
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
pendingProcessedRegions *cache.TTLUint64
r *rand.Rand
}

// NewReplicaChecker creates a replica checker.
Expand All @@ -53,6 +56,7 @@ func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigPro
cluster: cluster,
conf: conf,
pendingProcessedRegions: pendingProcessedRegions,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand All @@ -67,32 +71,32 @@ func (*ReplicaChecker) GetType() types.CheckerSchedulerType {
}

// Check verifies a region's replicas, creating an operator.Operator if need.
func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
func (c *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {

Check failure on line 74 in pkg/schedule/checker/replica_checker.go

View workflow job for this annotation

GitHub Actions / statics

ST1016: methods on the same type should have the same receiver name (seen 1x "r", 7x "c") (stylecheck)
replicaCheckerCounter.Inc()
if r.IsPaused() {
if c.IsPaused() {
replicaCheckerPausedCounter.Inc()
return nil
}
if op := r.checkDownPeer(region); op != nil {
if op := c.checkDownPeer(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkOfflinePeer(region); op != nil {
if op := c.checkOfflinePeer(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkMakeUpReplica(region); op != nil {
if op := c.checkMakeUpReplica(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkRemoveExtraReplica(region); op != nil {
if op := c.checkRemoveExtraReplica(region); op != nil {
replicaCheckerNewOpCounter.Inc()
return op
}
if op := r.checkLocationReplacement(region); op != nil {
if op := c.checkLocationReplacement(region); op != nil {
replicaCheckerNewOpCounter.Inc()
return op
}
Expand Down Expand Up @@ -124,8 +128,8 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat
return nil
}

func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsReplaceOfflineReplicaEnabled() {
func (c *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsReplaceOfflineReplicaEnabled() {
return nil
}

Expand All @@ -136,7 +140,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope

for _, peer := range region.GetPeers() {
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
Expand All @@ -145,71 +149,71 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope
continue
}

return r.fixPeer(region, storeID, offlineStatus)
return c.fixPeer(region, storeID, offlineStatus)
}

return nil
}

func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsMakeUpReplicaEnabled() {
func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsMakeUpReplicaEnabled() {
return nil
}
if len(region.GetPeers()) >= r.conf.GetMaxReplicas() {
if len(region.GetPeers()) >= c.conf.GetMaxReplicas() {
return nil
}
log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores)
regionStores := c.cluster.GetRegionStores(region)
target, filterByTempState := c.strategy(c.r, region).SelectStoreToAdd(regionStores)
if target == 0 {
log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID()))
replicaCheckerNoTargetStoreCounter.Inc()
if filterByTempState {
r.pendingProcessedRegions.Put(region.GetID(), nil)
c.pendingProcessedRegions.Put(region.GetID(), nil)
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
op, err := operator.CreateAddPeerOperator("make-up-replica", r.cluster, region, newPeer, operator.OpReplica)
op, err := operator.CreateAddPeerOperator("make-up-replica", c.cluster, region, newPeer, operator.OpReplica)
if err != nil {
log.Debug("create make-up-replica operator fail", errs.ZapError(err))
return nil
}
return op
}

func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsRemoveExtraReplicaEnabled() {
func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsRemoveExtraReplicaEnabled() {
return nil
}
// when add learner peer, the number of peer will exceed max replicas for a while,
// just comparing the the number of voters to avoid too many cancel add operator log.
if len(region.GetVoters()) <= r.conf.GetMaxReplicas() {
if len(region.GetVoters()) <= c.conf.GetMaxReplicas() {
return nil
}
log.Debug("region has more than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
old := r.strategy(region).SelectStoreToRemove(regionStores)
regionStores := c.cluster.GetRegionStores(region)
old := c.strategy(c.r, region).SelectStoreToRemove(regionStores)
if old == 0 {
replicaCheckerNoWorstPeerCounter.Inc()
r.pendingProcessedRegions.Put(region.GetID(), nil)
c.pendingProcessedRegions.Put(region.GetID(), nil)
return nil
}
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", r.cluster, operator.OpReplica, region, old)
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old)
if err != nil {
replicaCheckerCreateOpFailedCounter.Inc()
return nil
}
return op
}

func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsLocationReplacementEnabled() {
func (c *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsLocationReplacementEnabled() {
return nil
}

strategy := r.strategy(region)
regionStores := r.cluster.GetRegionStores(region)
strategy := c.strategy(c.r, region)
regionStores := c.cluster.GetRegionStores(region)
oldStore := strategy.SelectStoreToRemove(regionStores)
if oldStore == 0 {
replicaCheckerAllRightCounter.Inc()
Expand All @@ -223,19 +227,19 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper
}

newPeer := &metapb.Peer{StoreId: newStore}
op, err := operator.CreateMovePeerOperator("move-to-better-location", r.cluster, region, operator.OpReplica, oldStore, newPeer)
op, err := operator.CreateMovePeerOperator("move-to-better-location", c.cluster, region, operator.OpReplica, oldStore, newPeer)
if err != nil {
replicaCheckerCreateOpFailedCounter.Inc()
return nil
}
return op
}

func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
// Check the number of replicas first.
if len(region.GetVoters()) > r.conf.GetMaxReplicas() {
if len(region.GetVoters()) > c.conf.GetMaxReplicas() {
removeExtra := fmt.Sprintf("remove-extra-%s-replica", status)
op, err := operator.CreateRemovePeerOperator(removeExtra, r.cluster, operator.OpReplica, region, storeID)
op, err := operator.CreateRemovePeerOperator(removeExtra, c.cluster, operator.OpReplica, region, storeID)
if err != nil {
if status == offlineStatus {
replicaCheckerRemoveExtraOfflineFailedCounter.Inc()
Expand All @@ -247,8 +251,8 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
return op
}

regionStores := r.cluster.GetRegionStores(region)
target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID)
regionStores := c.cluster.GetRegionStores(region)
target, filterByTempState := c.strategy(c.r, region).SelectStoreToFix(regionStores, storeID)
if target == 0 {
if status == offlineStatus {
replicaCheckerNoStoreOfflineCounter.Inc()
Expand All @@ -257,13 +261,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
}
log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID()))
if filterByTempState {
r.pendingProcessedRegions.Put(region.GetID(), nil)
c.pendingProcessedRegions.Put(region.GetID(), nil)
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
replace := fmt.Sprintf("replace-%s-replica", status)
op, err := operator.CreateMovePeerOperator(replace, r.cluster, region, operator.OpReplica, storeID, newPeer)
op, err := operator.CreateMovePeerOperator(replace, c.cluster, region, operator.OpReplica, storeID, newPeer)
if err != nil {
if status == offlineStatus {
replicaCheckerReplaceOfflineFailedCounter.Inc()
Expand All @@ -275,12 +279,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
return op
}

func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy {
func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy {
return &ReplicaStrategy{
checkerName: r.Name(),
cluster: r.cluster,
locationLabels: r.conf.GetLocationLabels(),
isolationLevel: r.conf.GetIsolationLevel(),
checkerName: c.Name(),
cluster: c.cluster,
locationLabels: c.conf.GetLocationLabels(),
isolationLevel: c.conf.GetIsolationLevel(),
region: region,
r: r,
}
}
7 changes: 5 additions & 2 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package checker

import (
"math/rand"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
Expand All @@ -26,6 +28,7 @@ import (
// ReplicaStrategy collects some utilities to manipulate region peers. It
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
r *rand.Rand
checkerName string // replica-checker / rule-checker
cluster sche.CheckerCluster
locationLabels []string
Expand Down Expand Up @@ -76,7 +79,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e

isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level}
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()).
FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, filters...).
KeepTheTopStores(isolationComparer, false) // greater isolation score is better
if targetCandidate.Len() == 0 {
Expand Down Expand Up @@ -143,7 +146,7 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo
if s.fastFailover {
level = constant.Urgent
}
source := filter.NewCandidates(coLocationStores).
source := filter.NewCandidates(s.r, coLocationStores).
FilterSource(s.cluster.GetCheckerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
KeepTheTopStores(isolationComparer, true).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetCheckerConfig()), false)
Expand Down
12 changes: 8 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"math"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -56,6 +57,7 @@ type RuleChecker struct {
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
r *rand.Rand
}

// NewRuleChecker creates a checker instance.
Expand All @@ -67,6 +69,7 @@ func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManage
pendingList: cache.NewDefaultCache(maxPendingListLen),
switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()),
record: newRecord(),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand Down Expand Up @@ -201,7 +204,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
ruleStores := c.getRuleFitStores(rf)
isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
// If the peer to be added is a witness, since no snapshot is needed, we also reuse the fast failover logic.
store, filterByTempState := c.strategy(region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores)
store, filterByTempState := c.strategy(c.r, region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores)
if store == 0 {
ruleCheckerNoStoreAddCounter.Inc()
c.handleFilterState(region, filterByTempState)
Expand Down Expand Up @@ -252,7 +255,7 @@ func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *pla
fastFailover = false
}
ruleStores := c.getRuleFitStores(rf)
store, filterByTempState := c.strategy(region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId())
store, filterByTempState := c.strategy(c.r, region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId())
if store == 0 {
ruleCheckerNoStoreReplaceCounter.Inc()
c.handleFilterState(region, filterByTempState)
Expand Down Expand Up @@ -393,7 +396,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R

isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
// If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic.
strategy := c.strategy(region, rf.Rule, isWitness)
strategy := c.strategy(c.r, region, rf.Rule, isWitness)
ruleStores := c.getRuleFitStores(rf)
oldStore := strategy.SelectStoreToRemove(ruleStores)
if oldStore == 0 {
Expand Down Expand Up @@ -618,7 +621,7 @@ func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb.
return nil, false
}

func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
func (c *RuleChecker) strategy(r *rand.Rand, region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
return &ReplicaStrategy{
checkerName: c.Name(),
cluster: c.cluster,
Expand All @@ -627,6 +630,7 @@ func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fa
region: region,
extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.Name(), rule.LabelConstraints)},
fastFailover: fastFailover,
r: r,
}
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/schedule/filter/candidates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package filter
import (
"math/rand"
"sort"
"time"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/config"
Expand All @@ -32,8 +31,8 @@ type StoreCandidates struct {
}

// NewCandidates creates StoreCandidates with store list.
func NewCandidates(stores []*core.StoreInfo) *StoreCandidates {
return &StoreCandidates{r: rand.New(rand.NewSource(time.Now().UnixNano())), Stores: stores}
func NewCandidates(r *rand.Rand, stores []*core.StoreInfo) *StoreCandidates {
return &StoreCandidates{r: r, Stores: stores}
}

// FilterSource keeps stores that can pass all source filters.
Expand Down
4 changes: 3 additions & 1 deletion pkg/schedule/filter/candidates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package filter

import (
"math/rand"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -112,7 +114,7 @@ func newTestCandidates(ids ...uint64) *StoreCandidates {
for _, id := range ids {
stores = append(stores, core.NewStoreInfo(&metapb.Store{Id: id}))
}
return NewCandidates(stores)
return NewCandidates(rand.New(rand.NewSource(time.Now().UnixNano())), stores)
}

func check(re *require.Assertions, candidates *StoreCandidates, ids ...uint64) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla
if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil {
finalFilters = append(l.filters, leaderFilter)
}
target := filter.NewCandidates([]*core.StoreInfo{solver.Target}).
target := filter.NewCandidates(l.R, []*core.StoreInfo{solver.Target}).
FilterTarget(conf, nil, l.filterCounter, finalFilters...).
PickFirst()
if target == nil {
Expand Down
Loading

0 comments on commit dac2d7f

Please sign in to comment.