diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go new file mode 100644 index 00000000000..79715a6fd44 --- /dev/null +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -0,0 +1,343 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "net/http" + "time" + + "github.com/gorilla/mux" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/unrolled/render" + "go.uber.org/zap" +) + +const ( + // EvictSlowStoreName is evict leader scheduler name. + EvictSlowStoreName = "evict-slow-store-scheduler" + // EvictSlowStoreType is evict leader scheduler type. + EvictSlowStoreType = "evict-slow-store" + + slowStoreEvictThreshold = 100 + slowStoreRecoverThreshold = 1 +) + +// WithLabelValues is a heavy operation, define variable to avoid call it every time. +var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule") + +type evictSlowStoreSchedulerConfig struct { + syncutil.RWMutex + cluster *core.BasicCluster + storage endpoint.ConfigStorage + // Last timestamp of the chosen slow store for eviction. + lastSlowStoreCaptureTS time.Time + // Duration gap for recovering the candidate, unit: s. + RecoveryDurationGap uint64 `json:"recovery-duration"` + EvictedStores []uint64 `json:"evict-stores"` +} + +func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig { + return &evictSlowStoreSchedulerConfig{ + storage: storage, + lastSlowStoreCaptureTS: time.Time{}, + RecoveryDurationGap: defaultRecoveryDurationGap, + EvictedStores: make([]uint64, 0), + } +} + +func (conf *evictSlowStoreSchedulerConfig) Clone() *evictSlowStoreSchedulerConfig { + conf.RLock() + defer conf.RUnlock() + return &evictSlowStoreSchedulerConfig{ + RecoveryDurationGap: conf.RecoveryDurationGap, + } +} + +func (conf *evictSlowStoreSchedulerConfig) persistLocked() error { + name := EvictSlowStoreName + data, err := EncodeConfig(conf) + failpoint.Inject("persistFail", func() { + err = errors.New("fail to persist") + }) + if err != nil { + return err + } + return conf.storage.SaveSchedulerConfig(name, data) +} + +func (conf *evictSlowStoreSchedulerConfig) getStores() []uint64 { + conf.RLock() + defer conf.RUnlock() + return conf.EvictedStores +} + +func (conf *evictSlowStoreSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { + if conf.evictStore() != id { + return nil + } + return []core.KeyRange{core.NewKeyRange("", "")} +} + +func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 { + if len(conf.getStores()) == 0 { + return 0 + } + return conf.getStores()[0] +} + +// readyForRecovery checks whether the last cpatured candidate is ready for recovery. +func (conf *evictSlowStoreSchedulerConfig) readyForRecovery() bool { + conf.RLock() + defer conf.RUnlock() + recoveryDurationGap := conf.RecoveryDurationGap + failpoint.Inject("transientRecoveryGap", func() { + recoveryDurationGap = 0 + }) + return uint64(time.Since(conf.lastSlowStoreCaptureTS).Seconds()) >= recoveryDurationGap +} + +func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error { + conf.Lock() + defer conf.Unlock() + conf.EvictedStores = []uint64{id} + conf.lastSlowStoreCaptureTS = time.Now() + return conf.persistLocked() +} + +func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) { + oldID = conf.evictStore() + conf.Lock() + defer conf.Unlock() + if oldID > 0 { + conf.EvictedStores = []uint64{} + conf.lastSlowStoreCaptureTS = time.Time{} + err = conf.persistLocked() + } + return +} + +type evictSlowStoreHandler struct { + rd *render.Render + config *evictSlowStoreSchedulerConfig +} + +func newEvictSlowStoreHandler(config *evictSlowStoreSchedulerConfig) http.Handler { + h := &evictSlowStoreHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} + +func (handler *evictSlowStoreHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + return + } + handler.config.Lock() + defer handler.config.Unlock() + prevRecoveryDurationGap := handler.config.RecoveryDurationGap + recoveryDurationGap := uint64(recoveryDurationGapFloat) + handler.config.RecoveryDurationGap = recoveryDurationGap + if err := handler.config.persistLocked(); err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.config.RecoveryDurationGap = prevRecoveryDurationGap + return + } + log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) + handler.rd.JSON(w, http.StatusOK, "Config updated.") +} + +func (handler *evictSlowStoreHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + +type evictSlowStoreScheduler struct { + *BaseScheduler + conf *evictSlowStoreSchedulerConfig + handler http.Handler +} + +func (s *evictSlowStoreScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + +func (s *evictSlowStoreScheduler) GetName() string { + return EvictSlowStoreName +} + +func (s *evictSlowStoreScheduler) GetType() string { + return EvictSlowStoreType +} + +func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { + return EncodeConfig(s.conf) +} + +func (s *evictSlowStoreScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictSlowStoreSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + old := make(map[uint64]struct{}) + for _, id := range s.conf.EvictedStores { + old[id] = struct{}{} + } + new := make(map[uint64]struct{}) + for _, id := range newCfg.EvictedStores { + new[id] = struct{}{} + } + pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap + s.conf.EvictedStores = newCfg.EvictedStores + return nil +} + +func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { + evictStore := s.conf.evictStore() + if evictStore != 0 { + return cluster.SlowStoreEvicted(evictStore) + } + return nil +} + +func (s *evictSlowStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) { + s.cleanupEvictLeader(cluster) +} + +func (s *evictSlowStoreScheduler) prepareEvictLeader(cluster sche.SchedulerCluster, storeID uint64) error { + err := s.conf.setStoreAndPersist(storeID) + if err != nil { + log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", storeID)) + return err + } + + return cluster.SlowStoreEvicted(storeID) +} + +func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerCluster) { + evictSlowStore, err := s.conf.clearAndPersist() + if err != nil { + log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", evictSlowStore)) + } + if evictSlowStore == 0 { + return + } + cluster.SlowStoreRecovered(evictSlowStore) +} + +func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { + return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) +} + +func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + if s.conf.evictStore() != 0 { + allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() + if !allowed { + operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() + } + return allowed + } + return true +} + +func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { + evictSlowStoreCounter.Inc() + var ops []*operator.Operator + + if s.conf.evictStore() != 0 { + store := cluster.GetStore(s.conf.evictStore()) + if store == nil || store.IsRemoved() { + // Previous slow store had been removed, remove the scheduler and check + // slow node next time. + log.Info("slow store has been removed", + zap.Uint64("store-id", store.GetID())) + } else if store.GetSlowScore() <= slowStoreRecoverThreshold && s.conf.readyForRecovery() { + log.Info("slow store has been recovered", + zap.Uint64("store-id", store.GetID())) + } else { + return s.schedulerEvictLeader(cluster), nil + } + s.cleanupEvictLeader(cluster) + return ops, nil + } + + var slowStore *core.StoreInfo + + for _, store := range cluster.GetStores() { + if store.IsRemoved() { + continue + } + + if (store.IsPreparing() || store.IsServing()) && store.IsSlow() { + // Do nothing if there is more than one slow store. + if slowStore != nil { + return ops, nil + } + slowStore = store + } + } + + if slowStore == nil || slowStore.GetSlowScore() < slowStoreEvictThreshold { + return ops, nil + } + + // If there is only one slow store, evict leaders from that store. + log.Info("detected slow store, start to evict leaders", + zap.Uint64("store-id", slowStore.GetID())) + err := s.prepareEvictLeader(cluster, slowStore.GetID()) + if err != nil { + log.Info("prepare for evicting leader failed", zap.Error(err), zap.Uint64("store-id", slowStore.GetID())) + return ops, nil + } + return s.schedulerEvictLeader(cluster), nil +} + +// newEvictSlowStoreScheduler creates a scheduler that detects and evicts slow stores. +func newEvictSlowStoreScheduler(opController *operator.Controller, conf *evictSlowStoreSchedulerConfig) Scheduler { + handler := newEvictSlowStoreHandler(conf) + return &evictSlowStoreScheduler{ + BaseScheduler: NewBaseScheduler(opController), + conf: conf, + handler: handler, + } +} diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go new file mode 100644 index 00000000000..20c53219765 --- /dev/null +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -0,0 +1,667 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "net/http" + "strconv" + "time" + + "github.com/gorilla/mux" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" + sche "github.com/tikv/pd/pkg/schedule/core" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/unrolled/render" + "go.uber.org/zap" +) + +const ( + // EvictSlowTrendName is evict leader by slow trend scheduler name. + EvictSlowTrendName = "evict-slow-trend-scheduler" + // EvictSlowTrendType is evict leader by slow trend scheduler type. + EvictSlowTrendType = "evict-slow-trend" +) + +const ( + alterEpsilon = 1e-9 + minReCheckDurationGap = 120 // default gap for re-check the slow node, unit: s + defaultRecoveryDurationGap = 600 // default gap for recovery, unit: s. +) + +type slowCandidate struct { + storeID uint64 + captureTS time.Time + recoverTS time.Time +} + +type evictSlowTrendSchedulerConfig struct { + syncutil.RWMutex + cluster *core.BasicCluster + storage endpoint.ConfigStorage + // Candidate for eviction in current tick. + evictCandidate slowCandidate + // Last chosen candidate for eviction. + lastEvictCandidate slowCandidate + // Duration gap for recovering the candidate, unit: s. + RecoveryDurationGap uint64 `json:"recovery-duration"` + // Only evict one store for now + EvictedStores []uint64 `json:"evict-by-trend-stores"` +} + +func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowTrendSchedulerConfig { + return &evictSlowTrendSchedulerConfig{ + storage: storage, + evictCandidate: slowCandidate{}, + lastEvictCandidate: slowCandidate{}, + RecoveryDurationGap: defaultRecoveryDurationGap, + EvictedStores: make([]uint64, 0), + } +} + +func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfig { + conf.RLock() + defer conf.RUnlock() + return &evictSlowTrendSchedulerConfig{ + RecoveryDurationGap: conf.RecoveryDurationGap, + } +} + +func (conf *evictSlowTrendSchedulerConfig) persistLocked() error { + name := EvictSlowTrendName + data, err := EncodeConfig(conf) + failpoint.Inject("persistFail", func() { + err = errors.New("fail to persist") + }) + if err != nil { + return err + } + return conf.storage.SaveSchedulerConfig(name, data) +} + +func (conf *evictSlowTrendSchedulerConfig) getStores() []uint64 { + conf.RLock() + defer conf.RUnlock() + return conf.EvictedStores +} + +func (conf *evictSlowTrendSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { + if conf.evictedStore() != id { + return nil + } + return []core.KeyRange{core.NewKeyRange("", "")} +} + +func (conf *evictSlowTrendSchedulerConfig) hasEvictedStores() bool { + conf.RLock() + defer conf.RUnlock() + return len(conf.EvictedStores) > 0 +} + +func (conf *evictSlowTrendSchedulerConfig) evictedStore() uint64 { + if !conf.hasEvictedStores() { + return 0 + } + conf.RLock() + defer conf.RUnlock() + // If a candidate passes all checks and proved to be slow, it will be + // recorded in `conf.EvictStores`, and `conf.lastEvictCandidate` will record + // the captured timestamp of this store. + return conf.EvictedStores[0] +} + +func (conf *evictSlowTrendSchedulerConfig) candidate() uint64 { + conf.RLock() + defer conf.RUnlock() + return conf.evictCandidate.storeID +} + +func (conf *evictSlowTrendSchedulerConfig) captureTS() time.Time { + conf.RLock() + defer conf.RUnlock() + return conf.evictCandidate.captureTS +} + +func (conf *evictSlowTrendSchedulerConfig) candidateCapturedSecs() uint64 { + conf.RLock() + defer conf.RUnlock() + return DurationSinceAsSecs(conf.evictCandidate.captureTS) +} + +func (conf *evictSlowTrendSchedulerConfig) lastCapturedCandidate() *slowCandidate { + conf.RLock() + defer conf.RUnlock() + return &conf.lastEvictCandidate +} + +func (conf *evictSlowTrendSchedulerConfig) lastCandidateCapturedSecs() uint64 { + return DurationSinceAsSecs(conf.lastEvictCandidate.captureTS) +} + +// readyForRecovery checks whether the last cpatured candidate is ready for recovery. +func (conf *evictSlowTrendSchedulerConfig) readyForRecovery() bool { + conf.RLock() + defer conf.RUnlock() + recoveryDurationGap := conf.RecoveryDurationGap + failpoint.Inject("transientRecoveryGap", func() { + recoveryDurationGap = 0 + }) + return conf.lastCandidateCapturedSecs() >= recoveryDurationGap +} + +func (conf *evictSlowTrendSchedulerConfig) captureCandidate(id uint64) { + conf.Lock() + defer conf.Unlock() + conf.evictCandidate = slowCandidate{ + storeID: id, + captureTS: time.Now(), + recoverTS: time.Now(), + } + if conf.lastEvictCandidate == (slowCandidate{}) { + conf.lastEvictCandidate = conf.evictCandidate + } +} + +func (conf *evictSlowTrendSchedulerConfig) popCandidate(updLast bool) uint64 { + conf.Lock() + defer conf.Unlock() + id := conf.evictCandidate.storeID + if updLast { + conf.lastEvictCandidate = conf.evictCandidate + } + conf.evictCandidate = slowCandidate{} + return id +} + +func (conf *evictSlowTrendSchedulerConfig) markCandidateRecovered() { + conf.Lock() + defer conf.Unlock() + if conf.lastEvictCandidate != (slowCandidate{}) { + conf.lastEvictCandidate.recoverTS = time.Now() + } +} + +func (conf *evictSlowTrendSchedulerConfig) setStoreAndPersist(id uint64) error { + conf.Lock() + defer conf.Unlock() + conf.EvictedStores = []uint64{id} + return conf.persistLocked() +} + +func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.SchedulerCluster) (oldID uint64, err error) { + oldID = conf.evictedStore() + if oldID == 0 { + return + } + address := "?" + store := cluster.GetStore(oldID) + if store != nil { + address = store.GetAddress() + } + storeSlowTrendEvictedStatusGauge.WithLabelValues(address, strconv.FormatUint(oldID, 10)).Set(0) + conf.Lock() + defer conf.Unlock() + conf.EvictedStores = []uint64{} + return oldID, conf.persistLocked() +} + +type evictSlowTrendHandler struct { + rd *render.Render + config *evictSlowTrendSchedulerConfig +} + +func newEvictSlowTrendHandler(config *evictSlowTrendSchedulerConfig) http.Handler { + h := &evictSlowTrendHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} + +func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + return + } + handler.config.Lock() + defer handler.config.Unlock() + prevRecoveryDurationGap := handler.config.RecoveryDurationGap + recoveryDurationGap := uint64(recoveryDurationGapFloat) + handler.config.RecoveryDurationGap = recoveryDurationGap + if err := handler.config.persistLocked(); err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + handler.config.RecoveryDurationGap = prevRecoveryDurationGap + return + } + log.Info("evict-slow-trend-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) + handler.rd.JSON(w, http.StatusOK, "Config updated.") +} + +func (handler *evictSlowTrendHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + +type evictSlowTrendScheduler struct { + *BaseScheduler + conf *evictSlowTrendSchedulerConfig + handler http.Handler +} + +func (s *evictSlowTrendScheduler) GetNextInterval(interval time.Duration) time.Duration { + var growthType intervalGrowthType + // If it already found a slow node as candidate, the next interval should be shorter + // to make the next scheduling as soon as possible. This adjustment will decrease the + // response time, as heartbeats from other nodes will be received and updated more quickly. + if s.conf.hasEvictedStores() { + growthType = zeroGrowth + } else { + growthType = exponentialGrowth + } + return intervalGrow(s.GetMinInterval(), MaxScheduleInterval, growthType) +} + +func (s *evictSlowTrendScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + +func (s *evictSlowTrendScheduler) GetName() string { + return EvictSlowTrendName +} + +func (s *evictSlowTrendScheduler) GetType() string { + return EvictSlowTrendType +} + +func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { + return EncodeConfig(s.conf) +} + +func (s *evictSlowTrendScheduler) ReloadConfig() error { + s.conf.Lock() + defer s.conf.Unlock() + cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictSlowTrendSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + old := make(map[uint64]struct{}) + for _, id := range s.conf.EvictedStores { + old[id] = struct{}{} + } + new := make(map[uint64]struct{}) + for _, id := range newCfg.EvictedStores { + new[id] = struct{}{} + } + pauseAndResumeLeaderTransfer(s.conf.cluster, old, new) + s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap + s.conf.EvictedStores = newCfg.EvictedStores + return nil +} + +func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { + evictedStoreID := s.conf.evictedStore() + if evictedStoreID == 0 { + return nil + } + return cluster.SlowTrendEvicted(evictedStoreID) +} + +func (s *evictSlowTrendScheduler) CleanConfig(cluster sche.SchedulerCluster) { + s.cleanupEvictLeader(cluster) +} + +func (s *evictSlowTrendScheduler) prepareEvictLeader(cluster sche.SchedulerCluster, storeID uint64) error { + err := s.conf.setStoreAndPersist(storeID) + if err != nil { + log.Info("evict-slow-trend-scheduler persist config failed", zap.Uint64("store-id", storeID)) + return err + } + return cluster.SlowTrendEvicted(storeID) +} + +func (s *evictSlowTrendScheduler) cleanupEvictLeader(cluster sche.SchedulerCluster) { + evictedStoreID, err := s.conf.clearAndPersist(cluster) + if err != nil { + log.Info("evict-slow-trend-scheduler persist config failed", zap.Uint64("store-id", evictedStoreID)) + } + if evictedStoreID != 0 { + // Assertion: evictStoreID == s.conf.LastEvictCandidate.storeID + s.conf.markCandidateRecovered() + cluster.SlowTrendRecovered(evictedStoreID) + } +} + +func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { + store := cluster.GetStore(s.conf.evictedStore()) + if store == nil { + return nil + } + storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) + return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) +} + +func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + if s.conf.evictedStore() == 0 { + return true + } + allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() + if !allowed { + operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc() + } + return allowed +} + +func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { + schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() + + var ops []*operator.Operator + + if s.conf.evictedStore() != 0 { + store := cluster.GetStore(s.conf.evictedStore()) + if store == nil || store.IsRemoved() { + // Previous slow store had been removed, remove the scheduler and check + // slow node next time. + log.Info("store evicted by slow trend has been removed", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_removed").Inc() + } else if checkStoreCanRecover(cluster, store) && s.conf.readyForRecovery() { + log.Info("store evicted by slow trend has been recovered", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_recovered").Inc() + } else { + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "continue").Inc() + return s.scheduleEvictLeader(cluster), nil + } + s.cleanupEvictLeader(cluster) + return ops, nil + } + + candFreshCaptured := false + if s.conf.candidate() == 0 { + candidate := chooseEvictCandidate(cluster, s.conf.lastCapturedCandidate()) + if candidate != nil { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "captured").Inc() + s.conf.captureCandidate(candidate.GetID()) + candFreshCaptured = true + } + } else { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "continue").Inc() + } + + slowStoreID := s.conf.candidate() + if slowStoreID == 0 { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none").Inc() + return ops, nil + } + + slowStore := cluster.GetStore(slowStoreID) + if !candFreshCaptured && checkStoreFasterThanOthers(cluster, slowStore) { + s.conf.popCandidate(false) + log.Info("slow store candidate by trend has been cancel", zap.Uint64("store-id", slowStoreID)) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "canceled_too_faster").Inc() + return ops, nil + } + if slowStoreRecordTS := s.conf.captureTS(); !checkStoresAreUpdated(cluster, slowStoreID, slowStoreRecordTS) { + log.Info("slow store candidate waiting for other stores to update heartbeats", zap.Uint64("store-id", slowStoreID)) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "wait").Inc() + return ops, nil + } + + candCapturedSecs := s.conf.candidateCapturedSecs() + log.Info("detected slow store by trend, start to evict leaders", + zap.Uint64("store-id", slowStoreID), + zap.Uint64("candidate-captured-secs", candCapturedSecs)) + storeSlowTrendMiscGauge.WithLabelValues("candidate", "captured_secs").Set(float64(candCapturedSecs)) + if err := s.prepareEvictLeader(cluster, s.conf.popCandidate(true)); err != nil { + log.Info("prepare for evicting leader by slow trend failed", zap.Error(err), zap.Uint64("store-id", slowStoreID)) + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "prepare_err").Inc() + return ops, nil + } + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "start").Inc() + return s.scheduleEvictLeader(cluster), nil +} + +func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) Scheduler { + handler := newEvictSlowTrendHandler(conf) + return &evictSlowTrendScheduler{ + BaseScheduler: NewBaseScheduler(opController), + conf: conf, + handler: handler, + } +} + +func chooseEvictCandidate(cluster sche.SchedulerCluster, lastEvictCandidate *slowCandidate) (slowStore *core.StoreInfo) { + isRaftKV2 := cluster.GetStoreConfig().IsRaftKV2() + failpoint.Inject("mockRaftKV2", func() { + isRaftKV2 = true + }) + stores := cluster.GetStores() + if len(stores) < 3 { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_too_few").Inc() + return + } + + var candidates []*core.StoreInfo + var affectedStoreCount int + for _, store := range stores { + if store.IsRemoved() { + continue + } + if !(store.IsPreparing() || store.IsServing()) { + continue + } + if slowTrend := store.GetSlowTrend(); slowTrend != nil { + if slowTrend.ResultRate < -alterEpsilon { + affectedStoreCount += 1 + } + // For the cases of disk io jitters. + // Normally, if there exists jitters on disk io or network io, the slow store must have a descending + // trend on QPS and ascending trend on duration. So, the slowTrend must match the following pattern. + if slowTrend.CauseRate > alterEpsilon && slowTrend.ResultRate < -alterEpsilon { + candidates = append(candidates, store) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() + log.Info("evict-slow-trend-scheduler pre-captured candidate", + zap.Uint64("store-id", store.GetID()), + zap.Float64("cause-rate", slowTrend.CauseRate), + zap.Float64("result-rate", slowTrend.ResultRate), + zap.Float64("cause-value", slowTrend.CauseValue), + zap.Float64("result-value", slowTrend.ResultValue)) + } else if isRaftKV2 && slowTrend.CauseRate > alterEpsilon { + // Meanwhile, if the store was previously experiencing slowness in the `Duration` dimension, it should + // re-check whether this node is still encountering network I/O-related jitters. And If this node matches + // the last identified candidate, it indicates that the node is still being affected by delays in network I/O, + // and consequently, it should be re-designated as slow once more. + // Prerequisite: `raft-kv2` engine has the ability to percept the slow trend on network io jitters. + // TODO: maybe make it compatible to `raft-kv` later. + if lastEvictCandidate != nil && lastEvictCandidate.storeID == store.GetID() && DurationSinceAsSecs(lastEvictCandidate.recoverTS) <= minReCheckDurationGap { + candidates = append(candidates, store) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() + log.Info("evict-slow-trend-scheduler pre-captured candidate in raft-kv2 cluster", + zap.Uint64("store-id", store.GetID()), + zap.Float64("cause-rate", slowTrend.CauseRate), + zap.Float64("result-rate", slowTrend.ResultRate), + zap.Float64("cause-value", slowTrend.CauseValue), + zap.Float64("result-value", slowTrend.ResultValue)) + } + } + } + } + if len(candidates) == 0 { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_no_fit").Inc() + return + } + // TODO: Calculate to judge if one store is way slower than the others + if len(candidates) > 1 { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_too_many").Inc() + return + } + + store := candidates[0] + + affectedStoreThreshold := int(float64(len(stores)) * cluster.GetSchedulerConfig().GetSlowStoreEvictingAffectedStoreRatioThreshold()) + if affectedStoreCount < affectedStoreThreshold { + log.Info("evict-slow-trend-scheduler failed to confirm candidate: it only affect a few stores", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_affect_a_few").Inc() + return + } + + if !checkStoreSlowerThanOthers(cluster, store) { + log.Info("evict-slow-trend-scheduler failed to confirm candidate: it's not slower than others", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_not_slower").Inc() + return + } + + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() + log.Info("evict-slow-trend-scheduler captured candidate", zap.Uint64("store-id", store.GetID())) + return store +} + +func checkStoresAreUpdated(cluster sche.SchedulerCluster, slowStoreID uint64, slowStoreRecordTS time.Time) bool { + stores := cluster.GetStores() + if len(stores) <= 1 { + return false + } + expected := (len(stores) + 1) / 2 + updatedStores := 0 + for _, store := range stores { + if store.IsRemoved() { + updatedStores += 1 + continue + } + if !(store.IsPreparing() || store.IsServing()) { + updatedStores += 1 + continue + } + if store.GetID() == slowStoreID { + updatedStores += 1 + continue + } + if slowStoreRecordTS.Compare(store.GetLastHeartbeatTS()) <= 0 { + updatedStores += 1 + } + } + storeSlowTrendMiscGauge.WithLabelValues("store", "check_updated_count").Set(float64(updatedStores)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_updated_expected").Set(float64(expected)) + return updatedStores >= expected +} + +func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { + stores := cluster.GetStores() + expected := (len(stores)*2 + 1) / 3 + targetSlowTrend := target.GetSlowTrend() + if targetSlowTrend == nil { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "check_slower_no_data").Inc() + return false + } + slowerThanStoresNum := 0 + for _, store := range stores { + if store.IsRemoved() { + continue + } + if !(store.IsPreparing() || store.IsServing()) { + continue + } + if store.GetID() == target.GetID() { + continue + } + slowTrend := store.GetSlowTrend() + // Use `SlowTrend.ResultValue` at first, but not good, `CauseValue` is better + // Greater `CuaseValue` means slower + if slowTrend != nil && (targetSlowTrend.CauseValue-slowTrend.CauseValue) > alterEpsilon && slowTrend.CauseValue > alterEpsilon { + slowerThanStoresNum += 1 + } + } + storeSlowTrendMiscGauge.WithLabelValues("store", "check_slower_count").Set(float64(slowerThanStoresNum)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_slower_expected").Set(float64(expected)) + return slowerThanStoresNum >= expected +} + +func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { + /* + // + // This might not be necessary, + // and it also have tiny chances to cause `stuck in evicted` + // status when this store restarted, + // the `become fast` might be ignore on tikv side + // because of the detecting windows are not fully filled yet. + // Hence, we disabled this event capturing by now but keep the code here for further checking. + // + + // Wait for the evicted store's `become fast` event + slowTrend := target.GetSlowTrend() + if slowTrend == nil || slowTrend.CauseRate >= 0 && slowTrend.ResultRate <= 0 { + storeSlowTrendActionStatusGauge.WithLabelValues("recover.reject:no-event").Inc() + return false + } else { + storeSlowTrendActionStatusGauge.WithLabelValues("recover.judging:got-event").Inc() + } + */ + return checkStoreFasterThanOthers(cluster, target) +} + +func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { + stores := cluster.GetStores() + expected := (len(stores) + 1) / 2 + targetSlowTrend := target.GetSlowTrend() + if targetSlowTrend == nil { + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "check_faster_no_data").Inc() + return false + } + fasterThanStores := 0 + for _, store := range stores { + if store.IsRemoved() { + continue + } + if !(store.IsPreparing() || store.IsServing()) { + continue + } + if store.GetID() == target.GetID() { + continue + } + slowTrend := store.GetSlowTrend() + // Greater `CauseValue` means slower + if slowTrend != nil && targetSlowTrend.CauseValue <= slowTrend.CauseValue*1.1 && + slowTrend.CauseValue > alterEpsilon && targetSlowTrend.CauseValue > alterEpsilon { + fasterThanStores += 1 + } + } + storeSlowTrendMiscGauge.WithLabelValues("store", "check_faster_count").Set(float64(fasterThanStores)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_faster_expected").Set(float64(expected)) + return fasterThanStores >= expected +} + +// DurationSinceAsSecs returns the duration gap since the given startTS, unit: s. +func DurationSinceAsSecs(startTS time.Time) uint64 { + return uint64(time.Since(startTS).Seconds()) +} diff --git a/server/api/router.go b/server/api/router.go index eb87ef05bc2..9eae8ff3d77 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -84,10 +84,10 @@ func getFunctionName(f interface{}) string { // @BasePath /pd/api/v1 func createRouter(prefix string, svr *server.Server) *mux.Router { serviceMiddle := newServiceMiddlewareBuilder(svr) - registerPrefix := func(router *mux.Router, prefixPath string, + registerPrefix := func(router *mux.Router, prefixPath, name string, handleFunc func(http.ResponseWriter, *http.Request), opts ...createRouteOption) { routeCreateFunc(router.PathPrefix(prefixPath), serviceMiddle.createHandler(handleFunc), - getFunctionName(handleFunc), opts...) + name, opts...) } registerFunc := func(router *mux.Router, path string, handleFunc func(http.ResponseWriter, *http.Request), opts ...createRouteOption) { @@ -147,7 +147,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/schedulers/diagnostic/{name}", diagnosticHandler.GetDiagnosticResult, setMethods(http.MethodGet), setAuditBackend(prometheus)) schedulerConfigHandler := newSchedulerConfigHandler(svr, rd) - registerPrefix(apiRouter, "/scheduler-config", schedulerConfigHandler.GetSchedulerConfig, setAuditBackend(prometheus)) + registerPrefix(apiRouter, "/scheduler-config", "HandleSchedulerConfig", schedulerConfigHandler.HandleSchedulerConfig, setMethods(http.MethodPost, http.MethodDelete, http.MethodPut, http.MethodPatch), setAuditBackend(localLog, prometheus)) + registerPrefix(apiRouter, "/scheduler-config", "GetSchedulerConfig", schedulerConfigHandler.HandleSchedulerConfig, setMethods(http.MethodGet), setAuditBackend(prometheus)) clusterHandler := newClusterHandler(svr, rd) registerFunc(apiRouter, "/cluster", clusterHandler.GetCluster, setMethods(http.MethodGet), setAuditBackend(prometheus)) @@ -361,9 +362,14 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { unsafeOperationHandler.GetFailedStoresRemovalStatus, setMethods(http.MethodGet), setAuditBackend(prometheus)) // API to set or unset failpoints +<<<<<<< HEAD failpoint.Inject("enableFailpointAPI", func() { // this function will be named to "func2". It may be used in test registerPrefix(apiRouter, "/fail", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +======= + if enableFailPointAPI { + registerPrefix(apiRouter, "/fail", "FailPoint", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +>>>>>>> 8b8c78a78 (scheduler: add aduit log for scheduler config API and add resp msg for evict-leader (#7674)) // The HTTP handler of failpoint requires the full path to be the failpoint path. r.URL.Path = strings.TrimPrefix(r.URL.Path, prefix+apiPrefix+"/fail") new(failpoint.HttpHandler).ServeHTTP(w, r) diff --git a/server/api/scheduler.go b/server/api/scheduler.go index 9b690a93249..8cbd19f167c 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -357,7 +357,7 @@ func newSchedulerConfigHandler(svr *server.Server, rd *render.Render) *scheduler } } -func (h *schedulerConfigHandler) GetSchedulerConfig(w http.ResponseWriter, r *http.Request) { +func (h *schedulerConfigHandler) HandleSchedulerConfig(w http.ResponseWriter, r *http.Request) { handler := h.svr.GetHandler() sh, err := handler.GetSchedulerConfigHandler() if err == nil && sh != nil { diff --git a/server/api/server_test.go b/server/api/server_test.go index bdf929b17b3..6471e344414 100644 --- a/server/api/server_test.go +++ b/server/api/server_test.go @@ -172,9 +172,18 @@ func (suite *serviceTestSuite) TestServiceLabels() { suite.Equal("Profile", serviceLabel) accessPaths = suite.svr.GetServiceLabels("GetSchedulerConfig") +<<<<<<< HEAD suite.Len(accessPaths, 1) suite.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) suite.Equal("", accessPaths[0].Method) +======= + re.Len(accessPaths, 1) + re.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) + re.Equal("GET", accessPaths[0].Method) + accessPaths = suite.svr.GetServiceLabels("HandleSchedulerConfig") + re.Len(accessPaths, 4) + re.Equal("/pd/api/v1/scheduler-config", accessPaths[0].Path) +>>>>>>> 8b8c78a78 (scheduler: add aduit log for scheduler config API and add resp msg for evict-leader (#7674)) accessPaths = suite.svr.GetServiceLabels("ResignLeader") suite.Len(accessPaths, 1) diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index 716b848b402..cbefceeb020 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -397,7 +397,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - handler.rd.JSON(w, http.StatusOK, nil) + handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") } func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) { diff --git a/server/schedulers/grant_leader.go b/server/schedulers/grant_leader.go index bac48675323..18be38f16a2 100644 --- a/server/schedulers/grant_leader.go +++ b/server/schedulers/grant_leader.go @@ -301,7 +301,7 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - handler.rd.JSON(w, http.StatusOK, nil) + handler.rd.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") } func (handler *grantLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) { diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go new file mode 100644 index 00000000000..ab96d430523 --- /dev/null +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -0,0 +1,716 @@ +// Copyright 2019 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler_test + +import ( + "encoding/json" + "fmt" + "reflect" + "strings" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/spf13/cobra" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/versioninfo" + pdTests "github.com/tikv/pd/tests" + ctl "github.com/tikv/pd/tools/pd-ctl/pdctl" + "github.com/tikv/pd/tools/pd-ctl/tests" +) + +type schedulerTestSuite struct { + suite.Suite + env *pdTests.SchedulingTestEnvironment + defaultSchedulers []string +} + +func TestSchedulerTestSuite(t *testing.T) { + suite.Run(t, new(schedulerTestSuite)) +} + +func (suite *schedulerTestSuite) SetupSuite() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/skipStoreConfigSync", `return(true)`)) + suite.env = pdTests.NewSchedulingTestEnvironment(suite.T()) + suite.defaultSchedulers = []string{ + "balance-leader-scheduler", + "balance-region-scheduler", + "balance-hot-region-scheduler", + "balance-witness-scheduler", + "transfer-witness-leader-scheduler", + "evict-slow-store-scheduler", + } +} + +func (suite *schedulerTestSuite) TearDownSuite() { + re := suite.Require() + suite.env.Cleanup() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/skipStoreConfigSync")) +} + +func (suite *schedulerTestSuite) TearDownTest() { + cleanFunc := func(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + var currentSchedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, ¤tSchedulers) + for _, scheduler := range suite.defaultSchedulers { + if slice.NoneOf(currentSchedulers, func(i int) bool { + return currentSchedulers[i] == scheduler + }) { + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", scheduler}, nil) + re.Contains(echo, "Success!") + } + } + for _, scheduler := range currentSchedulers { + if slice.NoneOf(suite.defaultSchedulers, func(i int) bool { + return suite.defaultSchedulers[i] == scheduler + }) { + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", scheduler}, nil) + re.Contains(echo, "Success!") + } + } + } + suite.env.RunFuncInTwoModes(cleanFunc) +} + +func (suite *schedulerTestSuite) TestScheduler() { + suite.env.RunTestInTwoModes(suite.checkScheduler) +} + +func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + + mustUsage := func(args []string) { + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "Usage") + } + + checkSchedulerCommand := func(args []string, expected map[string]bool) { + if args != nil { + echo := mustExec(re, cmd, args, nil) + re.Contains(echo, "Success!") + } + testutil.Eventually(re, func() bool { + var schedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers) + if len(schedulers) != len(expected) { + return false + } + for _, scheduler := range schedulers { + if _, ok := expected[scheduler]; !ok { + return false + } + } + return true + }) + } + + checkSchedulerConfigCommand := func(expectedConfig map[string]interface{}, schedulerName string) { + testutil.Eventually(re, func() bool { + configInfo := make(map[string]interface{}) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) + return reflect.DeepEqual(expectedConfig, configInfo) + }) + } + + leaderServer := cluster.GetLeaderServer() + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + + // note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region. + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + + // scheduler show command + expected := map[string]bool{ + "balance-region-scheduler": true, + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(nil, expected) + + // scheduler delete command + args := []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + + // avoid the influence of the scheduler order + schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler", "evict-leader-scheduler", "grant-leader-scheduler"} + + checkStorePause := func(changedStores []uint64, schedulerName string) { + status := func() string { + switch schedulerName { + case "evict-leader-scheduler": + return "paused" + case "grant-leader-scheduler": + return "resumed" + default: + re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) + return "" + } + }() + for _, store := range stores { + isStorePaused := !cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()).AllowLeaderTransfer() + if slice.AnyOf(changedStores, func(i int) bool { + return store.GetId() == changedStores[i] + }) { + re.True(isStorePaused, + fmt.Sprintf("store %d should be %s with %s", store.GetId(), status, schedulerName)) + } else { + re.False(isStorePaused, + fmt.Sprintf("store %d should not be %s with %s", store.GetId(), status, schedulerName)) + } + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransfer()) + } + } + } + + for idx := range schedulers { + checkStorePause([]uint64{}, schedulers[idx]) + // scheduler add command + args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + + // scheduler config show command + expectedConfig := make(map[string]interface{}) + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2}, schedulers[idx]) + + // scheduler config update command + args = []string{"-u", pdAddr, "scheduler", "config", schedulers[idx], "add-store", "3"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + + // check update success + checkSchedulerCommand(args, expected) + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2, 3}, schedulers[idx]) + + // scheduler delete command + args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + checkStorePause([]uint64{}, schedulers[idx]) + + // scheduler add command + args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + checkStorePause([]uint64{2}, schedulers[idx]) + + // scheduler add command twice + args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "4"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + + // check add success + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "4": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2, 4}, schedulers[idx]) + + // scheduler remove command [old] + args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-4"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + schedulers[idx]: true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + + // check remove success + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2}, schedulers[idx]) + + // scheduler remove command, when remove the last store, it should remove whole scheduler + args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-2"} + expected = map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + } + checkSchedulerCommand(args, expected) + checkStorePause([]uint64{}, schedulers[idx]) + } + + // test shuffle region config + checkSchedulerCommand([]string{"-u", pdAddr, "scheduler", "add", "shuffle-region-scheduler"}, map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "shuffle-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + }) + var roles []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) + re.Equal([]string{"leader", "follower", "learner"}, roles) + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) + return reflect.DeepEqual([]string{"learner"}, roles) + }) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) + re.Equal([]string{"learner"}, roles) + + // test grant hot region scheduler config + checkSchedulerCommand([]string{"-u", pdAddr, "scheduler", "add", "grant-hot-region-scheduler", "1", "1,2,3"}, map[string]bool{ + "balance-leader-scheduler": true, + "balance-hot-region-scheduler": true, + "shuffle-region-scheduler": true, + "grant-hot-region-scheduler": true, + "transfer-witness-leader-scheduler": true, + "balance-witness-scheduler": true, + "evict-slow-store-scheduler": true, + }) + var conf3 map[string]interface{} + expected3 := map[string]interface{}{ + "store-id": []interface{}{float64(1), float64(2), float64(3)}, + "store-leader-id": float64(1), + } + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) + re.Equal(expected3, conf3) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) + re.Contains(echo, "Success!") + expected3["store-leader-id"] = float64(2) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) + return reflect.DeepEqual(expected3, conf3) + }) + + // test remove and add scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.NotContains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) + re.Contains(echo, "Success! The scheduler is created.") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}, nil) + re.Contains(echo, "Success! The scheduler has been applied to the store.") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-2"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + re.Contains(echo, "404") + testutil.Eventually(re, func() bool { // wait for removed scheduler to be synced to scheduling server. + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil) + return strings.Contains(echo, "[404] scheduler not found") + }) + + // test hot region config + expected1 := map[string]interface{}{ + "min-hot-byte-rate": float64(100), + "min-hot-key-rate": float64(10), + "min-hot-query-rate": float64(10), + "max-zombie-rounds": float64(3), + "max-peer-number": float64(1000), + "byte-rate-rank-step-ratio": 0.05, + "key-rate-rank-step-ratio": 0.05, + "query-rate-rank-step-ratio": 0.05, + "count-rank-step-ratio": 0.01, + "great-dec-ratio": 0.95, + "minor-dec-ratio": 0.99, + "src-tolerance-ratio": 1.05, + "dst-tolerance-ratio": 1.05, + "read-priorities": []interface{}{"byte", "key"}, + "write-leader-priorities": []interface{}{"key", "byte"}, + "write-peer-priorities": []interface{}{"byte", "key"}, + "strict-picking-store": "true", + "enable-for-tiflash": "true", + "rank-formula-version": "v2", + "split-thresholds": 0.2, + } + checkHotSchedulerConfig := func(expect map[string]interface{}) { + testutil.Eventually(re, func() bool { + var conf1 map[string]interface{} + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + return reflect.DeepEqual(expect, conf1) + }) + } + + var conf map[string]interface{} + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "list"}, &conf) + re.Equal(expected1, conf) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "show"}, &conf) + re.Equal(expected1, conf) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) + re.Contains(echo, "Success!") + expected1["src-tolerance-ratio"] = 1.02 + checkHotSchedulerConfig(expected1) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + re.Contains(echo, "Success!") + expected1["read-priorities"] = []interface{}{"byte", "key"} + checkHotSchedulerConfig(expected1) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) + re.Contains(echo, "Success!") + expected1["read-priorities"] = []interface{}{"key", "byte"} + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + + // write-priorities is divided into write-leader-priorities and write-peer-priorities + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) + re.Contains(echo, "Failed!") + re.Contains(echo, "Config item is not found.") + checkHotSchedulerConfig(expected1) + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) + re.Contains(echo, "Failed!") + checkHotSchedulerConfig(expected1) + expected1["rank-formula-version"] = "v2" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + expected1["rank-formula-version"] = "v1" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + + expected1["forbid-rw-type"] = "read" + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + re.Contains(echo, "Success!") + checkHotSchedulerConfig(expected1) + + // test compatibility + re.Equal("2.0.0", leaderServer.GetClusterVersion().String()) + for _, store := range stores { + version := versioninfo.HotScheduleWithQuery + store.Version = versioninfo.MinSupportedVersion(version).String() + store.LastHeartbeat = time.Now().UnixNano() + pdTests.MustPutStore(re, cluster, store) + } + re.Equal("5.2.0", leaderServer.GetClusterVersion().String()) + // After upgrading, we should not use query. + checkHotSchedulerConfig(expected1) + // cannot set qps as write-peer-priorities + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) + re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities") + checkHotSchedulerConfig(expected1) + + // test remove and add + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) + re.Contains(echo, "Success") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) + re.Contains(echo, "Success") + + // test balance leader config + conf = make(map[string]interface{}) + conf1 := make(map[string]interface{}) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) + re.Equal(4., conf["batch"]) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) + return conf1["batch"] == 3. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) + re.NotContains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-leader-scheduler"}, nil) + re.Contains(echo, "404") + re.Contains(echo, "PD:scheduler:ErrSchedulerNotFound]scheduler not found") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, nil) + re.Contains(echo, "404") + re.Contains(echo, "scheduler not found") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + + // test evict-slow-store && evict-slow-trend schedulers config + evictSlownessSchedulers := []string{"evict-slow-store-scheduler", "evict-slow-trend-scheduler"} + for _, schedulerName := range evictSlownessSchedulers { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil) + if strings.Contains(echo, "Success!") { + re.Contains(echo, "Success!") + } else { + re.Contains(echo, "scheduler existed") + } + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, schedulerName) + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil) + re.Contains(echo, "Success! Config updated.") + conf = make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf) + return conf["recovery-duration"] == 100. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, schedulerName) + }) + } + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-slow-store-scheduler"}, nil) + re.Contains(echo, "Success!") + + // test shuffle hot region scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "shuffle-hot-region-scheduler"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, "shuffle-hot-region-scheduler") + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "set", "limit", "127"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-hot-region-scheduler", "show"}, &conf) + return conf["limit"] == 127. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "shuffle-hot-region-scheduler"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, "shuffle-hot-region-scheduler") + }) + + // test show scheduler with paused and disabled status. + checkSchedulerWithStatusCommand := func(status string, expected []string) { + testutil.Eventually(re, func() bool { + var schedulers []string + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers) + return reflect.DeepEqual(expected, schedulers) + }) + } + + mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"}) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") + checkSchedulerWithStatusCommand("paused", []string{ + "balance-leader-scheduler", + }) + result := make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result) + return len(result) != 0 && result["status"] == "paused" && result["summary"] == "" + }, testutil.WithWaitFor(30*time.Second)) + + mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + checkSchedulerWithStatusCommand("paused", []string{}) + + // set label scheduler to disabled manually. + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "label-scheduler"}, nil) + re.Contains(echo, "Success!") + cfg := leaderServer.GetServer().GetScheduleConfig() + origin := cfg.Schedulers + cfg.Schedulers = sc.SchedulerConfigs{{Type: "label", Disable: true}} + err := leaderServer.GetServer().SetScheduleConfig(*cfg) + re.NoError(err) + checkSchedulerWithStatusCommand("disabled", []string{"label-scheduler"}) + // reset Schedulers in ScheduleConfig + cfg.Schedulers = origin + err = leaderServer.GetServer().SetScheduleConfig(*cfg) + re.NoError(err) + checkSchedulerWithStatusCommand("disabled", []string{}) +} + +func (suite *schedulerTestSuite) TestSchedulerDiagnostic() { + suite.env.RunTestInTwoModes(suite.checkSchedulerDiagnostic) +} + +func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + checkSchedulerDescribeCommand := func(schedulerName, expectedStatus, expectedSummary string) { + result := make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) + return len(result) != 0 && expectedStatus == result["status"] && expectedSummary == result["summary"] + }, testutil.WithTickInterval(50*time.Millisecond)) + } + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + + // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + + echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") + + // scheduler delete command + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") + + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") +} + +func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v interface{}) string { + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + if v == nil { + return string(output) + } + re.NoError(json.Unmarshal(output, v), string(output)) + return "" +} + +func mightExec(re *require.Assertions, cmd *cobra.Command, args []string, v interface{}) { + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + if v == nil { + return + } + json.Unmarshal(output, v) +}