Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 18, 2024
1 parent 4a6219e commit bf66e8f
Show file tree
Hide file tree
Showing 15 changed files with 29 additions and 188 deletions.
41 changes: 5 additions & 36 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,19 @@ var (
// Location management, mainly used for cross data center deployment.
type ReplicaChecker struct {
PauseController
<<<<<<< HEAD
cluster schedule.Cluster
conf config.Config
regionWaitingList cache.Cache
=======
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
pendingProcessedRegions *cache.TTLUint64
r *rand.Rand
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
r *rand.Rand
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster schedule.Cluster, conf config.Config, regionWaitingList cache.Cache) *ReplicaChecker {
return &ReplicaChecker{
<<<<<<< HEAD
cluster: cluster,
conf: conf,
regionWaitingList: regionWaitingList,
=======
cluster: cluster,
conf: conf,
pendingProcessedRegions: pendingProcessedRegions,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand Down Expand Up @@ -195,11 +183,7 @@ func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.O
log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID()))
replicaCheckerNoTargetStoreCounter.Inc()
if filterByTempState {
<<<<<<< HEAD
r.regionWaitingList.Put(region.GetID(), nil)
=======
c.pendingProcessedRegions.Put(region.GetID(), nil)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
c.regionWaitingList.Put(region.GetID(), nil)
}
return nil
}
Expand All @@ -226,11 +210,7 @@ func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *opera
old := c.strategy(c.r, region).SelectStoreToRemove(regionStores)
if old == 0 {
replicaCheckerNoWorstPeerCounter.Inc()
<<<<<<< HEAD
r.regionWaitingList.Put(region.GetID(), nil)
=======
c.pendingProcessedRegions.Put(region.GetID(), nil)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
c.regionWaitingList.Put(region.GetID(), nil)
return nil
}
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old)
Expand Down Expand Up @@ -295,11 +275,7 @@ func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
}
log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID()))
if filterByTempState {
<<<<<<< HEAD
r.regionWaitingList.Put(region.GetID(), nil)
=======
c.pendingProcessedRegions.Put(region.GetID(), nil)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
c.regionWaitingList.Put(region.GetID(), nil)
}
return nil
}
Expand All @@ -319,17 +295,10 @@ func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status

