Skip to content

Commit

Permalink
scheduler: move init function (#5934)
Browse files Browse the repository at this point in the history
ref #5837

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 and ti-chi-bot authored Feb 8, 2023
1 parent efa3e31 commit 76c0c10
Show file tree
Hide file tree
Showing 26 changed files with 460 additions and 467 deletions.
4 changes: 2 additions & 2 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ import (
"github.com/tikv/pd/server/join"
"go.uber.org/zap"

// Register schedulers.
_ "github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/schedulers"

// Register Service
_ "github.com/tikv/pd/pkg/mcs/registry"
Expand Down Expand Up @@ -136,6 +135,7 @@ func createServerWrapper(args []string) (context.Context, context.CancelFunc, ba
if err != nil {
log.Fatal("create server failed", errs.ZapError(err))
}
schedulers.Register()

return ctx, cancel, svr
}
Expand Down
1 change: 0 additions & 1 deletion server/api/hot_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
tu "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server"
_ "github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/storage"
)

Expand Down
1 change: 0 additions & 1 deletion server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
tu "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
_ "github.com/tikv/pd/server/schedulers"
)

type scheduleTestSuite struct {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockid"
_ "github.com/tikv/pd/server/schedulers"
_ "github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/storage"
)

Expand Down
29 changes: 0 additions & 29 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,35 +66,6 @@ var (
balanceLeaderNewOpCounter = schedulerCounter.WithLabelValues(BalanceLeaderName, "new-operator")
)

func init() {
schedule.RegisterSliceDecoderBuilder(BalanceLeaderType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
conf, ok := v.(*balanceLeaderSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
ranges, err := getKeyRanges(args)
if err != nil {
return err
}
conf.Ranges = ranges
conf.Batch = BalanceLeaderBatchSize
return nil
}
})

schedule.RegisterScheduler(BalanceLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &balanceLeaderSchedulerConfig{storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
if conf.Batch == 0 {
conf.Batch = BalanceLeaderBatchSize
}
return newBalanceLeaderScheduler(opController, conf), nil
})
}

type balanceLeaderSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
Expand Down
27 changes: 0 additions & 27 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,13 @@ import (
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/filter"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/plan"
"go.uber.org/zap"
)

func init() {
schedule.RegisterSliceDecoderBuilder(BalanceRegionType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
conf, ok := v.(*balanceRegionSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
ranges, err := getKeyRanges(args)
if err != nil {
return err
}
conf.Ranges = ranges
conf.Name = BalanceRegionName
return nil
}
})
schedule.RegisterScheduler(BalanceRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &balanceRegionSchedulerConfig{}
if err := decoder(conf); err != nil {
return nil, err
}
return newBalanceRegionScheduler(opController, conf), nil
})
}

const (
// BalanceRegionName is balance region scheduler name.
BalanceRegionName = "balance-region-scheduler"
Expand Down
29 changes: 0 additions & 29 deletions server/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,35 +51,6 @@ const (
MaxBalanceWitnessBatchSize = 10
)

func init() {
schedule.RegisterSliceDecoderBuilder(BalanceWitnessType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
conf, ok := v.(*balanceWitnessSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
ranges, err := getKeyRanges(args)
if err != nil {
return err
}
conf.Ranges = ranges
conf.Batch = balanceWitnessBatchSize
return nil
}
})

schedule.RegisterScheduler(BalanceWitnessType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &balanceWitnessSchedulerConfig{storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
if conf.Batch == 0 {
conf.Batch = balanceWitnessBatchSize
}
return newBalanceWitnessScheduler(opController, conf), nil
})
}

type balanceWitnessSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
Expand Down
35 changes: 0 additions & 35 deletions server/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,41 +54,6 @@ var (
evictLeaderNewOperatorCounter = schedulerCounter.WithLabelValues(EvictLeaderName, "new-operator")
)

func init() {
schedule.RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
if len(args) != 1 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
}
conf, ok := v.(*evictLeaderSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}

ranges, err := getKeyRanges(args[1:])
if err != nil {
return err
}
conf.StoreIDWithRanges[id] = ranges
return nil
}
})

