diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index cac835a1cb7..19d57b399f2 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -606,11 +606,6 @@ func (o *PersistConfig) GetRegionMaxKeys() uint64 { return o.GetStoreConfig().GetRegionMaxKeys() } -// IsSynced returns true if the cluster config is synced. -func (o *PersistConfig) IsSynced() bool { - return o.GetStoreConfig().IsSynced() -} - // IsEnableRegionBucket return true if the region bucket is enabled. func (o *PersistConfig) IsEnableRegionBucket() bool { return o.GetStoreConfig().IsEnableRegionBucket() diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index eeda9fbeb2c..00f11a5950f 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -139,7 +139,6 @@ type StoreConfigProvider interface { GetRegionMaxKeys() uint64 CheckRegionSize(uint64, uint64) error CheckRegionKeys(uint64, uint64) error - IsSynced() bool IsEnableRegionBucket() bool IsRaftKV2() bool } diff --git a/pkg/schedule/config/store_config.go b/pkg/schedule/config/store_config.go index d7c873fb9c9..5f898ef17c4 100644 --- a/pkg/schedule/config/store_config.go +++ b/pkg/schedule/config/store_config.go @@ -48,8 +48,6 @@ type StoreConfig struct { RegionMaxSizeMB uint64 `json:"-"` RegionSplitSizeMB uint64 `json:"-"` RegionBucketSizeMB uint64 `json:"-"` - - Sync bool `json:"sync"` } // Storage is the config for the tikv storage. @@ -127,21 +125,6 @@ func (c *StoreConfig) GetRegionMaxKeys() uint64 { return uint64(c.RegionMaxKeys) } -// SetSynced marks StoreConfig has been synced. -func (c *StoreConfig) SetSynced() { - if c != nil { - c.Sync = true - } -} - -// IsSynced returns whether the StoreConfig is synced or not. -func (c *StoreConfig) IsSynced() bool { - if c == nil { - return false - } - return c.Sync -} - // IsEnableRegionBucket return true if the region bucket is enabled. func (c *StoreConfig) IsEnableRegionBucket() bool { if c == nil { diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 5b2815271ec..0e6c2281f2f 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -19,7 +19,6 @@ import ( "context" "strconv" "sync" - "testing" "time" "github.com/pingcap/errors" @@ -313,6 +312,42 @@ func (c *Coordinator) drivePushOperator() { } } +// driveSlowNodeScheduler is used to enable slow node scheduler when using `raft-kv2`. +func (c *Coordinator) driveSlowNodeScheduler() { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + log.Info("drive slow node scheduler is stopped") + return + case <-ticker.C: + { + // If enabled, exit. + if exists, _ := c.schedulers.IsSchedulerExisted(schedulers.EvictSlowTrendName); exists { + return + } + // If the cluster was set up with `raft-kv2` engine, this cluster should + // enable `evict-slow-trend` scheduler as default. + if c.GetCluster().GetStoreConfig().IsRaftKV2() { + typ := schedulers.EvictSlowTrendType + args := []string{} + + s, err := schedulers.CreateScheduler(typ, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(typ, args), c.schedulers.RemoveScheduler) + if err != nil { + log.Warn("initializing evict-slow-trend scheduler failed", errs.ZapError(err)) + } else if err = c.schedulers.AddScheduler(s, args...); err != nil { + log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) + } + } + } + } + } +} + // RunUntilStop runs the coordinator until receiving the stop signal. func (c *Coordinator) RunUntilStop() { c.Run() @@ -346,12 +381,14 @@ func (c *Coordinator) Run() { log.Info("Coordinator starts to run schedulers") c.initSchedulers() - c.wg.Add(3) + c.wg.Add(4) // Starts to patrol regions. go c.PatrolRegions() // Checks suspect key ranges go c.checkSuspectRanges() go c.drivePushOperator() + // Checks whether to create evict-slow-trend scheduler. + go c.driveSlowNodeScheduler() } func (c *Coordinator) initSchedulers() { @@ -439,20 +476,6 @@ func (c *Coordinator) initSchedulers() { if err := c.cluster.GetSchedulerConfig().Persist(c.cluster.GetStorage()); err != nil { log.Error("cannot persist schedule config", errs.ZapError(err)) } - - // If the cluster was set up with `raft-kv2` engine, this cluster should - // enable `evict-slow-trend` scheduler as default. - if c.GetCluster().GetStoreConfig().IsRaftKV2() { - name := schedulers.EvictSlowTrendType - args := []string{} - - s, err := schedulers.CreateScheduler(name, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(name, args), c.schedulers.RemoveScheduler) - if err != nil { - log.Warn("initializing evict-slow-trend scheduler failed", errs.ZapError(err)) - } else if err = c.schedulers.AddScheduler(s, args...); err != nil { - log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) - } - } } // LoadPlugin load user plugin @@ -639,11 +662,7 @@ func (c *Coordinator) ResetHotSpotMetrics() { // ShouldRun returns true if the coordinator should run. func (c *Coordinator) ShouldRun() bool { - isSynced := c.cluster.GetStoreConfig().IsSynced() - if testing.Testing() { - isSynced = true - } - return c.prepareChecker.check(c.cluster.GetBasicCluster()) && isSynced + return c.prepareChecker.check(c.cluster.GetBasicCluster()) } // GetSchedulersController returns the schedulers controller. diff --git a/pkg/schedule/schedulers/metrics.go b/pkg/schedule/schedulers/metrics.go index 6ff5b2edac1..2052bc923af 100644 --- a/pkg/schedule/schedulers/metrics.go +++ b/pkg/schedule/schedulers/metrics.go @@ -123,7 +123,7 @@ var ( Namespace: "pd", Subsystem: "scheduler", Name: "store_slow_trend_misc", - Help: "Store trend internal uncatelogued values", + Help: "Store trend internal uncatalogued values", }, []string{"type", "dim"}) // HotPendingSum is the sum of pending influence in hot region scheduler. diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 8afb1796068..efdf767b704 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -25,7 +25,6 @@ import ( "strconv" "strings" "sync" - "testing" "time" "github.com/coreos/go-semver/semver" @@ -329,7 +328,6 @@ func (c *RaftCluster) Start(s Server) error { } c.wg.Add(10) - go c.runStoreConfigSync() go c.runCoordinator() go c.runMetricsCollectionJob() go c.runNodeStateCheckJob() @@ -337,6 +335,7 @@ func (c *RaftCluster) Start(s Server) error { go c.syncRegions() go c.runReplicationMode() go c.runMinResolvedTSJob() + go c.runStoreConfigSync() go c.runUpdateStoreStats() go c.startGCTuner() @@ -424,8 +423,7 @@ func (c *RaftCluster) runStoreConfigSync() { ) // Start the ticker with a second-level timer to accelerate // the bootstrap stage. - init := false - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { synced, switchRaftV2Config = c.syncStoreConfig(stores) @@ -440,13 +438,6 @@ func (c *RaftCluster) runStoreConfigSync() { } else if err := c.opt.Persist(c.storage); err != nil { log.Warn("store config persisted failed", zap.Error(err)) } - // If the config has been synced, the interval should be added - // up to minute level. - if testing.Testing() || (!init && c.opt.GetStoreConfig().IsSynced()) { - init = true - ticker.Stop() - ticker = time.NewTicker(time.Minute) - } select { case <-c.ctx.Done(): log.Info("sync store config job is stopped") @@ -505,7 +496,7 @@ func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (b return false, err } oldCfg := c.opt.GetStoreConfig() - if cfg == nil || (oldCfg.IsSynced() && oldCfg.Equal(cfg)) { + if cfg == nil || oldCfg.Equal(cfg) { return false, nil } log.Info("sync the store config successful", @@ -518,8 +509,6 @@ func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (b // updateStoreConfig updates the store config. This is extracted for testing. func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (bool, error) { cfg.Adjust() - // Mark config has been synced. - cfg.SetSynced() c.opt.SetStoreConfig(cfg) return oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2, nil } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 68aafd0e402..e5bf862174b 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1354,14 +1354,12 @@ func TestStoreConfigUpdate(t *testing.T) { "perf-level": 2 }}` var config sc.StoreConfig - re.False(config.IsSynced()) re.NoError(json.Unmarshal([]byte(body), &config)) tc.updateStoreConfig(opt.GetStoreConfig(), &config) re.Equal(uint64(144000000), opt.GetRegionMaxKeys()) re.Equal(uint64(96000000), opt.GetRegionSplitKeys()) re.Equal(uint64(15*units.GiB/units.MiB), opt.GetRegionMaxSize()) re.Equal(uint64(10*units.GiB/units.MiB), opt.GetRegionSplitSize()) - re.True(opt.IsSynced()) } // Case2: empty config. { @@ -1373,7 +1371,6 @@ func TestStoreConfigUpdate(t *testing.T) { re.Equal(uint64(960000), opt.GetRegionSplitKeys()) re.Equal(uint64(144), opt.GetRegionMaxSize()) re.Equal(uint64(96), opt.GetRegionSplitSize()) - re.True(opt.IsSynced()) } // Case3: raft-kv2 config. { diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 2b1f3edc266..1ea0b79424f 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -1012,11 +1012,6 @@ func (o *PersistOptions) CheckRegionKeys(keys, mergeKeys uint64) error { return o.GetStoreConfig().CheckRegionKeys(keys, mergeKeys) } -// IsSynced returns true if the store config is synced. -func (o *PersistOptions) IsSynced() bool { - return o.GetStoreConfig().IsSynced() -} - // IsEnableRegionBucket return true if the region bucket is enabled. func (o *PersistOptions) IsEnableRegionBucket() bool { return o.GetStoreConfig().IsEnableRegionBucket()