diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 917831ba9cad..d0ddf6f2f9cc 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -213,7 +213,7 @@ func (c *Cluster) updateScheduler() { ) // Create the newly added schedulers. for _, scheduler := range latestSchedulersConfig { - s, err := schedulers.CreateScheduler( + s, err := schedulers.CreateSchedulerWithoutSave( scheduler.Type, c.coordinator.GetOperatorController(), c.storage, @@ -234,7 +234,7 @@ func (c *Cluster) updateScheduler() { zap.Strings("scheduler-args", scheduler.Args)) continue } - if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil { + if err := schedulersController.AddScheduler(c.storage, s, scheduler.Args...); err != nil { log.Error("failed to add scheduler", zap.String("scheduler-name", name), zap.Strings("scheduler-args", scheduler.Args), diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 8cd5567b75c5..7960a730892d 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -354,10 +354,10 @@ func (c *Coordinator) driveSlowNodeScheduler() { typ := schedulers.EvictSlowTrendType args := []string{} - s, err := schedulers.CreateScheduler(typ, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(typ, args), c.schedulers.RemoveScheduler) + s, err := schedulers.CreateSchedulerWithoutSave(typ, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(typ, args), c.schedulers.RemoveScheduler) if err != nil { log.Warn("initializing evict-slow-trend scheduler failed", errs.ZapError(err)) - } else if err = c.schedulers.AddScheduler(s, args...); err != nil { + } else if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s, args...); err != nil { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) } } @@ -452,17 +452,17 @@ func (c *Coordinator) InitSchedulers(needRun bool) { log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args)) continue } - s, err := schedulers.CreateScheduler(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.schedulers.RemoveScheduler) + s, err := schedulers.CreateSchedulerWithoutSave(cfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.schedulers.RemoveScheduler) if err != nil { log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) continue } log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName())) if needRun { - if err = c.schedulers.AddScheduler(s); err != nil { + if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s); err != nil { log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } - } else if err = c.schedulers.AddSchedulerHandler(s); err != nil { + } else if err = c.schedulers.AddSchedulerHandler(c.cluster.GetStorage(), s); err != nil { log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err)) } } @@ -477,7 +477,7 @@ func (c *Coordinator) InitSchedulers(needRun bool) { continue } - s, err := schedulers.CreateScheduler(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args), c.schedulers.RemoveScheduler) + s, err := schedulers.CreateSchedulerWithoutSave(schedulerCfg.Type, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args), c.schedulers.RemoveScheduler) if err != nil { log.Error("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) continue @@ -485,14 +485,14 @@ func (c *Coordinator) InitSchedulers(needRun bool) { log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args)) if needRun { - if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } else { // Only records the valid scheduler config. scheduleCfg.Schedulers[k] = schedulerCfg k++ } - } else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { + } else if err = c.schedulers.AddSchedulerHandler(c.cluster.GetStorage(), s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) { log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err)) } } @@ -525,14 +525,14 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) { } schedulerArgs := SchedulerArgs.(func() []string) // create and add user scheduler - s, err := schedulers.CreateScheduler(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs()), c.schedulers.RemoveScheduler) + s, err := schedulers.CreateSchedulerWithoutSave(schedulerType(), c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(schedulerType(), schedulerArgs()), c.schedulers.RemoveScheduler) if err != nil { log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), errs.ZapError(err)) return } log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) // TODO: handle the plugin in API service mode. - if err = c.schedulers.AddScheduler(s); err != nil { + if err = c.schedulers.AddScheduler(c.cluster.GetStorage(), s); err != nil { log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err)) return } diff --git a/pkg/schedule/schedulers/balance_test.go b/pkg/schedule/schedulers/balance_test.go index 54fe8ff489bc..b9979fd08c01 100644 --- a/pkg/schedule/schedulers/balance_test.go +++ b/pkg/schedule/schedulers/balance_test.go @@ -238,7 +238,7 @@ func TestBalanceLeaderSchedulerTestSuite(t *testing.T) { func (suite *balanceLeaderSchedulerTestSuite) SetupTest() { suite.cancel, suite.conf, suite.tc, suite.oc = prepareSchedulersTest() - lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + lb, err := CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) suite.NoError(err) suite.lb = lb } @@ -572,34 +572,34 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestSingleRangeBalance() { suite.tc.UpdateStoreLeaderWeight(3, 1) suite.tc.UpdateStoreLeaderWeight(4, 2) suite.tc.AddLeaderRegionWithRange(1, "a", "g", 1, 2, 3, 4) - lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + lb, err := CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) suite.NoError(err) ops, _ := lb.Schedule(suite.tc, false) suite.NotEmpty(ops) suite.Len(ops, 1) suite.Len(ops[0].Counters, 1) suite.Len(ops[0].FinishedCounters, 1) - lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"h", "n"})) + lb, err = CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"h", "n"})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) - lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"b", "f"})) + lb, err = CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"b", "f"})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) - lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "a"})) + lb, err = CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "a"})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) - lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"g", ""})) + lb, err = CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"g", ""})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) - lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "f"})) + lb, err = CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "f"})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) - lb, err = CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"b", ""})) + lb, err = CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"b", ""})) suite.NoError(err) ops, _ = lb.Schedule(suite.tc, false) suite.Empty(ops) @@ -618,7 +618,7 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestMultiRangeBalance() { suite.tc.UpdateStoreLeaderWeight(3, 1) suite.tc.UpdateStoreLeaderWeight(4, 2) suite.tc.AddLeaderRegionWithRange(1, "a", "g", 1, 2, 3, 4) - lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "g", "o", "t"})) + lb, err := CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", "g", "o", "t"})) suite.NoError(err) ops, _ := lb.Schedule(suite.tc, false) suite.Equal(uint64(1), ops[0].RegionID()) @@ -656,7 +656,7 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestBatchBalance() { suite.tc.AddLeaderRegionWithRange(uint64(102), "102a", "102z", 1, 2, 3) suite.tc.AddLeaderRegionWithRange(uint64(103), "103a", "103z", 4, 5, 6) - lb, err := CreateScheduler(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + lb, err := CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) suite.NoError(err) ops, _ := lb.Schedule(suite.tc, false) suite.Len(ops, 2) @@ -747,7 +747,7 @@ func checkBalanceRegionSchedule1(re *require.Assertions, enablePlacementRules bo tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 1) - sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) // Add stores 1,2,3,4. tc.AddRegionStore(1, 6) @@ -802,7 +802,7 @@ func checkReplica3(re *require.Assertions, enablePlacementRules bool) { tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 3) - sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) // Store 1 has the largest region score, so the balance scheduler tries to replace peer in store 1. tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) @@ -876,7 +876,7 @@ func checkReplica5(re *require.Assertions, enablePlacementRules bool) { tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 5) - sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) tc.AddLabelsStore(1, 4, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) tc.AddLabelsStore(2, 5, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) @@ -977,7 +977,7 @@ func checkBalanceRegionSchedule2(re *require.Assertions, enablePlacementRules bo core.SetApproximateKeys(200), ) - sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) tc.AddRegionStore(1, 11) @@ -1033,7 +1033,7 @@ func checkBalanceRegionStoreWeight(re *require.Assertions, enablePlacementRules tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 1) - sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) tc.AddRegionStore(1, 10) @@ -1068,7 +1068,7 @@ func checkBalanceRegionOpInfluence(re *require.Assertions, enablePlacementRules tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 1) - sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) // Add stores 1,2,3,4. tc.AddRegionStoreWithLeader(1, 2) @@ -1104,7 +1104,7 @@ func checkReplacePendingRegion(re *require.Assertions, enablePlacementRules bool tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) tc.SetMaxReplicasWithLabel(enablePlacementRules, 3) - sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) // Store 1 has the largest region score, so the balance scheduler try to replace peer in store 1. tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) @@ -1134,7 +1134,7 @@ func TestBalanceRegionShouldNotBalance(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) region := tc.MockRegionInfo(1, 0, []uint64{2, 3, 4}, nil, nil) tc.PutRegion(region) @@ -1147,7 +1147,7 @@ func TestBalanceRegionEmptyRegion(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - sb, err := CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + sb, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) re.NoError(err) tc.AddRegionStore(1, 10) tc.AddRegionStore(2, 9) @@ -1193,7 +1193,7 @@ func checkRandomMergeSchedule(re *require.Assertions, enablePlacementRules bool) tc.SetMaxReplicasWithLabel(enablePlacementRules, 3) tc.SetMergeScheduleLimit(1) - mb, err := CreateScheduler(RandomMergeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(RandomMergeType, []string{"", ""})) + mb, err := CreateSchedulerWithoutSave(RandomMergeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(RandomMergeType, []string{"", ""})) re.NoError(err) tc.AddRegionStore(1, 4) @@ -1275,7 +1275,7 @@ func checkScatterRangeBalance(re *require.Assertions, enablePlacementRules bool) tc.UpdateStoreStatus(uint64(i)) } - hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) + hb, err := CreateSchedulerWithoutSave(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) re.NoError(err) scheduleAndApplyOperator(tc, hb, 100) @@ -1349,7 +1349,7 @@ func checkBalanceLeaderLimit(re *require.Assertions, enablePlacementRules bool) // test not allow schedule leader tc.SetLeaderScheduleLimit(0) - hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) + hb, err := CreateSchedulerWithoutSave(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) re.NoError(err) scheduleAndApplyOperator(tc, hb, 100) @@ -1373,7 +1373,7 @@ func TestConcurrencyUpdateConfig(t *testing.T) { re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) + hb, err := CreateSchedulerWithoutSave(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_50", "t"})) sche := hb.(*scatterRangeScheduler) re.NoError(err) ch := make(chan struct{}) @@ -1446,7 +1446,7 @@ func TestBalanceWhenRegionNotHeartbeat(t *testing.T) { tc.UpdateStoreStatus(uint64(i)) } - hb, err := CreateScheduler(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_09", "t"})) + hb, err := CreateSchedulerWithoutSave(ScatterRangeType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ScatterRangeType, []string{"s_00", "s_09", "t"})) re.NoError(err) scheduleAndApplyOperator(tc, hb, 100) diff --git a/pkg/schedule/schedulers/balance_witness_test.go b/pkg/schedule/schedulers/balance_witness_test.go index abd4a3b3bbab..56a080fc5eb9 100644 --- a/pkg/schedule/schedulers/balance_witness_test.go +++ b/pkg/schedule/schedulers/balance_witness_test.go @@ -49,7 +49,7 @@ func (suite *balanceWitnessSchedulerTestSuite) SetupTest() { Count: 4, }, }) - lb, err := CreateScheduler(BalanceWitnessType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceWitnessType, []string{"", ""}), nil) + lb, err := CreateSchedulerWithoutSave(BalanceWitnessType, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceWitnessType, []string{"", ""}), nil) suite.NoError(err) suite.lb = lb } diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 1989c42ba6f9..963b897a682f 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -181,7 +181,7 @@ func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeade } } -// EvictStores returns the IDs of the evict-stores. +// EvictStoreIDs returns the IDs of the evict-stores. func (s *evictLeaderScheduler) EvictStoreIDs() []uint64 { return s.conf.getStores() } diff --git a/pkg/schedule/schedulers/evict_leader_test.go b/pkg/schedule/schedulers/evict_leader_test.go index d804561f11c5..77f6cbd76cd4 100644 --- a/pkg/schedule/schedulers/evict_leader_test.go +++ b/pkg/schedule/schedulers/evict_leader_test.go @@ -41,7 +41,7 @@ func TestEvictLeader(t *testing.T) { tc.AddLeaderRegion(2, 2, 1) tc.AddLeaderRegion(3, 3, 1) - sl, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) + sl, err := CreateSchedulerWithoutSave(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) re.NoError(err) re.True(sl.IsScheduleAllowed(tc)) ops, _ := sl.Schedule(tc, false) @@ -54,7 +54,7 @@ func TestEvictLeaderWithUnhealthyPeer(t *testing.T) { re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sl, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) + sl, err := CreateSchedulerWithoutSave(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) re.NoError(err) // Add stores 1, 2, 3 diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index 0b0c1d9ad393..8e87ee40ae74 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -56,9 +56,9 @@ func (suite *evictSlowStoreTestSuite) SetupTest() { storage := storage.NewStorageWithMemoryBackend() var err error - suite.es, err = CreateScheduler(EvictSlowStoreType, suite.oc, storage, ConfigSliceDecoder(EvictSlowStoreType, []string{}), nil) + suite.es, err = CreateSchedulerWithoutSave(EvictSlowStoreType, suite.oc, storage, ConfigSliceDecoder(EvictSlowStoreType, []string{}), nil) suite.NoError(err) - suite.bs, err = CreateScheduler(BalanceLeaderType, suite.oc, storage, ConfigSliceDecoder(BalanceLeaderType, []string{}), nil) + suite.bs, err = CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage, ConfigSliceDecoder(BalanceLeaderType, []string{}), nil) suite.NoError(err) } diff --git a/pkg/schedule/schedulers/evict_slow_trend_test.go b/pkg/schedule/schedulers/evict_slow_trend_test.go index 2ff86524bdcb..68a45e2cb5af 100644 --- a/pkg/schedule/schedulers/evict_slow_trend_test.go +++ b/pkg/schedule/schedulers/evict_slow_trend_test.go @@ -69,9 +69,9 @@ func (suite *evictSlowTrendTestSuite) SetupTest() { storage := storage.NewStorageWithMemoryBackend() var err error - suite.es, err = CreateScheduler(EvictSlowTrendType, suite.oc, storage, ConfigSliceDecoder(EvictSlowTrendType, []string{})) + suite.es, err = CreateSchedulerWithoutSave(EvictSlowTrendType, suite.oc, storage, ConfigSliceDecoder(EvictSlowTrendType, []string{})) suite.NoError(err) - suite.bs, err = CreateScheduler(BalanceLeaderType, suite.oc, storage, ConfigSliceDecoder(BalanceLeaderType, []string{})) + suite.bs, err = CreateSchedulerWithoutSave(BalanceLeaderType, suite.oc, storage, ConfigSliceDecoder(BalanceLeaderType, []string{})) suite.NoError(err) } diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index d8f9bbc532c6..64bd17559039 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -73,7 +73,7 @@ func TestUpgrade(t *testing.T) { cancel, _, _, oc := prepareSchedulersTest() defer cancel() // new - sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(HotRegionType, nil)) + sche, err := CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(HotRegionType, nil)) re.NoError(err) hb := sche.(*hotScheduler) re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.GetReadPriorities()) @@ -81,7 +81,7 @@ func TestUpgrade(t *testing.T) { re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.GetWritePeerPriorities()) re.Equal("v2", hb.conf.GetRankFormulaVersion()) // upgrade from json(null) - sche, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) + sche, err = CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb = sche.(*hotScheduler) re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.GetReadPriorities()) @@ -90,7 +90,7 @@ func TestUpgrade(t *testing.T) { re.Equal("v2", hb.conf.GetRankFormulaVersion()) // upgrade from < 5.2 config51 := `{"min-hot-byte-rate":100,"min-hot-key-rate":10,"min-hot-query-rate":10,"max-zombie-rounds":5,"max-peer-number":1000,"byte-rate-rank-step-ratio":0.05,"key-rate-rank-step-ratio":0.05,"query-rate-rank-step-ratio":0.05,"count-rank-step-ratio":0.01,"great-dec-ratio":0.95,"minor-dec-ratio":0.99,"src-tolerance-ratio":1.05,"dst-tolerance-ratio":1.05,"strict-picking-store":"true","enable-for-tiflash":"true"}` - sche, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte(config51))) + sche, err = CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte(config51))) re.NoError(err) hb = sche.(*hotScheduler) re.Equal([]string{utils.BytePriority, utils.KeyPriority}, hb.conf.GetReadPriorities()) @@ -99,7 +99,7 @@ func TestUpgrade(t *testing.T) { re.Equal("v1", hb.conf.GetRankFormulaVersion()) // upgrade from < 6.4 config54 := `{"min-hot-byte-rate":100,"min-hot-key-rate":10,"min-hot-query-rate":10,"max-zombie-rounds":5,"max-peer-number":1000,"byte-rate-rank-step-ratio":0.05,"key-rate-rank-step-ratio":0.05,"query-rate-rank-step-ratio":0.05,"count-rank-step-ratio":0.01,"great-dec-ratio":0.95,"minor-dec-ratio":0.99,"src-tolerance-ratio":1.05,"dst-tolerance-ratio":1.05,"read-priorities":["query","byte"],"write-leader-priorities":["query","byte"],"write-peer-priorities":["byte","key"],"strict-picking-store":"true","enable-for-tiflash":"true","forbid-rw-type":"none"}` - sche, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte(config54))) + sche, err = CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte(config54))) re.NoError(err) hb = sche.(*hotScheduler) re.Equal([]string{utils.QueryPriority, utils.BytePriority}, hb.conf.GetReadPriorities()) @@ -125,7 +125,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { tc.PutStoreWithLabels(id) } - sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) + sche, err := CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb := sche.(*hotScheduler) @@ -211,7 +211,7 @@ func TestSplitIfRegionTooHot(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetHotRegionCacheHitsThreshold(1) - hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) b := &metapb.Buckets{ RegionId: 1, @@ -266,7 +266,7 @@ func TestSplitIfRegionTooHot(t *testing.T) { addRegionInfo(tc, utils.Write, []testRegionInfo{ {1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0}, }) - hb, _ = CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, _ = CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) ops, _ = hb.Schedule(tc, false) re.Len(ops, 1) expectOp, _ = operator.CreateSplitRegionOperator(splitHotReadBuckets, tc.GetRegion(1), operator.OpSplit, @@ -286,7 +286,7 @@ func TestSplitBucketsBySize(t *testing.T) { tc.SetHotRegionCacheHitsThreshold(1) tc.SetRegionBucketEnabled(true) defer cancel() - hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) solve := newBalanceSolver(hb.(*hotScheduler), tc, utils.Read, transferLeader) solve.cur = &solution{} @@ -339,7 +339,7 @@ func TestSplitBucketsByLoad(t *testing.T) { tc.SetHotRegionCacheHitsThreshold(1) tc.SetRegionBucketEnabled(true) defer cancel() - hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) solve := newBalanceSolver(hb.(*hotScheduler), tc, utils.Read, transferLeader) solve.cur = &solution{} @@ -400,7 +400,7 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) tc.SetHotRegionCacheHitsThreshold(0) @@ -603,7 +603,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { }, }, })) - sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + sche, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb := sche.(*hotScheduler) @@ -792,7 +792,7 @@ func TestHotWriteRegionScheduleWithQuery(t *testing.T) { statistics.Denoising = false statisticsInterval = 0 - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) @@ -828,7 +828,7 @@ func TestHotWriteRegionScheduleWithKeyRate(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -887,7 +887,7 @@ func TestHotWriteRegionScheduleUnhealthyStore(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -935,7 +935,7 @@ func TestHotWriteRegionScheduleCheckHot(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -970,7 +970,7 @@ func TestHotWriteRegionScheduleWithLeader(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} re.NoError(err) @@ -1036,7 +1036,7 @@ func TestHotWriteRegionScheduleWithPendingInfluence(t *testing.T) { func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim int) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} hb.(*hotScheduler).conf.RankFormulaVersion = "v1" @@ -1124,7 +1124,7 @@ func TestHotWriteRegionScheduleWithRuleEnabled(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetEnablePlacementRules(true) - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} @@ -1206,7 +1206,7 @@ func TestHotReadRegionScheduleByteRateOnly(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - scheduler, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + scheduler, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb := scheduler.(*hotScheduler) hb.conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} @@ -1329,7 +1329,7 @@ func TestHotReadRegionScheduleWithQuery(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) @@ -1364,7 +1364,7 @@ func TestHotReadRegionScheduleWithKeyRate(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.RankFormulaVersion = "v1" hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -1426,7 +1426,7 @@ func TestHotReadRegionScheduleWithPendingInfluence(t *testing.T) { func checkHotReadRegionScheduleWithPendingInfluence(re *require.Assertions, dim int) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) // For test hb.(*hotScheduler).conf.RankFormulaVersion = "v1" @@ -1544,7 +1544,7 @@ func TestHotReadWithEvictLeaderScheduler(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) @@ -1852,7 +1852,7 @@ func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheC tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) - sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) + sche, err := CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb := sche.(*hotScheduler) heartbeat := tc.AddLeaderRegionWithWriteInfo @@ -1959,7 +1959,7 @@ func TestHotCacheSortHotPeer(t *testing.T) { re := require.New(t) cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) + sche, err := CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb := sche.(*hotScheduler) leaderSolver := newBalanceSolver(hb, tc, utils.Read, transferLeader) @@ -2021,7 +2021,7 @@ func TestInfluenceByRWType(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -2122,7 +2122,7 @@ func checkHotReadPeerSchedule(re *require.Assertions, enablePlacementRules bool) tc.PutStoreWithLabels(id) } - sche, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) + sche, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.ReadPriorities = []string{utils.BytePriority, utils.KeyPriority} @@ -2144,7 +2144,7 @@ func TestHotScheduleWithPriority(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1.05) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.05) @@ -2186,7 +2186,7 @@ func TestHotScheduleWithPriority(t *testing.T) { clearPendingInfluence(hb.(*hotScheduler)) // assert read priority schedule - hb, err = CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err = CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) tc.UpdateStorageReadStats(5, 10*units.MiB*utils.StoreHeartBeatReportInterval, 10*units.MiB*utils.StoreHeartBeatReportInterval) tc.UpdateStorageReadStats(4, 10*units.MiB*utils.StoreHeartBeatReportInterval, 10*units.MiB*utils.StoreHeartBeatReportInterval) @@ -2206,7 +2206,7 @@ func TestHotScheduleWithPriority(t *testing.T) { re.Len(ops, 1) operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 1, 3) - hb, err = CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err = CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{utils.KeyPriority, utils.BytePriority} hb.(*hotScheduler).conf.RankFormulaVersion = "v1" re.NoError(err) @@ -2250,7 +2250,7 @@ func TestHotScheduleWithStddev(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1.0) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1.0) @@ -2310,7 +2310,7 @@ func TestHotWriteLeaderScheduleWithPriority(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) @@ -2353,7 +2353,7 @@ func TestCompatibility(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) // default checkPriority(re, hb.(*hotScheduler), tc, [3][2]int{ @@ -2421,7 +2421,7 @@ func TestCompatibilityConfig(t *testing.T) { defer cancel() // From new or 3.x cluster, it will use new config - hb, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("hot-region", nil)) + hb, err := CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("hot-region", nil)) re.NoError(err) checkPriority(re, hb.(*hotScheduler), tc, [3][2]int{ {utils.QueryDim, utils.ByteDim}, @@ -2430,7 +2430,7 @@ func TestCompatibilityConfig(t *testing.T) { }) // Config file is not currently supported - hb, err = CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), + hb, err = CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("hot-region", []string{"read-priorities=byte,query"})) re.NoError(err) checkPriority(re, hb.(*hotScheduler), tc, [3][2]int{ @@ -2458,7 +2458,7 @@ func TestCompatibilityConfig(t *testing.T) { re.NoError(err) err = storage.SaveSchedulerConfig(HotRegionName, data) re.NoError(err) - hb, err = CreateScheduler(HotRegionType, oc, storage, ConfigJSONDecoder(data)) + hb, err = CreateSchedulerWithoutSave(HotRegionType, oc, storage, ConfigJSONDecoder(data)) re.NoError(err) checkPriority(re, hb.(*hotScheduler), tc, [3][2]int{ {utils.ByteDim, utils.KeyDim}, @@ -2474,7 +2474,7 @@ func TestCompatibilityConfig(t *testing.T) { re.NoError(err) err = storage.SaveSchedulerConfig(HotRegionName, data) re.NoError(err) - hb, err = CreateScheduler(HotRegionType, oc, storage, ConfigJSONDecoder(data)) + hb, err = CreateSchedulerWithoutSave(HotRegionType, oc, storage, ConfigJSONDecoder(data)) re.NoError(err) checkPriority(re, hb.(*hotScheduler), tc, [3][2]int{ {utils.KeyDim, utils.QueryDim}, @@ -2588,7 +2588,7 @@ func TestMaxZombieDuration(t *testing.T) { re := require.New(t) cancel, _, _, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("hot-region", nil)) + hb, err := CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("hot-region", nil)) re.NoError(err) maxZombieDur := hb.(*hotScheduler).conf.getValidConf().MaxZombieRounds testCases := []maxZombieDurTestCase{ @@ -2641,7 +2641,7 @@ func TestExpect(t *testing.T) { re := require.New(t) cancel, _, _, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("hot-region", nil)) + hb, err := CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("hot-region", nil)) re.NoError(err) testCases := []struct { initFunc func(*balanceSolver) @@ -2944,7 +2944,7 @@ func TestEncodeConfig(t *testing.T) { re := require.New(t) cancel, _, _, oc := prepareSchedulersTest() defer cancel() - sche, err := CreateScheduler(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) + sche, err := CreateSchedulerWithoutSave(HotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigJSONDecoder([]byte("null"))) re.NoError(err) data, err := sche.EncodeConfig() re.NoError(err) diff --git a/pkg/schedule/schedulers/hot_region_v2_test.go b/pkg/schedule/schedulers/hot_region_v2_test.go index d11ac44dde9d..2ec01beb66de 100644 --- a/pkg/schedule/schedulers/hot_region_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_v2_test.go @@ -36,7 +36,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { statistics.Denoising = false statisticsInterval = 0 statistics.HistorySampleDuration = 0 - sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) + sche, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -98,7 +98,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirst(t *testing.T) { defer cancel() statistics.Denoising = false - sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) + sche, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -153,7 +153,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sche, err := CreateScheduler(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) + sche, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -217,7 +217,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sche, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) + sche, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetDstToleranceRatio(0.0) @@ -280,7 +280,7 @@ func TestSkipUniformStore(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) + hb, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb.(*hotScheduler).conf.SetSrcToleranceRatio(1) hb.(*hotScheduler).conf.SetDstToleranceRatio(1) @@ -439,7 +439,7 @@ func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLo cancel, _, tc, oc := prepareSchedulersTest() defer cancel() statistics.Denoising = false - sche, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) + sche, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil, nil) re.NoError(err) hb := sche.(*hotScheduler) hb.conf.SetSrcToleranceRatio(1) diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 1c624dcd9167..083c738260fd 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -117,23 +117,22 @@ func RegisterSliceDecoderBuilder(typ string, builder ConfigSliceDecoderBuilder) config.RegisterScheduler(typ) } -// CreateScheduler creates a scheduler with registered creator func. -func CreateScheduler(typ string, oc *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { +// CreateSchedulerWithoutSave creates a scheduler with registered creator func. +func CreateSchedulerWithoutSave(typ string, oc *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { fn, ok := schedulerMap[typ] if !ok { return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } - s, err := fn(oc, storage, dec, removeSchedulerCb...) - if err != nil { - return nil, err - } + return fn(oc, storage, dec, removeSchedulerCb...) +} + +func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error { data, err := s.EncodeConfig() if err != nil { - return nil, err + return err } - err = storage.SaveSchedulerConfig(s.GetName(), data) - return s, err + return storage.SaveSchedulerConfig(s.GetName(), data) } // FindSchedulerTypeByName finds the type of the specified name. diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 4d72699b0feb..80e88b87989c 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -127,7 +127,7 @@ func (c *Controller) ResetSchedulerMetrics() { } // AddSchedulerHandler adds the HTTP handler for a scheduler. -func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) error { +func (c *Controller) AddSchedulerHandler(storage endpoint.ConfigStorage, scheduler Scheduler, args ...string) error { c.Lock() defer c.Unlock() @@ -137,6 +137,10 @@ func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) er } c.schedulerHandlers[name] = scheduler + if err := SaveSchedulerConfig(storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args) return nil } @@ -171,7 +175,7 @@ func (c *Controller) RemoveSchedulerHandler(name string) error { } // AddScheduler adds a scheduler. -func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { +func (c *Controller) AddScheduler(storage endpoint.ConfigStorage, scheduler Scheduler, args ...string) error { c.Lock() defer c.Unlock() @@ -187,6 +191,10 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { c.wg.Add(1) go c.runScheduler(s) c.schedulers[s.Scheduler.GetName()] = s + if err := SaveSchedulerConfig(storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.GetSchedulerConfig().AddSchedulerCfg(s.Scheduler.GetType(), args) return nil } diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index 12ab9f8aa2fd..48acd0586044 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -54,7 +54,7 @@ func TestShuffleLeader(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sl, err := CreateScheduler(ShuffleLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ShuffleLeaderType, []string{"", ""})) + sl, err := CreateSchedulerWithoutSave(ShuffleLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ShuffleLeaderType, []string{"", ""})) re.NoError(err) ops, _ := sl.Schedule(tc, false) re.Empty(ops) @@ -92,7 +92,7 @@ func TestRejectLeader(t *testing.T) { tc.AddLeaderRegion(2, 2, 1, 3) // The label scheduler transfers leader out of store1. - sl, err := CreateScheduler(LabelType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(LabelType, []string{"", ""})) + sl, err := CreateSchedulerWithoutSave(LabelType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(LabelType, []string{"", ""})) re.NoError(err) ops, _ := sl.Schedule(tc, false) operatorutil.CheckTransferLeaderFrom(re, ops[0], operator.OpLeader, 1) @@ -104,13 +104,13 @@ func TestRejectLeader(t *testing.T) { // As store3 is disconnected, store1 rejects leader. Balancer will not create // any operators. - bs, err := CreateScheduler(BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + bs, err := CreateSchedulerWithoutSave(BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) re.NoError(err) ops, _ = bs.Schedule(tc, false) re.Empty(ops) // Can't evict leader from store2, neither. - el, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"2"}), func(string) error { return nil }) + el, err := CreateSchedulerWithoutSave(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"2"}), func(string) error { return nil }) re.NoError(err) ops, _ = el.Schedule(tc, false) re.Empty(ops) @@ -136,7 +136,7 @@ func TestRemoveRejectLeader(t *testing.T) { defer cancel() tc.AddRegionStore(1, 0) tc.AddRegionStore(2, 1) - el, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) + el, err := CreateSchedulerWithoutSave(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) re.NoError(err) tc.DeleteStore(tc.GetStore(1)) succ, _ := el.(*evictLeaderScheduler).conf.removeStore(1) @@ -156,7 +156,7 @@ func checkBalance(re *require.Assertions, enablePlacementRules bool) { tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) - hb, err := CreateScheduler(ShuffleHotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("shuffle-hot-region", []string{"", ""})) + hb, err := CreateSchedulerWithoutSave(ShuffleHotRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder("shuffle-hot-region", []string{"", ""})) re.NoError(err) // Add stores 1, 2, 3, 4, 5, 6 with hot peer counts 3, 2, 2, 2, 0, 0. tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"}) @@ -203,7 +203,7 @@ func TestHotRegionScheduleAbnormalReplica(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetHotRegionScheduleLimit(0) - hb, err := CreateScheduler(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + hb, err := CreateSchedulerWithoutSave(utils.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) tc.AddRegionStore(1, 3) @@ -228,7 +228,7 @@ func TestShuffleRegion(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sl, err := CreateScheduler(ShuffleRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ShuffleRegionType, []string{"", ""})) + sl, err := CreateSchedulerWithoutSave(ShuffleRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ShuffleRegionType, []string{"", ""})) re.NoError(err) re.True(sl.IsScheduleAllowed(tc)) ops, _ := sl.Schedule(tc, false) @@ -292,7 +292,7 @@ func TestShuffleRegionRole(t *testing.T) { }, peers[0]) tc.PutRegion(region) - sl, err := CreateScheduler(ShuffleRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ShuffleRegionType, []string{"", ""})) + sl, err := CreateSchedulerWithoutSave(ShuffleRegionType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(ShuffleRegionType, []string{"", ""})) re.NoError(err) conf := sl.(*shuffleRegionScheduler).conf @@ -313,9 +313,9 @@ func TestSpecialUseHotRegion(t *testing.T) { storage := storage.NewStorageWithMemoryBackend() cd := ConfigSliceDecoder(BalanceRegionType, []string{"", ""}) - bs, err := CreateScheduler(BalanceRegionType, oc, storage, cd) + bs, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage, cd) re.NoError(err) - hs, err := CreateScheduler(utils.Write.String(), oc, storage, cd) + hs, err := CreateSchedulerWithoutSave(utils.Write.String(), oc, storage, cd) re.NoError(err) tc.SetHotRegionCacheHitsThreshold(0) @@ -365,7 +365,7 @@ func TestSpecialUseReserved(t *testing.T) { storage := storage.NewStorageWithMemoryBackend() cd := ConfigSliceDecoder(BalanceRegionType, []string{"", ""}) - bs, err := CreateScheduler(BalanceRegionType, oc, storage, cd) + bs, err := CreateSchedulerWithoutSave(BalanceRegionType, oc, storage, cd) re.NoError(err) tc.SetHotRegionCacheHitsThreshold(0) @@ -399,7 +399,7 @@ func TestBalanceLeaderWithConflictRule(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() tc.SetEnablePlacementRules(true) - lb, err := CreateScheduler(BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + lb, err := CreateSchedulerWithoutSave(BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) re.NoError(err) tc.AddLeaderStore(1, 1) diff --git a/pkg/schedule/schedulers/transfer_witness_leader_test.go b/pkg/schedule/schedulers/transfer_witness_leader_test.go index 1da968d8dc2f..5f4057428e60 100644 --- a/pkg/schedule/schedulers/transfer_witness_leader_test.go +++ b/pkg/schedule/schedulers/transfer_witness_leader_test.go @@ -38,7 +38,7 @@ func TestTransferWitnessLeader(t *testing.T) { // Add regions 1 with leader in stores 1 tc.AddLeaderRegion(1, 1, 2, 3) - sl, err := CreateScheduler(TransferWitnessLeaderType, oc, storage.NewStorageWithMemoryBackend(), nil) + sl, err := CreateSchedulerWithoutSave(TransferWitnessLeaderType, oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) RecvRegionInfo(sl) <- tc.GetRegion(1) re.True(sl.IsScheduleAllowed(tc)) @@ -53,7 +53,7 @@ func TestTransferWitnessLeaderWithUnhealthyPeer(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - sl, err := CreateScheduler(TransferWitnessLeaderType, oc, storage.NewStorageWithMemoryBackend(), nil) + sl, err := CreateSchedulerWithoutSave(TransferWitnessLeaderType, oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) // Add stores 1, 2, 3 diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 29a8709bdac1..f086fd5ede40 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -790,8 +790,8 @@ func (c *RaftCluster) GetSchedulerHandlers() map[string]http.Handler { } // AddSchedulerHandler adds a scheduler handler. -func (c *RaftCluster) AddSchedulerHandler(scheduler schedulers.Scheduler, args ...string) error { - return c.coordinator.GetSchedulersController().AddSchedulerHandler(scheduler, args...) +func (c *RaftCluster) AddSchedulerHandler(storage endpoint.ConfigStorage, scheduler schedulers.Scheduler, args ...string) error { + return c.coordinator.GetSchedulersController().AddSchedulerHandler(storage, scheduler, args...) } // RemoveSchedulerHandler removes a scheduler handler. @@ -800,8 +800,8 @@ func (c *RaftCluster) RemoveSchedulerHandler(name string) error { } // AddScheduler adds a scheduler. -func (c *RaftCluster) AddScheduler(scheduler schedulers.Scheduler, args ...string) error { - return c.coordinator.GetSchedulersController().AddScheduler(scheduler, args...) +func (c *RaftCluster) AddScheduler(storage endpoint.ConfigStorage, scheduler schedulers.Scheduler, args ...string) error { + return c.coordinator.GetSchedulersController().AddScheduler(storage, scheduler, args...) } // RemoveScheduler removes a scheduler. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index ea8d27b155f2..5793e9b4a7dc 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -324,11 +324,11 @@ func TestSetOfflineWithReplica(t *testing.T) { func addEvictLeaderScheduler(cluster *RaftCluster, storeID uint64) (evictScheduler schedulers.Scheduler, err error) { args := []string{fmt.Sprintf("%d", storeID)} - evictScheduler, err = schedulers.CreateScheduler(schedulers.EvictLeaderType, cluster.GetOperatorController(), cluster.storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, args), cluster.GetCoordinator().GetSchedulersController().RemoveScheduler) + evictScheduler, err = schedulers.CreateSchedulerWithoutSave(schedulers.EvictLeaderType, cluster.GetOperatorController(), cluster.storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, args), cluster.GetCoordinator().GetSchedulersController().RemoveScheduler) if err != nil { return } - if err = cluster.AddScheduler(evictScheduler, args...); err != nil { + if err = cluster.AddScheduler(cluster.storage, evictScheduler, args...); err != nil { return } else if err = cluster.opt.Persist(cluster.GetStorage()); err != nil { return @@ -3020,7 +3020,7 @@ func TestAddScheduler(t *testing.T) { oc := co.GetOperatorController() // test ConfigJSONDecoder create - bl, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + bl, err := schedulers.CreateSchedulerWithoutSave(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) conf, err := bl.EncodeConfig() re.NoError(err) @@ -3029,16 +3029,16 @@ func TestAddScheduler(t *testing.T) { re.NoError(err) batch := data["batch"].(float64) re.Equal(4, int(batch)) - gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"}), controller.RemoveScheduler) + gls, err := schedulers.CreateSchedulerWithoutSave(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"}), controller.RemoveScheduler) re.NoError(err) - re.NotNil(controller.AddScheduler(gls)) + re.NotNil(controller.AddScheduler(storage.NewStorageWithMemoryBackend(), gls)) re.NotNil(controller.RemoveScheduler(gls.GetName())) - gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + gls, err = schedulers.CreateSchedulerWithoutSave(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(gls)) + re.NoError(controller.AddScheduler(storage.NewStorageWithMemoryBackend(), gls)) - hb, err := schedulers.CreateScheduler(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + hb, err := schedulers.CreateSchedulerWithoutSave(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) re.NoError(err) conf, err = hb.EncodeConfig() re.NoError(err) @@ -3081,12 +3081,12 @@ func TestPersistScheduler(t *testing.T) { oc := co.GetOperatorController() storage := tc.RaftCluster.storage - gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + gls1, err := schedulers.CreateSchedulerWithoutSave(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(gls1, "1")) - evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}), controller.RemoveScheduler) + re.NoError(controller.AddScheduler(storage, gls1, "1")) + evict, err := schedulers.CreateSchedulerWithoutSave(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(evict, "2")) + re.NoError(controller.AddScheduler(storage, evict, "2")) re.Len(controller.GetSchedulerNames(), defaultCount+2) sches, _, err := storage.LoadAllSchedulerConfigs() re.NoError(err) @@ -3107,7 +3107,7 @@ func TestPersistScheduler(t *testing.T) { // whether the schedulers added or removed in dynamic way are recorded in opt _, newOpt, err := newTestScheduleConfig() re.NoError(err) - _, err = schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) + _, err = schedulers.CreateSchedulerWithoutSave(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) re.NoError(err) // suppose we add a new default enable scheduler sc.DefaultSchedulers = append(sc.DefaultSchedulers, sc.SchedulerConfig{Type: "shuffle-region"}) @@ -3142,12 +3142,12 @@ func TestPersistScheduler(t *testing.T) { co.Run() controller = co.GetSchedulersController() re.Len(controller.GetSchedulerNames(), 3) - bls, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + bls, err := schedulers.CreateSchedulerWithoutSave(schedulers.BalanceLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) re.NoError(err) - re.NoError(controller.AddScheduler(bls)) - brs, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(controller.AddScheduler(storage, bls)) + brs, err := schedulers.CreateSchedulerWithoutSave(schedulers.BalanceRegionType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) re.NoError(err) - re.NoError(controller.AddScheduler(brs)) + re.NoError(controller.AddScheduler(storage, brs)) re.Len(controller.GetSchedulerNames(), defaultCount) // the scheduler option should contain 6 items @@ -3194,9 +3194,9 @@ func TestRemoveScheduler(t *testing.T) { oc := co.GetOperatorController() storage := tc.RaftCluster.storage - gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + gls1, err := schedulers.CreateSchedulerWithoutSave(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) re.NoError(err) - re.NoError(controller.AddScheduler(gls1, "1")) + re.NoError(controller.AddScheduler(storage, gls1, "1")) re.Len(controller.GetSchedulerNames(), defaultCount+1) sches, _, err := storage.LoadAllSchedulerConfigs() re.NoError(err) @@ -3383,7 +3383,7 @@ func TestStoreOverloaded(t *testing.T) { tc, co, cleanup := prepare(nil, nil, nil, re) defer cleanup() oc := co.GetOperatorController() - lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + lb, err := schedulers.CreateSchedulerWithoutSave(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) re.NoError(err) opt := tc.GetOpts() re.NoError(tc.addRegionStore(4, 100)) @@ -3437,7 +3437,7 @@ func TestStoreOverloadedWithReplace(t *testing.T) { tc, co, cleanup := prepare(nil, nil, nil, re) defer cleanup() oc := co.GetOperatorController() - lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + lb, err := schedulers.CreateSchedulerWithoutSave(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) re.NoError(err) re.NoError(tc.addRegionStore(4, 100)) @@ -3529,7 +3529,7 @@ func TestController(t *testing.T) { re.NoError(tc.addLeaderRegion(1, 1)) re.NoError(tc.addLeaderRegion(2, 2)) - scheduler, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + scheduler, err := schedulers.CreateSchedulerWithoutSave(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) re.NoError(err) lb := &mockLimitScheduler{ Scheduler: scheduler, @@ -3615,7 +3615,7 @@ func TestInterval(t *testing.T) { tc, co, cleanup := prepare(nil, nil, nil, re) defer cleanup() - lb, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, co.GetOperatorController(), storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + lb, err := schedulers.CreateSchedulerWithoutSave(schedulers.BalanceLeaderType, co.GetOperatorController(), storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) re.NoError(err) sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) diff --git a/server/handler.go b/server/handler.go index a90f8e3f04f3..ffa699cd7a13 100644 --- a/server/handler.go +++ b/server/handler.go @@ -231,19 +231,19 @@ func (h *Handler) AddScheduler(name string, args ...string) error { return err } - s, err := schedulers.CreateScheduler(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args), c.GetCoordinator().GetSchedulersController().RemoveScheduler) + s, err := schedulers.CreateSchedulerWithoutSave(name, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(name, args), c.GetCoordinator().GetSchedulersController().RemoveScheduler) if err != nil { return err } log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args)) if h.s.IsAPIServiceMode() { - if err = c.AddSchedulerHandler(s, args...); err != nil { + if err = c.AddSchedulerHandler(h.s.storage, s, args...); err != nil { log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) return err } log.Info("add scheduler handler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args)) } else { - if err = c.AddScheduler(s, args...); err != nil { + if err = c.AddScheduler(h.s.storage, s, args...); err != nil { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) return err } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index f22a754b8bf4..31a7cc1fa363 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/tso" @@ -47,6 +48,7 @@ import ( "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -1275,6 +1277,119 @@ func TestStaleTermHeartbeat(t *testing.T) { re.NoError(err) } +func TestTransferLeaderForScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`)) + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + // start + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + rc := leaderServer.GetServer().GetRaftCluster() + re.NotNil(rc) + + storesNum := 2 + grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + for i := 1; i <= storesNum; i++ { + store := &metapb.Store{ + Id: uint64(i), + Address: "127.0.0.1:" + strconv.Itoa(i), + } + resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store) + re.NoError(err) + re.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType()) + } + // region heartbeat + id := leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Add evict leader scheduler + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + api.MustAddScheduler(re, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + // Check scheduler updated. + schedulersController := rc.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + // transfer PD leader to another PD + tc.ResignLeader() + rc.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc1 := leaderServer.GetServer().GetRaftCluster() + rc1.Start(leaderServer.GetServer()) + re.NoError(err) + re.NotNil(rc1) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc1, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated. + schedulersController = rc1.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + // transfer PD leader back to the previous PD + tc.ResignLeader() + rc1.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + rc.Start(leaderServer.GetServer()) + re.NotNil(rc) + // region heartbeat + id = leaderServer.GetAllocator() + putRegionWithLeader(re, rc, id, 1) + time.Sleep(time.Second) + re.True(leaderServer.GetRaftCluster().IsPrepared()) + // Check scheduler updated + schedulersController = rc.GetCoordinator().GetSchedulersController() + re.Len(schedulersController.GetSchedulerNames(), 6) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker")) +} + +func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) { + testutil.Eventually(re, func() bool { + if !exist { + return sc.GetScheduler(schedulers.EvictLeaderName) == nil + } + return sc.GetScheduler(schedulers.EvictLeaderName) != nil + }) +} + +func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, expected []uint64) { + handler, ok := sc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + re.True(ok) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + re.True(ok) + var evictStoreIDs []uint64 + testutil.Eventually(re, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) + re.ElementsMatch(evictStoreIDs, expected) +} + func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) { for i := 0; i < 3; i++ { regionID, err := id.Alloc()