schedule.RegisterScheduler(EvictLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
conf.cluster = opController.GetCluster()
return newEvictLeaderScheduler(opController, conf), nil
})
}

type evictLeaderSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
Expand Down
16 changes: 0 additions & 16 deletions server/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,6 @@ const (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
var evictSlowStoreCounter = schedulerCounter.WithLabelValues(EvictSlowStoreName, "schedule")

func init() {
schedule.RegisterSliceDecoderBuilder(EvictSlowStoreType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
return nil
}
})

schedule.RegisterScheduler(EvictSlowStoreType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &evictSlowStoreSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0)}
if err := decoder(conf); err != nil {
return nil, err
}
return newEvictSlowStoreScheduler(opController, conf), nil
})
}

type evictSlowStoreSchedulerConfig struct {
storage endpoint.ConfigStorage
EvictedStores []uint64 `json:"evict-stores"`
Expand Down
41 changes: 0 additions & 41 deletions server/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,47 +51,6 @@ var (
grantHotRegionSkipCounter = schedulerCounter.WithLabelValues(GrantHotRegionName, "skip")
)

func init() {
schedule.RegisterSliceDecoderBuilder(GrantHotRegionType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
if len(args) != 2 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
}

conf, ok := v.(*grantHotRegionSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
leaderID, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}

storeIDs := make([]uint64, 0)
for _, id := range strings.Split(args[1], ",") {
storeID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}
storeIDs = append(storeIDs, storeID)
}
if !conf.setStore(leaderID, storeIDs) {
return errs.ErrSchedulerConfig
}
return nil
}
})

schedule.RegisterScheduler(GrantHotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &grantHotRegionSchedulerConfig{StoreIDs: make([]uint64, 0), storage: storage}
conf.cluster = opController.GetCluster()
if err := decoder(conf); err != nil {
return nil, err
}
return newGrantHotRegionScheduler(opController, conf), nil
})
}

type grantHotRegionSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
Expand Down
35 changes: 0 additions & 35 deletions server/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,41 +47,6 @@ var (
grantLeaderNewOperatorCounter = schedulerCounter.WithLabelValues(GrantLeaderName, "new-operator")
)

func init() {
schedule.RegisterSliceDecoderBuilder(GrantLeaderType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
if len(args) != 1 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
}

conf, ok := v.(*grantLeaderSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}
ranges, err := getKeyRanges(args[1:])
if err != nil {
return err
}
conf.StoreIDWithRanges[id] = ranges
return nil
}
})

schedule.RegisterScheduler(GrantLeaderType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &grantLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage}
conf.cluster = opController.GetCluster()
if err := decoder(conf); err != nil {
return nil, err
}
return newGrantLeaderScheduler(opController, conf), nil
})
}

type grantLeaderSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
Expand Down
32 changes: 0 additions & 32 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/filter"
Expand Down Expand Up @@ -177,37 +176,6 @@ func (h *baseHotScheduler) randomRWType() statistics.RWType {
return h.types[h.r.Int()%len(h.types)]
}

func init() {
schedule.RegisterSliceDecoderBuilder(HotRegionType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
return nil
}
})
schedule.RegisterScheduler(HotRegionType, func(opController *schedule.OperatorController, storage endpoint.ConfigStorage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := initHotRegionScheduleConfig()

var data map[string]interface{}
if err := decoder(&data); err != nil {
return nil, err
}
if len(data) != 0 {
// After upgrading, use compatible config.

// For clusters with the initial version >= v5.2, it will be overwritten by the default config.
conf.applyPrioritiesConfig(compatiblePrioritiesConfig)
// For clusters with the initial version >= v6.4, it will be overwritten by the default config.
conf.SetRankFormulaVersion("")

if err := decoder(conf); err != nil {
return nil, err
}
}

conf.storage = storage
return newHotScheduler(opController, conf), nil
})
}

const (
// HotRegionName is balance hot region scheduler name.
HotRegionName = "balance-hot-region-scheduler"
Expand Down
Loading

0 comments on commit 76c0c10

Please sign in to comment.