Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert the initialization of evict-slow-trend scheduler in init_schedulers. #6994

Merged
merged 4 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,6 @@ func (o *PersistConfig) GetRegionMaxKeys() uint64 {
return o.GetStoreConfig().GetRegionMaxKeys()
}

// IsSynced returns true if the cluster config is synced.
func (o *PersistConfig) IsSynced() bool {
return o.GetStoreConfig().IsSynced()
}

// IsEnableRegionBucket return true if the region bucket is enabled.
func (o *PersistConfig) IsEnableRegionBucket() bool {
return o.GetStoreConfig().IsEnableRegionBucket()
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ type StoreConfigProvider interface {
GetRegionMaxKeys() uint64
CheckRegionSize(uint64, uint64) error
CheckRegionKeys(uint64, uint64) error
IsSynced() bool
IsEnableRegionBucket() bool
IsRaftKV2() bool
}
17 changes: 0 additions & 17 deletions pkg/schedule/config/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ type StoreConfig struct {
RegionMaxSizeMB uint64 `json:"-"`
RegionSplitSizeMB uint64 `json:"-"`
RegionBucketSizeMB uint64 `json:"-"`

Sync bool `json:"sync"`
}

// Storage is the config for the tikv storage.
Expand Down Expand Up @@ -127,21 +125,6 @@ func (c *StoreConfig) GetRegionMaxKeys() uint64 {
return uint64(c.RegionMaxKeys)
}

// SetSynced marks StoreConfig has been synced.
func (c *StoreConfig) SetSynced() {
if c != nil {
c.Sync = true
}
}

// IsSynced returns whether the StoreConfig is synced or not.
func (c *StoreConfig) IsSynced() bool {
if c == nil {
return false
}
return c.Sync
}

// IsEnableRegionBucket return true if the region bucket is enabled.
func (c *StoreConfig) IsEnableRegionBucket() bool {
if c == nil {
Expand Down
61 changes: 40 additions & 21 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"strconv"
"sync"
"testing"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -313,6 +312,42 @@ func (c *Coordinator) drivePushOperator() {
}
}

// driveSlowNodeScheduler is used to enable slow node scheduler when using `raft-kv2`.
func (c *Coordinator) driveSlowNodeScheduler() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
log.Info("drive slow node scheduler is stopped")
return
case <-ticker.C:
{
// If enabled, exit.
if exists, _ := c.schedulers.IsSchedulerExisted(schedulers.EvictSlowTrendName); exists {
return
}
// If the cluster was set up with `raft-kv2` engine, this cluster should
// enable `evict-slow-trend` scheduler as default.
if c.GetCluster().GetStoreConfig().IsRaftKV2() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unbelievable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it will not influence the current strategy when initializing.

typ := schedulers.EvictSlowTrendType
args := []string{}

s, err := schedulers.CreateScheduler(typ, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(typ, args), c.schedulers.RemoveScheduler)
if err != nil {
log.Warn("initializing evict-slow-trend scheduler failed", errs.ZapError(err))
} else if err = c.schedulers.AddScheduler(s, args...); err != nil {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
}
}
}
}
}
}

