Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: pluggable PD scheduler #1799

Merged
merged 51 commits into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
812d090
Multi-plugin hot loading
CheneyDing Oct 9, 2019
86a11a7
Simplify plugin design
CheneyDing Oct 12, 2019
5876dbc
Merge remote-tracking branch 'upstream/master'
CheneyDing Oct 12, 2019
73cb1fc
remove unused files
CheneyDing Oct 12, 2019
8148de7
resolve some conversation
CheneyDing Oct 14, 2019
87a1f66
Update plugin/evict_leader/base_scheduler.go
CheneyDing Oct 15, 2019
fdbf749
Update server/api/plugin.go
CheneyDing Oct 15, 2019
229df0a
Update server/schedule/plugin_interface.go
CheneyDing Oct 15, 2019
ebae046
Update server/schedule/plugin_interface.go
CheneyDing Oct 15, 2019
9838ece
resolve some conversation
CheneyDing Oct 15, 2019
311658d
resolve merge conflict
CheneyDing Oct 15, 2019
aee7903
resolve some conversation
CheneyDing Oct 21, 2019
bde6bae
resolve some conversation
CheneyDing Oct 22, 2019
23f1a5c
resolve some conversation
CheneyDing Oct 22, 2019
d119695
resolve conflict
CheneyDing Oct 23, 2019
a12ac54
resolve conflict
CheneyDing Oct 23, 2019
0fd6b59
resolve some ci problem
CheneyDing Oct 23, 2019
22f979e
add unit test for plugin
CheneyDing Oct 31, 2019
8ca41d8
Merge remote-tracking branch 'upstream/master' into plug-sche
CheneyDing Oct 31, 2019
d78315a
add plugin test
CheneyDing Nov 20, 2019
557a0fa
resolve conflict
CheneyDing Nov 20, 2019
afcd58d
resolve some bug
CheneyDing Nov 22, 2019
f240cb1
Merge remote-tracking branch 'upstream/master' into plug-sche
CheneyDing Nov 22, 2019
564fd3d
Merge remote-tracking branch 'upstream/master' into plug-sche
CheneyDing Nov 27, 2019
cd2f546
delete grant_leader plugin
CheneyDing Nov 28, 2019
81bb6a8
Merge remote-tracking branch 'upstream/master' into plug-sche
CheneyDing Nov 28, 2019
c0dda63
remove unused files
CheneyDing Nov 28, 2019
b4c2776
fix evict_leader plugin
CheneyDing Nov 29, 2019
1a84b3b
resolve some bug
CheneyDing Nov 29, 2019
f0f87de
remove base_scheduler for plugin
CheneyDing Nov 29, 2019
f3ab59d
Merge remote-tracking branch 'upstream/master' into plug-sche
CheneyDing Nov 29, 2019
b32c991
Update plugin/scheduler_example/Makefile
CheneyDing Dec 2, 2019
9d4f031
comment for ci
CheneyDing Dec 4, 2019
f70c2be
Merge remote-tracking branch 'origin/plug-sche' into plug-sche
CheneyDing Dec 4, 2019
a2cb7f6
add comment
CheneyDing Dec 4, 2019
a2e42dd
Merge remote-tracking branch 'upstream/master' into plug-sche
CheneyDing Dec 4, 2019
4eb027b
Update server/api/plugin.go
CheneyDing Dec 4, 2019
252889d
Update server/api/plugin.go
CheneyDing Dec 4, 2019
c0d446f
Update server/handler.go
CheneyDing Dec 4, 2019
0d1a45c
resolve conflict
CheneyDing Dec 4, 2019
abdd00b
modify coordinator
CheneyDing Dec 5, 2019
3abccb6
Update server/schedule/plugin_interface.go
CheneyDing Dec 6, 2019
56f88af
remove unused code
CheneyDing Dec 6, 2019
3d15a2e
add lock for pluginChMap
CheneyDing Dec 10, 2019
1390342
resolve some comment
CheneyDing Dec 12, 2019
3399dd3
Merge branch 'master' into plug-sche
nolouch Dec 16, 2019
5387f8e
Update evict_leader.go
nolouch Dec 16, 2019
e6f00ba
*: fix make plugin and add check
nolouch Dec 16, 2019
c6a47c9
Merge branch 'master' into plug-sche
nolouch Dec 16, 2019
d52cbaa
Merge branch 'master' into plug-sche
nolouch Dec 16, 2019
d37ecef
Merge branch 'master' into plug-sche
sre-bot Dec 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions plugin/scheduler_example/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
evictLeaderPlugin.so: *.go
GO111MODULE=on GO_ENABLED=1 go build -buildmode=plugin -o evictLeaderPlugin.so *.go