func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy {
return &ReplicaStrategy{
<<<<<<< HEAD
checkerName: replicaCheckerName,
cluster: r.cluster,
locationLabels: r.conf.GetLocationLabels(),
isolationLevel: r.conf.GetIsolationLevel(),
=======
checkerName: c.Name(),
cluster: c.cluster,
locationLabels: c.conf.GetLocationLabels(),
isolationLevel: c.conf.GetIsolationLevel(),
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
region: region,
r: r,
}
Expand Down
14 changes: 2 additions & 12 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,8 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e

isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level}
<<<<<<< HEAD
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
FilterTarget(s.cluster.GetOpts(), nil, nil, filters...).
=======
targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()).
FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, filters...).
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
FilterTarget(s.cluster.GetOpts(), nil, nil, filters...).
KeepTheTopStores(isolationComparer, false) // greater isolation score is better
if targetCandidate.Len() == 0 {
return 0, false
Expand Down Expand Up @@ -145,13 +140,8 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo
if s.fastFailover {
level = constant.Urgent
}
<<<<<<< HEAD
source := filter.NewCandidates(coLocationStores).
FilterSource(s.cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
=======
source := filter.NewCandidates(s.r, coLocationStores).
FilterSource(s.cluster.GetCheckerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
FilterSource(s.cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
KeepTheTopStores(isolationComparer, true).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetOpts()), false)
if source == nil {
Expand Down
22 changes: 2 additions & 20 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,45 +85,27 @@ var (
// RuleChecker fix/improve region by placement rules.
type RuleChecker struct {
PauseController
<<<<<<< HEAD
cluster schedule.Cluster
ruleManager *placement.RuleManager
name string
regionWaitingList cache.Cache
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
=======
cluster sche.CheckerCluster
ruleManager *placement.RuleManager
pendingProcessedRegions *cache.TTLUint64
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
r *rand.Rand
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
r *rand.Rand
}

// NewRuleChecker creates a checker instance.
func NewRuleChecker(ctx context.Context, cluster schedule.Cluster, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker {
return &RuleChecker{
<<<<<<< HEAD
cluster: cluster,
ruleManager: ruleManager,
name: ruleCheckerName,
regionWaitingList: regionWaitingList,
pendingList: cache.NewDefaultCache(maxPendingListLen),
switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetOpts().GetSwitchWitnessInterval()),
record: newRecord(),
=======
cluster: cluster,
ruleManager: ruleManager,
pendingProcessedRegions: pendingProcessedRegions,
pendingList: cache.NewDefaultCache(maxPendingListLen),
switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()),
record: newRecord(),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,7 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla
if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source, false /*allowMoveLeader*/); leaderFilter != nil {
finalFilters = append(l.filters, leaderFilter)
}
<<<<<<< HEAD
target := filter.NewCandidates([]*core.StoreInfo{solver.target}).
=======
target := filter.NewCandidates(l.R, []*core.StoreInfo{solver.Target}).
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
target := filter.NewCandidates(l.R, []*core.StoreInfo{solver.target}).
FilterTarget(conf, nil, l.filterCounter, finalFilters...).
PickFirst()
if target == nil {
Expand Down
6 changes: 1 addition & 5 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
filter.NewPlacementSafeguard(s.GetName(), solver.GetOpts(), solver.GetBasicCluster(), solver.GetRuleManager(),
solver.region, solver.source, solver.fit),
}
<<<<<<< HEAD
candidates := filter.NewCandidates(dstStores).FilterTarget(solver.GetOpts(), collector, s.filterCounter, filters...)
=======
candidates := filter.NewCandidates(s.R, dstStores).FilterTarget(conf, collector, s.filterCounter, filters...)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
candidates := filter.NewCandidates(s.R, dstStores).FilterTarget(solver.GetOpts(), collector, s.filterCounter, filters...)
if len(candidates.Stores) != 0 {
solver.step++
}
Expand Down
21 changes: 2 additions & 19 deletions pkg/schedule/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,13 @@ func intervalGrow(x time.Duration, maxInterval time.Duration, typ intervalGrowth

// BaseScheduler is a basic scheduler for all other complex scheduler
type BaseScheduler struct {
<<<<<<< HEAD
OpController *schedule.OperatorController
}

// NewBaseScheduler returns a basic scheduler
func NewBaseScheduler(opController *schedule.OperatorController) *BaseScheduler {
return &BaseScheduler{OpController: opController}
=======
OpController *operator.Controller
R *rand.Rand

name string
tp types.CheckerSchedulerType
conf schedulerConfig
}

// NewBaseScheduler returns a basic scheduler
func NewBaseScheduler(
opController *operator.Controller,
tp types.CheckerSchedulerType,
conf schedulerConfig,
) *BaseScheduler {
return &BaseScheduler{OpController: opController, tp: tp, conf: conf, R: rand.New(rand.NewSource(time.Now().UnixNano()))}
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
func NewBaseScheduler(opController *schedule.OperatorController) *BaseScheduler {
return &BaseScheduler{OpController: opController, R: rand.New(rand.NewSource(time.Now().UnixNano()))}
}

func (s *BaseScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down
31 changes: 5 additions & 26 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool

func (s *evictLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
evictLeaderCounter.Inc()
<<<<<<< HEAD
return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil
=======
return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf), nil
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil
}

func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator {
Expand All @@ -261,18 +257,10 @@ type evictLeaderStoresConf interface {
getKeyRangesByID(id uint64) []core.KeyRange
}

<<<<<<< HEAD
func scheduleEvictLeaderBatch(name, typ string, cluster schedule.Cluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator {
=======
func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator {
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
func scheduleEvictLeaderBatch(r *rand.Rand, name, typ string, cluster schedule.Cluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator {
var ops []*operator.Operator
for i := 0; i < batchSize; i++ {
<<<<<<< HEAD
once := scheduleEvictLeaderOnce(name, typ, cluster, conf)
=======
once := scheduleEvictLeaderOnce(r, name, cluster, conf)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
once := scheduleEvictLeaderOnce(r, name, typ, cluster, conf)
// no more regions
if len(once) == 0 {
break
Expand All @@ -286,11 +274,7 @@ func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerC
return ops
}

<<<<<<< HEAD
func scheduleEvictLeaderOnce(name, typ string, cluster schedule.Cluster, conf evictLeaderStoresConf) []*operator.Operator {
=======
func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator {
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
func scheduleEvictLeaderOnce(r *rand.Rand, name, typ string, cluster schedule.Cluster, conf evictLeaderStoresConf) []*operator.Operator {
stores := conf.getStores()
ops := make([]*operator.Operator, 0, len(stores))
for _, storeID := range stores {
Expand Down Expand Up @@ -321,13 +305,8 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl
}

filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent})
<<<<<<< HEAD
candidates := filter.NewCandidates(cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetOpts(), nil, nil, filters...)
=======
candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
FilterTarget(cluster.GetOpts(), nil, nil, filters...)
// Compatible with old TiKV transfer leader logic.
target := candidates.RandomPick()
targets := candidates.PickAll()
Expand Down
7 changes: 1 addition & 6 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,8 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster schedule.Cluster) {
cluster.SlowStoreRecovered(evictSlowStore)
}

<<<<<<< HEAD
func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster schedule.Cluster) []*operator.Operator {
return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize)
=======
func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator {
return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize)
}

func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
Expand Down
6 changes: 1 addition & 5 deletions pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,7 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster schedule.Cluster)
return nil
}
storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1)
<<<<<<< HEAD
return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize)
=======
return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize)
}

func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
Expand Down
7 changes: 1 addition & 6 deletions pkg/schedule/schedulers/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,8 @@ func (s *labelScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*ope
}
f := filter.NewExcludedFilter(s.GetName(), nil, excludeStores)

<<<<<<< HEAD
target := filter.NewCandidates(cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true, OperatorLevel: constant.Medium}, f).
=======
target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium}, f).
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
FilterTarget(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true, OperatorLevel: constant.Medium}, f).
RandomPick()
if target == nil {
log.Debug("label scheduler no target found for region", zap.Uint64("region-id", region.GetID()))
Expand Down
7 changes: 1 addition & 6 deletions pkg/schedule/schedulers/random_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,8 @@ func (s *randomMergeScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool
func (s *randomMergeScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
randomMergeCounter.Inc()

<<<<<<< HEAD
store := filter.NewCandidates(cluster.GetStores()).
FilterSource(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true, OperatorLevel: constant.Low}).
=======
store := filter.NewCandidates(s.R, cluster.GetStores()).
FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true, OperatorLevel: constant.Low}).
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
FilterSource(cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true, OperatorLevel: constant.Low}).
RandomPick()
if store == nil {
randomMergeNoSourceStoreCounter.Inc()
Expand Down
7 changes: 1 addition & 6 deletions pkg/schedule/schedulers/shuffle_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,8 @@ func (s *shuffleLeaderScheduler) Schedule(cluster schedule.Cluster, dryRun bool)
// 1. random select a valid store.
// 2. transfer a leader to the store.
shuffleLeaderCounter.Inc()
<<<<<<< HEAD
targetStore := filter.NewCandidates(cluster.GetStores()).
FilterTarget(cluster.GetOpts(), nil, nil, s.filters...).
=======
targetStore := filter.NewCandidates(s.R, cluster.GetStores()).
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, s.filters...).
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
FilterTarget(cluster.GetOpts(), nil, nil, s.filters...).
RandomPick()
if targetStore == nil {
shuffleLeaderNoTargetStoreCounter.Inc()
Expand Down
15 changes: 2 additions & 13 deletions pkg/schedule/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,9 @@ func (s *shuffleRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool)
return []*operator.Operator{op}, nil
}