// RunUntilStop runs the coordinator until receiving the stop signal.
func (c *Coordinator) RunUntilStop() {
c.Run()
Expand Down Expand Up @@ -346,12 +381,14 @@ func (c *Coordinator) Run() {
log.Info("Coordinator starts to run schedulers")
c.initSchedulers()

c.wg.Add(3)
c.wg.Add(4)
// Starts to patrol regions.
go c.PatrolRegions()
// Checks suspect key ranges
go c.checkSuspectRanges()
go c.drivePushOperator()
// // Checks whether to create evict-slow-trend scheduler.
LykxSassinator marked this conversation as resolved.
Show resolved Hide resolved
go c.driveSlowNodeScheduler()
}

func (c *Coordinator) initSchedulers() {
Expand Down Expand Up @@ -439,20 +476,6 @@ func (c *Coordinator) initSchedulers() {
if err := c.cluster.GetSchedulerConfig().Persist(c.cluster.GetStorage()); err != nil {
log.Error("cannot persist schedule config", errs.ZapError(err))
}

// If the cluster was set up with `raft-kv2` engine, this cluster should
// enable `evict-slow-trend` scheduler as default.
if c.GetCluster().GetStoreConfig().IsRaftKV2() {
name := schedulers.EvictSlowTrendType
args := []string{}

s, err := schedulers.CreateScheduler(name, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(name, args), c.schedulers.RemoveScheduler)
if err != nil {
log.Warn("initializing evict-slow-trend scheduler failed", errs.ZapError(err))
} else if err = c.schedulers.AddScheduler(s, args...); err != nil {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
}
}
}

// LoadPlugin load user plugin
Expand Down Expand Up @@ -639,11 +662,7 @@ func (c *Coordinator) ResetHotSpotMetrics() {

// ShouldRun returns true if the coordinator should run.
func (c *Coordinator) ShouldRun() bool {
isSynced := c.cluster.GetStoreConfig().IsSynced()
if testing.Testing() {
isSynced = true
}
return c.prepareChecker.check(c.cluster.GetBasicCluster()) && isSynced
return c.prepareChecker.check(c.cluster.GetBasicCluster())
}

// GetSchedulersController returns the schedulers controller.
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var (
Namespace: "pd",
Subsystem: "scheduler",
Name: "store_slow_trend_misc",
Help: "Store trend internal uncatelogued values",
Help: "Store trend internal uncatalogued values",
}, []string{"type", "dim"})

// HotPendingSum is the sum of pending influence in hot region scheduler.
Expand Down
17 changes: 3 additions & 14 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -329,14 +328,14 @@ func (c *RaftCluster) Start(s Server) error {
}

c.wg.Add(10)
go c.runStoreConfigSync()
go c.runCoordinator()
go c.runMetricsCollectionJob()
go c.runNodeStateCheckJob()
go c.runStatsBackgroundJobs()
go c.syncRegions()
go c.runReplicationMode()
go c.runMinResolvedTSJob()
go c.runStoreConfigSync()
go c.runUpdateStoreStats()
go c.startGCTuner()

Expand Down Expand Up @@ -424,8 +423,7 @@ func (c *RaftCluster) runStoreConfigSync() {
)
// Start the ticker with a second-level timer to accelerate
// the bootstrap stage.
init := false
ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
synced, switchRaftV2Config = c.syncStoreConfig(stores)
Expand All @@ -440,13 +438,6 @@ func (c *RaftCluster) runStoreConfigSync() {
} else if err := c.opt.Persist(c.storage); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
}
// If the config has been synced, the interval should be added
// up to minute level.
if testing.Testing() || (!init && c.opt.GetStoreConfig().IsSynced()) {
init = true
ticker.Stop()
ticker = time.NewTicker(time.Minute)
}
select {
case <-c.ctx.Done():
log.Info("sync store config job is stopped")
Expand Down Expand Up @@ -505,7 +496,7 @@ func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (b
return false, err
}
oldCfg := c.opt.GetStoreConfig()
if cfg == nil || (oldCfg.IsSynced() && oldCfg.Equal(cfg)) {
if cfg == nil || oldCfg.Equal(cfg) {
return false, nil
}
log.Info("sync the store config successful",
Expand All @@ -518,8 +509,6 @@ func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (b
// updateStoreConfig updates the store config. This is extracted for testing.
func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (bool, error) {
cfg.Adjust()
// Mark config has been synced.
cfg.SetSynced()
c.opt.SetStoreConfig(cfg)
return oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2, nil
}
Expand Down
3 changes: 0 additions & 3 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1354,14 +1354,12 @@ func TestStoreConfigUpdate(t *testing.T) {
"perf-level": 2
}}`
var config sc.StoreConfig
re.False(config.IsSynced())
re.NoError(json.Unmarshal([]byte(body), &config))
tc.updateStoreConfig(opt.GetStoreConfig(), &config)
re.Equal(uint64(144000000), opt.GetRegionMaxKeys())
re.Equal(uint64(96000000), opt.GetRegionSplitKeys())
re.Equal(uint64(15*units.GiB/units.MiB), opt.GetRegionMaxSize())
re.Equal(uint64(10*units.GiB/units.MiB), opt.GetRegionSplitSize())
re.True(opt.IsSynced())
}
// Case2: empty config.
{
Expand All @@ -1373,7 +1371,6 @@ func TestStoreConfigUpdate(t *testing.T) {
re.Equal(uint64(960000), opt.GetRegionSplitKeys())
re.Equal(uint64(144), opt.GetRegionMaxSize())
re.Equal(uint64(96), opt.GetRegionSplitSize())
re.True(opt.IsSynced())
}
// Case3: raft-kv2 config.
{
Expand Down
5 changes: 0 additions & 5 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,11 +1012,6 @@ func (o *PersistOptions) CheckRegionKeys(keys, mergeKeys uint64) error {
return o.GetStoreConfig().CheckRegionKeys(keys, mergeKeys)
}

// IsSynced returns true if the store config is synced.
func (o *PersistOptions) IsSynced() bool {
return o.GetStoreConfig().IsSynced()
}

// IsEnableRegionBucket return true if the region bucket is enabled.
func (o *PersistOptions) IsEnableRegionBucket() bool {
return o.GetStoreConfig().IsEnableRegionBucket()
Expand Down
Loading