diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index b434c7ad706..ef254ee6950 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -32,7 +32,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/reflectutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -56,8 +55,9 @@ const ( type balanceLeaderSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage - Ranges []core.KeyRange `json:"ranges"` + schedulerConfig + + Ranges []core.KeyRange `json:"ranges"` // Batch is used to generate multiple operators by one scheduling Batch int `json:"batch"` } @@ -79,7 +79,7 @@ func (conf *balanceLeaderSchedulerConfig) update(data []byte) (int, any) { } return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10" } - if err := conf.persistLocked(); err != nil { + if err := conf.save(); err != nil { log.Warn("failed to save balance-leader-scheduler config", errs.ZapError(err)) } log.Info("balance-leader-scheduler config is updated", zap.ByteString("old", oldConfig), zap.ByteString("new", newConfig)) @@ -111,14 +111,6 @@ func (conf *balanceLeaderSchedulerConfig) clone() *balanceLeaderSchedulerConfig } } -func (conf *balanceLeaderSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(BalanceLeaderName, data) -} - func (conf *balanceLeaderSchedulerConfig) getBatch() int { conf.RLock() defer conf.RUnlock() @@ -216,15 +208,9 @@ func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { func (l *balanceLeaderScheduler) ReloadConfig() error { l.conf.Lock() defer l.conf.Unlock() - cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } + newCfg := &balanceLeaderSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := l.conf.load(newCfg); err != nil { return err } l.conf.Ranges = newCfg.Ranges diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index 599af6df637..450c43647cf 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/reflectutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -53,8 +52,9 @@ const ( type balanceWitnessSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage - Ranges []core.KeyRange `json:"ranges"` + schedulerConfig + + Ranges []core.KeyRange `json:"ranges"` // Batch is used to generate multiple operators by one scheduling Batch int `json:"batch"` } @@ -76,7 +76,7 @@ func (conf *balanceWitnessSchedulerConfig) update(data []byte) (int, any) { } return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10" } - if err := conf.persistLocked(); err != nil { + if err := conf.save(); err != nil { log.Warn("failed to persist config", zap.Error(err)) } log.Info("balance-witness-scheduler config is updated", zap.ByteString("old", oldc), zap.ByteString("new", newc)) @@ -108,14 +108,6 @@ func (conf *balanceWitnessSchedulerConfig) clone() *balanceWitnessSchedulerConfi } } -func (conf *balanceWitnessSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(BalanceWitnessName, data) -} - func (conf *balanceWitnessSchedulerConfig) getBatch() int { conf.RLock() defer conf.RUnlock() @@ -215,15 +207,9 @@ func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { func (b *balanceWitnessScheduler) ReloadConfig() error { b.conf.Lock() defer b.conf.Unlock() - cfgData, err := b.conf.storage.LoadSchedulerConfig(b.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } + newCfg := &balanceWitnessSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := b.conf.load(newCfg); err != nil { return err } b.conf.Ranges = newCfg.Ranges diff --git a/pkg/schedule/schedulers/config.go b/pkg/schedule/schedulers/config.go new file mode 100644 index 00000000000..0c7caf686c3 --- /dev/null +++ b/pkg/schedule/schedulers/config.go @@ -0,0 +1,60 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/tikv/pd/pkg/storage/endpoint" +) + +type schedulerConfig interface { + save() error + load(any) error + init(name string, storage endpoint.ConfigStorage, data any) +} + +type baseSchedulerConfig struct { + name string + storage endpoint.ConfigStorage + + // data is the config of the scheduler. + data any +} + +func (b *baseSchedulerConfig) init(name string, storage endpoint.ConfigStorage, data any) { + b.name = name + b.storage = storage + b.data = data +} + +func (b *baseSchedulerConfig) save() error { + data, err := EncodeConfig(b.data) + failpoint.Inject("persistFail", func() { + err = errors.New("fail to persist") + }) + if err != nil { + return err + } + return b.storage.SaveSchedulerConfig(b.name, data) +} + +func (b *baseSchedulerConfig) load(v any) error { + data, err := b.storage.LoadSchedulerConfig(b.name) + if err != nil { + return err + } + return DecodeConfig([]byte(data), v) +} diff --git a/pkg/schedule/schedulers/config_test.go b/pkg/schedule/schedulers/config_test.go new file mode 100644 index 00000000000..31858bd7c10 --- /dev/null +++ b/pkg/schedule/schedulers/config_test.go @@ -0,0 +1,50 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/storage" +) + +func TestSchedulerConfig(t *testing.T) { + s := storage.NewStorageWithMemoryBackend() + + type testConfig struct { + schedulerConfig + Value string `json:"value"` + } + + cfg := &testConfig{ + schedulerConfig: &baseSchedulerConfig{}, + } + cfg.init("test", s, cfg) + + cfg.Value = "test" + require.NoError(t, cfg.save()) + newTc := &testConfig{} + require.NoError(t, cfg.load(newTc)) + require.Equal(t, cfg.Value, newTc.Value) + + // config with another name cannot loaded the previous config + cfg2 := &testConfig{ + schedulerConfig: &baseSchedulerConfig{}, + } + cfg2.init("test2", s, cfg2) + // report error because the config is empty and cannot be decoded + require.Error(t, cfg2.load(newTc)) +} diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index d4e26cb1b68..85f861f0082 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -20,7 +20,6 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/constant" @@ -30,7 +29,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -48,7 +46,8 @@ const ( type evictLeaderSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig + StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` // Batch is used to generate multiple operators by one scheduling Batch int `json:"batch"` @@ -85,17 +84,6 @@ func (conf *evictLeaderSchedulerConfig) clone() *evictLeaderSchedulerConfig { } } -func (conf *evictLeaderSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - failpoint.Inject("persistFail", func() { - err = errors.New("fail to persist") - }) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.EvictLeaderScheduler.String(), data) -} - func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { conf.RLock() defer conf.RUnlock() @@ -145,18 +133,11 @@ func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) { return EncodeConfig(conf) } -func (conf *evictLeaderSchedulerConfig) reloadConfig(name string) error { +func (conf *evictLeaderSchedulerConfig) reloadConfig() error { conf.Lock() defer conf.Unlock() - cfgData, err := conf.storage.LoadSchedulerConfig(name) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &evictLeaderSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := conf.load(newCfg); err != nil { return err } pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) @@ -203,7 +184,7 @@ func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRa conf.StoreIDWithRanges[id] = newRanges } conf.Batch = batch - err := conf.persistLocked() + err := conf.save() if err != nil && id != 0 { _, _ = conf.removeStoreLocked(id) } @@ -220,7 +201,7 @@ func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) { } keyRanges := conf.StoreIDWithRanges[id] - err = conf.persistLocked() + err = conf.save() if err != nil { conf.resetStoreLocked(id, keyRanges) conf.Unlock() @@ -275,7 +256,7 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { // ReloadConfig reloads the config from the storage. func (s *evictLeaderScheduler) ReloadConfig() error { - return s.conf.reloadConfig(s.GetName()) + return s.conf.reloadConfig() } // PrepareConfig implements the Scheduler interface. diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index d0fb963bd52..de581f597bb 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -27,7 +27,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -44,8 +43,9 @@ const ( type evictSlowStoreSchedulerConfig struct { syncutil.RWMutex + schedulerConfig + cluster *core.BasicCluster - storage endpoint.ConfigStorage // Last timestamp of the chosen slow store for eviction. lastSlowStoreCaptureTS time.Time // Duration gap for recovering the candidate, unit: s. @@ -53,9 +53,9 @@ type evictSlowStoreSchedulerConfig struct { EvictedStores []uint64 `json:"evict-stores"` } -func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig { +func initEvictSlowStoreSchedulerConfig() *evictSlowStoreSchedulerConfig { return &evictSlowStoreSchedulerConfig{ - storage: storage, + schedulerConfig: &baseSchedulerConfig{}, lastSlowStoreCaptureTS: time.Time{}, RecoveryDurationGap: defaultRecoveryDurationGap, EvictedStores: make([]uint64, 0), @@ -70,17 +70,6 @@ func (conf *evictSlowStoreSchedulerConfig) clone() *evictSlowStoreSchedulerConfi } } -func (conf *evictSlowStoreSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - failpoint.Inject("persistFail", func() { - err = errors.New("fail to persist") - }) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.EvictSlowStoreScheduler.String(), data) -} - func (conf *evictSlowStoreSchedulerConfig) getStores() []uint64 { conf.RLock() defer conf.RUnlock() @@ -121,7 +110,7 @@ func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error { defer conf.Unlock() conf.EvictedStores = []uint64{id} conf.lastSlowStoreCaptureTS = time.Now() - return conf.persistLocked() + return conf.save() } func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) { @@ -131,7 +120,7 @@ func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err if oldID > 0 { conf.EvictedStores = []uint64{} conf.lastSlowStoreCaptureTS = time.Time{} - err = conf.persistLocked() + err = conf.save() } return } @@ -162,12 +151,13 @@ func (handler *evictSlowStoreHandler) updateConfig(w http.ResponseWriter, r *htt handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) return } + handler.config.Lock() defer handler.config.Unlock() prevRecoveryDurationGap := handler.config.RecoveryDurationGap recoveryDurationGap := uint64(recoveryDurationGapFloat) handler.config.RecoveryDurationGap = recoveryDurationGap - if err := handler.config.persistLocked(); err != nil { + if err := handler.config.save(); err != nil { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) handler.config.RecoveryDurationGap = prevRecoveryDurationGap return @@ -201,15 +191,9 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { func (s *evictSlowStoreScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } + newCfg := &evictSlowStoreSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } old := make(map[uint64]struct{}) diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index ad5b16e8ca3..406a08b9c99 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -16,8 +16,6 @@ package schedulers import ( "context" - "encoding/json" - "strings" "testing" "github.com/pingcap/failpoint" @@ -103,18 +101,10 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { re.Zero(es2.conf.evictStore()) // check the value from storage. - sches, vs, err := es2.conf.storage.LoadAllSchedulerConfigs() - re.NoError(err) - valueStr := "" - for id, sche := range sches { - if strings.EqualFold(sche, EvictSlowStoreName) { - valueStr = vs[id] - } - } - var persistValue evictSlowStoreSchedulerConfig - err = json.Unmarshal([]byte(valueStr), &persistValue) + err := es2.conf.load(&persistValue) re.NoError(err) + re.Equal(es2.conf.EvictedStores, persistValue.EvictedStores) re.Zero(persistValue.evictStore()) re.True(persistValue.readyForRecovery()) diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 206791900c6..427787016a2 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -54,8 +53,9 @@ type slowCandidate struct { type evictSlowTrendSchedulerConfig struct { syncutil.RWMutex + schedulerConfig + cluster *core.BasicCluster - storage endpoint.ConfigStorage // Candidate for eviction in current tick. evictCandidate slowCandidate // Last chosen candidate for eviction. @@ -66,9 +66,9 @@ type evictSlowTrendSchedulerConfig struct { EvictedStores []uint64 `json:"evict-by-trend-stores"` } -func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowTrendSchedulerConfig { +func initEvictSlowTrendSchedulerConfig() *evictSlowTrendSchedulerConfig { return &evictSlowTrendSchedulerConfig{ - storage: storage, + schedulerConfig: &baseSchedulerConfig{}, evictCandidate: slowCandidate{}, lastEvictCandidate: slowCandidate{}, RecoveryDurationGap: defaultRecoveryDurationGap, @@ -84,17 +84,6 @@ func (conf *evictSlowTrendSchedulerConfig) clone() *evictSlowTrendSchedulerConfi } } -func (conf *evictSlowTrendSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - failpoint.Inject("persistFail", func() { - err = errors.New("fail to persist") - }) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.EvictSlowTrendScheduler.String(), data) -} - func (conf *evictSlowTrendSchedulerConfig) getStores() []uint64 { conf.RLock() defer conf.RUnlock() @@ -205,7 +194,7 @@ func (conf *evictSlowTrendSchedulerConfig) setStoreAndPersist(id uint64) error { conf.Lock() defer conf.Unlock() conf.EvictedStores = []uint64{id} - return conf.persistLocked() + return conf.save() } func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.SchedulerCluster) (oldID uint64, err error) { @@ -222,7 +211,7 @@ func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.Schedule conf.Lock() defer conf.Unlock() conf.EvictedStores = []uint64{} - return oldID, conf.persistLocked() + return oldID, conf.save() } type evictSlowTrendHandler struct { @@ -256,7 +245,7 @@ func (handler *evictSlowTrendHandler) updateConfig(w http.ResponseWriter, r *htt prevRecoveryDurationGap := handler.config.RecoveryDurationGap recoveryDurationGap := uint64(recoveryDurationGapFloat) handler.config.RecoveryDurationGap = recoveryDurationGap - if err := handler.config.persistLocked(); err != nil { + if err := handler.config.save(); err != nil { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) handler.config.RecoveryDurationGap = prevRecoveryDurationGap return @@ -304,17 +293,11 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { func (s *evictSlowTrendScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &evictSlowTrendSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } + old := make(map[uint64]struct{}) for _, id := range s.conf.EvictedStores { old[id] = struct{}{} @@ -456,11 +439,12 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, _ bool func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) Scheduler { handler := newEvictSlowTrendHandler(conf) - return &evictSlowTrendScheduler{ + sche := &evictSlowTrendScheduler{ BaseScheduler: NewBaseScheduler(opController, types.EvictSlowTrendScheduler), conf: conf, handler: handler, } + return sche } func chooseEvictCandidate(cluster sche.SchedulerCluster, lastEvictCandidate *slowCandidate) (slowStore *core.StoreInfo) { diff --git a/pkg/schedule/schedulers/evict_slow_trend_test.go b/pkg/schedule/schedulers/evict_slow_trend_test.go index 10da5c91565..02cb65021eb 100644 --- a/pkg/schedule/schedulers/evict_slow_trend_test.go +++ b/pkg/schedule/schedulers/evict_slow_trend_test.go @@ -16,8 +16,6 @@ package schedulers import ( "context" - "encoding/json" - "strings" "testing" "time" @@ -186,17 +184,8 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrend() { re.Zero(es2.conf.evictedStore()) // check the value from storage. - sches, vs, err := es2.conf.storage.LoadAllSchedulerConfigs() - re.NoError(err) - valueStr := "" - for id, sche := range sches { - if strings.EqualFold(sche, EvictSlowTrendName) { - valueStr = vs[id] - } - } - var persistValue evictSlowTrendSchedulerConfig - err = json.Unmarshal([]byte(valueStr), &persistValue) + err := es2.conf.load(&persistValue) re.NoError(err) re.Equal(es2.conf.EvictedStores, persistValue.EvictedStores) re.Zero(persistValue.evictedStore()) diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 15a520f95d0..a441f41062a 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -34,7 +34,6 @@ import ( "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -48,7 +47,8 @@ const ( type grantHotRegionSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig + cluster *core.BasicCluster StoreIDs []uint64 `json:"store-id"` StoreLeaderID uint64 `json:"store-leader-id"` @@ -93,11 +93,7 @@ func (conf *grantHotRegionSchedulerConfig) clone() *grantHotRegionSchedulerConfi func (conf *grantHotRegionSchedulerConfig) persist() error { conf.RLock() defer conf.RUnlock() - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.GrantHotRegionScheduler.String(), data) + return conf.save() } func (conf *grantHotRegionSchedulerConfig) has(storeID uint64) bool { @@ -146,15 +142,8 @@ func (s *grantHotRegionScheduler) EncodeConfig() ([]byte, error) { func (s *grantHotRegionScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &grantHotRegionSchedulerConfig{} - if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } s.conf.StoreIDs = newCfg.StoreIDs diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 134eddda880..d70c0b19d67 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -29,7 +29,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -43,7 +42,8 @@ const ( type grantLeaderSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig + StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` cluster *core.BasicCluster removeSchedulerCb func(name string) error @@ -83,11 +83,7 @@ func (conf *grantLeaderSchedulerConfig) clone() *grantLeaderSchedulerConfig { func (conf *grantLeaderSchedulerConfig) persist() error { conf.RLock() defer conf.RUnlock() - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.GrantLeaderScheduler.String(), data) + return conf.save() } func (conf *grantLeaderSchedulerConfig) getRanges(id uint64) []string { @@ -176,15 +172,8 @@ func (s *grantLeaderScheduler) EncodeConfig() ([]byte, error) { func (s *grantLeaderScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &grantLeaderSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } pauseAndResumeLeaderTransfer(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 717c1413ac4..ab595ec9058 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -222,15 +222,9 @@ func (h *hotScheduler) EncodeConfig() ([]byte, error) { func (h *hotScheduler) ReloadConfig() error { h.conf.Lock() defer h.conf.Unlock() - cfgData, err := h.conf.storage.LoadSchedulerConfig(h.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } + newCfg := &hotRegionSchedulerConfig{} - if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := h.conf.load(newCfg); err != nil { return err } h.conf.MinHotByteRate = newCfg.MinHotByteRate diff --git a/pkg/schedule/schedulers/hot_region_config.go b/pkg/schedule/schedulers/hot_region_config.go index 83121254cc0..0424a582bf4 100644 --- a/pkg/schedule/schedulers/hot_region_config.go +++ b/pkg/schedule/schedulers/hot_region_config.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/reflectutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -59,6 +58,7 @@ var compatiblePrioritiesConfig = prioritiesConfig{ // params about hot region. func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { cfg := &hotRegionSchedulerConfig{ + schedulerConfig: &baseSchedulerConfig{}, MinHotByteRate: 100, MinHotKeyRate: 10, MinHotQueryRate: 10, @@ -114,7 +114,8 @@ func (conf *hotRegionSchedulerConfig) getValidConf() *hotRegionSchedulerConfig { type hotRegionSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig + lastQuerySupported bool MinHotByteRate float64 `json:"min-hot-byte-rate"` @@ -455,7 +456,7 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r * } newc, _ := json.Marshal(conf) if !bytes.Equal(oldc, newc) { - if err := conf.persistLocked(); err != nil { + if err := conf.save(); err != nil { log.Warn("failed to persist config", zap.Error(err)) } log.Info("hot-region-scheduler config is updated", zap.String("old", string(oldc)), zap.String("new", string(newc))) @@ -477,14 +478,6 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r * rd.Text(w, http.StatusBadRequest, "Config item is not found.") } -func (conf *hotRegionSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(HotRegionName, data) -} - func (conf *hotRegionSchedulerConfig) checkQuerySupport(cluster sche.SchedulerCluster) bool { version := cluster.GetSchedulerConfig().GetClusterVersion() querySupport := versioninfo.IsFeatureSupported(version, versioninfo.HotScheduleWithQuery) diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 0e1917e8552..16f78284cf8 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -55,14 +55,18 @@ func schedulersRegister() { RegisterScheduler(types.BalanceLeaderScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &balanceLeaderSchedulerConfig{storage: storage} + conf := &balanceLeaderSchedulerConfig{ + schedulerConfig: &baseSchedulerConfig{}, + } if err := decoder(conf); err != nil { return nil, err } if conf.Batch == 0 { conf.Batch = BalanceLeaderBatchSize } - return newBalanceLeaderScheduler(opController, conf), nil + sche := newBalanceLeaderScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // balance region @@ -109,14 +113,18 @@ func schedulersRegister() { RegisterScheduler(types.BalanceWitnessScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &balanceWitnessSchedulerConfig{storage: storage} + conf := &balanceWitnessSchedulerConfig{ + schedulerConfig: &baseSchedulerConfig{}, + } if err := decoder(conf); err != nil { return nil, err } if conf.Batch == 0 { conf.Batch = balanceWitnessBatchSize } - return newBalanceWitnessScheduler(opController, conf), nil + sche := newBalanceWitnessScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // evict leader @@ -147,13 +155,18 @@ func schedulersRegister() { RegisterScheduler(types.EvictLeaderScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { - conf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} + conf := &evictLeaderSchedulerConfig{ + schedulerConfig: &baseSchedulerConfig{}, + StoreIDWithRanges: make(map[uint64][]core.KeyRange), + } if err := decoder(conf); err != nil { return nil, err } conf.cluster = opController.GetCluster() conf.removeSchedulerCb = removeSchedulerCb[0] - return newEvictLeaderScheduler(opController, conf), nil + sche := newEvictLeaderScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // evict slow store @@ -165,12 +178,14 @@ func schedulersRegister() { RegisterScheduler(types.EvictSlowStoreScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := initEvictSlowStoreSchedulerConfig(storage) + conf := initEvictSlowStoreSchedulerConfig() if err := decoder(conf); err != nil { return nil, err } conf.cluster = opController.GetCluster() - return newEvictSlowStoreScheduler(opController, conf), nil + sche := newEvictSlowStoreScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // grant hot region @@ -206,12 +221,17 @@ func schedulersRegister() { RegisterScheduler(types.GrantHotRegionScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &grantHotRegionSchedulerConfig{StoreIDs: make([]uint64, 0), storage: storage} + conf := &grantHotRegionSchedulerConfig{ + schedulerConfig: &baseSchedulerConfig{}, + StoreIDs: make([]uint64, 0), + } conf.cluster = opController.GetCluster() if err := decoder(conf); err != nil { return nil, err } - return newGrantHotRegionScheduler(opController, conf), nil + sche := newGrantHotRegionScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // hot region @@ -238,8 +258,9 @@ func schedulersRegister() { return nil, err } } - conf.storage = storage - return newHotScheduler(opController, conf), nil + sche := newHotScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // grant leader @@ -269,13 +290,18 @@ func schedulersRegister() { RegisterScheduler(types.GrantLeaderScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { - conf := &grantLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} + conf := &grantLeaderSchedulerConfig{ + schedulerConfig: &baseSchedulerConfig{}, + StoreIDWithRanges: make(map[uint64][]core.KeyRange), + } conf.cluster = opController.GetCluster() conf.removeSchedulerCb = removeSchedulerCb[0] if err := decoder(conf); err != nil { return nil, err } - return newGrantLeaderScheduler(opController, conf), nil + sche := newGrantLeaderScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // label @@ -352,7 +378,7 @@ func schedulersRegister() { RegisterScheduler(types.ScatterRangeScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { conf := &scatterRangeSchedulerConfig{ - storage: storage, + schedulerConfig: &baseSchedulerConfig{}, } if err := decoder(conf); err != nil { return nil, err @@ -361,7 +387,9 @@ func schedulersRegister() { if len(rangeName) == 0 { return nil, errs.ErrSchedulerConfig.FastGenByArgs("range name") } - return newScatterRangeScheduler(opController, conf), nil + sche := newScatterRangeScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // shuffle hot region @@ -385,12 +413,16 @@ func schedulersRegister() { RegisterScheduler(types.ShuffleHotRegionScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &shuffleHotRegionSchedulerConfig{Limit: uint64(1)} + conf := &shuffleHotRegionSchedulerConfig{ + schedulerConfig: &baseSchedulerConfig{}, + Limit: uint64(1), + } if err := decoder(conf); err != nil { return nil, err } - conf.storage = storage - return newShuffleHotRegionScheduler(opController, conf), nil + sche := newShuffleHotRegionScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // shuffle leader @@ -405,7 +437,6 @@ func schedulersRegister() { return err } conf.Ranges = ranges - conf.Name = ShuffleLeaderName return nil } }) @@ -438,11 +469,15 @@ func schedulersRegister() { RegisterScheduler(types.ShuffleRegionScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &shuffleRegionSchedulerConfig{storage: storage} + conf := &shuffleRegionSchedulerConfig{ + schedulerConfig: &baseSchedulerConfig{}, + } if err := decoder(conf); err != nil { return nil, err } - return newShuffleRegionScheduler(opController, conf), nil + sche := newShuffleRegionScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // split bucket @@ -458,8 +493,9 @@ func schedulersRegister() { if err := decoder(conf); err != nil { return nil, err } - conf.storage = storage - return newSplitBucketScheduler(opController, conf), nil + sche := newSplitBucketScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) // transfer witness leader @@ -483,10 +519,13 @@ func schedulersRegister() { RegisterScheduler(types.EvictSlowTrendScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := initEvictSlowTrendSchedulerConfig(storage) + conf := initEvictSlowTrendSchedulerConfig() if err := decoder(conf); err != nil { return nil, err } - return newEvictSlowTrendScheduler(opController, conf), nil + + sche := newEvictSlowTrendScheduler(opController, conf) + conf.init(sche.GetName(), storage, conf) + return sche, nil }) } diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index 9c9606b29a9..e86785fcc19 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -26,7 +26,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -39,7 +38,8 @@ const ( type scatterRangeSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig + RangeName string `json:"range-name"` StartKey string `json:"start-key"` EndKey string `json:"end-key"` @@ -69,14 +69,9 @@ func (conf *scatterRangeSchedulerConfig) clone() *scatterRangeSchedulerConfig { } func (conf *scatterRangeSchedulerConfig) persist() error { - name := conf.getSchedulerName() conf.RLock() defer conf.RUnlock() - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(name, data) + return conf.save() } func (conf *scatterRangeSchedulerConfig) getRangeName() string { @@ -153,15 +148,8 @@ func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { func (l *scatterRangeScheduler) ReloadConfig() error { l.config.Lock() defer l.config.Unlock() - cfgData, err := l.config.storage.LoadSchedulerConfig(l.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &scatterRangeSchedulerConfig{} - if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := l.config.load(newCfg); err != nil { return err } l.config.RangeName = newCfg.RangeName diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 686322961cb..f8544fff48d 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" "github.com/tikv/pd/pkg/statistics" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -42,8 +41,9 @@ const ( type shuffleHotRegionSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage - Limit uint64 `json:"limit"` + schedulerConfig + + Limit uint64 `json:"limit"` } func (conf *shuffleHotRegionSchedulerConfig) clone() *shuffleHotRegionSchedulerConfig { @@ -54,14 +54,6 @@ func (conf *shuffleHotRegionSchedulerConfig) clone() *shuffleHotRegionSchedulerC } } -func (conf *shuffleHotRegionSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.ShuffleHotRegionScheduler.String(), data) -} - func (conf *shuffleHotRegionSchedulerConfig) getLimit() uint64 { conf.RLock() defer conf.RUnlock() @@ -106,15 +98,8 @@ func (s *shuffleHotRegionScheduler) EncodeConfig() ([]byte, error) { func (s *shuffleHotRegionScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &shuffleHotRegionSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } s.conf.Limit = newCfg.Limit @@ -224,11 +209,12 @@ func (handler *shuffleHotRegionHandler) updateConfig(w http.ResponseWriter, r *h handler.rd.JSON(w, http.StatusBadRequest, "invalid limit") return } + handler.config.Lock() defer handler.config.Unlock() previous := handler.config.Limit handler.config.Limit = uint64(limit) - err := handler.config.persistLocked() + err := handler.config.save() if err != nil { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) handler.config.Limit = previous diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index 2cd6c231a11..4270613667b 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -32,7 +32,6 @@ const ( ) type shuffleLeaderSchedulerConfig struct { - Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. } @@ -46,11 +45,11 @@ type shuffleLeaderScheduler struct { // newShuffleLeaderScheduler creates an admin scheduler that shuffles leaders // between stores. func newShuffleLeaderScheduler(opController *operator.Controller, conf *shuffleLeaderSchedulerConfig) Scheduler { + base := NewBaseScheduler(opController, types.ShuffleLeaderScheduler) filters := []filter.Filter{ - &filter.StoreStateFilter{ActionScope: conf.Name, TransferLeader: true, OperatorLevel: constant.Low}, - filter.NewSpecialUseFilter(conf.Name), + &filter.StoreStateFilter{ActionScope: base.GetName(), TransferLeader: true, OperatorLevel: constant.Low}, + filter.NewSpecialUseFilter(base.GetName()), } - base := NewBaseScheduler(opController, types.ShuffleLeaderScheduler) return &shuffleLeaderScheduler{ BaseScheduler: base, conf: conf, diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index c179efd32c1..5d4c49e0fcc 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -67,15 +67,8 @@ func (s *shuffleRegionScheduler) EncodeConfig() ([]byte, error) { func (s *shuffleRegionScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &shuffleRegionSchedulerConfig{} - if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } s.conf.Roles = newCfg.Roles diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index fbf53cfeb4d..2e3394a58df 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -21,7 +21,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/slice" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -37,7 +36,7 @@ var allRoles = []string{roleLeader, roleFollower, roleLearner} type shuffleRegionSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig Ranges []core.KeyRange `json:"ranges"` Roles []string `json:"roles"` // can include `leader`, `follower`, `learner`. @@ -100,18 +99,10 @@ func (conf *shuffleRegionSchedulerConfig) handleSetRoles(w http.ResponseWriter, defer conf.Unlock() old := conf.Roles conf.Roles = roles - if err := conf.persist(); err != nil { + if err := conf.save(); err != nil { conf.Roles = old // revert rd.Text(w, http.StatusInternalServerError, err.Error()) return } rd.Text(w, http.StatusOK, "Config is updated.") } - -func (conf *shuffleRegionSchedulerConfig) persist() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(ShuffleRegionName, data) -} diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 7b238890107..0d8fa614aef 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -30,7 +30,6 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" "github.com/tikv/pd/pkg/statistics/buckets" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/reflectutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -46,14 +45,15 @@ const ( func initSplitBucketConfig() *splitBucketSchedulerConfig { return &splitBucketSchedulerConfig{ - Degree: defaultHotDegree, - SplitLimit: defaultSplitLimit, + schedulerConfig: &baseSchedulerConfig{}, + Degree: defaultHotDegree, + SplitLimit: defaultSplitLimit, } } type splitBucketSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig Degree int `json:"degree"` SplitLimit uint64 `json:"split-limit"` } @@ -66,14 +66,6 @@ func (conf *splitBucketSchedulerConfig) clone() *splitBucketSchedulerConfig { } } -func (conf *splitBucketSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(SplitBucketName, data) -} - func (conf *splitBucketSchedulerConfig) getDegree() int { conf.RLock() defer conf.RUnlock() @@ -120,7 +112,7 @@ func (h *splitBucketHandler) updateConfig(w http.ResponseWriter, r *http.Request } newc, _ := json.Marshal(h.conf) if !bytes.Equal(oldc, newc) { - if err := h.conf.persistLocked(); err != nil { + if err := h.conf.save(); err != nil { log.Warn("failed to save config", errs.ZapError(err)) } rd.Text(w, http.StatusOK, "Config is updated.") @@ -167,15 +159,8 @@ func newSplitBucketScheduler(opController *operator.Controller, conf *splitBucke func (s *splitBucketScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &splitBucketSchedulerConfig{} - if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } s.conf.SplitLimit = newCfg.SplitLimit