diff --git a/go.sum b/go.sum index 0140da7f349..8f35a008e45 100644 --- a/go.sum +++ b/go.sum @@ -196,6 +196,7 @@ github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pelletier/go-toml v1.3.0 h1:e5+lF2E4Y2WCIxBefVowBuB0iHrUH4HZ8q+6mGF7fJc= github.com/pelletier/go-toml v1.3.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d h1:U+PMnTlV2tu7RuMK5etusZG3Cf+rpow5hqQByeCzJ2g= github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d/go.mod h1:lXfE4PvvTW5xOjO6Mba8zDPyw8M93B6AQ7frTGnMlA8= @@ -333,7 +334,6 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181005035420-146acd28ed58 h1:otZG8yDCO4LVps5+9bxOeNiCvgmOyt96J3roHTYs7oE= golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -404,7 +404,6 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI= google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= diff --git a/pkg/typeutil/duration.go b/pkg/typeutil/duration.go index c29c8de8ed6..f69aca02196 100644 --- a/pkg/typeutil/duration.go +++ b/pkg/typeutil/duration.go @@ -56,3 +56,8 @@ func (d *Duration) UnmarshalText(text []byte) error { d.Duration, err = time.ParseDuration(string(text)) return errors.WithStack(err) } + +// MarshalText returns the duration as a JSON string. +func (d Duration) MarshalText() ([]byte, error) { + return []byte(d.String()), nil +} diff --git a/pkg/typeutil/string_slice.go b/pkg/typeutil/string_slice.go index 8cf6816bf41..6b23aeddf89 100644 --- a/pkg/typeutil/string_slice.go +++ b/pkg/typeutil/string_slice.go @@ -35,7 +35,7 @@ func (s *StringSlice) UnmarshalJSON(text []byte) error { return errors.WithStack(err) } if len(data) == 0 { - *s = nil + *s = []string{} return nil } *s = strings.Split(data, ",") diff --git a/pkg/typeutil/string_slice_test.go b/pkg/typeutil/string_slice_test.go index ce8e8ddbe3f..9e3f8c7bd98 100644 --- a/pkg/typeutil/string_slice_test.go +++ b/pkg/typeutil/string_slice_test.go @@ -37,7 +37,7 @@ func (s *testStringSliceSuite) TestJSON(c *C) { } func (s *testStringSliceSuite) TestEmpty(c *C) { - var ss StringSlice + ss := StringSlice([]string{}) b, err := json.Marshal(ss) c.Assert(err, IsNil) c.Assert(string(b), Equals, "\"\"") diff --git a/server/api/config.go b/server/api/config.go index 116f4d8f7c7..91ed7fe3a29 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -21,12 +21,16 @@ import ( "reflect" "strings" + "github.com/BurntSushi/toml" "github.com/pingcap/errcode" + "github.com/pingcap/kvproto/pkg/configpb" + "github.com/pingcap/log" "github.com/pingcap/pd/pkg/apiutil" "github.com/pingcap/pd/server" "github.com/pingcap/pd/server/config" "github.com/pkg/errors" "github.com/unrolled/render" + "go.uber.org/zap" ) type confHandler struct { @@ -46,6 +50,27 @@ func (h *confHandler) Get(w http.ResponseWriter, r *http.Request) { } func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) { + if h.svr.GetConfig().EnableConfigManager { + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + cm := h.svr.GetConfigManager() + m := make(map[string]interface{}) + json.NewDecoder(r.Body).Decode(&m) + entries, err := transToEntries(m) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + err = redirectUpdateReq(h.svr.Context(), client, cm, entries) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) + return + } config := h.svr.GetConfig() data, err := ioutil.ReadAll(r.Body) r.Body.Close() @@ -139,6 +164,27 @@ func (h *confHandler) GetSchedule(w http.ResponseWriter, r *http.Request) { } func (h *confHandler) SetSchedule(w http.ResponseWriter, r *http.Request) { + if h.svr.GetConfig().EnableConfigManager { + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + cm := h.svr.GetConfigManager() + m := make(map[string]interface{}) + json.NewDecoder(r.Body).Decode(&m) + entries, err := transToEntries(m) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + err = redirectUpdateReq(h.svr.Context(), client, cm, entries) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) + return + } config := h.svr.GetScheduleConfig() if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &config); err != nil { return @@ -156,6 +202,27 @@ func (h *confHandler) GetReplication(w http.ResponseWriter, r *http.Request) { } func (h *confHandler) SetReplication(w http.ResponseWriter, r *http.Request) { + if h.svr.GetConfig().EnableConfigManager { + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + cm := h.svr.GetConfigManager() + m := make(map[string]interface{}) + json.NewDecoder(r.Body).Decode(&m) + entries, err := transToEntries(m) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + err = redirectUpdateReq(h.svr.Context(), client, cm, entries) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) + return + } config := h.svr.GetReplicationConfig() if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &config); err != nil { return @@ -177,6 +244,53 @@ func (h *confHandler) SetLabelProperty(w http.ResponseWriter, r *http.Request) { if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { return } + + if h.svr.GetConfig().EnableConfigManager { + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + cm := h.svr.GetConfigManager() + typ := input["type"] + labelKey, labelValue := input["label-key"], input["label-value"] + cfg := h.svr.GetScheduleOption().LoadLabelPropertyConfig().Clone() + switch input["action"] { + case "set": + for _, l := range cfg[typ] { + if l.Key == labelKey && l.Value == labelValue { + return + } + } + cfg[typ] = append(cfg[typ], config.StoreLabel{Key: labelKey, Value: labelValue}) + case "delete": + oldLabels := cfg[typ] + cfg[typ] = []config.StoreLabel{} + for _, l := range oldLabels { + if l.Key == labelKey && l.Value == labelValue { + continue + } + cfg[typ] = append(cfg[typ], l) + } + if len(cfg[typ]) == 0 { + delete(cfg, typ) + } + default: + err := errors.Errorf("unknown action %v", input["action"]) + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + var buf bytes.Buffer + toml.NewEncoder(&buf).Encode(cfg) + entries := []*entry{{key: "label-property", value: buf.String()}} + err := redirectUpdateReq(h.svr.Context(), client, cm, entries) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) + return + } + var err error switch input["action"] { case "set": @@ -207,6 +321,23 @@ func (h *confHandler) SetClusterVersion(w http.ResponseWriter, r *http.Request) apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(errors.New("not set cluster-version"))) return } + + if h.svr.GetConfig().EnableConfigManager { + kind := &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: server.Component}}} + v := &configpb.Version{Global: h.svr.GetConfigManager().GlobalCfgs[server.Component].GetVersion()} + entry := &configpb.ConfigEntry{Name: "cluster-version", Value: version} + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + _, _, err := h.svr.GetConfigClient().Update(h.svr.Context(), v, kind, []*configpb.ConfigEntry{entry}) + if err != nil { + log.Error("update cluster version meet error", zap.Error(err)) + } + h.rd.JSON(w, http.StatusOK, nil) + return + } + err := h.svr.SetClusterVersion(version) if err != nil { apiutil.ErrorResp(h.rd, w, errcode.NewInternalErr(err)) diff --git a/server/api/config_test.go b/server/api/config_test.go index d2c456d9118..cbd0448ff47 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -32,11 +32,14 @@ type testConfigSuite struct { } func (s *testConfigSuite) SetUpSuite(c *C) { - s.svr, s.cleanup = mustNewServer(c) + server.ConfigCheckInterval = 10 * time.Millisecond + s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.EnableConfigManager = true }) mustWaitLeader(c, []*server.Server{s.svr}) addr := s.svr.GetAddr() s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) + // make sure the config client is initialized + time.Sleep(20 * time.Millisecond) } func (s *testConfigSuite) TearDownSuite(c *C) { @@ -71,6 +74,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) { err = postJSON(addr, postData) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) newCfg := &config.Config{} err = readJSON(addr, newCfg) c.Assert(err, IsNil) @@ -92,6 +96,7 @@ func (s *testConfigSuite) TestConfigSchedule(c *C) { err = postJSON(addr, postData) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) sc1 := &config.ScheduleConfig{} c.Assert(readJSON(addr, sc1), IsNil) c.Assert(*sc, DeepEquals, *sc1) @@ -118,6 +123,7 @@ func (s *testConfigSuite) TestConfigReplication(c *C) { err = postJSON(addr, postData) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) rc3 := &config.ReplicationConfig{} err = readJSON(addr, rc3) c.Assert(err, IsNil) @@ -146,7 +152,9 @@ func (s *testConfigSuite) TestConfigLabelProperty(c *C) { for _, cmd := range cmds { err := postJSON(addr, []byte(cmd)) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) } + cfg = loadProperties() c.Assert(cfg, HasLen, 2) c.Assert(cfg["foo"], DeepEquals, []config.StoreLabel{ @@ -162,7 +170,9 @@ func (s *testConfigSuite) TestConfigLabelProperty(c *C) { for _, cmd := range cmds { err := postJSON(addr, []byte(cmd)) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) } + cfg = loadProperties() c.Assert(cfg, HasLen, 1) c.Assert(cfg["foo"], DeepEquals, []config.StoreLabel{{Key: "zone", Value: "cn2"}}) diff --git a/server/api/label_test.go b/server/api/label_test.go index 203f5c8f00e..8a66cc9ca2d 100644 --- a/server/api/label_test.go +++ b/server/api/label_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "strings" + "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -107,7 +108,11 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) { }, } - s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.Replication.StrictlyMatchLabel = false }) + server.ConfigCheckInterval = 10 * time.Millisecond + s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { + cfg.Replication.StrictlyMatchLabel = false + cfg.EnableConfigManager = true + }) mustWaitLeader(c, []*server.Server{s.svr}) addr := s.svr.GetAddr() @@ -117,6 +122,8 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) { for _, store := range s.stores { mustPutStore(c, s.svr, store.Id, store.State, store.Labels) } + // make sure the config client is initialized + time.Sleep(20 * time.Millisecond) } func (s *testLabelsStoreSuite) TearDownSuite(c *C) { @@ -183,9 +190,11 @@ type testStrictlyLabelsStoreSuite struct { } func (s *testStrictlyLabelsStoreSuite) SetUpSuite(c *C) { + server.ConfigCheckInterval = 10 * time.Millisecond s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.Replication.LocationLabels = []string{"zone", "disk"} cfg.Replication.StrictlyMatchLabel = true + cfg.EnableConfigManager = true }) mustWaitLeader(c, []*server.Server{s.svr}) @@ -193,6 +202,8 @@ func (s *testStrictlyLabelsStoreSuite) SetUpSuite(c *C) { s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) mustBootstrapCluster(c, s.svr) + // make sure the config client is initialized + time.Sleep(20 * time.Millisecond) } func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { @@ -277,6 +288,7 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { // enable placement rules. Report no error any more. c.Assert(postJSON(fmt.Sprintf("%s/config", s.urlPrefix), []byte(`{"enable-placement-rules":"true"}`)), IsNil) + time.Sleep(20 * time.Millisecond) for _, t := range cases { _, err := s.svr.PutStore(context.Background(), &pdpb.PutStoreRequest{ Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()}, diff --git a/server/api/log.go b/server/api/log.go index a2053589e0a..e5a2094738c 100644 --- a/server/api/log.go +++ b/server/api/log.go @@ -15,6 +15,7 @@ package api import ( "encoding/json" + "fmt" "io/ioutil" "net/http" @@ -37,6 +38,23 @@ func newlogHandler(svr *server.Server, rd *render.Render) *logHandler { } func (h *logHandler) Handle(w http.ResponseWriter, r *http.Request) { + if h.svr.GetConfig().EnableConfigManager { + client := h.svr.GetConfigClient() + if client == nil { + h.rd.JSON(w, http.StatusServiceUnavailable, "no leader") + } + cm := h.svr.GetConfigManager() + var str string + json.NewDecoder(r.Body).Decode(&str) + entries := []*entry{{key: "log.level", value: fmt.Sprintf("level = \"%v\"", str)}} + err := redirectUpdateReq(h.svr.Context(), client, cm, entries) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, nil) + return + } var level string data, err := ioutil.ReadAll(r.Body) r.Body.Close() diff --git a/server/api/middleware.go b/server/api/middleware.go index 861f20c9fb7..a774e7f1517 100644 --- a/server/api/middleware.go +++ b/server/api/middleware.go @@ -14,10 +14,16 @@ package api import ( + "bytes" "net/http" + "reflect" + "strings" + "github.com/BurntSushi/toml" "github.com/pingcap/pd/server" "github.com/pingcap/pd/server/cluster" + "github.com/pingcap/pd/server/config" + "github.com/pkg/errors" "github.com/unrolled/render" ) @@ -44,3 +50,52 @@ func (m clusterMiddleware) Middleware(h http.Handler) http.Handler { h.ServeHTTP(w, r.WithContext(ctx)) }) } + +type entry struct { + key string + value string +} + +func transToEntries(req map[string]interface{}) ([]*entry, error) { + mapKeys := reflect.ValueOf(req).MapKeys() + var entries []*entry + for _, k := range mapKeys { + if config.IsDeprecated(k.String()) { + return nil, errors.New("config item has already been deprecated") + } + itemMap := make(map[string]interface{}) + itemMap[k.String()] = req[k.String()] + var buf bytes.Buffer + if err := toml.NewEncoder(&buf).Encode(itemMap); err != nil { + return nil, err + } + value := buf.String() + key := findTag(reflect.TypeOf(&config.Config{}).Elem(), k.String()) + if key == "" { + return nil, errors.New("config item not found") + } + entries = append(entries, &entry{key, value}) + } + return entries, nil +} + +func findTag(t reflect.Type, tag string) string { + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + + column := field.Tag.Get("json") + c := strings.Split(column, ",") + if c[0] == tag { + return c[0] + } + + if field.Type.Kind() == reflect.Struct { + path := findTag(field.Type, tag) + if path == "" { + continue + } + return field.Tag.Get("json") + "." + path + } + } + return "" +} diff --git a/server/api/router.go b/server/api/router.go index e6b9d5ff69a..32e1a32201e 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -40,144 +40,146 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { rootRouter := mux.NewRouter().PathPrefix(prefix).Subrouter() handler := svr.GetHandler() - clusterRouter := rootRouter.NewRoute().Subrouter() + apiRouter := rootRouter.PathPrefix("/api/v1").Subrouter() + + clusterRouter := apiRouter.NewRoute().Subrouter() clusterRouter.Use(newClusterMiddleware(svr).Middleware) operatorHandler := newOperatorHandler(handler, rd) - rootRouter.HandleFunc("/api/v1/operators", operatorHandler.List).Methods("GET") - rootRouter.HandleFunc("/api/v1/operators", operatorHandler.Post).Methods("POST") - rootRouter.HandleFunc("/api/v1/operators/{region_id}", operatorHandler.Get).Methods("GET") - rootRouter.HandleFunc("/api/v1/operators/{region_id}", operatorHandler.Delete).Methods("DELETE") + apiRouter.HandleFunc("/operators", operatorHandler.List).Methods("GET") + apiRouter.HandleFunc("/operators", operatorHandler.Post).Methods("POST") + apiRouter.HandleFunc("/operators/{region_id}", operatorHandler.Get).Methods("GET") + apiRouter.HandleFunc("/operators/{region_id}", operatorHandler.Delete).Methods("DELETE") schedulerHandler := newSchedulerHandler(handler, rd) - rootRouter.HandleFunc("/api/v1/schedulers", schedulerHandler.List).Methods("GET") - rootRouter.HandleFunc("/api/v1/schedulers", schedulerHandler.Post).Methods("POST") - rootRouter.HandleFunc("/api/v1/schedulers/{name}", schedulerHandler.Delete).Methods("DELETE") - rootRouter.HandleFunc("/api/v1/schedulers/{name}", schedulerHandler.PauseOrResume).Methods("POST") + apiRouter.HandleFunc("/schedulers", schedulerHandler.List).Methods("GET") + apiRouter.HandleFunc("/schedulers", schedulerHandler.Post).Methods("POST") + apiRouter.HandleFunc("/schedulers/{name}", schedulerHandler.Delete).Methods("DELETE") + apiRouter.HandleFunc("/schedulers/{name}", schedulerHandler.PauseOrResume).Methods("POST") schedulerConfigHandler := newSchedulerConfigHandler(svr, rd) rootRouter.PathPrefix(server.SchedulerConfigHandlerPath).Handler(schedulerConfigHandler) clusterHandler := newClusterHandler(svr, rd) - rootRouter.Handle("/api/v1/cluster", clusterHandler).Methods("GET") - rootRouter.HandleFunc("/api/v1/cluster/status", clusterHandler.GetClusterStatus).Methods("GET") + apiRouter.Handle("/cluster", clusterHandler).Methods("GET") + apiRouter.HandleFunc("/cluster/status", clusterHandler.GetClusterStatus).Methods("GET") confHandler := newConfHandler(svr, rd) - rootRouter.HandleFunc("/api/v1/config", confHandler.Get).Methods("GET") - rootRouter.HandleFunc("/api/v1/config", confHandler.Post).Methods("POST") - rootRouter.HandleFunc("/api/v1/config/schedule", confHandler.SetSchedule).Methods("POST") - rootRouter.HandleFunc("/api/v1/config/schedule", confHandler.GetSchedule).Methods("GET") - rootRouter.HandleFunc("/api/v1/config/replicate", confHandler.SetReplication).Methods("POST") - rootRouter.HandleFunc("/api/v1/config/replicate", confHandler.GetReplication).Methods("GET") - rootRouter.HandleFunc("/api/v1/config/label-property", confHandler.GetLabelProperty).Methods("GET") - rootRouter.HandleFunc("/api/v1/config/label-property", confHandler.SetLabelProperty).Methods("POST") - rootRouter.HandleFunc("/api/v1/config/cluster-version", confHandler.GetClusterVersion).Methods("GET") - rootRouter.HandleFunc("/api/v1/config/cluster-version", confHandler.SetClusterVersion).Methods("POST") + apiRouter.HandleFunc("/config", confHandler.Get).Methods("GET") + apiRouter.HandleFunc("/config", confHandler.Post).Methods("POST") + apiRouter.HandleFunc("/config/schedule", confHandler.GetSchedule).Methods("GET") + apiRouter.HandleFunc("/config/schedule", confHandler.SetSchedule).Methods("POST") + apiRouter.HandleFunc("/config/replicate", confHandler.GetReplication).Methods("GET") + apiRouter.HandleFunc("/config/replicate", confHandler.SetReplication).Methods("POST") + apiRouter.HandleFunc("/config/label-property", confHandler.GetLabelProperty).Methods("GET") + apiRouter.HandleFunc("/config/label-property", confHandler.SetLabelProperty).Methods("POST") + apiRouter.HandleFunc("/config/cluster-version", confHandler.GetClusterVersion).Methods("GET") + apiRouter.HandleFunc("/config/cluster-version", confHandler.SetClusterVersion).Methods("POST") rulesHandler := newRulesHandler(svr, rd) - clusterRouter.HandleFunc("/api/v1/config/rules", rulesHandler.GetAll).Methods("GET") - clusterRouter.HandleFunc("/api/v1/config/rules/group/{group}", rulesHandler.GetAllByGroup).Methods("GET") - clusterRouter.HandleFunc("/api/v1/config/rules/region/{region}", rulesHandler.GetAllByRegion).Methods("GET") - clusterRouter.HandleFunc("/api/v1/config/rules/key/{key}", rulesHandler.GetAllByKey).Methods("GET") - clusterRouter.HandleFunc("/api/v1/config/rule/{group}/{id}", rulesHandler.Get).Methods("GET") - clusterRouter.HandleFunc("/api/v1/config/rule", rulesHandler.Set).Methods("POST") - clusterRouter.HandleFunc("/api/v1/config/rule/{group}/{id}", rulesHandler.Delete).Methods("DELETE") + clusterRouter.HandleFunc("/config/rules", rulesHandler.GetAll).Methods("GET") + clusterRouter.HandleFunc("/config/rules/group/{group}", rulesHandler.GetAllByGroup).Methods("GET") + clusterRouter.HandleFunc("/config/rules/region/{region}", rulesHandler.GetAllByRegion).Methods("GET") + clusterRouter.HandleFunc("/config/rules/key/{key}", rulesHandler.GetAllByKey).Methods("GET") + clusterRouter.HandleFunc("/config/rule/{group}/{id}", rulesHandler.Get).Methods("GET") + clusterRouter.HandleFunc("/config/rule", rulesHandler.Set).Methods("POST") + clusterRouter.HandleFunc("/config/rule/{group}/{id}", rulesHandler.Delete).Methods("DELETE") storeHandler := newStoreHandler(handler, rd) - clusterRouter.HandleFunc("/api/v1/store/{id}", storeHandler.Get).Methods("GET") - clusterRouter.HandleFunc("/api/v1/store/{id}", storeHandler.Delete).Methods("DELETE") - clusterRouter.HandleFunc("/api/v1/store/{id}/state", storeHandler.SetState).Methods("POST") - clusterRouter.HandleFunc("/api/v1/store/{id}/label", storeHandler.SetLabels).Methods("POST") - clusterRouter.HandleFunc("/api/v1/store/{id}/weight", storeHandler.SetWeight).Methods("POST") - clusterRouter.HandleFunc("/api/v1/store/{id}/limit", storeHandler.SetLimit).Methods("POST") + clusterRouter.HandleFunc("/store/{id}", storeHandler.Get).Methods("GET") + clusterRouter.HandleFunc("/store/{id}", storeHandler.Delete).Methods("DELETE") + clusterRouter.HandleFunc("/store/{id}/state", storeHandler.SetState).Methods("POST") + clusterRouter.HandleFunc("/store/{id}/label", storeHandler.SetLabels).Methods("POST") + clusterRouter.HandleFunc("/store/{id}/weight", storeHandler.SetWeight).Methods("POST") + clusterRouter.HandleFunc("/store/{id}/limit", storeHandler.SetLimit).Methods("POST") storesHandler := newStoresHandler(handler, rd) - clusterRouter.Handle("/api/v1/stores", storesHandler).Methods("GET") - clusterRouter.HandleFunc("/api/v1/stores/remove-tombstone", storesHandler.RemoveTombStone).Methods("DELETE") - clusterRouter.HandleFunc("/api/v1/stores/limit", storesHandler.GetAllLimit).Methods("GET") - clusterRouter.HandleFunc("/api/v1/stores/limit", storesHandler.SetAllLimit).Methods("POST") - clusterRouter.HandleFunc("/api/v1/stores/limit/scene", storesHandler.SetStoreLimitScene).Methods("POST") - clusterRouter.HandleFunc("/api/v1/stores/limit/scene", storesHandler.GetStoreLimitScene).Methods("GET") + clusterRouter.Handle("/stores", storesHandler).Methods("GET") + clusterRouter.HandleFunc("/stores/remove-tombstone", storesHandler.RemoveTombStone).Methods("DELETE") + clusterRouter.HandleFunc("/stores/limit", storesHandler.GetAllLimit).Methods("GET") + clusterRouter.HandleFunc("/stores/limit", storesHandler.SetAllLimit).Methods("POST") + clusterRouter.HandleFunc("/stores/limit/scene", storesHandler.SetStoreLimitScene).Methods("POST") + clusterRouter.HandleFunc("/stores/limit/scene", storesHandler.GetStoreLimitScene).Methods("GET") labelsHandler := newLabelsHandler(svr, rd) - clusterRouter.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET") - clusterRouter.HandleFunc("/api/v1/labels/stores", labelsHandler.GetStores).Methods("GET") + clusterRouter.HandleFunc("/labels", labelsHandler.Get).Methods("GET") + clusterRouter.HandleFunc("/labels/stores", labelsHandler.GetStores).Methods("GET") hotStatusHandler := newHotStatusHandler(handler, rd) - rootRouter.HandleFunc("/api/v1/hotspot/regions/write", hotStatusHandler.GetHotWriteRegions).Methods("GET") - rootRouter.HandleFunc("/api/v1/hotspot/regions/read", hotStatusHandler.GetHotReadRegions).Methods("GET") - rootRouter.HandleFunc("/api/v1/hotspot/stores", hotStatusHandler.GetHotStores).Methods("GET") + apiRouter.HandleFunc("/hotspot/regions/write", hotStatusHandler.GetHotWriteRegions).Methods("GET") + apiRouter.HandleFunc("/hotspot/regions/read", hotStatusHandler.GetHotReadRegions).Methods("GET") + apiRouter.HandleFunc("/hotspot/stores", hotStatusHandler.GetHotStores).Methods("GET") regionHandler := newRegionHandler(svr, rd) - clusterRouter.HandleFunc("/api/v1/region/id/{id}", regionHandler.GetRegionByID).Methods("GET") - clusterRouter.HandleFunc("/api/v1/region/key/{key}", regionHandler.GetRegionByKey).Methods("GET") + clusterRouter.HandleFunc("/region/id/{id}", regionHandler.GetRegionByID).Methods("GET") + clusterRouter.HandleFunc("/region/key/{key}", regionHandler.GetRegionByKey).Methods("GET") srd := createStreamingRender() regionsAllHandler := newRegionsHandler(svr, srd) - clusterRouter.HandleFunc("/api/v1/regions", regionsAllHandler.GetAll).Methods("GET") + clusterRouter.HandleFunc("/regions", regionsAllHandler.GetAll).Methods("GET") regionsHandler := newRegionsHandler(svr, rd) - clusterRouter.HandleFunc("/api/v1/regions/key", regionsHandler.ScanRegions).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/count", regionsHandler.GetRegionCount).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/store/{id}", regionsHandler.GetStoreRegions).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/writeflow", regionsHandler.GetTopWriteFlow).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/readflow", regionsHandler.GetTopReadFlow).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/confver", regionsHandler.GetTopConfVer).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/version", regionsHandler.GetTopVersion).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/size", regionsHandler.GetTopSize).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/check/miss-peer", regionsHandler.GetMissPeerRegions).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/check/extra-peer", regionsHandler.GetExtraPeerRegions).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/check/pending-peer", regionsHandler.GetPendingPeerRegions).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/check/down-peer", regionsHandler.GetDownPeerRegions).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/check/offline-peer", regionsHandler.GetOfflinePeer).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/check/empty-region", regionsHandler.GetEmptyRegion).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/check/hist-size", regionsHandler.GetSizeHistogram).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/check/hist-keys", regionsHandler.GetKeysHistogram).Methods("GET") - clusterRouter.HandleFunc("/api/v1/regions/sibling/{id}", regionsHandler.GetRegionSiblings).Methods("GET") - - rootRouter.Handle("/api/v1/version", newVersionHandler(rd)).Methods("GET") - rootRouter.Handle("/api/v1/status", newStatusHandler(rd)).Methods("GET") + clusterRouter.HandleFunc("/regions/key", regionsHandler.ScanRegions).Methods("GET") + clusterRouter.HandleFunc("/regions/count", regionsHandler.GetRegionCount).Methods("GET") + clusterRouter.HandleFunc("/regions/store/{id}", regionsHandler.GetStoreRegions).Methods("GET") + clusterRouter.HandleFunc("/regions/writeflow", regionsHandler.GetTopWriteFlow).Methods("GET") + clusterRouter.HandleFunc("/regions/readflow", regionsHandler.GetTopReadFlow).Methods("GET") + clusterRouter.HandleFunc("/regions/confver", regionsHandler.GetTopConfVer).Methods("GET") + clusterRouter.HandleFunc("/regions/version", regionsHandler.GetTopVersion).Methods("GET") + clusterRouter.HandleFunc("/regions/size", regionsHandler.GetTopSize).Methods("GET") + clusterRouter.HandleFunc("/regions/check/miss-peer", regionsHandler.GetMissPeerRegions).Methods("GET") + clusterRouter.HandleFunc("/regions/check/extra-peer", regionsHandler.GetExtraPeerRegions).Methods("GET") + clusterRouter.HandleFunc("/regions/check/pending-peer", regionsHandler.GetPendingPeerRegions).Methods("GET") + clusterRouter.HandleFunc("/regions/check/down-peer", regionsHandler.GetDownPeerRegions).Methods("GET") + clusterRouter.HandleFunc("/regions/check/offline-peer", regionsHandler.GetOfflinePeer).Methods("GET") + clusterRouter.HandleFunc("/regions/check/empty-region", regionsHandler.GetEmptyRegion).Methods("GET") + clusterRouter.HandleFunc("/regions/check/hist-size", regionsHandler.GetSizeHistogram).Methods("GET") + clusterRouter.HandleFunc("/regions/check/hist-keys", regionsHandler.GetKeysHistogram).Methods("GET") + clusterRouter.HandleFunc("/regions/sibling/{id}", regionsHandler.GetRegionSiblings).Methods("GET") + + apiRouter.Handle("/version", newVersionHandler(rd)).Methods("GET") + apiRouter.Handle("/status", newStatusHandler(rd)).Methods("GET") memberHandler := newMemberHandler(svr, rd) - rootRouter.HandleFunc("/api/v1/members", memberHandler.ListMembers).Methods("GET") - rootRouter.HandleFunc("/api/v1/members/name/{name}", memberHandler.DeleteByName).Methods("DELETE") - rootRouter.HandleFunc("/api/v1/members/id/{id}", memberHandler.DeleteByID).Methods("DELETE") - rootRouter.HandleFunc("/api/v1/members/name/{name}", memberHandler.SetMemberPropertyByName).Methods("POST") + apiRouter.HandleFunc("/members", memberHandler.ListMembers).Methods("GET") + apiRouter.HandleFunc("/members/name/{name}", memberHandler.DeleteByName).Methods("DELETE") + apiRouter.HandleFunc("/members/id/{id}", memberHandler.DeleteByID).Methods("DELETE") + apiRouter.HandleFunc("/members/name/{name}", memberHandler.SetMemberPropertyByName).Methods("POST") leaderHandler := newLeaderHandler(svr, rd) - rootRouter.HandleFunc("/api/v1/leader", leaderHandler.Get).Methods("GET") - rootRouter.HandleFunc("/api/v1/leader/resign", leaderHandler.Resign).Methods("POST") - rootRouter.HandleFunc("/api/v1/leader/transfer/{next_leader}", leaderHandler.Transfer).Methods("POST") + apiRouter.HandleFunc("/leader", leaderHandler.Get).Methods("GET") + apiRouter.HandleFunc("/leader/resign", leaderHandler.Resign).Methods("POST") + apiRouter.HandleFunc("/leader/transfer/{next_leader}", leaderHandler.Transfer).Methods("POST") statsHandler := newStatsHandler(svr, rd) - clusterRouter.HandleFunc("/api/v1/stats/region", statsHandler.Region).Methods("GET") + clusterRouter.HandleFunc("/stats/region", statsHandler.Region).Methods("GET") trendHandler := newTrendHandler(svr, rd) - rootRouter.HandleFunc("/api/v1/trend", trendHandler.Handle).Methods("GET") + apiRouter.HandleFunc("/trend", trendHandler.Handle).Methods("GET") adminHandler := newAdminHandler(svr, rd) - clusterRouter.HandleFunc("/api/v1/admin/cache/region/{id}", adminHandler.HandleDropCacheRegion).Methods("DELETE") - clusterRouter.HandleFunc("/api/v1/admin/reset-ts", adminHandler.ResetTS).Methods("POST") + clusterRouter.HandleFunc("/admin/cache/region/{id}", adminHandler.HandleDropCacheRegion).Methods("DELETE") + clusterRouter.HandleFunc("/admin/reset-ts", adminHandler.ResetTS).Methods("POST") logHandler := newlogHandler(svr, rd) - rootRouter.HandleFunc("/api/v1/admin/log", logHandler.Handle).Methods("POST") + apiRouter.HandleFunc("/admin/log", logHandler.Handle).Methods("POST") pluginHandler := newPluginHandler(handler, rd) - rootRouter.HandleFunc("/api/v1/plugin", pluginHandler.LoadPlugin).Methods("POST") - rootRouter.HandleFunc("/api/v1/plugin", pluginHandler.UnloadPlugin).Methods("DELETE") + apiRouter.HandleFunc("/plugin", pluginHandler.LoadPlugin).Methods("POST") + apiRouter.HandleFunc("/plugin", pluginHandler.UnloadPlugin).Methods("DELETE") - rootRouter.Handle("/api/v1/health", newHealthHandler(svr, rd)).Methods("GET") - rootRouter.Handle("/api/v1/diagnose", newDiagnoseHandler(svr, rd)).Methods("GET") - rootRouter.HandleFunc("/api/v1/ping", func(w http.ResponseWriter, r *http.Request) {}).Methods("GET") + apiRouter.Handle("/health", newHealthHandler(svr, rd)).Methods("GET") + apiRouter.Handle("/diagnose", newDiagnoseHandler(svr, rd)).Methods("GET") + apiRouter.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {}).Methods("GET") // metric query use to query metric data, the protocol is compatible with prometheus. - rootRouter.Handle("/api/v1/metric/query", newQueryMetric(svr)).Methods("GET", "POST") - rootRouter.Handle("/api/v1/metric/query_range", newQueryMetric(svr)).Methods("GET", "POST") + apiRouter.Handle("/metric/query", newQueryMetric(svr)).Methods("GET", "POST") + apiRouter.Handle("/metric/query_range", newQueryMetric(svr)).Methods("GET", "POST") // profile API - rootRouter.HandleFunc("/api/v1/debug/pprof/profile", pprof.Profile) - rootRouter.Handle("/api/v1/debug/pprof/heap", pprof.Handler("heap")) - rootRouter.Handle("/api/v1/debug/pprof/mutex", pprof.Handler("mutex")) - rootRouter.Handle("/api/v1/debug/pprof/allocs", pprof.Handler("allocs")) - rootRouter.Handle("/api/v1/debug/pprof/block", pprof.Handler("block")) - rootRouter.Handle("/api/v1/debug/pprof/goroutine", pprof.Handler("goroutine")) + apiRouter.HandleFunc("/debug/pprof/profile", pprof.Profile) + apiRouter.Handle("/debug/pprof/heap", pprof.Handler("heap")) + apiRouter.Handle("/debug/pprof/mutex", pprof.Handler("mutex")) + apiRouter.Handle("/debug/pprof/allocs", pprof.Handler("allocs")) + apiRouter.Handle("/debug/pprof/block", pprof.Handler("block")) + apiRouter.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) // Deprecated rootRouter.Handle("/health", newHealthHandler(svr, rd)).Methods("GET") diff --git a/server/api/store_test.go b/server/api/store_test.go index 88b715aeeac..24ae6f60f98 100644 --- a/server/api/store_test.go +++ b/server/api/store_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/server" + "github.com/pingcap/pd/server/config" "github.com/pingcap/pd/server/core" ) @@ -82,8 +83,8 @@ func (s *testStoreSuite) SetUpSuite(c *C) { Version: "2.0.0", }, } - - s.svr, s.cleanup = mustNewServer(c) + server.ConfigCheckInterval = 10 * time.Millisecond + s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.EnableConfigManager = true }) mustWaitLeader(c, []*server.Server{s.svr}) addr := s.svr.GetAddr() @@ -94,6 +95,8 @@ func (s *testStoreSuite) SetUpSuite(c *C) { for _, store := range s.stores { mustPutStore(c, s.svr, store.Id, store.State, nil) } + // make sure the config client is initialized + time.Sleep(20 * time.Millisecond) } func (s *testStoreSuite) TearDownSuite(c *C) { @@ -171,6 +174,7 @@ func (s *testStoreSuite) TestStoreLabel(c *C) { lc, _ := json.Marshal(labelCheck) err = postJSON(s.urlPrefix+"/config", lc) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) // Test set. labels := map[string]string{"zone": "cn", "host": "local"} b, err := json.Marshal(labels) @@ -181,6 +185,7 @@ func (s *testStoreSuite) TestStoreLabel(c *C) { ll, _ := json.Marshal(locationLabels) err = postJSON(s.urlPrefix+"/config", ll) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) err = postJSON(url+"/label", b) c.Assert(err, IsNil) @@ -198,6 +203,7 @@ func (s *testStoreSuite) TestStoreLabel(c *C) { err = postJSON(s.urlPrefix+"/config", lc) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) labels = map[string]string{"zack": "zack1", "Host": "host1"} b, err = json.Marshal(labels) c.Assert(err, IsNil) diff --git a/server/api/util.go b/server/api/util.go index 5c5fd6df8b2..2964b4a37a1 100644 --- a/server/api/util.go +++ b/server/api/util.go @@ -15,11 +15,16 @@ package api import ( "bytes" + "context" "encoding/json" "io/ioutil" "net/http" "net/url" + "github.com/pingcap/kvproto/pkg/configpb" + pd "github.com/pingcap/pd/client" + "github.com/pingcap/pd/server" + "github.com/pingcap/pd/server/config_manager" "github.com/pkg/errors" ) @@ -115,3 +120,21 @@ func doDelete(url string) (*http.Response, error) { res.Body.Close() return res, nil } + +func redirectUpdateReq(ctx context.Context, client pd.ConfigClient, cm *configmanager.ConfigManager, entries []*entry) error { + var configEntries []*configpb.ConfigEntry + for _, e := range entries { + configEntry := &configpb.ConfigEntry{Name: e.key, Value: e.value} + configEntries = append(configEntries, configEntry) + } + version := &configpb.Version{Global: cm.GlobalCfgs[server.Component].GetVersion()} + kind := &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: server.Component}}} + status, _, err := client.Update(ctx, version, kind, configEntries) + if err != nil { + return err + } + if status.GetCode() != configpb.StatusCode_OK { + return errors.New(status.GetMessage()) + } + return nil +} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3283c21b81d..656c1ea14b4 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1151,7 +1151,7 @@ func (c *RaftCluster) AllocID() (uint64, error) { } // OnStoreVersionChange changes the version of the cluster when needed. -func (c *RaftCluster) OnStoreVersionChange() { +func (c *RaftCluster) OnStoreVersionChange() *semver.Version { c.RLock() defer c.RUnlock() var ( @@ -1188,7 +1188,9 @@ func (c *RaftCluster) OnStoreVersionChange() { log.Info("cluster version changed", zap.Stringer("old-cluster-version", clusterVersion), zap.Stringer("new-cluster-version", minVersion)) + return minVersion } + return nil } func (c *RaftCluster) changedRegionNotifier() <-chan *core.RegionInfo { diff --git a/server/config/config.go b/server/config/config.go index 4b397f20c2f..1bc00f3b187 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -84,7 +84,7 @@ type Config struct { PDServerCfg PDServerConfig `toml:"pd-server" json:"pd-server"` - ClusterVersion semver.Version `json:"cluster-version"` + ClusterVersion semver.Version `toml:"cluster-version" json:"cluster-version"` // QuotaBackendBytes Raise alarms when backend size exceeds the given quota. 0 means use the default quota. // the default size is 2GB, the maximum is 8GB. @@ -207,7 +207,10 @@ const ( defaultDisableErrorVerbose = true ) -var defaultRuntimeServices = []string{} +var ( + defaultRuntimeServices = []string{} + defaultLocationLabels = []string{} +) func adjustString(v *string, defValue string) { if len(*v) == 0 { @@ -573,7 +576,7 @@ type ScheduleConfig struct { Schedulers SchedulerConfigs `toml:"schedulers" json:"schedulers-v2"` // json v2 is for the sake of compatible upgrade // Only used to display - SchedulersPayload map[string]string `json:"schedulers,omitempty"` + SchedulersPayload map[string]string `toml:"schedulers-payload" json:"schedulers-payload,omitempty"` // StoreLimitMode can be auto or manual, when set to auto, // PD tries to change the store limit values according to @@ -801,6 +804,24 @@ func (c *ScheduleConfig) Deprecated() error { return nil } +var deprecateConfigs = []string{ + "disable-remove-down-replica", + "disable-replace-offline-replica", + "disable-make-up-replica", + "disable-remove-extra-replica", + "disable-location-replacement", +} + +// IsDeprecated returns if a config is deprecated. +func IsDeprecated(config string) bool { + for _, t := range deprecateConfigs { + if t == config { + return true + } + } + return false +} + // SchedulerConfigs is a slice of customized scheduler configuration. type SchedulerConfigs []SchedulerConfig @@ -873,6 +894,9 @@ func (c *ReplicationConfig) adjust(meta *configMetaData) error { if !meta.IsDefined("strictly-match-label") { c.StrictlyMatchLabel = defaultStrictlyMatchLabel } + if !meta.IsDefined("location-labels") { + c.LocationLabels = defaultLocationLabels + } return c.Validate() } diff --git a/server/config_manager/config_manager.go b/server/config_manager/config_manager.go index aaabef58604..f9b47eb651c 100644 --- a/server/config_manager/config_manager.go +++ b/server/config_manager/config_manager.go @@ -16,10 +16,10 @@ package configmanager import ( "bytes" "fmt" - "reflect" "strconv" "strings" "sync" + "time" "github.com/BurntSushi/toml" "github.com/pingcap/kvproto/pkg/configpb" @@ -73,6 +73,14 @@ func NewConfigManager(svr Server) *ConfigManager { } } +// GetGlobalConfigs returns the global config for a given component. +func (c *ConfigManager) GetGlobalConfigs(component string) *GlobalConfig { + if _, ok := c.GlobalCfgs[component]; ok { + return c.GlobalCfgs[component] + } + return nil +} + // Persist saves the configuration to the storage. func (c *ConfigManager) Persist(storage *core.Storage) error { c.Lock() @@ -88,8 +96,8 @@ func (c *ConfigManager) Reload(storage *core.Storage) error { return err } -// getComponent returns the component from a given component ID. -func (c *ConfigManager) getComponent(id string) string { +// GetComponent returns the component from a given component ID. +func (c *ConfigManager) GetComponent(id string) string { for component, cfgs := range c.LocalCfgs { if _, ok := cfgs[id]; ok { return component @@ -124,7 +132,7 @@ func (c *ConfigManager) GetConfig(version *configpb.Version, component, componen Message: errEncode(err), } } - if versionEqual(cfg.getVersion(), version) { + if versionEqual(cfg.GetVersion(), version) { status = &configpb.Status{Code: configpb.StatusCode_OK} } else { status = &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} @@ -174,7 +182,7 @@ func (c *ConfigManager) CreateConfig(version *configpb.Version, component, compo globalCfg := c.GlobalCfgs[component] if globalCfg != nil { entries := globalCfg.GetConfigEntries() - if err := c.ApplyGlobalConifg(globalCfg, component, globalCfg.getVersion(), entries); err != nil { + if err := c.applyGlobalConifg(globalCfg, component, globalCfg.GetVersion(), entries); err != nil { return latestVersion, "", &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: err.Error()} } } @@ -190,8 +198,8 @@ func (c *ConfigManager) CreateConfig(version *configpb.Version, component, compo func (c *ConfigManager) getLatestVersion(component, componentID string) *configpb.Version { v := &configpb.Version{ - Global: c.GlobalCfgs[component].getVersion(), - Local: c.LocalCfgs[component][componentID].getVersion().GetLocal(), + Global: c.GlobalCfgs[component].GetVersion(), + Local: c.LocalCfgs[component][componentID].GetVersion().GetLocal(), } return v } @@ -208,7 +216,7 @@ func (c *ConfigManager) UpdateConfig(kind *configpb.ConfigKind, version *configp global := kind.GetGlobal() if global != nil { - return c.updateGlobal(global.GetComponent(), version, entries) + return c.UpdateGlobal(global.GetComponent(), version, entries) } local := kind.GetLocal() @@ -218,8 +226,8 @@ func (c *ConfigManager) UpdateConfig(kind *configpb.ConfigKind, version *configp return &configpb.Version{Global: 0, Local: 0}, &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: errUnknownKind(kind)} } -// ApplyGlobalConifg applies the global change to each local component. -func (c *ConfigManager) ApplyGlobalConifg(globalCfg *GlobalConfig, component string, newGlobalVersion uint64, entries []*configpb.ConfigEntry) error { +// applyGlobalConifg applies the global change to each local component. +func (c *ConfigManager) applyGlobalConifg(globalCfg *GlobalConfig, component string, newGlobalVersion uint64, entries []*configpb.ConfigEntry) error { // get the global config updateEntries := make(map[string]*EntryValue) for _, entry := range entries { @@ -230,7 +238,6 @@ func (c *ConfigManager) ApplyGlobalConifg(globalCfg *GlobalConfig, component str for k, v := range globalUpdateEntries { updateEntries[k] = v } - // update all local config // merge the global config with each local config and update it for _, LocalCfg := range c.LocalCfgs[component] { @@ -246,15 +253,16 @@ func (c *ConfigManager) ApplyGlobalConifg(globalCfg *GlobalConfig, component str return nil } -func (c *ConfigManager) updateGlobal(component string, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { +// UpdateGlobal is used to update the global config. +func (c *ConfigManager) UpdateGlobal(component string, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { // if the global config of the component is existed. if globalCfg, ok := c.GlobalCfgs[component]; ok { - globalLatestVersion := globalCfg.getVersion() + globalLatestVersion := globalCfg.GetVersion() if globalLatestVersion != version.GetGlobal() { return &configpb.Version{Global: globalLatestVersion, Local: version.GetLocal()}, &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} } - if err := c.ApplyGlobalConifg(globalCfg, component, version.GetGlobal()+1, entries); err != nil { + if err := c.applyGlobalConifg(globalCfg, component, version.GetGlobal()+1, entries); err != nil { return &configpb.Version{Global: globalLatestVersion, Local: version.GetLocal()}, &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: err.Error()} } @@ -268,12 +276,12 @@ func (c *ConfigManager) updateGlobal(component string, version *configpb.Version globalCfg := NewGlobalConfig(entries, &configpb.Version{Global: 0, Local: 0}) c.GlobalCfgs[component] = globalCfg - if err := c.ApplyGlobalConifg(globalCfg, component, 1, entries); err != nil { + if err := c.applyGlobalConifg(globalCfg, component, 1, entries); err != nil { return &configpb.Version{Global: 0, Local: version.GetLocal()}, &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: err.Error()} } } - return &configpb.Version{Global: c.GlobalCfgs[component].getVersion(), Local: 0}, &configpb.Status{Code: configpb.StatusCode_OK} + return &configpb.Version{Global: c.GlobalCfgs[component].GetVersion(), Local: 0}, &configpb.Status{Code: configpb.StatusCode_OK} } func mergeAndUpdateConfig(localCfg *LocalConfig, updateEntries map[string]*EntryValue) (string, error) { @@ -305,7 +313,7 @@ func mergeAndUpdateConfig(localCfg *LocalConfig, updateEntries map[string]*Entry } func (c *ConfigManager) updateLocal(componentID string, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { - component := c.getComponent(componentID) + component := c.GetComponent(componentID) if component == "" { return &configpb.Version{Global: 0, Local: 0}, &configpb.Status{Code: configpb.StatusCode_COMPONENT_NOT_FOUND} } @@ -317,7 +325,7 @@ func (c *ConfigManager) updateLocal(componentID string, version *configpb.Versio } } if localCfg, ok := c.LocalCfgs[component][componentID]; ok { - localLatestVersion := localCfg.getVersion() + localLatestVersion := localCfg.GetVersion() if !versionEqual(localLatestVersion, version) { return localLatestVersion, &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} } @@ -332,7 +340,7 @@ func (c *ConfigManager) updateLocal(componentID string, version *configpb.Versio } else { return version, &configpb.Status{Code: configpb.StatusCode_COMPONENT_ID_NOT_FOUND} } - return c.LocalCfgs[component][componentID].getVersion(), &configpb.Status{Code: configpb.StatusCode_OK} + return c.LocalCfgs[component][componentID].GetVersion(), &configpb.Status{Code: configpb.StatusCode_OK} } func (c *ConfigManager) deleteEntry(component, e string) { @@ -368,12 +376,12 @@ func (c *ConfigManager) deleteGlobal(component string, version *configpb.Version } func (c *ConfigManager) deleteLocal(componentID string, version *configpb.Version) *configpb.Status { - component := c.getComponent(componentID) + component := c.GetComponent(componentID) if component == "" { return &configpb.Status{Code: configpb.StatusCode_COMPONENT_NOT_FOUND} } if localCfg, ok := c.LocalCfgs[component][componentID]; ok { - localLatestVersion := localCfg.getVersion() + localLatestVersion := localCfg.GetVersion() if !versionEqual(localLatestVersion, version) { return &configpb.Status{Code: configpb.StatusCode_WRONG_VERSION} } @@ -421,8 +429,8 @@ func (gc *GlobalConfig) updateEntry(entry *configpb.ConfigEntry, version *config entries[entry.GetName()] = NewEntryValue(entry, version) } -// getVersion returns the global version. -func (gc *GlobalConfig) getVersion() uint64 { +// GetVersion returns the global version. +func (gc *GlobalConfig) GetVersion() uint64 { if gc == nil { return 0 } @@ -474,8 +482,8 @@ func (lc *LocalConfig) updateEntry(entry *configpb.ConfigEntry, version *configp entries[entry.GetName()] = NewEntryValue(entry, version) } -// getVersion return the local config version for a component. -func (lc *LocalConfig) getVersion() *configpb.Version { +// GetVersion return the local config version for a component. +func (lc *LocalConfig) GetVersion() *configpb.Version { if lc == nil { return nil } @@ -486,51 +494,121 @@ func (lc *LocalConfig) getConfigs() map[string]interface{} { return lc.Configs } -// TODO: need to consider the redundant label. func update(config map[string]interface{}, configName []string, value string) error { - res := config - for len(configName) >= 2 { - if _, ok := config[configName[0]]; !ok { - config[configName[0]] = make(map[string]interface{}) + if len(configName) > 1 { + sub, ok := config[configName[0]] + if !ok { + return errors.Errorf("cannot find the config item: %v", configName[0]) + } + s, ok := sub.(map[string]interface{}) + if ok { + return update(s, configName[1:], value) } - config = config[configName[0]].(map[string]interface{}) - configName = configName[1:] - res = config } - t := reflect.TypeOf(res[configName[0]]) - if t == nil { - return errors.Errorf("config item %s is not existed", configName[0]) + _, ok := config[configName[0]] + if !ok { + // TODO: remove it + if configName[0] != "schedulers-v2" { + return errors.Errorf("cannot find the config item: %v", configName[0]) + } } - // TODO: support more types - var v interface{} - var err error - switch t.Kind() { - case reflect.Bool: - v, err = strconv.ParseBool(value) - case reflect.Int: - v, err = strconv.Atoi(value) - case reflect.Int64: - v, err = strconv.ParseInt(value, 10, 64) - case reflect.Float64: - v, err = strconv.ParseFloat(value, 64) - case reflect.String: - v = value - case reflect.Slice: - // TODO: make slice work for any other type - v = strings.Split(value, ",") - default: - return errors.Errorf("unsupported type") + + container := make(map[string]interface{}) + + // TODO: remove it + if configName[0] == "cluster-version" { + cv, err := cluster.ParseVersion(value) + if err != nil { + return errors.Errorf("failed to parse version: %v", err.Error()) + } + container[configName[0]] = cv + } else if _, err := toml.Decode(value, &container); err != nil { + if !strings.Contains(err.Error(), "bare keys") { + return errors.Errorf("failed to decode value: %v", err.Error()) + } + container[configName[0]] = value + } else if configName[0] == "label-property" { + config[configName[0]] = container + return nil } + v, err := getUpdateValue(config[configName[0]], container[configName[0]]) if err != nil { return err } - res[configName[0]] = v + config[configName[0]] = v return nil } +func getUpdateValue(item, updateItem interface{}) (interface{}, error) { + var err error + var v interface{} + var tmp float64 + switch t := item.(type) { + case bool: + switch t1 := updateItem.(type) { + case string: + v, err = strconv.ParseBool(updateItem.(string)) + case bool: + v = updateItem + default: + return nil, errors.Errorf("unexpected type: %T\n", t1) + } + case int64: + switch t1 := updateItem.(type) { + case string: + tmp, err = strconv.ParseFloat(updateItem.(string), 64) + v = int64(tmp) + case float64: + v = int64(updateItem.(float64)) + case int64: + v = updateItem + default: + return nil, errors.Errorf("unexpected type: %T\n", t1) + } + case []interface{}: + strSlice := strings.Split(updateItem.(string), ",") + slice := make([]interface{}, 0) + for _, str := range strSlice { + slice = append(slice, str) + } + v = slice + case float64: + switch t1 := updateItem.(type) { + case string: + v, err = strconv.ParseFloat(updateItem.(string), 64) + case float64: + v = updateItem + default: + return nil, errors.Errorf("unexpected type: %T\n", t1) + } + case string: + switch t1 := updateItem.(type) { + case string: + v = updateItem + default: + return nil, errors.Errorf("unexpected type: %T\n", t1) + } + case time.Time: + switch t1 := updateItem.(type) { + case string: + v, err = time.Parse(time.RFC3339, updateItem.(string)) + default: + return nil, errors.Errorf("unexpected type: %T\n", t1) + } + case nil: + default: + return nil, errors.Errorf("unsupported type: %T\n", t) + } + + if err != nil { + return nil, err + } + return v, nil +} + func encodeConfigs(configs map[string]interface{}) (string, error) { buf := new(bytes.Buffer) if err := toml.NewEncoder(buf).Encode(configs); err != nil { diff --git a/server/config_manager/config_manager_test.go b/server/config_manager/config_manager_test.go index 3f76c305163..87315f985e3 100644 --- a/server/config_manager/config_manager_test.go +++ b/server/config_manager/config_manager_test.go @@ -178,7 +178,8 @@ func (s *testComponentsConfigSuite) TestUpdateConfig(c *C) { "defaultcf": defaultcfMap, } cfg["rocksdb"] = rocksdbMap - update(cfg, strings.Split("rocksdb.defaultcf.titan.discardable-ratio", "."), "0.002") + err := update(cfg, strings.Split("rocksdb.defaultcf.titan.discardable-ratio", "."), "0.002") + c.Assert(err, IsNil) c.Assert(defaultcfTitanMap["discardable-ratio"], Equals, 0.002) } @@ -280,7 +281,7 @@ discardable-ratio = 0.00156 cfg.GlobalCfgs["tikv"] = gc cfg.LocalCfgs["tikv"] = make(map[string]*LocalConfig) cfg.LocalCfgs["tikv"]["tikv1"] = lc - err = cfg.ApplyGlobalConifg(cfg.GlobalCfgs["tikv"], "tikv", 1, entry) + err = cfg.applyGlobalConifg(cfg.GlobalCfgs["tikv"], "tikv", 1, entry) c.Assert(err, IsNil) str, err := cfg.getComponentCfg("tikv", "tikv1") c.Assert(err, IsNil) diff --git a/server/config_manager/grpc_service.go b/server/config_manager/grpc_service.go index 9e9bb85f5ef..42c02e933a5 100644 --- a/server/config_manager/grpc_service.go +++ b/server/config_manager/grpc_service.go @@ -17,7 +17,9 @@ import ( "context" "github.com/pingcap/kvproto/pkg/configpb" + "github.com/pingcap/log" "github.com/pkg/errors" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -32,6 +34,7 @@ func (c *ConfigManager) Create(ctx context.Context, request *configpb.CreateRequ version, config, status := c.CreateConfig(request.GetVersion(), request.GetComponent(), request.GetComponentId(), request.GetConfig()) if status.GetCode() == configpb.StatusCode_OK { + log.Info("component has registered", zap.String("component", request.GetComponent()), zap.String("component-id", request.GetComponentId())) c.Persist(c.svr.GetStorage()) } diff --git a/server/grpc_service.go b/server/grpc_service.go index 378a0503112..ba98310feef 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -21,6 +21,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/kvproto/pkg/configpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -227,7 +228,13 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (* } log.Info("put store ok", zap.Stringer("store", store)) - rc.OnStoreVersionChange() + v := rc.OnStoreVersionChange() + if s.GetConfig().EnableConfigManager && v != nil { + status := s.updateConfigManager("cluster-version", v.String()) + if status.GetCode() != configpb.StatusCode_OK { + log.Error("failed to update the cluster version", zap.Error(errors.New(status.GetMessage()))) + } + } CheckPDVersion(s.scheduleOpt) return &pdpb.PutStoreResponse{ diff --git a/server/server.go b/server/server.go index 7870948eb9a..b4867018639 100644 --- a/server/server.go +++ b/server/server.go @@ -183,7 +183,6 @@ func combineBuilderServerHTTPService(svr *Server, apiBuilders ...HandlerBuilder) log.Info("register REST path", zap.String("path", pathPrefix)) registerMap[pathPrefix] = struct{}{} router.PathPrefix(pathPrefix).Handler(handler) - if info.IsCore { // Deprecated router.Path("/pd/health").Handler(handler) @@ -193,7 +192,6 @@ func combineBuilderServerHTTPService(svr *Server, apiBuilders ...HandlerBuilder) router.Path("/pd/ping").Handler(handler) } } - engine.UseHandler(router) return engine, nil } @@ -598,6 +596,16 @@ func (s *Server) GetClient() *clientv3.Client { return s.client } +// GetConfigManager returns the config manager of server. +func (s *Server) GetConfigManager() *configmanager.ConfigManager { + return s.cfgManager +} + +// GetConfigClient returns the config client of server. +func (s *Server) GetConfigClient() pd.ConfigClient { + return s.configClient +} + // GetLeader returns leader of etcd. func (s *Server) GetLeader() *pdpb.Member { return s.member.GetLeader() @@ -656,6 +664,7 @@ func (s *Server) GetConfig() *config.Config { cfg.LabelProperty = s.scheduleOpt.LoadLabelPropertyConfig().Clone() cfg.ClusterVersion = *s.scheduleOpt.LoadClusterVersion() cfg.PDServerCfg = *s.scheduleOpt.LoadPDServerConfig() + cfg.Log = *s.scheduleOpt.LoadLogConfig() storage := s.GetStorage() if storage == nil { return cfg @@ -815,6 +824,7 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { zap.Error(err)) return err } + log.Info("label property config is updated", zap.Reflect("config", s.scheduleOpt.LoadLabelPropertyConfig())) return nil } @@ -833,10 +843,22 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { zap.Error(err)) return err } + log.Info("label property config is deleted", zap.Reflect("config", s.scheduleOpt.LoadLabelPropertyConfig())) return nil } +func (s *Server) updateConfigManager(name, value string) *configpb.Status { + configManager := s.GetConfigManager() + globalVersion := configManager.GetGlobalConfigs(Component).GetVersion() + version := &configpb.Version{Global: globalVersion} + entries := []*configpb.ConfigEntry{{Name: name, Value: value}} + configManager.Lock() + defer configManager.Unlock() + _, status := configManager.UpdateGlobal(Component, version, entries) + return status +} + // GetLabelProperty returns the whole label property config. func (s *Server) GetLabelProperty() config.LabelPropertyConfig { return s.scheduleOpt.LoadLabelPropertyConfig().Clone() @@ -1129,7 +1151,7 @@ func (s *Server) configCheckLoop() { return case <-ticker.C: version := s.GetConfigVersion() - config, err := s.getComponentConfig(ctx, version, addr) + config, err := s.getComponentConfig(ctx, version, compoenntID) if err != nil { log.Error("failed to get config", zap.Error(err)) } diff --git a/server/server_test.go b/server/server_test.go index 6faff24253a..87e54e6ebc6 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -201,7 +201,6 @@ func (s *testServerHandlerSuite) TestRegisterServerHandler(c *C) { } return mux, info } - cfg := NewTestSingleConfig(c) ctx, cancel := context.WithCancel(context.Background()) svr, err := CreateServer(ctx, cfg, mokHandler) diff --git a/tests/client/config_client_test.go b/tests/client/config_client_test.go index 2e51351c0e9..39f9a931b2c 100644 --- a/tests/client/config_client_test.go +++ b/tests/client/config_client_test.go @@ -89,7 +89,7 @@ func (s *configClientTestSuite) TestUpdateWrongEntry(c *C) { []*configpb.ConfigEntry{{Name: "aaa.xxx-xxx", Value: "2"}}, ) c.Assert(status.GetCode(), Equals, configpb.StatusCode_UNKNOWN) - c.Assert(strings.Contains(status.GetMessage(), "is not existed"), IsTrue) + c.Assert(strings.Contains(status.GetMessage(), "cannot find the config item"), IsTrue) c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) c.Assert(err, IsNil) diff --git a/tests/cluster.go b/tests/cluster.go index 7c60cf53663..f159abe39bc 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -433,6 +433,7 @@ func (c *TestCluster) WaitLeader() string { } for name, num := range counter { if num == running && c.GetServer(name).IsLeader() { + time.Sleep(20 * time.Millisecond) return name } } diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 72e38855693..bd93a5dd738 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -19,6 +19,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/coreos/go-semver/semver" . "github.com/pingcap/check" @@ -39,6 +40,7 @@ type configTestSuite struct{} func (s *configTestSuite) SetUpSuite(c *C) { server.EnableZap = true + server.ConfigCheckInterval = 10 * time.Millisecond } type testItem struct { @@ -114,6 +116,7 @@ func (s *configTestSuite) TestConfig(c *C) { args2 := []string{"-u", pdAddr, "config", "set", "cluster-version", "2.1.0-rc.5"} _, _, err = pdctl.ExecuteCommandC(cmd, args2...) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) c.Assert(clusterVersion, Not(DeepEquals), svr.GetClusterVersion()) _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) @@ -133,6 +136,7 @@ func (s *configTestSuite) TestConfig(c *C) { args2 = []string{"-u", pdAddr, "config", "set", "label-property", "reject-leader", "zone", "cn"} _, _, err = pdctl.ExecuteCommandC(cmd, args2...) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) c.Assert(labelPropertyCfg, Not(DeepEquals), svr.GetLabelProperty()) _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) @@ -144,6 +148,7 @@ func (s *configTestSuite) TestConfig(c *C) { args3 := []string{"-u", pdAddr, "config", "delete", "label-property", "reject-leader", "zone", "cn"} _, _, err = pdctl.ExecuteCommandC(cmd, args3...) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) c.Assert(labelPropertyCfg, Not(DeepEquals), svr.GetLabelProperty()) _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index aaabdcf7794..46ab23a3789 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -41,6 +41,7 @@ type hotTestSuite struct{} func (s *hotTestSuite) SetUpSuite(c *C) { server.EnableZap = true + server.ConfigCheckInterval = 10 * time.Millisecond } func (s *hotTestSuite) TestHot(c *C) { diff --git a/tests/pdctl/label/label_test.go b/tests/pdctl/label/label_test.go index 274eb1c4d43..ff6fa0fcbc1 100644 --- a/tests/pdctl/label/label_test.go +++ b/tests/pdctl/label/label_test.go @@ -18,6 +18,7 @@ import ( "encoding/json" "strings" "testing" + "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -38,6 +39,7 @@ type labelTestSuite struct{} func (s *labelTestSuite) SetUpSuite(c *C) { server.EnableZap = true + server.ConfigCheckInterval = 10 * time.Millisecond } func (s *labelTestSuite) TestLabel(c *C) { diff --git a/tests/pdctl/log/log_test.go b/tests/pdctl/log/log_test.go index 7abad7a4cc3..4935d09b940 100644 --- a/tests/pdctl/log/log_test.go +++ b/tests/pdctl/log/log_test.go @@ -16,6 +16,7 @@ package log_test import ( "context" "testing" + "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -34,6 +35,7 @@ type logTestSuite struct{} func (s *logTestSuite) SetUpSuite(c *C) { server.EnableZap = true + server.ConfigCheckInterval = 10 * time.Millisecond } func (s *logTestSuite) TestLog(c *C) { @@ -87,6 +89,7 @@ func (s *logTestSuite) TestLog(c *C) { for _, testCase := range testCases { _, _, err = pdctl.ExecuteCommandC(cmd, testCase.cmd...) c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) c.Assert(svr.GetConfig().Log.Level, Equals, testCase.expect) } } diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index 03e64b605df..8d5ad665737 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -38,6 +38,7 @@ type operatorTestSuite struct{} func (s *operatorTestSuite) SetUpSuite(c *C) { server.EnableZap = true + server.ConfigCheckInterval = 10 * time.Millisecond } func (s *operatorTestSuite) TestOperator(c *C) { @@ -209,6 +210,7 @@ func (s *operatorTestSuite) TestOperator(c *C) { _, _, err = pdctl.ExecuteCommandC(cmd, "config", "set", "enable-placement-rules", "true") c.Assert(err, IsNil) + time.Sleep(20 * time.Millisecond) _, output, err = pdctl.ExecuteCommandC(cmd, "operator", "add", "transfer-region", "1", "2", "3") c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "not supported"), IsTrue) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 5613816ce7a..3ec010b6a78 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -533,7 +533,7 @@ func (s *clusterTestSuite) TestConcurrentHandleRegion(c *C) { } func (s *clusterTestSuite) TestSetScheduleOpt(c *C) { - tc, err := tests.NewTestCluster(s.ctx, 1) + tc, err := tests.NewTestCluster(s.ctx, 1, func(cfg *config.Config) { cfg.EnableConfigManager = true }) defer tc.Destroy() c.Assert(err, IsNil)