<<<<<<< HEAD
func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster schedule.Cluster) (*core.RegionInfo, *metapb.Peer) {
candidates := filter.NewCandidates(cluster.GetStores()).
FilterSource(cluster.GetOpts(), nil, nil, s.filters...).
=======
func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluster) (*core.RegionInfo, *metapb.Peer) {
candidates := filter.NewCandidates(s.R, cluster.GetStores()).
FilterSource(cluster.GetSchedulerConfig(), nil, nil, s.filters...).
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
FilterSource(cluster.GetOpts(), nil, nil, s.filters...).
Shuffle()

pendingFilter := filter.NewRegionPendingFilter()
Expand Down Expand Up @@ -158,13 +152,8 @@ func (s *shuffleRegionScheduler) scheduleAddPeer(cluster schedule.Cluster, regio
scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster.GetOpts(), cluster.GetBasicCluster(), cluster.GetRuleManager(), region, store, nil)
excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIDs())

<<<<<<< HEAD
target := filter.NewCandidates(cluster.GetStores()).
FilterTarget(cluster.GetOpts(), nil, nil, append(s.filters, scoreGuard, excludedFilter)...).
=======
target := filter.NewCandidates(s.R, cluster.GetStores()).
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, append(s.filters, scoreGuard, excludedFilter)...).
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
FilterTarget(cluster.GetOpts(), nil, nil, append(s.filters, scoreGuard, excludedFilter)...).
RandomPick()
if target == nil {
return nil
Expand Down
Loading

0 comments on commit bf66e8f

Please sign in to comment.