Skip to content

Commit

Permalink
scheduler: extract storage operations to schedulerConfig interface (#…
Browse files Browse the repository at this point in the history
…8515)

ref #8379, close #8514

Signed-off-by: okJiang <819421878@qq.com>
  • Loading branch information
okJiang authored Aug 14, 2024
1 parent 1c1cd99 commit 3b37572
Show file tree
Hide file tree
Showing 20 changed files with 254 additions and 298 deletions.
26 changes: 6 additions & 20 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand All @@ -56,8 +55,9 @@ const (

type balanceLeaderSchedulerConfig struct {
syncutil.RWMutex
storage endpoint.ConfigStorage
Ranges []core.KeyRange `json:"ranges"`
schedulerConfig

Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}
Expand All @@ -79,7 +79,7 @@ func (conf *balanceLeaderSchedulerConfig) update(data []byte) (int, any) {
}
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
if err := conf.persistLocked(); err != nil {
if err := conf.save(); err != nil {
log.Warn("failed to save balance-leader-scheduler config", errs.ZapError(err))
}
log.Info("balance-leader-scheduler config is updated", zap.ByteString("old", oldConfig), zap.ByteString("new", newConfig))
Expand Down Expand Up @@ -111,14 +111,6 @@ func (conf *balanceLeaderSchedulerConfig) clone() *balanceLeaderSchedulerConfig
}
}

func (conf *balanceLeaderSchedulerConfig) persistLocked() error {
data, err := EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(BalanceLeaderName, data)
}

func (conf *balanceLeaderSchedulerConfig) getBatch() int {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -216,15 +208,9 @@ func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
func (l *balanceLeaderScheduler) ReloadConfig() error {
l.conf.Lock()
defer l.conf.Unlock()
cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName())
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}

newCfg := &balanceLeaderSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
if err := l.conf.load(newCfg); err != nil {
return err
}
l.conf.Ranges = newCfg.Ranges
Expand Down
26 changes: 6 additions & 20 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
Expand All @@ -53,8 +52,9 @@ const (

type balanceWitnessSchedulerConfig struct {
syncutil.RWMutex
storage endpoint.ConfigStorage
Ranges []core.KeyRange `json:"ranges"`
schedulerConfig

Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}
Expand All @@ -76,7 +76,7 @@ func (conf *balanceWitnessSchedulerConfig) update(data []byte) (int, any) {
}
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
if err := conf.persistLocked(); err != nil {
if err := conf.save(); err != nil {
log.Warn("failed to persist config", zap.Error(err))
}
log.Info("balance-witness-scheduler config is updated", zap.ByteString("old", oldc), zap.ByteString("new", newc))
Expand Down Expand Up @@ -108,14 +108,6 @@ func (conf *balanceWitnessSchedulerConfig) clone() *balanceWitnessSchedulerConfi
}
}

func (conf *balanceWitnessSchedulerConfig) persistLocked() error {
data, err := EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(BalanceWitnessName, data)
}

func (conf *balanceWitnessSchedulerConfig) getBatch() int {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -215,15 +207,9 @@ func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) {
func (b *balanceWitnessScheduler) ReloadConfig() error {
b.conf.Lock()
defer b.conf.Unlock()
cfgData, err := b.conf.storage.LoadSchedulerConfig(b.GetName())
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}

newCfg := &balanceWitnessSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
if err := b.conf.load(newCfg); err != nil {
return err
}
b.conf.Ranges = newCfg.Ranges
Expand Down
60 changes: 60 additions & 0 deletions pkg/schedule/schedulers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/tikv/pd/pkg/storage/endpoint"
)

type schedulerConfig interface {
save() error
load(any) error
init(name string, storage endpoint.ConfigStorage, data any)
}

type baseSchedulerConfig struct {
name string
storage endpoint.ConfigStorage

// data is the config of the scheduler.
data any
}

func (b *baseSchedulerConfig) init(name string, storage endpoint.ConfigStorage, data any) {
b.name = name
b.storage = storage
b.data = data
}

func (b *baseSchedulerConfig) save() error {
data, err := EncodeConfig(b.data)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
if err != nil {
return err
}
return b.storage.SaveSchedulerConfig(b.name, data)
}

func (b *baseSchedulerConfig) load(v any) error {
data, err := b.storage.LoadSchedulerConfig(b.name)
if err != nil {
return err
}
return DecodeConfig([]byte(data), v)
}
50 changes: 50 additions & 0 deletions pkg/schedule/schedulers/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/storage"
)

func TestSchedulerConfig(t *testing.T) {
s := storage.NewStorageWithMemoryBackend()

type testConfig struct {
schedulerConfig
Value string `json:"value"`
}

cfg := &testConfig{
schedulerConfig: &baseSchedulerConfig{},
}
cfg.init("test", s, cfg)

cfg.Value = "test"
require.NoError(t, cfg.save())
newTc := &testConfig{}
require.NoError(t, cfg.load(newTc))
require.Equal(t, cfg.Value, newTc.Value)

// config with another name cannot loaded the previous config
cfg2 := &testConfig{
schedulerConfig: &baseSchedulerConfig{},
}
cfg2.init("test2", s, cfg2)
// report error because the config is empty and cannot be decoded
require.Error(t, cfg2.load(newTc))
}
33 changes: 7 additions & 26 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
Expand All @@ -48,7 +46,8 @@ const (

type evictLeaderSchedulerConfig struct {
syncutil.RWMutex
storage endpoint.ConfigStorage
schedulerConfig

StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
Expand Down Expand Up @@ -85,17 +84,6 @@ func (conf *evictLeaderSchedulerConfig) clone() *evictLeaderSchedulerConfig {
}
}

func (conf *evictLeaderSchedulerConfig) persistLocked() error {
data, err := EncodeConfig(conf)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(types.EvictLeaderScheduler.String(), data)
}

func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -145,18 +133,11 @@ func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) {
return EncodeConfig(conf)
}

func (conf *evictLeaderSchedulerConfig) reloadConfig(name string) error {
func (conf *evictLeaderSchedulerConfig) reloadConfig() error {
conf.Lock()
defer conf.Unlock()
cfgData, err := conf.storage.LoadSchedulerConfig(name)
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}
newCfg := &evictLeaderSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
if err := conf.load(newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
Expand Down Expand Up @@ -203,7 +184,7 @@ func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRa
conf.StoreIDWithRanges[id] = newRanges
}
conf.Batch = batch
err := conf.persistLocked()
err := conf.save()
if err != nil && id != 0 {
_, _ = conf.removeStoreLocked(id)
}
Expand All @@ -220,7 +201,7 @@ func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) {
}

keyRanges := conf.StoreIDWithRanges[id]
err = conf.persistLocked()
err = conf.save()
if err != nil {
conf.resetStoreLocked(id, keyRanges)
conf.Unlock()
Expand Down Expand Up @@ -275,7 +256,7 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {

// ReloadConfig reloads the config from the storage.
func (s *evictLeaderScheduler) ReloadConfig() error {
return s.conf.reloadConfig(s.GetName())
return s.conf.reloadConfig()
}

// PrepareConfig implements the Scheduler interface.
Expand Down
Loading

0 comments on commit 3b37572

Please sign in to comment.