.PHONY : clean

clean:
rm evictLeaderPlugin.so
344 changes: 344 additions & 0 deletions plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,344 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"net/http"
"net/url"
"strconv"
"sync"

"github.com/gorilla/mux"
"github.com/pingcap/pd/pkg/apiutil"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedule/filter"
"github.com/pingcap/pd/server/schedule/operator"
"github.com/pingcap/pd/server/schedule/opt"
"github.com/pingcap/pd/server/schedule/selector"
"github.com/pingcap/pd/server/schedulers"
"github.com/pkg/errors"
"github.com/unrolled/render"
)

const (
// EvictLeaderName is evict leader scheduler name.
EvictLeaderName = "user-evict-leader-scheduler"
// EvictLeaderType is evict leader scheduler type.
EvictLeaderType = "user-evict-leader"
noStoreInSchedulerInfo = "No store in user-evict-leader-scheduler-config"
)

func init() {
schedule.RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
if len(args) != 1 {
return errors.New("should specify the store-id")
}
conf, ok := v.(*evictLeaderSchedulerConfig)
if !ok {
return errors.New("the config does not exist")
}

id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errors.WithStack(err)
}
ranges, err := getKeyRanges(args[1:])
if err != nil {
return errors.WithStack(err)
}
conf.StoreIDWitRanges[id] = ranges
return nil

}
})

schedule.RegisterScheduler(EvictLeaderType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) {
conf := &evictLeaderSchedulerConfig{StoreIDWitRanges: make(map[uint64][]core.KeyRange), storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
conf.cluster = opController.GetCluster()
return newEvictLeaderScheduler(opController, conf), nil
})
}

// SchedulerType returns the type of the scheduler
// nolint
func SchedulerType() string {
return EvictLeaderType
}

// SchedulerArgs returns the args for the scheduler
// nolint
func SchedulerArgs() []string {
args := []string{"1"}
return args
}

type evictLeaderSchedulerConfig struct {
mu sync.RWMutex
storage *core.Storage
StoreIDWitRanges map[uint64][]core.KeyRange `json:"store-id-ranges"`
cluster opt.Cluster
}

func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error {
if len(args) != 1 {
return errors.New("should specify the store-id")
}

id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errors.WithStack(err)
}
ranges, err := getKeyRanges(args[1:])
if err != nil {
return errors.WithStack(err)
}
conf.mu.Lock()
defer conf.mu.Unlock()
conf.StoreIDWitRanges[id] = ranges
return nil
}

func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {
conf.mu.RLock()
defer conf.mu.RUnlock()
return &evictLeaderSchedulerConfig{
StoreIDWitRanges: conf.StoreIDWitRanges,
}
}

func (conf *evictLeaderSchedulerConfig) Persist() error {
name := conf.getScheduleName()
conf.mu.RLock()
defer conf.mu.RUnlock()
data, err := schedule.EncodeConfig(conf)
if err != nil {
return err
}
conf.storage.SaveScheduleConfig(name, data)
return nil
}

func (conf *evictLeaderSchedulerConfig) getScheduleName() string {
return EvictLeaderName
}

func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
conf.mu.RLock()
defer conf.mu.RUnlock()
var res []string
for index := range conf.StoreIDWitRanges[id] {
res = append(res, (string)(conf.StoreIDWitRanges[id][index].StartKey))
res = append(res, (string)(conf.StoreIDWitRanges[id][index].EndKey))
}
return res
}

type evictLeaderScheduler struct {
*schedulers.BaseScheduler
conf *evictLeaderSchedulerConfig
selector *selector.RandomSelector
handler http.Handler
}

