diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go new file mode 100644 index 00000000000..46b4947b6cd --- /dev/null +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -0,0 +1,582 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "context" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/errs" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "go.uber.org/zap" +) + +const maxScheduleRetries = 10 + +var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny") + +// Controller is used to manage all schedulers. +type Controller struct { + syncutil.RWMutex + wg sync.WaitGroup + ctx context.Context + cluster sche.SchedulerCluster + storage endpoint.ConfigStorage + // schedulers is used to manage all schedulers, which will only be initialized + // and used in the PD leader service mode now. + schedulers map[string]*ScheduleController + // schedulerHandlers is used to manage the HTTP handlers of schedulers, + // which will only be initialized and used in the API service mode now. + schedulerHandlers map[string]http.Handler + opController *operator.Controller +} + +// NewController creates a scheduler controller. +func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller { + return &Controller{ + ctx: ctx, + cluster: cluster, + storage: storage, + schedulers: make(map[string]*ScheduleController), + schedulerHandlers: make(map[string]http.Handler), + opController: opController, + } +} + +// Wait waits on all schedulers to exit. +func (c *Controller) Wait() { + c.wg.Wait() +} + +// GetScheduler returns a schedule controller by name. +func (c *Controller) GetScheduler(name string) *ScheduleController { + c.RLock() + defer c.RUnlock() + return c.schedulers[name] +} + +// GetSchedulerNames returns all names of schedulers. +func (c *Controller) GetSchedulerNames() []string { + c.RLock() + defer c.RUnlock() + names := make([]string, 0, len(c.schedulers)) + for name := range c.schedulers { + names = append(names, name) + } + return names +} + +// GetSchedulerHandlers returns all handlers of schedulers. +func (c *Controller) GetSchedulerHandlers() map[string]http.Handler { + c.RLock() + defer c.RUnlock() + if len(c.schedulerHandlers) > 0 { + return c.schedulerHandlers + } + handlers := make(map[string]http.Handler, len(c.schedulers)) + for name, scheduler := range c.schedulers { + handlers[name] = scheduler.Scheduler + } + return handlers +} + +// CollectSchedulerMetrics collects metrics of all schedulers. +func (c *Controller) CollectSchedulerMetrics() { + c.RLock() + defer c.RUnlock() + for _, s := range c.schedulers { + var allowScheduler float64 + // If the scheduler is not allowed to schedule, it will disappear in Grafana panel. + // See issue #1341. + if !s.IsPaused() && !c.isSchedulingHalted() { + allowScheduler = 1 + } + schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler) + } +} + +func (c *Controller) isSchedulingHalted() bool { + return c.cluster.GetSchedulerConfig().IsSchedulingHalted() +} + +// ResetSchedulerMetrics resets metrics of all schedulers. +func (c *Controller) ResetSchedulerMetrics() { + schedulerStatusGauge.Reset() +} + +// AddSchedulerHandler adds the HTTP handler for a scheduler. +func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) error { + c.Lock() + defer c.Unlock() + + name := scheduler.GetName() + if _, ok := c.schedulerHandlers[name]; ok { + return errs.ErrSchedulerExisted.FastGenByArgs() + } + + c.schedulerHandlers[name] = scheduler + if err := SaveSchedulerConfig(c.storage, scheduler); err != nil { + log.Error("can not save HTTP scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } + c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args) + return nil +} + +// RemoveSchedulerHandler removes the HTTP handler for a scheduler. +func (c *Controller) RemoveSchedulerHandler(name string) error { + c.Lock() + defer c.Unlock() + if c.cluster == nil { + return errs.ErrNotBootstrapped.FastGenByArgs() + } + s, ok := c.schedulerHandlers[name] + if !ok { + return errs.ErrSchedulerNotFound.FastGenByArgs() + } + + conf := c.cluster.GetSchedulerConfig() + conf.RemoveSchedulerCfg(s.(Scheduler).GetType()) + if err := conf.Persist(c.storage); err != nil { + log.Error("the option can not persist scheduler config", errs.ZapError(err)) + return err + } + + if err := c.storage.RemoveSchedulerConfig(name); err != nil { + log.Error("can not remove the scheduler config", errs.ZapError(err)) + return err + } + + delete(c.schedulerHandlers, name) + + return nil +} + +// AddScheduler adds a scheduler. +func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { + c.Lock() + defer c.Unlock() + + if _, ok := c.schedulers[scheduler.GetName()]; ok { + return errs.ErrSchedulerExisted.FastGenByArgs() + } + + s := NewScheduleController(c.ctx, c.cluster, c.opController, scheduler) + if err := s.Scheduler.Prepare(c.cluster); err != nil { + return err + } + + c.wg.Add(1) + go c.runScheduler(s) + c.schedulers[s.Scheduler.GetName()] = s + if err := SaveSchedulerConfig(c.storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } + c.cluster.GetSchedulerConfig().AddSchedulerCfg(s.Scheduler.GetType(), args) + return nil +} + +// RemoveScheduler removes a scheduler by name. +func (c *Controller) RemoveScheduler(name string) error { + c.Lock() + defer c.Unlock() + if c.cluster == nil { + return errs.ErrNotBootstrapped.FastGenByArgs() + } + s, ok := c.schedulers[name] + if !ok { + return errs.ErrSchedulerNotFound.FastGenByArgs() + } + + conf := c.cluster.GetSchedulerConfig() + conf.RemoveSchedulerCfg(s.Scheduler.GetType()) + if err := conf.Persist(c.storage); err != nil { + log.Error("the option can not persist scheduler config", errs.ZapError(err)) + return err + } + + if err := c.storage.RemoveSchedulerConfig(name); err != nil { + log.Error("can not remove the scheduler config", errs.ZapError(err)) + return err + } + + s.Stop() + schedulerStatusGauge.DeleteLabelValues(name, "allow") + delete(c.schedulers, name) + + return nil +} + +// PauseOrResumeScheduler pauses or resumes a scheduler by name. +func (c *Controller) PauseOrResumeScheduler(name string, t int64) error { + c.Lock() + defer c.Unlock() + if c.cluster == nil { + return errs.ErrNotBootstrapped.FastGenByArgs() + } + var s []*ScheduleController + if name != "all" { + sc, ok := c.schedulers[name] + if !ok { + return errs.ErrSchedulerNotFound.FastGenByArgs() + } + s = append(s, sc) + } else { + for _, sc := range c.schedulers { + s = append(s, sc) + } + } + var err error + for _, sc := range s { + var delayAt, delayUntil int64 + if t > 0 { + delayAt = time.Now().Unix() + delayUntil = delayAt + t + } + sc.SetDelay(delayAt, delayUntil) + } + return err +} + +// ReloadSchedulerConfig reloads a scheduler's config if it exists. +func (c *Controller) ReloadSchedulerConfig(name string) error { + if exist, _ := c.IsSchedulerExisted(name); !exist { + return nil + } + return c.GetScheduler(name).ReloadConfig() +} + +// IsSchedulerAllowed returns whether a scheduler is allowed to schedule, a scheduler is not allowed to schedule if it is paused or blocked by unsafe recovery. +func (c *Controller) IsSchedulerAllowed(name string) (bool, error) { + c.RLock() + defer c.RUnlock() + if c.cluster == nil { + return false, errs.ErrNotBootstrapped.FastGenByArgs() + } + s, ok := c.schedulers[name] + if !ok { + return false, errs.ErrSchedulerNotFound.FastGenByArgs() + } + return s.AllowSchedule(false), nil +} + +// IsSchedulerPaused returns whether a scheduler is paused. +func (c *Controller) IsSchedulerPaused(name string) (bool, error) { + c.RLock() + defer c.RUnlock() + if c.cluster == nil { + return false, errs.ErrNotBootstrapped.FastGenByArgs() + } + s, ok := c.schedulers[name] + if !ok { + return false, errs.ErrSchedulerNotFound.FastGenByArgs() + } + return s.IsPaused(), nil +} + +// IsSchedulerDisabled returns whether a scheduler is disabled. +func (c *Controller) IsSchedulerDisabled(name string) (bool, error) { + c.RLock() + defer c.RUnlock() + if c.cluster == nil { + return false, errs.ErrNotBootstrapped.FastGenByArgs() + } + s, ok := c.schedulers[name] + if !ok { + return false, errs.ErrSchedulerNotFound.FastGenByArgs() + } + return c.cluster.GetSchedulerConfig().IsSchedulerDisabled(s.Scheduler.GetType()), nil +} + +// IsSchedulerExisted returns whether a scheduler is existed. +func (c *Controller) IsSchedulerExisted(name string) (bool, error) { + c.RLock() + defer c.RUnlock() + if c.cluster == nil { + return false, errs.ErrNotBootstrapped.FastGenByArgs() + } + _, existScheduler := c.schedulers[name] + _, existHandler := c.schedulerHandlers[name] + if !existScheduler && !existHandler { + return false, errs.ErrSchedulerNotFound.FastGenByArgs() + } + return true, nil +} + +func (c *Controller) runScheduler(s *ScheduleController) { + defer logutil.LogPanic() + defer c.wg.Done() + defer s.Scheduler.Cleanup(c.cluster) + + ticker := time.NewTicker(s.GetInterval()) + defer ticker.Stop() + for { + select { + case <-ticker.C: + diagnosable := s.IsDiagnosticAllowed() + if !s.AllowSchedule(diagnosable) { + continue + } + if op := s.Schedule(diagnosable); len(op) > 0 { + added := c.opController.AddWaitingOperator(op...) + log.Debug("add operator", zap.Int("added", added), zap.Int("total", len(op)), zap.String("scheduler", s.Scheduler.GetName())) + } + // Note: we reset the ticker here to support updating configuration dynamically. + ticker.Reset(s.GetInterval()) + case <-s.Ctx().Done(): + log.Info("scheduler has been stopped", + zap.String("scheduler-name", s.Scheduler.GetName()), + errs.ZapError(s.Ctx().Err())) + return + } + } +} + +// GetPausedSchedulerDelayAt returns paused timestamp of a paused scheduler +func (c *Controller) GetPausedSchedulerDelayAt(name string) (int64, error) { + c.RLock() + defer c.RUnlock() + if c.cluster == nil { + return -1, errs.ErrNotBootstrapped.FastGenByArgs() + } + s, ok := c.schedulers[name] + if !ok { + return -1, errs.ErrSchedulerNotFound.FastGenByArgs() + } + return s.GetDelayAt(), nil +} + +// GetPausedSchedulerDelayUntil returns the delay time until the scheduler is paused. +func (c *Controller) GetPausedSchedulerDelayUntil(name string) (int64, error) { + c.RLock() + defer c.RUnlock() + if c.cluster == nil { + return -1, errs.ErrNotBootstrapped.FastGenByArgs() + } + s, ok := c.schedulers[name] + if !ok { + return -1, errs.ErrSchedulerNotFound.FastGenByArgs() + } + return s.GetDelayUntil(), nil +} + +// CheckTransferWitnessLeader determines if transfer leader is required, then sends to the scheduler if needed +func (c *Controller) CheckTransferWitnessLeader(region *core.RegionInfo) { + if core.NeedTransferWitnessLeader(region) { + c.RLock() + s, ok := c.schedulers[TransferWitnessLeaderName] + c.RUnlock() + if ok { + select { + case RecvRegionInfo(s.Scheduler) <- region: + default: + log.Warn("drop transfer witness leader due to recv region channel full", zap.Uint64("region-id", region.GetID())) + } + } + } +} + +// ScheduleController is used to manage a scheduler. +type ScheduleController struct { + Scheduler + cluster sche.SchedulerCluster + opController *operator.Controller + nextInterval time.Duration + ctx context.Context + cancel context.CancelFunc + delayAt int64 + delayUntil int64 + diagnosticRecorder *DiagnosticRecorder +} + +// NewScheduleController creates a new ScheduleController. +func NewScheduleController(ctx context.Context, cluster sche.SchedulerCluster, opController *operator.Controller, s Scheduler) *ScheduleController { + ctx, cancel := context.WithCancel(ctx) + return &ScheduleController{ + Scheduler: s, + cluster: cluster, + opController: opController, + nextInterval: s.GetMinInterval(), + ctx: ctx, + cancel: cancel, + diagnosticRecorder: NewDiagnosticRecorder(s.GetName(), cluster.GetSchedulerConfig()), + } +} + +// Ctx returns the context of ScheduleController +func (s *ScheduleController) Ctx() context.Context { + return s.ctx +} + +// Stop stops the ScheduleController +func (s *ScheduleController) Stop() { + s.cancel() +} + +// Schedule tries to create some operators. +func (s *ScheduleController) Schedule(diagnosable bool) []*operator.Operator { + for i := 0; i < maxScheduleRetries; i++ { + // no need to retry if schedule should stop to speed exit + select { + case <-s.ctx.Done(): + return nil + default: + } + cacheCluster := newCacheCluster(s.cluster) + // we need only process diagnostic once in the retry loop + diagnosable = diagnosable && i == 0 + ops, plans := s.Scheduler.Schedule(cacheCluster, diagnosable) + if diagnosable { + s.diagnosticRecorder.SetResultFromPlans(ops, plans) + } + foundDisabled := false + for _, op := range ops { + if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil { + region := s.cluster.GetRegion(op.RegionID()) + if region == nil { + continue + } + if labelMgr.ScheduleDisabled(region) { + denySchedulersByLabelerCounter.Inc() + foundDisabled = true + break + } + } + } + if len(ops) > 0 { + // If we have schedule, reset interval to the minimal interval. + s.nextInterval = s.Scheduler.GetMinInterval() + // try regenerating operators + if foundDisabled { + continue + } + return ops + } + } + s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval) + return nil +} + +// DiagnoseDryRun returns the operators and plans of a scheduler. +func (s *ScheduleController) DiagnoseDryRun() ([]*operator.Operator, []plan.Plan) { + cacheCluster := newCacheCluster(s.cluster) + return s.Scheduler.Schedule(cacheCluster, true) +} + +// GetInterval returns the interval of scheduling for a scheduler. +func (s *ScheduleController) GetInterval() time.Duration { + return s.nextInterval +} + +// SetInterval sets the interval of scheduling for a scheduler. for test purpose. +func (s *ScheduleController) SetInterval(interval time.Duration) { + s.nextInterval = interval +} + +// AllowSchedule returns if a scheduler is allowed to +func (s *ScheduleController) AllowSchedule(diagnosable bool) bool { + if !s.Scheduler.IsScheduleAllowed(s.cluster) { + if diagnosable { + s.diagnosticRecorder.SetResultFromStatus(Pending) + } + return false + } + if s.isSchedulingHalted() { + if diagnosable { + s.diagnosticRecorder.SetResultFromStatus(Halted) + } + return false + } + if s.IsPaused() { + if diagnosable { + s.diagnosticRecorder.SetResultFromStatus(Paused) + } + return false + } + return true +} + +func (s *ScheduleController) isSchedulingHalted() bool { + return s.cluster.GetSchedulerConfig().IsSchedulingHalted() +} + +// IsPaused returns if a scheduler is paused. +func (s *ScheduleController) IsPaused() bool { + delayUntil := atomic.LoadInt64(&s.delayUntil) + return time.Now().Unix() < delayUntil +} + +// GetDelayAt returns paused timestamp of a paused scheduler +func (s *ScheduleController) GetDelayAt() int64 { + if s.IsPaused() { + return atomic.LoadInt64(&s.delayAt) + } + return 0 +} + +// GetDelayUntil returns resume timestamp of a paused scheduler +func (s *ScheduleController) GetDelayUntil() int64 { + if s.IsPaused() { + return atomic.LoadInt64(&s.delayUntil) + } + return 0 +} + +// SetDelay sets the delay of a scheduler. +func (s *ScheduleController) SetDelay(delayAt, delayUntil int64) { + atomic.StoreInt64(&s.delayAt, delayAt) + atomic.StoreInt64(&s.delayUntil, delayUntil) +} + +// GetDiagnosticRecorder returns the diagnostic recorder of a scheduler. +func (s *ScheduleController) GetDiagnosticRecorder() *DiagnosticRecorder { + return s.diagnosticRecorder +} + +// IsDiagnosticAllowed returns if a scheduler is allowed to do diagnostic. +func (s *ScheduleController) IsDiagnosticAllowed() bool { + return s.diagnosticRecorder.IsAllowed() +} + +// cacheCluster include cache info to improve the performance. +type cacheCluster struct { + sche.SchedulerCluster + stores []*core.StoreInfo +} + +// GetStores returns store infos from cache +func (c *cacheCluster) GetStores() []*core.StoreInfo { + return c.stores +} + +// newCacheCluster constructor for cache +func newCacheCluster(c sche.SchedulerCluster) *cacheCluster { + return &cacheCluster{ + SchedulerCluster: c, + stores: c.GetStores(), + } +} diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index b3ce608ab37..385374e97eb 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1275,3 +1275,1441 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error { return nil } +<<<<<<< HEAD +======= + +func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind operator.OpKind, steps ...operator.OpStep) *operator.Operator { + return operator.NewTestOperator(regionID, regionEpoch, kind, steps...) +} + +func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { + id, err := c.AllocID() + if err != nil { + return nil, err + } + return &metapb.Peer{Id: id, StoreId: storeID}, nil +} + +func (c *testCluster) addRegionStore(storeID uint64, regionCount int, regionSizes ...uint64) error { + var regionSize uint64 + if len(regionSizes) == 0 { + regionSize = uint64(regionCount) * 10 + } else { + regionSize = regionSizes[0] + } + + stats := &pdpb.StoreStats{} + stats.Capacity = 100 * units.GiB + stats.UsedSize = regionSize * units.MiB + stats.Available = stats.Capacity - stats.UsedSize + newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetRegionCount(regionCount), + core.SetRegionSize(int64(regionSize)), + core.SetLastHeartbeatTS(time.Now()), + ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) addLeaderRegion(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) error { + region := newTestRegionMeta(regionID) + leader, _ := c.AllocPeer(leaderStoreID) + region.Peers = []*metapb.Peer{leader} + for _, followerStoreID := range followerStoreIDs { + peer, _ := c.AllocPeer(followerStoreID) + region.Peers = append(region.Peers, peer) + } + regionInfo := core.NewRegionInfo(region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10)) + return c.putRegion(regionInfo) +} + +func (c *testCluster) updateLeaderCount(storeID uint64, leaderCount int) error { + store := c.GetStore(storeID) + newStore := store.Clone( + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + ) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error { + stats := &pdpb.StoreStats{} + newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + core.SetLastHeartbeatTS(time.Now()), + ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) setStoreDown(storeID uint64) error { + store := c.GetStore(storeID) + newStore := store.Clone( + core.SetStoreState(metapb.StoreState_Up), + core.SetLastHeartbeatTS(typeutil.ZeroTime), + ) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) setStoreOffline(storeID uint64) error { + store := c.GetStore(storeID) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, false)) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) error { + // regions load from etcd will have no leader + region := newTestRegionMeta(regionID) + region.Peers = []*metapb.Peer{} + for _, id := range followerStoreIDs { + peer, _ := c.AllocPeer(id) + region.Peers = append(region.Peers, peer) + } + return c.putRegion(core.NewRegionInfo(region, nil)) +} + +func TestBasic(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + re.NoError(tc.addLeaderRegion(1, 1)) + + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) + re.Equal(op1.RegionID(), oc.GetOperator(1).RegionID()) + + // Region 1 already has an operator, cannot add another one. + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op2) + re.Equal(uint64(0), oc.OperatorCount(operator.OpRegion)) + + // Remove the operator manually, then we can add a new operator. + re.True(oc.RemoveOperator(op1)) + op3 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op3) + re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) + re.Equal(op3.RegionID(), oc.GetOperator(1).RegionID()) +} + +func TestDispatch(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + co.GetPrepareChecker().SetPrepared() + // Transfer peer from store 4 to store 1. + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addRegionStore(3, 30)) + re.NoError(tc.addRegionStore(2, 20)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + // Transfer leader from store 4 to store 2. + re.NoError(tc.updateLeaderCount(4, 50)) + re.NoError(tc.updateLeaderCount(3, 50)) + re.NoError(tc.updateLeaderCount(2, 20)) + re.NoError(tc.updateLeaderCount(1, 10)) + re.NoError(tc.addLeaderRegion(2, 4, 3, 2)) + + go co.RunUntilStop() + + // Wait for schedule and turn off balance. + waitOperator(re, co, 1) + controller := co.GetSchedulersController() + operatorutil.CheckTransferPeer(re, co.GetOperatorController().GetOperator(1), operator.OpKind(0), 4, 1) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + waitOperator(re, co, 2) + operatorutil.CheckTransferLeader(re, co.GetOperatorController().GetOperator(2), operator.OpKind(0), 4, 2) + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + + stream := mockhbstream.NewHeartbeatStream() + + // Transfer peer. + region := tc.GetRegion(1).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitRemovePeer(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Transfer leader. + region = tc.GetRegion(2).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + waitTransferLeader(re, stream, region, 2) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func dispatchHeartbeat(co *schedule.Coordinator, region *core.RegionInfo, stream hbstream.HeartbeatStream) error { + co.GetHeartbeatStreams().BindStream(region.GetLeader().GetStoreId(), stream) + if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone()); err != nil { + return err + } + co.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, nil) + return nil +} + +func TestCollectMetricsConcurrent(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, func(tc *testCluster) { + tc.regionStats = statistics.NewRegionStatistics( + tc.GetBasicCluster(), + tc.GetOpts(), + nil) + }, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + // Make sure there are no problem when concurrent write and read + var wg sync.WaitGroup + count := 10 + wg.Add(count + 1) + for i := 0; i <= count; i++ { + go func(i int) { + defer wg.Done() + for j := 0; j < 1000; j++ { + re.NoError(tc.addRegionStore(uint64(i%5), rand.Intn(200))) + } + }(i) + } + controller := co.GetSchedulersController() + for i := 0; i < 1000; i++ { + co.CollectHotSpotMetrics() + controller.CollectSchedulerMetrics() + co.GetCluster().(*RaftCluster).collectClusterMetrics() + } + co.ResetHotSpotMetrics() + controller.ResetSchedulerMetrics() + co.GetCluster().(*RaftCluster).resetClusterMetrics() + wg.Wait() +} + +func TestCollectMetrics(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, func(tc *testCluster) { + tc.regionStats = statistics.NewRegionStatistics( + tc.GetBasicCluster(), + tc.GetOpts(), + nil) + }, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + count := 10 + for i := 0; i <= count; i++ { + for k := 0; k < 200; k++ { + item := &statistics.HotPeerStat{ + StoreID: uint64(i % 5), + RegionID: uint64(i*1000 + k), + Loads: []float64{10, 20, 30}, + HotDegree: 10, + AntiCount: utils.HotRegionAntiCount, // for write + } + tc.hotStat.HotCache.Update(item, utils.Write) + } + } + controller := co.GetSchedulersController() + for i := 0; i < 1000; i++ { + co.CollectHotSpotMetrics() + controller.CollectSchedulerMetrics() + co.GetCluster().(*RaftCluster).collectClusterMetrics() + } + stores := co.GetCluster().GetStores() + regionStats := co.GetCluster().RegionWriteStats() + status1 := statistics.CollectHotPeerInfos(stores, regionStats) + status2 := statistics.GetHotStatus(stores, co.GetCluster().GetStoresLoads(), regionStats, utils.Write, co.GetCluster().GetSchedulerConfig().IsTraceRegionFlow()) + for _, s := range status2.AsLeader { + s.Stats = nil + } + for _, s := range status2.AsPeer { + s.Stats = nil + } + re.Equal(status1, status2) + co.ResetHotSpotMetrics() + controller.ResetSchedulerMetrics() + co.GetCluster().(*RaftCluster).resetClusterMetrics() +} + +func prepare(setCfg func(*sc.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) { + ctx, cancel := context.WithCancel(context.Background()) + cfg, opt, err := newTestScheduleConfig() + re.NoError(err) + if setCfg != nil { + setCfg(cfg) + } + tc := newTestCluster(ctx, opt) + hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */) + if setTc != nil { + setTc(tc) + } + co := schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + if run != nil { + run(co) + } + return tc, co, func() { + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + hbStreams.Close() + cancel() + } +} + +func checkRegionAndOperator(re *require.Assertions, tc *testCluster, co *schedule.Coordinator, regionID uint64, expectAddOperator int) { + ops := co.GetCheckerController().CheckRegion(tc.GetRegion(regionID)) + if ops == nil { + re.Equal(0, expectAddOperator) + } else { + re.Equal(expectAddOperator, co.GetOperatorController().AddWaitingOperator(ops...)) + } +} + +func TestCheckRegion(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(nil, nil, nil, re) + hbStreams, opt := co.GetHeartbeatStreams(), tc.opt + defer cleanup() + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addLeaderRegion(1, 2, 3)) + checkRegionAndOperator(re, tc, co, 1, 1) + operatorutil.CheckAddPeer(re, co.GetOperatorController().GetOperator(1), operator.OpReplica, 1) + checkRegionAndOperator(re, tc, co, 1, 0) + + r := tc.GetRegion(1) + p := &metapb.Peer{Id: 1, StoreId: 1, Role: metapb.PeerRole_Learner} + r = r.Clone( + core.WithAddPeer(p), + core.WithPendingPeers(append(r.GetPendingPeers(), p)), + ) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 0) + + tc = newTestCluster(ctx, opt) + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 0) + r = r.Clone(core.WithPendingPeers(nil)) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 1) + op := co.GetOperatorController().GetOperator(1) + re.Equal(1, op.Len()) + re.Equal(uint64(1), op.Step(0).(operator.PromoteLearner).ToStore) + checkRegionAndOperator(re, tc, co, 1, 0) +} + +func TestCheckRegionWithScheduleDeny(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addLeaderRegion(1, 2, 3)) + region := tc.GetRegion(1) + re.NotNil(region) + // test with label schedule=deny + labelerManager := tc.GetRegionLabeler() + labelerManager.SetLabelRule(&labeler.LabelRule{ + ID: "schedulelabel", + Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}}, + RuleType: labeler.KeyRange, + Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}}, + }) + + // should allow to do rule checker + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 1, 1) + + // should not allow to merge + tc.opt.SetSplitMergeInterval(time.Duration(0)) + re.NoError(tc.addLeaderRegion(2, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(3, 2, 3, 4)) + region = tc.GetRegion(2) + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 0) + // delete label rule, should allow to do merge + labelerManager.DeleteLabelRule("schedulelabel") + re.False(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 2) +} + +func TestCheckerIsBusy(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + cfg.ReplicaScheduleLimit = 0 // ensure replica checker is busy + cfg.MergeScheduleLimit = 10 + }, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 0)) + num := 1 + typeutil.MaxUint64(tc.opt.GetReplicaScheduleLimit(), tc.opt.GetMergeScheduleLimit()) + var operatorKinds = []operator.OpKind{ + operator.OpReplica, operator.OpRegion | operator.OpMerge, + } + for i, operatorKind := range operatorKinds { + for j := uint64(0); j < num; j++ { + regionID := j + uint64(i+1)*num + re.NoError(tc.addLeaderRegion(regionID, 1)) + switch operatorKind { + case operator.OpReplica: + op := newTestOperator(regionID, tc.GetRegion(regionID).GetRegionEpoch(), operatorKind) + re.Equal(1, co.GetOperatorController().AddWaitingOperator(op)) + case operator.OpRegion | operator.OpMerge: + if regionID%2 == 1 { + ops, err := operator.CreateMergeRegionOperator("merge-region", co.GetCluster(), tc.GetRegion(regionID), tc.GetRegion(regionID-1), operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + } + } + } + } + checkRegionAndOperator(re, tc, co, num, 0) +} + +func TestMergeRegionCancelOneOperator(t *testing.T) { + re := require.New(t) + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + source := core.NewRegionInfo( + &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("a"), + }, + nil, + ) + target := core.NewRegionInfo( + &metapb.Region{ + Id: 2, + StartKey: []byte("a"), + EndKey: []byte("t"), + }, + nil, + ) + re.NoError(tc.putRegion(source)) + re.NoError(tc.putRegion(target)) + + // Cancel source region. + ops, err := operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel source operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(source.GetID())) + re.Len(co.GetOperatorController().GetOperators(), 0) + + // Cancel target region. + ops, err = operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel target operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(target.GetID())) + re.Len(co.GetOperatorController().GetOperators(), 0) +} + +func TestReplica(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + // Turn off balance. + cfg.LeaderScheduleLimit = 0 + cfg.RegionScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(4, 4)) + + stream := mockhbstream.NewHeartbeatStream() + + // Add peer to store 1. + re.NoError(tc.addLeaderRegion(1, 2, 3)) + region := tc.GetRegion(1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Peer in store 3 is down, remove peer in store 3 and add peer to store 4. + re.NoError(tc.setStoreDown(3)) + downPeer := &pdpb.PeerStats{ + Peer: region.GetStorePeer(3), + DownSeconds: 24 * 60 * 60, + } + region = region.Clone( + core.WithDownPeers(append(region.GetDownPeers(), downPeer)), + ) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 4) + region = region.Clone(core.WithDownPeers(nil)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Remove peer from store 4. + re.NoError(tc.addLeaderRegion(2, 1, 2, 3, 4)) + region = tc.GetRegion(2) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitRemovePeer(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Remove offline peer directly when it's pending. + re.NoError(tc.addLeaderRegion(3, 1, 2, 3)) + re.NoError(tc.setStoreOffline(3)) + region = tc.GetRegion(3) + region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(3)})) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func TestCheckCache(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + // Turn off replica scheduling. + cfg.ReplicaScheduleLimit = 0 + }, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 0)) + re.NoError(tc.addRegionStore(2, 0)) + re.NoError(tc.addRegionStore(3, 0)) + + // Add a peer with two replicas. + re.NoError(tc.addLeaderRegion(1, 2, 3)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/break-patrol", `return`)) + + // case 1: operator cannot be created due to replica-schedule-limit restriction + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(co.GetCheckerController().GetWaitingRegions(), 1) + + // cancel the replica-schedule-limit restriction + cfg := tc.GetScheduleConfig() + cfg.ReplicaScheduleLimit = 10 + tc.SetScheduleConfig(cfg) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + oc := co.GetOperatorController() + re.Len(oc.GetOperators(), 1) + re.Empty(co.GetCheckerController().GetWaitingRegions()) + + // case 2: operator cannot be created due to store limit restriction + oc.RemoveOperator(oc.GetOperator(1)) + tc.SetStoreLimit(1, storelimit.AddPeer, 0) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(co.GetCheckerController().GetWaitingRegions(), 1) + + // cancel the store limit restriction + tc.SetStoreLimit(1, storelimit.AddPeer, 10) + time.Sleep(time.Second) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(oc.GetOperators(), 1) + re.Empty(co.GetCheckerController().GetWaitingRegions()) + + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol")) +} + +func TestPeerState(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + // Transfer peer from store 4 to store 1. + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addRegionStore(2, 10)) + re.NoError(tc.addRegionStore(3, 10)) + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + stream := mockhbstream.NewHeartbeatStream() + + // Wait for schedule. + waitOperator(re, co, 1) + operatorutil.CheckTransferPeer(re, co.GetOperatorController().GetOperator(1), operator.OpKind(0), 4, 1) + + region := tc.GetRegion(1).Clone() + + // Add new peer. + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + + // If the new peer is pending, the operator will not finish. + region = region.Clone(core.WithPendingPeers(append(region.GetPendingPeers(), region.GetStorePeer(1)))) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + re.NotNil(co.GetOperatorController().GetOperator(region.GetID())) + + // The new peer is not pending now, the operator will finish. + // And we will proceed to remove peer in store 4. + region = region.Clone(core.WithPendingPeers(nil)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitRemovePeer(re, stream, region, 4) + re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) + region = tc.GetRegion(1).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func TestShouldRun(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 5)) + re.NoError(tc.addLeaderStore(2, 2)) + re.NoError(tc.addLeaderStore(3, 0)) + re.NoError(tc.addLeaderStore(4, 0)) + re.NoError(tc.LoadRegion(1, 1, 2, 3)) + re.NoError(tc.LoadRegion(2, 1, 2, 3)) + re.NoError(tc.LoadRegion(3, 1, 2, 3)) + re.NoError(tc.LoadRegion(4, 1, 2, 3)) + re.NoError(tc.LoadRegion(5, 1, 2, 3)) + re.NoError(tc.LoadRegion(6, 2, 1, 4)) + re.NoError(tc.LoadRegion(7, 2, 1, 4)) + re.False(co.ShouldRun()) + re.Equal(2, tc.GetStoreRegionCount(4)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + // store4 needs Collect two region + {6, false}, + {7, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + re.NoError(tc.processRegionHeartbeat(nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil) + re.Error(tc.processRegionHeartbeat(newRegion)) + re.Equal(7, co.GetPrepareChecker().GetSum()) +} + +func TestShouldRunWithNonLeaderRegions(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 10)) + re.NoError(tc.addLeaderStore(2, 0)) + re.NoError(tc.addLeaderStore(3, 0)) + for i := 0; i < 10; i++ { + re.NoError(tc.LoadRegion(uint64(i+1), 1, 2, 3)) + } + re.False(co.ShouldRun()) + re.Equal(10, tc.GetStoreRegionCount(1)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + {6, false}, + {7, false}, + {8, false}, + {9, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + re.NoError(tc.processRegionHeartbeat(nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil) + re.Error(tc.processRegionHeartbeat(newRegion)) + re.Equal(9, co.GetPrepareChecker().GetSum()) + + // Now, after server is prepared, there exist some regions with no leader. + re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId()) +} + +func TestAddScheduler(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + controller := co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), len(sc.DefaultSchedulers)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(controller.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + re.Empty(controller.GetSchedulerNames()) + + stream := mockhbstream.NewHeartbeatStream() + + // Add stores 1,2,3 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + re.NoError(tc.addLeaderStore(3, 1)) + // Add regions 1 with leader in store 1 and followers in stores 2,3 + re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) + // Add regions 2 with leader in store 2 and followers in stores 1,3 + re.NoError(tc.addLeaderRegion(2, 2, 1, 3)) + // Add regions 3 with leader in store 3 and followers in stores 1,2 + re.NoError(tc.addLeaderRegion(3, 3, 1, 2)) + + oc := co.GetOperatorController() + + // test ConfigJSONDecoder create + bl, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + re.NoError(err) + conf, err := bl.EncodeConfig() + re.NoError(err) + data := make(map[string]interface{}) + err = json.Unmarshal(conf, &data) + re.NoError(err) + batch := data["batch"].(float64) + re.Equal(4, int(batch)) + gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"}), controller.RemoveScheduler) + re.NoError(err) + re.NotNil(controller.AddScheduler(gls)) + re.NotNil(controller.RemoveScheduler(gls.GetName())) + + gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls)) + + hb, err := schedulers.CreateScheduler(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + re.NoError(err) + conf, err = hb.EncodeConfig() + re.NoError(err) + data = make(map[string]interface{}) + re.NoError(json.Unmarshal(conf, &data)) + re.Contains(data, "enable-for-tiflash") + re.Equal("true", data["enable-for-tiflash"].(string)) + + // Transfer all leaders to store 1. + waitOperator(re, co, 2) + region2 := tc.GetRegion(2) + re.NoError(dispatchHeartbeat(co, region2, stream)) + region2 = waitTransferLeader(re, stream, region2, 1) + re.NoError(dispatchHeartbeat(co, region2, stream)) + waitNoResponse(re, stream) + + waitOperator(re, co, 3) + region3 := tc.GetRegion(3) + re.NoError(dispatchHeartbeat(co, region3, stream)) + region3 = waitTransferLeader(re, stream, region3, 1) + re.NoError(dispatchHeartbeat(co, region3, stream)) + waitNoResponse(re, stream) +} + +func TestPersistScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + defaultCount := len(sc.DefaultSchedulers) + // Add stores 1,2 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + + controller := co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), defaultCount) + oc := co.GetOperatorController() + storage := tc.RaftCluster.storage + + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls1, "1")) + evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(evict, "2")) + re.Len(controller.GetSchedulerNames(), defaultCount+2) + sches, _, err := storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Len(sches, defaultCount+2) + + // remove 5 schedulers + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(controller.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + re.Len(controller.GetSchedulerNames(), defaultCount-3) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(storage)) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + // make a new coordinator for testing + // whether the schedulers added or removed in dynamic way are recorded in opt + _, newOpt, err := newTestScheduleConfig() + re.NoError(err) + shuffle, err := schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) + re.NoError(err) + re.NoError(controller.AddScheduler(shuffle)) + // suppose we add a new default enable scheduler + sc.DefaultSchedulers = append(sc.DefaultSchedulers, sc.SchedulerConfig{Type: "shuffle-region"}) + defer func() { + sc.DefaultSchedulers = sc.DefaultSchedulers[:len(sc.DefaultSchedulers)-1] + }() + re.Len(newOpt.GetSchedulers(), defaultCount) + re.NoError(newOpt.Reload(storage)) + // only remains 3 items with independent config. + sches, _, err = storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Len(sches, 3) + + // option have 6 items because the default scheduler do not remove. + re.Len(newOpt.GetSchedulers(), defaultCount+3) + re.NoError(newOpt.Persist(storage)) + tc.RaftCluster.opt = newOpt + + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + controller = co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), 3) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + // suppose restart PD again + _, newOpt, err = newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(storage)) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + controller = co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), 3) + bls, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + re.NoError(controller.AddScheduler(bls)) + brs, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + re.NoError(controller.AddScheduler(brs)) + re.Len(controller.GetSchedulerNames(), defaultCount) + + // the scheduler option should contain 6 items + // the `hot scheduler` are disabled + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount+3) + re.NoError(controller.RemoveScheduler(schedulers.GrantLeaderName)) + // the scheduler that is not enable by default will be completely deleted + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount+2) + re.Len(controller.GetSchedulerNames(), 4) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(co.GetCluster().GetStorage())) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + _, newOpt, err = newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(co.GetCluster().GetStorage())) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + + co.Run() + controller = co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), defaultCount-1) + re.NoError(controller.RemoveScheduler(schedulers.EvictLeaderName)) + re.Len(controller.GetSchedulerNames(), defaultCount-2) +} + +func TestRemoveScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + cfg.ReplicaScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + + // Add stores 1,2 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + defaultCount := len(sc.DefaultSchedulers) + controller := co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), defaultCount) + oc := co.GetOperatorController() + storage := tc.RaftCluster.storage + + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls1, "1")) + re.Len(controller.GetSchedulerNames(), defaultCount+1) + sches, _, err := storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Len(sches, defaultCount+1) + + // remove all schedulers + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.GrantLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(controller.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + // all removed + sches, _, err = storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Empty(sches) + re.Empty(controller.GetSchedulerNames()) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(co.GetCluster().GetStorage())) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + + // suppose restart PD again + _, newOpt, err := newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(tc.storage)) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + re.Empty(controller.GetSchedulerNames()) + // the option remains default scheduler + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() +} + +func TestRestart(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + // Turn off balance, we test add replica only. + cfg.LeaderScheduleLimit = 0 + cfg.RegionScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + + // Add 3 stores (1, 2, 3) and a region with 1 replica on store 1. + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addLeaderRegion(1, 1)) + region := tc.GetRegion(1) + co.GetPrepareChecker().Collect(region) + + // Add 1 replica on store 2. + stream := mockhbstream.NewHeartbeatStream() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 2) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 2) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + + // Recreate coordinator then add another replica on store 3. + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.GetPrepareChecker().Collect(region) + co.Run() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 3) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitPromoteLearner(re, stream, region, 3) +} + +func TestPauseScheduler(t *testing.T) { + re := require.New(t) + + _, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + controller := co.GetSchedulersController() + _, err := controller.IsSchedulerAllowed("test") + re.Error(err) + controller.PauseOrResumeScheduler(schedulers.BalanceLeaderName, 60) + paused, _ := controller.IsSchedulerPaused(schedulers.BalanceLeaderName) + re.True(paused) + pausedAt, err := controller.GetPausedSchedulerDelayAt(schedulers.BalanceLeaderName) + re.NoError(err) + resumeAt, err := controller.GetPausedSchedulerDelayUntil(schedulers.BalanceLeaderName) + re.NoError(err) + re.Equal(int64(60), resumeAt-pausedAt) + allowed, _ := controller.IsSchedulerAllowed(schedulers.BalanceLeaderName) + re.False(allowed) +} + +func BenchmarkPatrolRegion(b *testing.B) { + re := require.New(b) + + mergeLimit := uint64(4100) + regionNum := 10000 + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + cfg.MergeScheduleLimit = mergeLimit + }, nil, nil, re) + defer cleanup() + + tc.opt.SetSplitMergeInterval(time.Duration(0)) + for i := 1; i < 4; i++ { + if err := tc.addRegionStore(uint64(i), regionNum, 96); err != nil { + return + } + } + for i := 0; i < regionNum; i++ { + if err := tc.addLeaderRegion(uint64(i), 1, 2, 3); err != nil { + return + } + } + + listen := make(chan int) + go func() { + oc := co.GetOperatorController() + listen <- 0 + for { + if oc.OperatorCount(operator.OpMerge) == mergeLimit { + co.Stop() + return + } + } + }() + <-listen + + co.GetWaitGroup().Add(1) + b.ResetTimer() + co.PatrolRegions() +} + +func waitOperator(re *require.Assertions, co *schedule.Coordinator, regionID uint64) { + testutil.Eventually(re, func() bool { + return co.GetOperatorController().GetOperator(regionID) != nil + }) +} + +func TestOperatorCount(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader)) + re.Equal(uint64(0), oc.OperatorCount(operator.OpRegion)) + + re.NoError(tc.addLeaderRegion(1, 1)) + re.NoError(tc.addLeaderRegion(2, 2)) + { + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) // 1:leader + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op2) + re.Equal(uint64(2), oc.OperatorCount(operator.OpLeader)) // 1:leader, 2:leader + re.True(oc.RemoveOperator(op1)) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) // 2:leader + } + + { + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) // 1:region 2:leader + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion) + op2.SetPriorityLevel(constant.High) + oc.AddWaitingOperator(op2) + re.Equal(uint64(2), oc.OperatorCount(operator.OpRegion)) // 1:region 2:region + re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader)) + } +} + +func TestStoreOverloaded(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + opt := tc.GetOpts() + re.NoError(tc.addRegionStore(4, 100)) + re.NoError(tc.addRegionStore(3, 100)) + re.NoError(tc.addRegionStore(2, 100)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + region := tc.GetRegion(1).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + start := time.Now() + { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Len(ops, 1) + op1 := ops[0] + re.NotNil(op1) + re.True(oc.AddOperator(op1)) + re.True(oc.RemoveOperator(op1)) + } + for { + time.Sleep(time.Millisecond * 10) + ops, _ := lb.Schedule(tc, false /* dryRun */) + if time.Since(start) > time.Second { + break + } + re.Empty(ops) + } + + // reset all stores' limit + // scheduling one time needs 1/10 seconds + opt.SetAllStoresLimit(storelimit.AddPeer, 600) + opt.SetAllStoresLimit(storelimit.RemovePeer, 600) + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Len(ops, 1) + op := ops[0] + re.True(oc.AddOperator(op)) + re.True(oc.RemoveOperator(op)) + } + // sleep 1 seconds to make sure that the token is filled up + time.Sleep(time.Second) + for i := 0; i < 100; i++ { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Greater(len(ops), 0) + } +} + +func TestStoreOverloadedWithReplace(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + + re.NoError(tc.addRegionStore(4, 100)) + re.NoError(tc.addRegionStore(3, 100)) + re.NoError(tc.addRegionStore(2, 100)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(2, 1, 3, 4)) + region := tc.GetRegion(1).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + region = tc.GetRegion(2).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 1}) + re.True(oc.AddOperator(op1)) + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 2}) + op2.SetPriorityLevel(constant.High) + re.True(oc.AddOperator(op2)) + op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 3}) + re.False(oc.AddOperator(op3)) + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Empty(ops) + // sleep 2 seconds to make sure that token is filled up + time.Sleep(2 * time.Second) + ops, _ = lb.Schedule(tc, false /* dryRun */) + re.Greater(len(ops), 0) +} + +func TestDownStoreLimit(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + rc := co.GetCheckerController().GetRuleChecker() + + tc.addRegionStore(1, 100) + tc.addRegionStore(2, 100) + tc.addRegionStore(3, 100) + tc.addLeaderRegion(1, 1, 2, 3) + + region := tc.GetRegion(1) + tc.setStoreDown(1) + tc.SetStoreLimit(1, storelimit.RemovePeer, 1) + + region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: region.GetStorePeer(1), + DownSeconds: 24 * 60 * 60, + }, + }), core.SetApproximateSize(1)) + tc.putRegion(region) + for i := uint64(1); i < 20; i++ { + tc.addRegionStore(i+3, 100) + op := rc.Check(region) + re.NotNil(op) + re.True(oc.AddOperator(op)) + oc.RemoveOperator(op) + } + + region = region.Clone(core.SetApproximateSize(100)) + tc.putRegion(region) + for i := uint64(20); i < 25; i++ { + tc.addRegionStore(i+3, 100) + op := rc.Check(region) + re.NotNil(op) + re.True(oc.AddOperator(op)) + oc.RemoveOperator(op) + } +} + +// FIXME: remove after move into schedulers package +type mockLimitScheduler struct { + schedulers.Scheduler + limit uint64 + counter *operator.Controller + kind operator.OpKind +} + +func (s *mockLimitScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + return s.counter.OperatorCount(s.kind) < s.limit +} + +func TestController(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + re.NoError(tc.addLeaderRegion(1, 1)) + re.NoError(tc.addLeaderRegion(2, 2)) + scheduler, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + lb := &mockLimitScheduler{ + Scheduler: scheduler, + counter: oc, + kind: operator.OpLeader, + } + + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) + + for i := schedulers.MinScheduleInterval; sc.GetInterval() != schedulers.MaxScheduleInterval; i = sc.GetNextInterval(i) { + re.Equal(i, sc.GetInterval()) + re.Empty(sc.Schedule(false)) + } + // limit = 2 + lb.limit = 2 + // count = 0 + { + re.True(sc.AllowSchedule(false)) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op1)) + // count = 1 + re.True(sc.AllowSchedule(false)) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op2)) + // count = 2 + re.False(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op1)) + // count = 1 + re.True(sc.AllowSchedule(false)) + } + + op11 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + // add a PriorityKind operator will remove old operator + { + op3 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpHotRegion) + op3.SetPriorityLevel(constant.High) + re.Equal(1, oc.AddWaitingOperator(op11)) + re.False(sc.AllowSchedule(false)) + re.Equal(1, oc.AddWaitingOperator(op3)) + re.True(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op3)) + } + + // add a admin operator will remove old operator + { + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op2)) + re.False(sc.AllowSchedule(false)) + op4 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpAdmin) + op4.SetPriorityLevel(constant.High) + re.Equal(1, oc.AddWaitingOperator(op4)) + re.True(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op4)) + } + + // test wrong region id. + { + op5 := newTestOperator(3, &metapb.RegionEpoch{}, operator.OpHotRegion) + re.Equal(0, oc.AddWaitingOperator(op5)) + } + + // test wrong region epoch. + re.True(oc.RemoveOperator(op11)) + epoch := &metapb.RegionEpoch{ + Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1, + ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(), + } + { + op6 := newTestOperator(1, epoch, operator.OpLeader) + re.Equal(0, oc.AddWaitingOperator(op6)) + } + epoch.Version-- + { + op6 := newTestOperator(1, epoch, operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op6)) + re.True(oc.RemoveOperator(op6)) + } +} + +func TestInterval(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + lb, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, co.GetOperatorController(), storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) + + // If no operator for x seconds, the next check should be in x/2 seconds. + idleSeconds := []int{5, 10, 20, 30, 60} + for _, n := range idleSeconds { + sc.SetInterval(schedulers.MinScheduleInterval) + for totalSleep := time.Duration(0); totalSleep <= time.Second*time.Duration(n); totalSleep += sc.GetInterval() { + re.Empty(sc.Schedule(false)) + } + re.Less(sc.GetInterval(), time.Second*time.Duration(n/2)) + } +} + +func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddLearnerNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + return region.Clone( + core.WithAddPeer(res.GetChangePeer().GetPeer()), + core.WithIncConfVer(), + ) +} + +func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + // Remove learner than add voter. + return region.Clone( + core.WithRemoveStorePeer(storeID), + core.WithAddPeer(res.GetChangePeer().GetPeer()), + ) +} + +func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_RemoveNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + return region.Clone( + core.WithRemoveStorePeer(storeID), + core.WithIncConfVer(), + ) +} + +func waitTransferLeader(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + if res.GetRegionId() == region.GetID() { + for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) { + if peer.GetStoreId() == storeID { + return true + } + } + } + } + return false + }) + return region.Clone( + core.WithLeader(region.GetStorePeer(storeID)), + ) +} + +func waitNoResponse(re *require.Assertions, stream mockhbstream.HeartbeatStream) { + testutil.Eventually(re, func() bool { + res := stream.Recv() + return res == nil + }) +} +>>>>>>> 67529748f (scheduler: fix scheduler save config (#7108)) diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index d5dbede2ffc..7f036f1aaf7 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -119,16 +119,28 @@ func CreateScheduler(typ string, opController *OperatorController, storage *core return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } +<<<<<<< HEAD:server/schedule/scheduler.go s, err := fn(opController, storage, dec) if err != nil { return nil, err } +======= + return fn(oc, storage, dec, removeSchedulerCb...) +} + +// SaveSchedulerConfig saves the config of the specified scheduler. +func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error { +>>>>>>> 67529748f (scheduler: fix scheduler save config (#7108)):pkg/schedule/schedulers/scheduler.go data, err := s.EncodeConfig() if err != nil { - return nil, err + return err } +<<<<<<< HEAD:server/schedule/scheduler.go err = storage.SaveScheduleConfig(s.GetName(), data) return s, err +======= + return storage.SaveSchedulerConfig(s.GetName(), data) +>>>>>>> 67529748f (scheduler: fix scheduler save config (#7108)):pkg/schedule/schedulers/scheduler.go } // FindSchedulerTypeByName finds the type of the specified name. diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index 16eb5231b13..261235a0b87 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -189,6 +189,14 @@ func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *ev } } +<<<<<<< HEAD:server/schedulers/evict_leader.go +======= +// EvictStoreIDs returns the IDs of the evict-stores. +func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 { + return s.conf.getStores() +} + +>>>>>>> 67529748f (scheduler: fix scheduler save config (#7108)):pkg/schedule/schedulers/evict_leader.go func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 3a1868c3746..5187af9de68 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -30,7 +30,19 @@ import ( "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/tikv/pd/pkg/dashboard" "github.com/tikv/pd/pkg/mock/mockid" +<<<<<<< HEAD "github.com/tikv/pd/pkg/testutil" +======= + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/syncer" + "github.com/tikv/pd/pkg/tso" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/pkg/utils/typeutil" +>>>>>>> 67529748f (scheduler: fix scheduler save config (#7108)) "github.com/tikv/pd/server" "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/config" @@ -40,6 +52,7 @@ import ( syncer "github.com/tikv/pd/server/region_syncer" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -1132,7 +1145,281 @@ func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) { regionReq.Region.RegionEpoch.ConfVer = 1 region = core.RegionFromHeartbeat(regionReq) err = rc.HandleRegionHeartbeat(region) +<<<<<<< HEAD c.Assert(err, IsNil) +======= + re.NoError(err) +} + +func TestTransferLeaderForScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + // start + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + rc := leaderServer.GetServer().GetRaftCluster() + re.NotNil(rc) + + storesNum := 2 + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + for i := 1; i <= storesNum; i++ { + store := &metapb.Store{ + Id: uint64(i), + Address: "127.0.0.1:" + strconv.Itoa(i), + } + resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store) + re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) + } + // region heartbeat + id := leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Add evict leader scheduler + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + // Check scheduler updated. + schedulersController := rc.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + // transfer PD leader to another PD + tc.ResignLeader() + rc.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc1 := leaderServer.GetServer().GetRaftCluster() + rc1.Start(leaderServer.GetServer()) + re.NoError(err) + re.NotNil(rc1) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc1, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated. + schedulersController = rc1.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + // transfer PD leader back to the previous PD + tc.ResignLeader() + rc1.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + rc.Start(leaderServer.GetServer()) + re.NotNil(rc) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated + schedulersController = rc.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) +} + +func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) { + testutil.Eventually(re, func() bool { + if !exist { + return sc.GetScheduler(schedulers.EvictLeaderName) == nil + } + return sc.GetScheduler(schedulers.EvictLeaderName) != nil + }) +} + +func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, expected []uint64) { + handler, ok := sc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + re.True(ok) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + re.True(ok) + var evictStoreIDs []uint64 + testutil.Eventually(re, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) + re.ElementsMatch(evictStoreIDs, expected) +} + +func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) { + for i := 0; i < 3; i++ { + regionID, err := id.Alloc() + re.NoError(err) + peerID, err := id.Alloc() + re.NoError(err) + region := &metapb.Region{ + Id: regionID, + Peers: []*metapb.Peer{{Id: peerID, StoreId: storeID}}, + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + } + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) + } + + time.Sleep(50 * time.Millisecond) + re.Equal(3, rc.GetStore(storeID).GetLeaderCount()) +} + +func checkMinResolvedTS(re *require.Assertions, rc *cluster.RaftCluster, expect uint64) { + re.Eventually(func() bool { + ts := rc.GetMinResolvedTS() + return expect == ts + }, time.Second*10, time.Millisecond*50) +} + +func checkStoreMinResolvedTS(re *require.Assertions, rc *cluster.RaftCluster, expectTS, storeID uint64) { + re.Eventually(func() bool { + ts := rc.GetStoreMinResolvedTS(storeID) + return expectTS == ts + }, time.Second*10, time.Millisecond*50) +} + +func checkMinResolvedTSFromStorage(re *require.Assertions, rc *cluster.RaftCluster, expect uint64) { + re.Eventually(func() bool { + ts2, err := rc.GetStorage().LoadMinResolvedTS() + re.NoError(err) + return expect == ts2 + }, time.Second*10, time.Millisecond*50) +} + +func setMinResolvedTSPersistenceInterval(re *require.Assertions, rc *cluster.RaftCluster, svr *server.Server, interval time.Duration) { + cfg := rc.GetPDServerConfig().Clone() + cfg.MinResolvedTSPersistenceInterval = typeutil.NewDuration(interval) + err := svr.SetPDServerConfig(*cfg) + re.NoError(err) +} + +func TestMinResolvedTS(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster.DefaultMinResolvedTSPersistenceInterval = time.Millisecond + tc, err := tests.NewTestCluster(ctx, 1) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + leaderServer := tc.GetLeaderServer() + id := leaderServer.GetAllocator() + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + clusterID := leaderServer.GetClusterID() + bootstrapCluster(re, clusterID, grpcPDClient) + rc := leaderServer.GetRaftCluster() + re.NotNil(rc) + svr := leaderServer.GetServer() + addStoreAndCheckMinResolvedTS := func(re *require.Assertions, isTiflash bool, minResolvedTS, expect uint64) uint64 { + storeID, err := id.Alloc() + re.NoError(err) + store := &metapb.Store{ + Id: storeID, + Version: "v6.0.0", + Address: "127.0.0.1:" + strconv.Itoa(int(storeID)), + } + if isTiflash { + store.Labels = []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}} + } + resp, err := putStore(grpcPDClient, clusterID, store) + re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) + req := &pdpb.ReportMinResolvedTsRequest{ + Header: testutil.NewRequestHeader(clusterID), + StoreId: storeID, + MinResolvedTs: minResolvedTS, + } + _, err = grpcPDClient.ReportMinResolvedTS(context.Background(), req) + re.NoError(err) + ts := rc.GetMinResolvedTS() + re.Equal(expect, ts) + return storeID + } + + // default run job + re.NotEqual(rc.GetPDServerConfig().MinResolvedTSPersistenceInterval.Duration, 0) + setMinResolvedTSPersistenceInterval(re, rc, svr, 0) + re.Equal(time.Duration(0), rc.GetPDServerConfig().MinResolvedTSPersistenceInterval.Duration) + + // case1: cluster is no initialized + // min resolved ts should be not available + status, err := rc.LoadClusterStatus() + re.NoError(err) + re.False(status.IsInitialized) + store1TS := uint64(233) + store1 := addStoreAndCheckMinResolvedTS(re, false /* not tiflash */, store1TS, math.MaxUint64) + + // case2: add leader peer to store1 but no run job + // min resolved ts should be zero + putRegionWithLeader(re, rc, id, store1) + checkMinResolvedTS(re, rc, 0) + + // case3: add leader peer to store1 and run job + // min resolved ts should be store1TS + setMinResolvedTSPersistenceInterval(re, rc, svr, time.Millisecond) + checkMinResolvedTS(re, rc, store1TS) + checkMinResolvedTSFromStorage(re, rc, store1TS) + + // case4: add tiflash store + // min resolved ts should no change + addStoreAndCheckMinResolvedTS(re, true /* is tiflash */, 0, store1TS) + + // case5: add new store with lager min resolved ts + // min resolved ts should no change + store3TS := store1TS + 10 + store3 := addStoreAndCheckMinResolvedTS(re, false /* not tiflash */, store3TS, store1TS) + putRegionWithLeader(re, rc, id, store3) + + // case6: set store1 to tombstone + // min resolved ts should change to store 3 + resetStoreState(re, rc, store1, metapb.StoreState_Tombstone) + checkMinResolvedTS(re, rc, store3TS) + checkMinResolvedTSFromStorage(re, rc, store3TS) + checkStoreMinResolvedTS(re, rc, store3TS, store3) + // check no-exist store + checkStoreMinResolvedTS(re, rc, math.MaxUint64, 100) + + // case7: add a store with leader peer but no report min resolved ts + // min resolved ts should be no change + store4 := addStoreAndCheckMinResolvedTS(re, false /* not tiflash */, 0, store3TS) + putRegionWithLeader(re, rc, id, store4) + checkMinResolvedTS(re, rc, store3TS) + checkMinResolvedTSFromStorage(re, rc, store3TS) + resetStoreState(re, rc, store4, metapb.StoreState_Tombstone) + + // case8: set min resolved ts persist interval to zero + // although min resolved ts increase, it should be not persisted until job running. + store5TS := store3TS + 10 + setMinResolvedTSPersistenceInterval(re, rc, svr, 0) + store5 := addStoreAndCheckMinResolvedTS(re, false /* not tiflash */, store5TS, store3TS) + resetStoreState(re, rc, store3, metapb.StoreState_Tombstone) + putRegionWithLeader(re, rc, id, store5) + checkMinResolvedTS(re, rc, store3TS) + setMinResolvedTSPersistenceInterval(re, rc, svr, time.Millisecond) + checkMinResolvedTS(re, rc, store5TS) + checkStoreMinResolvedTS(re, rc, store5TS, store5) +>>>>>>> 67529748f (scheduler: fix scheduler save config (#7108)) } // See https://github.com/tikv/pd/issues/4941