diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 5cd59583767..d29128e9147 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -144,7 +144,25 @@ func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last succ = true last = len(conf.StoreIDWithRanges) == 0 } +<<<<<<< HEAD return succ, last +======= + return false, errs.ErrScheduleConfigNotExist.FastGenByArgs() +} + +func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) { + conf.Lock() + defer conf.Unlock() + // if the store is not existed, no need to resume leader transfer + _, _ = conf.removeStoreLocked(id) +} + +func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) { + if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) + } + conf.StoreIDWithRanges[id] = keyRange +>>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)) } func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { @@ -365,6 +383,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R var id uint64 idFloat, ok := input["store_id"].(float64) if ok { +<<<<<<< HEAD id = (uint64)(idFloat) handler.config.RLock() if _, exists = handler.config.StoreIDWithRanges[id]; !exists { @@ -373,6 +392,12 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } +======= + if batchFloat < 1 || batchFloat > 10 { + handler.config.removeStore(id) + handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]") + return +>>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)) } handler.config.RUnlock() args = append(args, strconv.FormatUint(id, 10)) @@ -380,6 +405,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R ranges, ok := (input["ranges"]).([]string) if ok { +<<<<<<< HEAD args = append(args, ranges...) } else if exists { args = append(args, handler.config.getRanges(id)...) @@ -387,6 +413,25 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.config.BuildWithArgs(args) err := handler.config.Persist() +======= + if !inputHasStoreID { + handler.config.removeStore(id) + handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id")) + return + } + } else if exist { + ranges = handler.config.getRanges(id) + } + + newRanges, err = getKeyRanges(ranges) + if err != nil { + handler.config.removeStore(id) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + + err = handler.config.update(id, newRanges, batch) +>>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)) if err != nil { handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 8d36a5ae1c3..8fd302d4006 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -296,10 +296,20 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R args = append(args, handler.config.getRanges(id)...) } +<<<<<<< HEAD handler.config.BuildWithArgs(args) err := handler.config.Persist() +======= + err := handler.config.buildWithArgs(args) if err != nil { - handler.config.removeStore(id) + _, _ = handler.config.removeStore(id) + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + err = handler.config.persist() +>>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)) + if err != nil { + _, _ = handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index eaeb449f4a6..372ed5a168f 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -275,9 +275,25 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R args = append(args, handler.config.getRanges(id)...) } +<<<<<<< HEAD handler.config.BuildWithArgs(args) err := handler.config.Persist() +======= + err := handler.config.BuildWithArgs(args) if err != nil { + handler.config.mu.Lock() + handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.mu.Unlock() + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + err = handler.config.Persist() +>>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)) + if err != nil { + handler.config.mu.Lock() + delete(handler.config.StoreIDWitRanges, id) + handler.config.cluster.ResumeLeaderTransfer(id) + handler.config.mu.Unlock() handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) } handler.rd.JSON(w, http.StatusOK, nil) @@ -298,6 +314,7 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R handler.config.mu.Lock() defer handler.config.mu.Unlock() +<<<<<<< HEAD _, exists := handler.config.StoreIDWitRanges[id] if exists { delete(handler.config.StoreIDWitRanges, id) @@ -316,6 +333,27 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R } handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist")) +======= + ranges, exists := handler.config.StoreIDWitRanges[id] + if !exists { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist")) + return + } + delete(handler.config.StoreIDWitRanges, id) + handler.config.cluster.ResumeLeaderTransfer(id) + + if err := handler.config.Persist(); err != nil { + handler.config.StoreIDWitRanges[id] = ranges + _ = handler.config.cluster.PauseLeaderTransfer(id) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + var resp any + if len(handler.config.StoreIDWitRanges) == 0 { + resp = noStoreInSchedulerInfo + } + handler.rd.JSON(w, http.StatusOK, resp) +>>>>>>> 6b927e117 (*: reset config if the input is invalid (#8632)) } func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index b5a2128752b..037595c1ff5 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -722,6 +722,60 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *pdTests.TestC checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") } +func (suite *schedulerTestSuite) TestEvictLeaderScheduler() { + // FIXME: API mode may have the problem + suite.env.RunTestInPDMode(suite.checkEvictLeaderScheduler) +} + +func (suite *schedulerTestSuite) checkEvictLeaderScheduler(cluster *pdTests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := ctl.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + for _, store := range stores { + pdTests.MustPutStore(re, cluster, store) + } + + pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b")) + output, err := tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}...) + re.NoError(err) + re.Contains(string(output), "Success!") +} + func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v any) string { output, err := tests.ExecuteCommand(cmd, args...) re.NoError(err)