diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 0b2ad56a0e4..d488e4fe4e3 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -45,6 +45,7 @@ import ( "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/apiutil" @@ -162,6 +163,7 @@ func (s *Server) updateAPIServerMemberLoop() { members, err := s.GetClient().MemberList(ctx) if err != nil { log.Warn("failed to list members", errs.ZapError(err)) + continue } for _, ep := range members.Members { status, err := s.GetClient().Status(ctx, ep.ClientURLs[0]) @@ -438,6 +440,7 @@ func CreateServer(ctx context.Context, cfg *config.Config) *Server { // CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server func CreateServerWrapper(cmd *cobra.Command, args []string) { + schedulers.Register() cmd.Flags().Parse(args) cfg := config.NewConfig() flagSet := cmd.Flags() diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 0e6c2281f2f..7e21919b214 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -379,7 +379,7 @@ func (c *Coordinator) Run() { } } log.Info("Coordinator starts to run schedulers") - c.initSchedulers() + c.InitSchedulers(true) c.wg.Add(4) // Starts to patrol regions. @@ -391,7 +391,8 @@ func (c *Coordinator) Run() { go c.driveSlowNodeScheduler() } -func (c *Coordinator) initSchedulers() { +// InitSchedulers initializes schedulers. +func (c *Coordinator) InitSchedulers(needRun bool) { var ( scheduleNames []string configs []string @@ -401,7 +402,7 @@ func (c *Coordinator) initSchedulers() { scheduleNames, configs, err = c.cluster.GetStorage().LoadAllScheduleConfig() select { case <-c.ctx.Done(): - log.Info("Coordinator stops running") + log.Info("init schedulers has been stopped") return default: } @@ -439,8 +440,10 @@ func (c *Coordinator) initSchedulers() { continue } log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) - if err = c.schedulers.AddScheduler(s); err != nil { - log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + if needRun { + if err = c.schedulers.AddScheduler(s); err != nil { + log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) + } } } @@ -461,12 +464,14 @@ func (c *Coordinator) initSchedulers() { } log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) - if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { - log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) - } else { - // Only records the valid scheduler config. - scheduleCfg.Schedulers[k] = schedulerCfg - k++ + if needRun { + if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) + } else { + // Only records the valid scheduler config. + scheduleCfg.Schedulers[k] = schedulerCfg + k++ + } } } diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index 2b695ed2923..74008014ddb 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -61,7 +61,7 @@ func newStoreStatistics(opt *config.PersistOptions) *storeStatistics { } } -func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) { +func (s *storeStatistics) Observe(store *core.StoreInfo) { for _, k := range s.opt.GetLocationLabels() { v := store.GetLabelValue(k) if v == "" { @@ -146,8 +146,12 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) { storeStatusGauge.WithLabelValues(storeAddress, id, "store_slow_trend_result_value").Set(slowTrend.ResultValue) storeStatusGauge.WithLabelValues(storeAddress, id, "store_slow_trend_result_rate").Set(slowTrend.ResultRate) } +} +func (s *storeStatistics) ObserveHotStat(store *core.StoreInfo, stats *StoresStats) { // Store flows. + storeAddress := store.GetAddress() + id := strconv.FormatUint(store.GetID(), 10) storeFlowStats := stats.GetRollingStoreStats(store.GetID()) if storeFlowStats == nil { return @@ -298,8 +302,12 @@ func NewStoreStatisticsMap(opt *config.PersistOptions) *storeStatisticsMap { } } -func (m *storeStatisticsMap) Observe(store *core.StoreInfo, stats *StoresStats) { - m.stats.Observe(store, stats) +func (m *storeStatisticsMap) Observe(store *core.StoreInfo) { + m.stats.Observe(store) +} + +func (m *storeStatisticsMap) ObserveHotStat(store *core.StoreInfo, stats *StoresStats) { + m.stats.ObserveHotStat(store, stats) } func (m *storeStatisticsMap) Collect() { diff --git a/pkg/statistics/store_collection_test.go b/pkg/statistics/store_collection_test.go index 229339cb4c4..74ef12c54a9 100644 --- a/pkg/statistics/store_collection_test.go +++ b/pkg/statistics/store_collection_test.go @@ -67,7 +67,8 @@ func TestStoreStatistics(t *testing.T) { stores[5] = store5 storeStats := NewStoreStatisticsMap(opt) for _, store := range stores { - storeStats.Observe(store, storesStats) + storeStats.Observe(store) + storeStats.ObserveHotStat(store, storesStats) } stats := storeStats.stats diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 36bec2fc1c1..dc07b260d87 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -143,11 +143,12 @@ type RaftCluster struct { etcdClient *clientv3.Client httpClient *http.Client - running bool - meta *metapb.Cluster - storage storage.Storage - minResolvedTS uint64 - externalTS uint64 + running bool + isAPIServiceMode bool + meta *metapb.Cluster + storage storage.Storage + minResolvedTS uint64 + externalTS uint64 // Keep the previous store limit settings when removing a store. prevStoreLimit map[uint64]map[storelimit.Type]float64 @@ -287,6 +288,7 @@ func (c *RaftCluster) Start(s Server) error { return nil } + c.isAPIServiceMode = s.IsAPIServiceMode() c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetKeyspaceGroupManager()) cluster, err := c.LoadClusterInfo() if err != nil { @@ -312,6 +314,7 @@ func (c *RaftCluster) Start(s Server) error { if err != nil { return err } + c.coordinator = schedule.NewCoordinator(c.ctx, cluster, s.GetHBStreams()) c.regionStats = statistics.NewRegionStatistics(c.core, c.opt, c.ruleManager) c.limiter = NewStoreLimiter(s.GetPersistOptions()) @@ -320,20 +323,23 @@ func (c *RaftCluster) Start(s Server) error { log.Error("load external timestamp meets error", zap.Error(err)) } - // bootstrap keyspace group manager after starting other parts successfully. - // This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster. if s.IsAPIServiceMode() { + // bootstrap keyspace group manager after starting other parts successfully. + // This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster. err = c.keyspaceGroupManager.Bootstrap(c.ctx) if err != nil { return err } + c.initSchedulers() + } else { + c.wg.Add(3) + go c.runCoordinator() + go c.runStatsBackgroundJobs() + go c.runMetricsCollectionJob() } - c.wg.Add(10) - go c.runCoordinator() - go c.runMetricsCollectionJob() + c.wg.Add(7) go c.runNodeStateCheckJob() - go c.runStatsBackgroundJobs() go c.syncRegions() go c.runReplicationMode() go c.runMinResolvedTSJob() @@ -588,10 +594,12 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { zap.Int("count", c.core.GetTotalRegionCount()), zap.Duration("cost", time.Since(start)), ) - for _, store := range c.GetStores() { - storeID := store.GetID() - c.hotStat.GetOrCreateRollingStoreStats(storeID) - c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow()) + if !c.isAPIServiceMode { + for _, store := range c.GetStores() { + storeID := store.GetID() + c.hotStat.GetOrCreateRollingStoreStats(storeID) + c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow()) + } } return c, nil } @@ -713,7 +721,9 @@ func (c *RaftCluster) Stop() { return } c.running = false - c.coordinator.Stop() + if !c.isAPIServiceMode { + c.coordinator.Stop() + } c.cancel() c.Unlock() @@ -839,6 +849,10 @@ func (c *RaftCluster) GetOpts() sc.ConfProvider { return c.opt } +func (c *RaftCluster) initSchedulers() { + c.coordinator.InitSchedulers(false) +} + // GetScheduleConfig returns scheduling configurations. func (c *RaftCluster) GetScheduleConfig() *sc.ScheduleConfig { return c.opt.GetScheduleConfig() @@ -934,11 +948,15 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest nowTime := time.Now() var newStore *core.StoreInfo // If this cluster has slow stores, we should awaken hibernated regions in other stores. - if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken { - log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs)) - newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt) - resp.AwakenRegions = &pdpb.AwakenRegions{ - AbnormalStores: slowStoreIDs, + if !c.isAPIServiceMode { + if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken { + log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs)) + newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt) + resp.AwakenRegions = &pdpb.AwakenRegions{ + AbnormalStores: slowStoreIDs, + } + } else { + newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), opt) } } else { newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), opt) @@ -961,41 +979,47 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest statistics.UpdateStoreHeartbeatMetrics(store) } c.core.PutStore(newStore) - c.hotStat.Observe(storeID, newStore.GetStoreStats()) - c.hotStat.FilterUnhealthyStore(c) - c.slowStat.ObserveSlowStoreStatus(storeID, newStore.IsSlow()) - reportInterval := stats.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - - regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) - for _, peerStat := range stats.GetPeerStats() { - regionID := peerStat.GetRegionId() - region := c.GetRegion(regionID) - regions[regionID] = region - if region == nil { - log.Warn("discard hot peer stat for unknown region", - zap.Uint64("region-id", regionID), - zap.Uint64("store-id", storeID)) - continue - } - peer := region.GetStorePeer(storeID) - if peer == nil { - log.Warn("discard hot peer stat for unknown region peer", - zap.Uint64("region-id", regionID), - zap.Uint64("store-id", storeID)) - continue - } - readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) - loads := []float64{ - utils.RegionReadBytes: float64(peerStat.GetReadBytes()), - utils.RegionReadKeys: float64(peerStat.GetReadKeys()), - utils.RegionReadQueryNum: float64(readQueryNum), - utils.RegionWriteBytes: 0, - utils.RegionWriteKeys: 0, - utils.RegionWriteQueryNum: 0, + var ( + regions map[uint64]*core.RegionInfo + interval uint64 + ) + if !c.isAPIServiceMode { + c.hotStat.Observe(storeID, newStore.GetStoreStats()) + c.hotStat.FilterUnhealthyStore(c) + c.slowStat.ObserveSlowStoreStatus(storeID, newStore.IsSlow()) + reportInterval := stats.GetInterval() + interval = reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + + regions = make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) + for _, peerStat := range stats.GetPeerStats() { + regionID := peerStat.GetRegionId() + region := c.GetRegion(regionID) + regions[regionID] = region + if region == nil { + log.Warn("discard hot peer stat for unknown region", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + peer := region.GetStorePeer(storeID) + if peer == nil { + log.Warn("discard hot peer stat for unknown region peer", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) + loads := []float64{ + utils.RegionReadBytes: float64(peerStat.GetReadBytes()), + utils.RegionReadKeys: float64(peerStat.GetReadKeys()), + utils.RegionReadQueryNum: float64(readQueryNum), + utils.RegionWriteBytes: 0, + utils.RegionWriteKeys: 0, + utils.RegionWriteQueryNum: 0, + } + peerInfo := core.NewPeerInfo(peer, loads, interval) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) } - peerInfo := core.NewPeerInfo(peer, loads, interval) - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) } for _, stat := range stats.GetSnapshotStats() { // the duration of snapshot is the sum between to send and generate snapshot. @@ -1015,8 +1039,10 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest e := int64(dur)*2 - int64(stat.GetTotalDurationSec()) store.Feedback(float64(e)) } - // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. - c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + if !c.isAPIServiceMode { + // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + } return nil } @@ -1064,22 +1090,24 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) - c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) - c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) + if !c.isAPIServiceMode { + c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) + c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + for _, peer := range region.GetPeers() { + peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) + c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region) } - c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region) hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !saveKV && !saveCache && !isNew { + if !c.isAPIServiceMode && !saveKV && !saveCache && !isNew { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -1106,23 +1134,25 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } for _, item := range overlaps { - if c.regionStats != nil { - c.regionStats.ClearDefunctRegion(item.GetID()) + if !c.isAPIServiceMode { + if c.regionStats != nil { + c.regionStats.ClearDefunctRegion(item.GetID()) + } + c.labelLevelStats.ClearDefunctRegion(item.GetID()) } - c.labelLevelStats.ClearDefunctRegion(item.GetID()) c.ruleManager.InvalidCache(item.GetID()) } regionUpdateCacheEventCounter.Inc() } - if hasRegionStats { - c.regionStats.Observe(region, c.getRegionStoresLocked(region)) + if !c.isAPIServiceMode { + if hasRegionStats { + c.regionStats.Observe(region, c.getRegionStoresLocked(region)) + } } - if !c.IsPrepared() && isNew { c.coordinator.GetPrepareChecker().Collect(region) } - if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. @@ -1598,8 +1628,10 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { delete(c.prevStoreLimit, storeID) c.RemoveStoreLimit(storeID) c.resetProgress(storeID, store.GetAddress()) - c.hotStat.RemoveRollingStoreStats(storeID) - c.slowStat.RemoveSlowStoreStatus(storeID) + if !c.isAPIServiceMode { + c.hotStat.RemoveRollingStoreStats(storeID) + c.slowStat.RemoveSlowStoreStatus(storeID) + } } return err } @@ -1773,8 +1805,10 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error { } } c.core.PutStore(store) - c.hotStat.GetOrCreateRollingStoreStats(store.GetID()) - c.slowStat.ObserveSlowStoreStatus(store.GetID(), store.IsSlow()) + if !c.isAPIServiceMode { + c.hotStat.GetOrCreateRollingStoreStats(store.GetID()) + c.slowStat.ObserveSlowStoreStatus(store.GetID(), store.IsSlow()) + } return nil } @@ -2124,13 +2158,18 @@ func (c *RaftCluster) collectMetrics() { statsMap := statistics.NewStoreStatisticsMap(c.opt) stores := c.GetStores() for _, s := range stores { - statsMap.Observe(s, c.hotStat.StoresStats) + statsMap.Observe(s) + if !c.isAPIServiceMode { + statsMap.ObserveHotStat(s, c.hotStat.StoresStats) + } } statsMap.Collect() - c.coordinator.GetSchedulersController().CollectSchedulerMetrics() - c.coordinator.CollectHotSpotMetrics() - c.collectClusterMetrics() + if !c.isAPIServiceMode { + c.coordinator.GetSchedulersController().CollectSchedulerMetrics() + c.coordinator.CollectHotSpotMetrics() + c.collectClusterMetrics() + } c.collectHealthStatus() } @@ -2138,9 +2177,11 @@ func (c *RaftCluster) resetMetrics() { statsMap := statistics.NewStoreStatisticsMap(c.opt) statsMap.Reset() - c.coordinator.GetSchedulersController().ResetSchedulerMetrics() - c.coordinator.ResetHotSpotMetrics() - c.resetClusterMetrics() + if !c.isAPIServiceMode { + c.coordinator.GetSchedulersController().ResetSchedulerMetrics() + c.coordinator.ResetHotSpotMetrics() + c.resetClusterMetrics() + } c.resetHealthStatus() c.resetProgressIndicator() } diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 82113bc1656..c1da97363b5 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -250,6 +250,8 @@ func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error { if err := c.processReportBuckets(b); err != nil { return err } - c.hotStat.CheckAsync(buckets.NewCheckPeerTask(b)) + if !c.isAPIServiceMode { + c.hotStat.CheckAsync(buckets.NewCheckPeerTask(b)) + } return nil } diff --git a/server/handler.go b/server/handler.go index 02ec6da4808..2e4b88b20e2 100644 --- a/server/handler.go +++ b/server/handler.go @@ -236,14 +236,20 @@ func (h *Handler) AddScheduler(name string, args ...string) error { return err } log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args)) - if err = c.AddScheduler(s, args...); err != nil { - log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) - } else if err = h.opt.Persist(c.GetStorage()); err != nil { - log.Error("can not persist scheduler config", errs.ZapError(err)) + if !h.s.IsAPIServiceMode() { + if err = c.AddScheduler(s, args...); err != nil { + log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) + return err + } } else { - log.Info("add scheduler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) + c.GetSchedulerConfig().AddSchedulerCfg(s.GetType(), args) } - return err + if err = h.opt.Persist(c.GetStorage()); err != nil { + log.Error("can not persist scheduler config", errs.ZapError(err)) + return err + } + log.Info("add scheduler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) + return nil } // RemoveScheduler removes a scheduler by name. @@ -252,10 +258,24 @@ func (h *Handler) RemoveScheduler(name string) error { if err != nil { return err } - if err = c.RemoveScheduler(name); err != nil { - log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) + if !h.s.IsAPIServiceMode() { + if err = c.RemoveScheduler(name); err != nil { + log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err)) + } else { + log.Info("remove scheduler successfully", zap.String("scheduler-name", name)) + } } else { - log.Info("remove scheduler successfully", zap.String("scheduler-name", name)) + conf := c.GetSchedulerConfig() + c.GetSchedulerConfig().RemoveSchedulerCfg(schedulers.FindSchedulerTypeByName(name)) + if err := conf.Persist(c.GetStorage()); err != nil { + log.Error("the option can not persist scheduler config", errs.ZapError(err)) + return err + } + + if err := c.GetStorage().RemoveScheduleConfig(name); err != nil { + log.Error("can not remove the scheduler config", errs.ZapError(err)) + return err + } } return err } diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index ef52203e349..032cb4ad7ae 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -136,18 +136,10 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { ) re.NoError(err) // Get all default scheduler names. - var ( - schedulerNames []string - schedulerController = suite.pdLeaderServer.GetRaftCluster().GetCoordinator().GetSchedulersController() - ) + var schedulerNames, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllScheduleConfig() + testutil.Eventually(re, func() bool { - schedulerNames = schedulerController.GetSchedulerNames() targetCount := len(sc.DefaultSchedulers) - // In the previous case, StoreConfig of raft-kv2 has been persisted. So, it might - // have EvictSlowTrendName. - if exists, _ := schedulerController.IsSchedulerExisted(schedulers.EvictSlowTrendName); exists { - targetCount += 1 - } return len(schedulerNames) == targetCount }) // Check all default schedulers' configs.