From 6699b3f6af601d304c79b9e724b3aafed6d214be Mon Sep 17 00:00:00 2001 From: Cheney Ding <1017472599@qq.com> Date: Wed, 18 Dec 2019 11:23:56 +0800 Subject: [PATCH] proposal: pluggable PD scheduler (#1799) --- Makefile | 6 +- plugin/scheduler_example/Makefile | 7 + plugin/scheduler_example/evict_leader.go | 351 +++++++++++++++++++ server/api/plugin.go | 90 +++++ server/api/router.go | 4 + server/cluster/coordinator.go | 64 ++++ server/handler.go | 39 ++- server/schedule/plugin_interface.go | 63 ++++ server/schedulers/adjacent_region.go | 10 +- server/schedulers/balance_leader.go | 6 +- server/schedulers/balance_region.go | 6 +- server/schedulers/base_scheduler.go | 27 +- server/schedulers/evict_leader.go | 8 +- server/schedulers/grant_leader.go | 8 +- server/schedulers/hot_region.go | 20 +- server/schedulers/label.go | 6 +- server/schedulers/random_merge.go | 8 +- server/schedulers/scatter_range.go | 8 +- server/schedulers/shuffle_hot_region.go | 12 +- server/schedulers/shuffle_leader.go | 8 +- server/schedulers/shuffle_region.go | 8 +- tests/pdctl/helper.go | 1 + tools/pd-ctl/pdctl/command/plugin_command.go | 95 +++++ tools/pd-ctl/pdctl/ctl.go | 1 + 24 files changed, 788 insertions(+), 68 deletions(-) create mode 100644 plugin/scheduler_example/Makefile create mode 100644 plugin/scheduler_example/evict_leader.go create mode 100644 server/api/plugin.go create mode 100644 server/schedule/plugin_interface.go create mode 100644 tools/pd-ctl/pdctl/command/plugin_command.go diff --git a/Makefile b/Makefile index d92192300cf7..d25ed4a0b453 100644 --- a/Makefile +++ b/Makefile @@ -106,7 +106,11 @@ retool-setup: @which retool >/dev/null 2>&1 || go get github.com/twitchtv/retool @./scripts/retool sync -check: retool-setup check-all +check: retool-setup check-all check-plugin + +check-plugin: + @echo "checking plugin" + cd ./plugin/scheduler_example && make evictLeaderPlugin.so && rm evictLeaderPlugin.so static: export GO111MODULE=on static: diff --git a/plugin/scheduler_example/Makefile b/plugin/scheduler_example/Makefile new file mode 100644 index 000000000000..46aa676bd030 --- /dev/null +++ b/plugin/scheduler_example/Makefile @@ -0,0 +1,7 @@ +evictLeaderPlugin.so: *.go + GO111MODULE=on GO_ENABLED=1 go build -buildmode=plugin -o evictLeaderPlugin.so *.go + +.PHONY : clean + +clean: + rm evictLeaderPlugin.so diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go new file mode 100644 index 000000000000..6f0731b7419d --- /dev/null +++ b/plugin/scheduler_example/evict_leader.go @@ -0,0 +1,351 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "net/http" + "net/url" + "strconv" + "sync" + + "github.com/gorilla/mux" + "github.com/pingcap/log" + "github.com/pingcap/pd/pkg/apiutil" + "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/schedule" + "github.com/pingcap/pd/server/schedule/filter" + "github.com/pingcap/pd/server/schedule/operator" + "github.com/pingcap/pd/server/schedule/opt" + "github.com/pingcap/pd/server/schedule/selector" + "github.com/pingcap/pd/server/schedulers" + "github.com/pkg/errors" + "github.com/unrolled/render" + "go.uber.org/zap" +) + +const ( + // EvictLeaderName is evict leader scheduler name. + EvictLeaderName = "user-evict-leader-scheduler" + // EvictLeaderType is evict leader scheduler type. + EvictLeaderType = "user-evict-leader" + noStoreInSchedulerInfo = "No store in user-evict-leader-scheduler-config" +) + +func init() { + schedule.RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) schedule.ConfigDecoder { + return func(v interface{}) error { + if len(args) != 1 { + return errors.New("should specify the store-id") + } + conf, ok := v.(*evictLeaderSchedulerConfig) + if !ok { + return errors.New("the config does not exist") + } + + id, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return errors.WithStack(err) + } + ranges, err := getKeyRanges(args[1:]) + if err != nil { + return errors.WithStack(err) + } + conf.StoreIDWitRanges[id] = ranges + return nil + + } + }) + + schedule.RegisterScheduler(EvictLeaderType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &evictLeaderSchedulerConfig{StoreIDWitRanges: make(map[uint64][]core.KeyRange), storage: storage} + if err := decoder(conf); err != nil { + return nil, err + } + conf.cluster = opController.GetCluster() + return newEvictLeaderScheduler(opController, conf), nil + }) +} + +// SchedulerType returns the type of the scheduler +// nolint +func SchedulerType() string { + return EvictLeaderType +} + +// SchedulerArgs returns the args for the scheduler +// nolint +func SchedulerArgs() []string { + args := []string{"1"} + return args +} + +type evictLeaderSchedulerConfig struct { + mu sync.RWMutex + storage *core.Storage + StoreIDWitRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` + cluster opt.Cluster +} + +func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { + if len(args) != 1 { + return errors.New("should specify the store-id") + } + + id, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return errors.WithStack(err) + } + ranges, err := getKeyRanges(args[1:]) + if err != nil { + return errors.WithStack(err) + } + conf.mu.Lock() + defer conf.mu.Unlock() + conf.StoreIDWitRanges[id] = ranges + return nil +} + +func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { + conf.mu.RLock() + defer conf.mu.RUnlock() + return &evictLeaderSchedulerConfig{ + StoreIDWitRanges: conf.StoreIDWitRanges, + } +} + +func (conf *evictLeaderSchedulerConfig) Persist() error { + name := conf.getScheduleName() + conf.mu.RLock() + defer conf.mu.RUnlock() + data, err := schedule.EncodeConfig(conf) + if err != nil { + return err + } + conf.storage.SaveScheduleConfig(name, data) + return nil +} + +func (conf *evictLeaderSchedulerConfig) getScheduleName() string { + return EvictLeaderName +} + +func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { + conf.mu.RLock() + defer conf.mu.RUnlock() + var res []string + for index := range conf.StoreIDWitRanges[id] { + res = append(res, (string)(conf.StoreIDWitRanges[id][index].StartKey)) + res = append(res, (string)(conf.StoreIDWitRanges[id][index].EndKey)) + } + return res +} + +type evictLeaderScheduler struct { + *schedulers.BaseScheduler + conf *evictLeaderSchedulerConfig + selector *selector.RandomSelector + handler http.Handler +} + +// newEvictLeaderScheduler creates an admin scheduler that transfers all leaders +// out of a store. +func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *evictLeaderSchedulerConfig) schedule.Scheduler { + filters := []filter.Filter{ + filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true}, + } + + base := schedulers.NewBaseScheduler(opController) + handler := newEvictLeaderHandler(conf) + return &evictLeaderScheduler{ + BaseScheduler: base, + conf: conf, + selector: selector.NewRandomSelector(filters), + handler: handler, + } +} + +func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) +} + +func (s *evictLeaderScheduler) GetName() string { + return EvictLeaderName +} + +func (s *evictLeaderScheduler) GetType() string { + return EvictLeaderType +} + +func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { + s.conf.mu.RLock() + defer s.conf.mu.RUnlock() + return schedule.EncodeConfig(s.conf) +} + +func (s *evictLeaderScheduler) Prepare(cluster opt.Cluster) error { + s.conf.mu.RLock() + defer s.conf.mu.RUnlock() + var res error + for id := range s.conf.StoreIDWitRanges { + if err := cluster.BlockStore(id); err != nil { + res = err + } + } + return res +} + +func (s *evictLeaderScheduler) Cleanup(cluster opt.Cluster) { + s.conf.mu.RLock() + defer s.conf.mu.RUnlock() + for id := range s.conf.StoreIDWitRanges { + cluster.UnblockStore(id) + } +} + +func (s *evictLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { + return s.OpController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() +} + +func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { + var ops []*operator.Operator + s.conf.mu.RLock() + defer s.conf.mu.RUnlock() + for id, ranges := range s.conf.StoreIDWitRanges { + region := cluster.RandLeaderRegion(id, ranges, opt.HealthRegion(cluster)) + if region == nil { + continue + } + target := s.selector.SelectTarget(cluster, cluster.GetFollowerStores(region)) + if target == nil { + continue + } + op, err := operator.CreateTransferLeaderOperator(EvictLeaderType, cluster, region, region.GetLeader().GetStoreId(), target.GetID(), operator.OpLeader) + if err != nil { + log.Debug("fail to create evict leader operator", zap.Error(err)) + continue + + } + op.SetPriorityLevel(core.HighPriority) + ops = append(ops, op) + } + + return ops +} + +type evictLeaderHandler struct { + rd *render.Render + config *evictLeaderSchedulerConfig +} + +func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + var args []string + var exists bool + var id uint64 + idFloat, ok := input["store_id"].(float64) + if ok { + id = (uint64)(idFloat) + if _, exists = handler.config.StoreIDWitRanges[id]; !exists { + if err := handler.config.cluster.BlockStore(id); err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err) + return + } + } + args = append(args, strconv.FormatUint(id, 10)) + } + + ranges, ok := (input["ranges"]).([]string) + if ok { + args = append(args, ranges...) + } else if exists { + args = append(args, handler.config.getRanges(id)...) + } + + handler.config.BuildWithArgs(args) + err := handler.config.Persist() + if err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err) + } + handler.rd.JSON(w, http.StatusOK, nil) +} + +func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + +func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) { + idStr := mux.Vars(r)["store_id"] + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + + handler.config.mu.Lock() + defer handler.config.mu.Unlock() + _, exists := handler.config.StoreIDWitRanges[id] + if exists { + delete(handler.config.StoreIDWitRanges, id) + handler.config.cluster.UnblockStore(id) + + handler.config.mu.Unlock() + handler.config.Persist() + handler.config.mu.Lock() + + var resp interface{} + if len(handler.config.StoreIDWitRanges) == 0 { + resp = noStoreInSchedulerInfo + } + handler.rd.JSON(w, http.StatusOK, resp) + return + } + + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist")) +} + +func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { + h := &evictLeaderHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods("POST") + router.HandleFunc("/list", h.ListConfig).Methods("GET") + router.HandleFunc("/delete/{store_id}", h.DeleteConfig).Methods("DELETE") + return router +} + +func getKeyRanges(args []string) ([]core.KeyRange, error) { + var ranges []core.KeyRange + for len(args) > 1 { + startKey, err := url.QueryUnescape(args[0]) + if err != nil { + return nil, err + } + endKey, err := url.QueryUnescape(args[1]) + if err != nil { + return nil, err + } + args = args[2:] + ranges = append(ranges, core.NewKeyRange(startKey, endKey)) + } + if len(ranges) == 0 { + return []core.KeyRange{core.NewKeyRange("", "")}, nil + } + return ranges, nil +} diff --git a/server/api/plugin.go b/server/api/plugin.go new file mode 100644 index 000000000000..8e14362c1eec --- /dev/null +++ b/server/api/plugin.go @@ -0,0 +1,90 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "errors" + "net/http" + "os" + "strings" + + "github.com/pingcap/pd/pkg/apiutil" + "github.com/pingcap/pd/server" + "github.com/pingcap/pd/server/cluster" + "github.com/unrolled/render" +) + +type pluginHandler struct { + *server.Handler + rd *render.Render +} + +func newPluginHandler(handler *server.Handler, rd *render.Render) *pluginHandler { + return &pluginHandler{ + Handler: handler, + rd: rd, + } +} + +func (h *pluginHandler) LoadPlugin(w http.ResponseWriter, r *http.Request) { + h.processPluginCommand(w, r, cluster.PluginLoad) +} + +func (h *pluginHandler) UnloadPlugin(w http.ResponseWriter, r *http.Request) { + h.processPluginCommand(w, r, cluster.PluginUnload) +} + +func (h *pluginHandler) processPluginCommand(w http.ResponseWriter, r *http.Request, action string) { + data := make(map[string]string) + if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &data); err != nil { + return + } + path := data["plugin-path"] + if !strings.HasPrefix(path, "./pd/plugin/") { + err := errors.New("plugin path must begin with ./pd/plugin/") + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if exist, err := pathExists(path); !exist { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + var err error + switch action { + case cluster.PluginLoad: + err = h.PluginLoad(path) + case cluster.PluginUnload: + err = h.PluginUnload(path) + default: + h.rd.JSON(w, http.StatusBadRequest, "unknown action") + return + } + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) +} + +func pathExists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + err = errors.New("file is not exists") + return false, err + } + return false, err +} diff --git a/server/api/router.go b/server/api/router.go index 0849386410f6..ae6172a9b5e7 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -158,6 +158,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { logHandler := newlogHandler(svr, rd) rootRouter.HandleFunc("/api/v1/admin/log", logHandler.Handle).Methods("POST") + pluginHandler := newPluginHandler(handler, rd) + rootRouter.HandleFunc("/api/v1/plugin", pluginHandler.LoadPlugin).Methods("POST") + rootRouter.HandleFunc("/api/v1/plugin", pluginHandler.UnloadPlugin).Methods("DELETE") + rootRouter.Handle("/api/v1/health", newHealthHandler(svr, rd)).Methods("GET") rootRouter.Handle("/api/v1/diagnose", newDiagnoseHandler(svr, rd)).Methods("GET") rootRouter.HandleFunc("/api/v1/ping", func(w http.ResponseWriter, r *http.Request) {}).Methods("GET") diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 32f952ef1d7a..b7e7819d5b25 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -41,6 +41,10 @@ const ( maxLoadConfigRetries = 10 patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions. + // PluginLoad means action for load plugin + PluginLoad = "PluginLoad" + // PluginUnload means action for unload plugin + PluginUnload = "PluginUnload" ) var ( @@ -63,6 +67,7 @@ type coordinator struct { schedulers map[string]*scheduleController opController *schedule.OperatorController hbStreams opt.HeartbeatStreams + pluginInterface *schedule.PluginInterface } // newCoordinator creates a new coordinator. @@ -78,6 +83,7 @@ func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams opt.Hea schedulers: make(map[string]*scheduleController), opController: opController, hbStreams: hbStreams, + pluginInterface: schedule.NewPluginInterface(), } } @@ -256,6 +262,64 @@ func (c *coordinator) run() { go c.drivePushOperator() } +// LoadPlugin load user plugin +func (c *coordinator) LoadPlugin(pluginPath string, ch chan string) { + log.Info("load plugin", zap.String("plugin-path", pluginPath)) + // get func: SchedulerType from plugin + SchedulerType, err := c.pluginInterface.GetFunction(pluginPath, "SchedulerType") + if err != nil { + log.Error("GetFunction SchedulerType error", zap.Error(err)) + return + } + schedulerType := SchedulerType.(func() string) + // get func: SchedulerArgs from plugin + SchedulerArgs, err := c.pluginInterface.GetFunction(pluginPath, "SchedulerArgs") + if err != nil { + log.Error("GetFunction SchedulerArgs error", zap.Error(err)) + return + } + schedulerArgs := SchedulerArgs.(func() []string) + // create and add user scheduler + s, err := schedule.CreateScheduler(schedulerType(), c.opController, c.cluster.storage, schedule.ConfigSliceDecoder(schedulerType(), schedulerArgs())) + if err != nil { + log.Error("can not create scheduler", zap.String("scheduler-type", schedulerType()), zap.Error(err)) + return + } + log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) + if err = c.addScheduler(s); err != nil { + log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err)) + return + } + + c.wg.Add(1) + go c.waitPluginUnload(pluginPath, s.GetName(), ch) +} + +func (c *coordinator) waitPluginUnload(pluginPath, schedulerName string, ch chan string) { + defer logutil.LogPanic() + defer c.wg.Done() + // Get signal from channel which means user unload the plugin + for { + select { + case action := <-ch: + if action == PluginUnload { + err := c.removeScheduler(schedulerName) + if err != nil { + log.Error("can not remove scheduler", zap.String("scheduler-name", schedulerName), zap.Error(err)) + } else { + log.Info("unload plugin", zap.String("plugin", pluginPath)) + return + } + } else { + log.Error("unknown action", zap.String("action", action)) + } + case <-c.ctx.Done(): + log.Info("unload plugin has been stopped") + return + } + } +} + func (c *coordinator) stop() { c.cancel() } diff --git a/server/handler.go b/server/handler.go index 0f139480a8db..5fc3f9241cd9 100644 --- a/server/handler.go +++ b/server/handler.go @@ -20,6 +20,7 @@ import ( "path" "strconv" "strings" + "sync" "time" "github.com/pingcap/errcode" @@ -62,16 +63,22 @@ var ( ErrStoreNotFound = func(storeID uint64) error { return errors.Errorf("store %v not found", storeID) } + // ErrPluginNotFound is error info for plugin not found. + ErrPluginNotFound = func(pluginPath string) error { + return errors.Errorf("plugin is not found: %s", pluginPath) + } ) // Handler is a helper to export methods to handle API/RPC requests. type Handler struct { - s *Server - opt *config.ScheduleOption + s *Server + opt *config.ScheduleOption + pluginChMap map[string]chan string + pluginChMapLock sync.RWMutex } func newHandler(s *Server) *Handler { - return &Handler{s: s, opt: s.scheduleOpt} + return &Handler{s: s, opt: s.scheduleOpt, pluginChMap: make(map[string]chan string), pluginChMapLock: sync.RWMutex{}} } // GetRaftCluster returns RaftCluster. @@ -835,3 +842,29 @@ func (h *Handler) ResetTS(ts uint64) error { } return tsoServer.ResetUserTimestamp(ts) } + +// PluginLoad loads the plugin referenced by the pluginPath +func (h *Handler) PluginLoad(pluginPath string) error { + h.pluginChMapLock.Lock() + defer h.pluginChMapLock.Unlock() + cluster, err := h.GetRaftCluster() + if err != nil { + return err + } + c := cluster.GetCoordinator() + ch := make(chan string) + h.pluginChMap[pluginPath] = ch + c.LoadPlugin(pluginPath, ch) + return nil +} + +// PluginUnload unloads the plugin referenced by the pluginPath +func (h *Handler) PluginUnload(pluginPath string) error { + h.pluginChMapLock.Lock() + defer h.pluginChMapLock.Unlock() + if ch, ok := h.pluginChMap[pluginPath]; ok { + ch <- cluster.PluginUnload + return nil + } + return ErrPluginNotFound(pluginPath) +} diff --git a/server/schedule/plugin_interface.go b/server/schedule/plugin_interface.go new file mode 100644 index 000000000000..32d140beff5e --- /dev/null +++ b/server/schedule/plugin_interface.go @@ -0,0 +1,63 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedule + +import ( + "path/filepath" + "plugin" + "sync" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// PluginInterface is used to manage all plugin. +type PluginInterface struct { + pluginMap map[string]*plugin.Plugin + pluginMapLock sync.RWMutex +} + +// NewPluginInterface create a plugin interface +func NewPluginInterface() *PluginInterface { + return &PluginInterface{ + pluginMap: make(map[string]*plugin.Plugin), + pluginMapLock: sync.RWMutex{}, + } +} + +// GetFunction gets func by funcName from plugin(.so) +func (p *PluginInterface) GetFunction(path string, funcName string) (plugin.Symbol, error) { + p.pluginMapLock.Lock() + defer p.pluginMapLock.Unlock() + if _, ok := p.pluginMap[path]; !ok { + //open plugin + filePath, err := filepath.Abs(path) + if err != nil { + return nil, err + } + log.Info("open plugin file", zap.String("file-path", filePath)) + plugin, err := plugin.Open(filePath) + if err != nil { + return nil, err + } + p.pluginMap[path] = plugin + } + //get func from plugin + f, err := p.pluginMap[path].Lookup(funcName) + if err != nil { + log.Error("Lookup func error!") + return nil, err + } + return f, nil +} diff --git a/server/schedulers/adjacent_region.go b/server/schedulers/adjacent_region.go index 3ba3016e6004..f46cc41c9a39 100644 --- a/server/schedulers/adjacent_region.go +++ b/server/schedulers/adjacent_region.go @@ -92,7 +92,7 @@ type balanceAdjacentRegionConfig struct { // 1. any two adjacent regions' leader will not in the same store // 2. the two regions' leader will not in the public store of this two regions type balanceAdjacentRegionScheduler struct { - *baseScheduler + *BaseScheduler selector *selector.RandomSelector lastKey []byte cacheRegions *adjacentState @@ -122,9 +122,9 @@ func newBalanceAdjacentRegionScheduler(opController *schedule.OperatorController filters := []filter.Filter{ filter.StoreStateFilter{ActionScope: AdjacentRegionName, TransferLeader: true, MoveRegion: true}, } - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) s := &balanceAdjacentRegionScheduler{ - baseScheduler: base, + BaseScheduler: base, selector: selector.NewRandomSelector(filters), conf: conf, lastKey: []byte(""), @@ -157,11 +157,11 @@ func (l *balanceAdjacentRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) } func (l *balanceAdjacentRegionScheduler) allowBalanceLeader() bool { - return l.opController.OperatorCount(operator.OpAdjacent|operator.OpLeader) < l.conf.LeaderLimit + return l.OpController.OperatorCount(operator.OpAdjacent|operator.OpLeader) < l.conf.LeaderLimit } func (l *balanceAdjacentRegionScheduler) allowBalancePeer() bool { - return l.opController.OperatorCount(operator.OpAdjacent|operator.OpRegion) < l.conf.PeerLimit + return l.OpController.OperatorCount(operator.OpAdjacent|operator.OpRegion) < l.conf.PeerLimit } func (l *balanceAdjacentRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 23d558142863..83e97fcf4ce0 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -69,7 +69,7 @@ type balanceLeaderSchedulerConfig struct { } type balanceLeaderScheduler struct { - *baseScheduler + *BaseScheduler conf *balanceLeaderSchedulerConfig opController *schedule.OperatorController filters []filter.Filter @@ -79,10 +79,10 @@ type balanceLeaderScheduler struct { // newBalanceLeaderScheduler creates a scheduler that tends to keep leaders on // each store balanced. func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *balanceLeaderSchedulerConfig, opts ...BalanceLeaderCreateOption) schedule.Scheduler { - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) s := &balanceLeaderScheduler{ - baseScheduler: base, + BaseScheduler: base, conf: conf, opController: opController, counter: balanceLeaderCounter, diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 7afc15ac6436..252deaf732f8 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -70,7 +70,7 @@ type balanceRegionSchedulerConfig struct { } type balanceRegionScheduler struct { - *baseScheduler + *BaseScheduler conf *balanceRegionSchedulerConfig opController *schedule.OperatorController filters []filter.Filter @@ -80,9 +80,9 @@ type balanceRegionScheduler struct { // newBalanceRegionScheduler creates a scheduler that tends to keep regions on // each store balanced. func newBalanceRegionScheduler(opController *schedule.OperatorController, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) schedule.Scheduler { - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) scheduler := &balanceRegionScheduler{ - baseScheduler: base, + BaseScheduler: base, conf: conf, opController: opController, counter: balanceRegionCounter, diff --git a/server/schedulers/base_scheduler.go b/server/schedulers/base_scheduler.go index fbc0be2db621..d085a0dedec8 100644 --- a/server/schedulers/base_scheduler.go +++ b/server/schedulers/base_scheduler.go @@ -55,30 +55,37 @@ func intervalGrow(x time.Duration, maxInterval time.Duration, typ intervalGrowth return 0 } -type baseScheduler struct { - opController *schedule.OperatorController +// BaseScheduler is a basic scheduler for all other complex scheduler +type BaseScheduler struct { + OpController *schedule.OperatorController } -func newBaseScheduler(opController *schedule.OperatorController) *baseScheduler { - return &baseScheduler{opController: opController} +// NewBaseScheduler returns a basic scheduler +func NewBaseScheduler(opController *schedule.OperatorController) *BaseScheduler { + return &BaseScheduler{OpController: opController} } -func (s *baseScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (s *BaseScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "not implements") } -func (s *baseScheduler) GetMinInterval() time.Duration { +// GetMinInterval returns the minimal interval for the scheduler +func (s *BaseScheduler) GetMinInterval() time.Duration { return MinScheduleInterval } -func (s *baseScheduler) EncodeConfig() ([]byte, error) { +// EncodeConfig encode config for the scheduler +func (s *BaseScheduler) EncodeConfig() ([]byte, error) { return schedule.EncodeConfig(nil) } -func (s *baseScheduler) GetNextInterval(interval time.Duration) time.Duration { +// GetNextInterval return the next interval for the scheduler +func (s *BaseScheduler) GetNextInterval(interval time.Duration) time.Duration { return intervalGrow(interval, MaxScheduleInterval, exponentialGrowth) } -func (s *baseScheduler) Prepare(cluster opt.Cluster) error { return nil } +// Prepare does some prepare work +func (s *BaseScheduler) Prepare(cluster opt.Cluster) error { return nil } -func (s *baseScheduler) Cleanup(cluster opt.Cluster) {} +// Cleanup does some cleanup work +func (s *BaseScheduler) Cleanup(cluster opt.Cluster) {} diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index 147cfdfd0f44..7313298cfa3c 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -152,7 +152,7 @@ func (conf *evictLeaderSchedulerConfig) mayBeRemoveStoreFromConfig(id uint64) (s } type evictLeaderScheduler struct { - *baseScheduler + *BaseScheduler conf *evictLeaderSchedulerConfig selector *selector.RandomSelector handler http.Handler @@ -165,10 +165,10 @@ func newEvictLeaderScheduler(opController *schedule.OperatorController, conf *ev filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true}, } - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) handler := newEvictLeaderHandler(conf) return &evictLeaderScheduler{ - baseScheduler: base, + BaseScheduler: base, conf: conf, selector: selector.NewRandomSelector(filters), handler: handler, @@ -214,7 +214,7 @@ func (s *evictLeaderScheduler) Cleanup(cluster opt.Cluster) { } func (s *evictLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { - return s.opController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() + return s.OpController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() } func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { diff --git a/server/schedulers/grant_leader.go b/server/schedulers/grant_leader.go index 8e85b0ab48c9..20664c5e9270 100644 --- a/server/schedulers/grant_leader.go +++ b/server/schedulers/grant_leader.go @@ -150,7 +150,7 @@ func (conf *grantLeaderSchedulerConfig) mayBeRemoveStoreFromConfig(id uint64) (s // grantLeaderScheduler transfers all leaders to peers in the store. type grantLeaderScheduler struct { - *baseScheduler + *BaseScheduler conf *grantLeaderSchedulerConfig handler http.Handler } @@ -158,10 +158,10 @@ type grantLeaderScheduler struct { // newGrantLeaderScheduler creates an admin scheduler that transfers all leaders // to a store. func newGrantLeaderScheduler(opController *schedule.OperatorController, conf *grantLeaderSchedulerConfig) schedule.Scheduler { - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) handler := newGrantLeaderHandler(conf) return &grantLeaderScheduler{ - baseScheduler: base, + BaseScheduler: base, conf: conf, handler: handler, } @@ -204,7 +204,7 @@ func (s *grantLeaderScheduler) Cleanup(cluster opt.Cluster) { } func (s *grantLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { - return s.opController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() + return s.OpController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() } func (s *grantLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 0c82b4b8bfd2..06284f75b8a3 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -89,7 +89,7 @@ func newStoreStaticstics() *storeStatistics { type balanceHotRegionsScheduler struct { name string - *baseScheduler + *BaseScheduler sync.RWMutex leaderLimit uint64 peerLimit uint64 @@ -107,10 +107,10 @@ type balanceHotRegionsScheduler struct { } func newBalanceHotRegionsScheduler(opController *schedule.OperatorController) *balanceHotRegionsScheduler { - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) return &balanceHotRegionsScheduler{ name: HotRegionName, - baseScheduler: base, + BaseScheduler: base, leaderLimit: 1, peerLimit: 1, stats: newStoreStaticstics(), @@ -123,9 +123,9 @@ func newBalanceHotRegionsScheduler(opController *schedule.OperatorController) *b } func newBalanceHotReadRegionsScheduler(opController *schedule.OperatorController) *balanceHotRegionsScheduler { - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) return &balanceHotRegionsScheduler{ - baseScheduler: base, + BaseScheduler: base, leaderLimit: 1, peerLimit: 1, stats: newStoreStaticstics(), @@ -138,9 +138,9 @@ func newBalanceHotReadRegionsScheduler(opController *schedule.OperatorController } func newBalanceHotWriteRegionsScheduler(opController *schedule.OperatorController) *balanceHotRegionsScheduler { - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) return &balanceHotRegionsScheduler{ - baseScheduler: base, + BaseScheduler: base, leaderLimit: 1, peerLimit: 1, stats: newStoreStaticstics(), @@ -165,12 +165,12 @@ func (h *balanceHotRegionsScheduler) IsScheduleAllowed(cluster opt.Cluster) bool } func (h *balanceHotRegionsScheduler) allowBalanceLeader(cluster opt.Cluster) bool { - return h.opController.OperatorCount(operator.OpHotRegion) < minUint64(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) && - h.opController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() + return h.OpController.OperatorCount(operator.OpHotRegion) < minUint64(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) && + h.OpController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() } func (h *balanceHotRegionsScheduler) allowBalanceRegion(cluster opt.Cluster) bool { - return h.opController.OperatorCount(operator.OpHotRegion) < minUint64(h.peerLimit, cluster.GetHotRegionScheduleLimit()) + return h.OpController.OperatorCount(operator.OpHotRegion) < minUint64(h.peerLimit, cluster.GetHotRegionScheduleLimit()) } func (h *balanceHotRegionsScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { diff --git a/server/schedulers/label.go b/server/schedulers/label.go index 401b31252d05..c515bec9c300 100644 --- a/server/schedulers/label.go +++ b/server/schedulers/label.go @@ -64,7 +64,7 @@ type labelSchedulerConfig struct { } type labelScheduler struct { - *baseScheduler + *BaseScheduler conf *labelSchedulerConfig selector *selector.BalanceSelector } @@ -78,7 +78,7 @@ func newLabelScheduler(opController *schedule.OperatorController, conf *labelSch } kind := core.NewScheduleKind(core.LeaderKind, core.ByCount) return &labelScheduler{ - baseScheduler: newBaseScheduler(opController), + BaseScheduler: NewBaseScheduler(opController), conf: conf, selector: selector.NewBalanceSelector(kind, filters), } @@ -97,7 +97,7 @@ func (s *labelScheduler) EncodeConfig() ([]byte, error) { } func (s *labelScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { - return s.opController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() + return s.OpController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() } func (s *labelScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { diff --git a/server/schedulers/random_merge.go b/server/schedulers/random_merge.go index b5e4f2c41e8b..d928ff339063 100644 --- a/server/schedulers/random_merge.go +++ b/server/schedulers/random_merge.go @@ -65,7 +65,7 @@ type randomMergeSchedulerConfig struct { } type randomMergeScheduler struct { - *baseScheduler + *BaseScheduler conf *randomMergeSchedulerConfig selector *selector.RandomSelector } @@ -76,9 +76,9 @@ func newRandomMergeScheduler(opController *schedule.OperatorController, conf *ra filters := []filter.Filter{ filter.StoreStateFilter{ActionScope: conf.Name, MoveRegion: true}, } - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) return &randomMergeScheduler{ - baseScheduler: base, + BaseScheduler: base, conf: conf, selector: selector.NewRandomSelector(filters), } @@ -97,7 +97,7 @@ func (s *randomMergeScheduler) EncodeConfig() ([]byte, error) { } func (s *randomMergeScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { - return s.opController.OperatorCount(operator.OpMerge) < cluster.GetMergeScheduleLimit() + return s.OpController.OperatorCount(operator.OpMerge) < cluster.GetMergeScheduleLimit() } func (s *randomMergeScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { diff --git a/server/schedulers/scatter_range.go b/server/schedulers/scatter_range.go index 0796314eede3..71a12d42ea39 100644 --- a/server/schedulers/scatter_range.go +++ b/server/schedulers/scatter_range.go @@ -139,7 +139,7 @@ func (conf *scatterRangeSchedulerConfig) getSchedulerName() string { } type scatterRangeScheduler struct { - *baseScheduler + *BaseScheduler name string config *scatterRangeSchedulerConfig balanceLeader schedule.Scheduler @@ -149,12 +149,12 @@ type scatterRangeScheduler struct { // newScatterRangeScheduler creates a scheduler that balances the distribution of leaders and regions that in the specified key range. func newScatterRangeScheduler(opController *schedule.OperatorController, config *scatterRangeSchedulerConfig) schedule.Scheduler { - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) name := config.getSchedulerName() handler := newScatterRangeHandler(config) scheduler := &scatterRangeScheduler{ - baseScheduler: base, + BaseScheduler: base, config: config, handler: handler, name: name, @@ -193,7 +193,7 @@ func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { } func (l *scatterRangeScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { - return l.opController.OperatorCount(operator.OpRange) < cluster.GetRegionScheduleLimit() + return l.OpController.OperatorCount(operator.OpRange) < cluster.GetRegionScheduleLimit() } func (l *scatterRangeScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index 736472da62bf..57b72076ac5e 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -74,7 +74,7 @@ type shuffleHotRegionSchedulerConfig struct { // to a random store, and then transfer the leader to // the hot peer. type shuffleHotRegionScheduler struct { - *baseScheduler + *BaseScheduler stats *storeStatistics r *rand.Rand conf *shuffleHotRegionSchedulerConfig @@ -83,9 +83,9 @@ type shuffleHotRegionScheduler struct { // newShuffleHotRegionScheduler creates an admin scheduler that random balance hot regions func newShuffleHotRegionScheduler(opController *schedule.OperatorController, conf *shuffleHotRegionSchedulerConfig) schedule.Scheduler { - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) return &shuffleHotRegionScheduler{ - baseScheduler: base, + BaseScheduler: base, conf: conf, stats: newStoreStaticstics(), types: []BalanceType{hotReadRegionBalance, hotWriteRegionBalance}, @@ -106,9 +106,9 @@ func (s *shuffleHotRegionScheduler) EncodeConfig() ([]byte, error) { } func (s *shuffleHotRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { - return s.opController.OperatorCount(operator.OpHotRegion) < s.conf.Limit && - s.opController.OperatorCount(operator.OpRegion) < cluster.GetRegionScheduleLimit() && - s.opController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() + return s.OpController.OperatorCount(operator.OpHotRegion) < s.conf.Limit && + s.OpController.OperatorCount(operator.OpRegion) < cluster.GetRegionScheduleLimit() && + s.OpController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() } func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { diff --git a/server/schedulers/shuffle_leader.go b/server/schedulers/shuffle_leader.go index 0913779d4d94..3ea5d224fef3 100644 --- a/server/schedulers/shuffle_leader.go +++ b/server/schedulers/shuffle_leader.go @@ -64,7 +64,7 @@ type shuffleLeaderSchedulerConfig struct { } type shuffleLeaderScheduler struct { - *baseScheduler + *BaseScheduler conf *shuffleLeaderSchedulerConfig selector *selector.RandomSelector } @@ -75,9 +75,9 @@ func newShuffleLeaderScheduler(opController *schedule.OperatorController, conf * filters := []filter.Filter{ filter.StoreStateFilter{ActionScope: conf.Name, TransferLeader: true}, } - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) return &shuffleLeaderScheduler{ - baseScheduler: base, + BaseScheduler: base, conf: conf, selector: selector.NewRandomSelector(filters), } @@ -96,7 +96,7 @@ func (s *shuffleLeaderScheduler) EncodeConfig() ([]byte, error) { } func (s *shuffleLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { - return s.opController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() + return s.OpController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() } func (s *shuffleLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { diff --git a/server/schedulers/shuffle_region.go b/server/schedulers/shuffle_region.go index f1f4a2af4a0f..016ec0837ac3 100644 --- a/server/schedulers/shuffle_region.go +++ b/server/schedulers/shuffle_region.go @@ -64,7 +64,7 @@ type shuffleRegionSchedulerConfig struct { } type shuffleRegionScheduler struct { - *baseScheduler + *BaseScheduler conf *shuffleRegionSchedulerConfig selector *selector.RandomSelector } @@ -75,9 +75,9 @@ func newShuffleRegionScheduler(opController *schedule.OperatorController, conf * filters := []filter.Filter{ filter.StoreStateFilter{ActionScope: conf.Name, MoveRegion: true}, } - base := newBaseScheduler(opController) + base := NewBaseScheduler(opController) return &shuffleRegionScheduler{ - baseScheduler: base, + BaseScheduler: base, conf: conf, selector: selector.NewRandomSelector(filters), } @@ -96,7 +96,7 @@ func (s *shuffleRegionScheduler) EncodeConfig() ([]byte, error) { } func (s *shuffleRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { - return s.opController.OperatorCount(operator.OpRegion) < cluster.GetRegionScheduleLimit() + return s.OpController.OperatorCount(operator.OpRegion) < cluster.GetRegionScheduleLimit() } func (s *shuffleRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index 6caec6b54430..143283559fad 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -60,6 +60,7 @@ func InitCommand() *cobra.Command { command.NewClusterCommand(), command.NewHealthCommand(), command.NewLogCommand(), + command.NewPluginCommand(), ) return rootCmd } diff --git a/tools/pd-ctl/pdctl/command/plugin_command.go b/tools/pd-ctl/pdctl/command/plugin_command.go new file mode 100644 index 000000000000..4f374a099334 --- /dev/null +++ b/tools/pd-ctl/pdctl/command/plugin_command.go @@ -0,0 +1,95 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "bytes" + "encoding/json" + "net/http" + + "github.com/pingcap/pd/server/cluster" + "github.com/spf13/cobra" +) + +var ( + pluginPrefix = "pd/api/v1/plugin" +) + +// NewPluginCommand a set subcommand of plugin command +func NewPluginCommand() *cobra.Command { + r := &cobra.Command{ + Use: "plugin ", + Short: "plugin commands", + } + r.AddCommand(NewLoadPluginCommand()) + r.AddCommand(NewUnloadPluginCommand()) + return r +} + +// NewLoadPluginCommand return a load subcommand of plugin command +func NewLoadPluginCommand() *cobra.Command { + r := &cobra.Command{ + Use: "load ", + Short: "load a plugin, path must begin with ./pd/plugin/", + Run: loadPluginCommandFunc, + } + return r +} + +// NewUnloadPluginCommand return a unload subcommand of plugin command +func NewUnloadPluginCommand() *cobra.Command { + r := &cobra.Command{ + Use: "unload ", + Short: "unload a plugin, path must begin with ./pd/plugin/", + Run: unloadPluginCommandFunc, + } + return r +} + +func loadPluginCommandFunc(cmd *cobra.Command, args []string) { + sendPluginCommand(cmd, cluster.PluginLoad, args) +} + +func unloadPluginCommandFunc(cmd *cobra.Command, args []string) { + sendPluginCommand(cmd, cluster.PluginUnload, args) +} + +func sendPluginCommand(cmd *cobra.Command, action string, args []string) { + if len(args) != 1 { + cmd.Println(cmd.Usage()) + return + } + data := map[string]interface{}{ + "plugin-path": args[0], + } + reqData, err := json.Marshal(data) + if err != nil { + cmd.Println(err) + return + } + switch action { + case cluster.PluginLoad: + _, err = doRequest(cmd, pluginPrefix, http.MethodPost, WithBody("application/json", bytes.NewBuffer(reqData))) + case cluster.PluginUnload: + _, err = doRequest(cmd, pluginPrefix, http.MethodDelete, WithBody("application/json", bytes.NewBuffer(reqData))) + default: + cmd.Printf("Unknown action %s\n", action) + return + } + if err != nil { + cmd.Printf("Failed to %s plugin %s: %s\n", action, args[0], err) + return + } + cmd.Println("Success!") +} diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index 39d262689a13..6001f0361a1c 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -64,6 +64,7 @@ func Start(args []string) { command.NewClusterCommand(), command.NewHealthCommand(), command.NewLogCommand(), + command.NewPluginCommand(), ) rootCmd.SetArgs(args)