// newEvictLeaderScheduler creates an admin scheduler that transfers all leaders
// out of a store.
func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *evictLeaderSchedulerConfig) schedule.Scheduler {
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true},
}

base := schedulers.NewBaseScheduler(opController)
handler := newEvictLeaderHandler(conf)
return &evictLeaderScheduler{
BaseScheduler: base,
conf: conf,
selector: selector.NewRandomSelector(filters),
handler: handler,
}
}

func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}

func (s *evictLeaderScheduler) GetName() string {
return EvictLeaderName
}

func (s *evictLeaderScheduler) GetType() string {
return EvictLeaderType
}

func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
return schedule.EncodeConfig(s.conf)
}

func (s *evictLeaderScheduler) Prepare(cluster opt.Cluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
for id := range s.conf.StoreIDWitRanges {
if err := cluster.BlockStore(id); err != nil {
res = err
}
}
return res
}

func (s *evictLeaderScheduler) Cleanup(cluster opt.Cluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWitRanges {
cluster.UnblockStore(id)
}
}

func (s *evictLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return s.OpController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit()
}

func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
var ops []*operator.Operator
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id, ranges := range s.conf.StoreIDWitRanges {
region := cluster.RandLeaderRegion(id, ranges, opt.HealthRegion(cluster))
if region == nil {
continue
}
target := s.selector.SelectTarget(cluster, cluster.GetFollowerStores(region))
if target == nil {
continue
}
op := operator.CreateTransferLeaderOperator(EvictLeaderType, region, region.GetLeader().GetStoreId(), target.GetID(), operator.OpLeader)
op.SetPriorityLevel(core.HighPriority)
ops = append(ops, op)
}

return ops
}

type evictLeaderHandler struct {
rd *render.Render
config *evictLeaderSchedulerConfig
}

func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil {
return
}
var args []string
var exists bool
var id uint64
idFloat, ok := input["store_id"].(float64)
if ok {
id = (uint64)(idFloat)
if _, exists = handler.config.StoreIDWitRanges[id]; !exists {
if err := handler.config.cluster.BlockStore(id); err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err)
return
}
}
args = append(args, strconv.FormatUint(id, 10))
}

ranges, ok := (input["ranges"]).([]string)
if ok {
args = append(args, ranges...)
} else if exists {
args = append(args, handler.config.getRanges(id)...)
}

handler.config.BuildWithArgs(args)
err := handler.config.Persist()
if err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err)
}
handler.rd.JSON(w, http.StatusOK, nil)
}

func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
}

func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["store_id"]
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

handler.config.mu.Lock()
defer handler.config.mu.Unlock()
_, exists := handler.config.StoreIDWitRanges[id]
if exists {
delete(handler.config.StoreIDWitRanges, id)
handler.config.cluster.UnblockStore(id)

handler.config.mu.Unlock()
handler.config.Persist()
handler.config.mu.Lock()

var resp interface{}
if len(handler.config.StoreIDWitRanges) == 0 {
resp = noStoreInSchedulerInfo
}
handler.rd.JSON(w, http.StatusOK, resp)
return
}

handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist"))
}

func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler {
h := &evictLeaderHandler{
config: config,
rd: render.New(render.Options{IndentJSON: true}),
}
router := mux.NewRouter()
router.HandleFunc("/config", h.UpdateConfig).Methods("POST")
router.HandleFunc("/list", h.ListConfig).Methods("GET")
router.HandleFunc("/delete/{store_id}", h.DeleteConfig).Methods("DELETE")
return router
}

func getKeyRanges(args []string) ([]core.KeyRange, error) {
var ranges []core.KeyRange
for len(args) > 1 {
startKey, err := url.QueryUnescape(args[0])
if err != nil {
return nil, err
}
endKey, err := url.QueryUnescape(args[1])
if err != nil {
return nil, err
}
args = args[2:]
ranges = append(ranges, core.NewKeyRange(startKey, endKey))
}
if len(ranges) == 0 {
return []core.KeyRange{core.NewKeyRange("", "")}, nil
}
return ranges, nil
}
Loading