Skip to content

Commit

Permalink
config: make original HTTP API compatible with config manager (#2080)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Feb 5, 2020
1 parent 9a034e0 commit 5169308
Show file tree
Hide file tree
Showing 28 changed files with 581 additions and 170 deletions.
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pelletier/go-toml v1.3.0 h1:e5+lF2E4Y2WCIxBefVowBuB0iHrUH4HZ8q+6mGF7fJc=
github.com/pelletier/go-toml v1.3.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo=
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d h1:U+PMnTlV2tu7RuMK5etusZG3Cf+rpow5hqQByeCzJ2g=
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d/go.mod h1:lXfE4PvvTW5xOjO6Mba8zDPyw8M93B6AQ7frTGnMlA8=
Expand Down Expand Up @@ -333,7 +334,6 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58 h1:otZG8yDCO4LVps5+9bxOeNiCvgmOyt96J3roHTYs7oE=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -404,7 +404,6 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
Expand Down
5 changes: 5 additions & 0 deletions pkg/typeutil/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ func (d *Duration) UnmarshalText(text []byte) error {
d.Duration, err = time.ParseDuration(string(text))
return errors.WithStack(err)
}

// MarshalText returns the duration as a JSON string.
func (d Duration) MarshalText() ([]byte, error) {
return []byte(d.String()), nil
}
2 changes: 1 addition & 1 deletion pkg/typeutil/string_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *StringSlice) UnmarshalJSON(text []byte) error {
return errors.WithStack(err)
}
if len(data) == 0 {
*s = nil
*s = []string{}
return nil
}
*s = strings.Split(data, ",")
Expand Down
2 changes: 1 addition & 1 deletion pkg/typeutil/string_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *testStringSliceSuite) TestJSON(c *C) {
}

func (s *testStringSliceSuite) TestEmpty(c *C) {
var ss StringSlice
ss := StringSlice([]string{})
b, err := json.Marshal(ss)
c.Assert(err, IsNil)
c.Assert(string(b), Equals, "\"\"")
Expand Down
131 changes: 131 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (
"reflect"
"strings"

"github.com/BurntSushi/toml"
"github.com/pingcap/errcode"
"github.com/pingcap/kvproto/pkg/configpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/apiutil"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/config"
"github.com/pkg/errors"
"github.com/unrolled/render"
"go.uber.org/zap"
)

type confHandler struct {
Expand All @@ -46,6 +50,27 @@ func (h *confHandler) Get(w http.ResponseWriter, r *http.Request) {
}

func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
if h.svr.GetConfig().EnableConfigManager {
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
cm := h.svr.GetConfigManager()
m := make(map[string]interface{})
json.NewDecoder(r.Body).Decode(&m)
entries, err := transToEntries(m)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = redirectUpdateReq(h.svr.Context(), client, cm, entries)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
return
}
config := h.svr.GetConfig()
data, err := ioutil.ReadAll(r.Body)
r.Body.Close()
Expand Down Expand Up @@ -139,6 +164,27 @@ func (h *confHandler) GetSchedule(w http.ResponseWriter, r *http.Request) {
}

func (h *confHandler) SetSchedule(w http.ResponseWriter, r *http.Request) {
if h.svr.GetConfig().EnableConfigManager {
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
cm := h.svr.GetConfigManager()
m := make(map[string]interface{})
json.NewDecoder(r.Body).Decode(&m)
entries, err := transToEntries(m)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = redirectUpdateReq(h.svr.Context(), client, cm, entries)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
return
}
config := h.svr.GetScheduleConfig()
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &config); err != nil {
return
Expand All @@ -156,6 +202,27 @@ func (h *confHandler) GetReplication(w http.ResponseWriter, r *http.Request) {
}

func (h *confHandler) SetReplication(w http.ResponseWriter, r *http.Request) {
if h.svr.GetConfig().EnableConfigManager {
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
cm := h.svr.GetConfigManager()
m := make(map[string]interface{})
json.NewDecoder(r.Body).Decode(&m)
entries, err := transToEntries(m)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = redirectUpdateReq(h.svr.Context(), client, cm, entries)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
return
}
config := h.svr.GetReplicationConfig()
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &config); err != nil {
return
Expand All @@ -177,6 +244,53 @@ func (h *confHandler) SetLabelProperty(w http.ResponseWriter, r *http.Request) {
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}

if h.svr.GetConfig().EnableConfigManager {
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
cm := h.svr.GetConfigManager()
typ := input["type"]
labelKey, labelValue := input["label-key"], input["label-value"]
cfg := h.svr.GetScheduleOption().LoadLabelPropertyConfig().Clone()
switch input["action"] {
case "set":
for _, l := range cfg[typ] {
if l.Key == labelKey && l.Value == labelValue {
return
}
}
cfg[typ] = append(cfg[typ], config.StoreLabel{Key: labelKey, Value: labelValue})
case "delete":
oldLabels := cfg[typ]
cfg[typ] = []config.StoreLabel{}
for _, l := range oldLabels {
if l.Key == labelKey && l.Value == labelValue {
continue
}
cfg[typ] = append(cfg[typ], l)
}
if len(cfg[typ]) == 0 {
delete(cfg, typ)
}
default:
err := errors.Errorf("unknown action %v", input["action"])
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
var buf bytes.Buffer
toml.NewEncoder(&buf).Encode(cfg)
entries := []*entry{{key: "label-property", value: buf.String()}}
err := redirectUpdateReq(h.svr.Context(), client, cm, entries)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
return
}

var err error
switch input["action"] {
case "set":
Expand Down Expand Up @@ -207,6 +321,23 @@ func (h *confHandler) SetClusterVersion(w http.ResponseWriter, r *http.Request)
apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(errors.New("not set cluster-version")))
return
}

if h.svr.GetConfig().EnableConfigManager {
kind := &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: server.Component}}}
v := &configpb.Version{Global: h.svr.GetConfigManager().GlobalCfgs[server.Component].GetVersion()}
entry := &configpb.ConfigEntry{Name: "cluster-version", Value: version}
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
_, _, err := h.svr.GetConfigClient().Update(h.svr.Context(), v, kind, []*configpb.ConfigEntry{entry})
if err != nil {
log.Error("update cluster version meet error", zap.Error(err))
}
h.rd.JSON(w, http.StatusOK, nil)
return
}

err := h.svr.SetClusterVersion(version)
if err != nil {
apiutil.ErrorResp(h.rd, w, errcode.NewInternalErr(err))
Expand Down
12 changes: 11 additions & 1 deletion server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ type testConfigSuite struct {
}

func (s *testConfigSuite) SetUpSuite(c *C) {
s.svr, s.cleanup = mustNewServer(c)
server.ConfigCheckInterval = 10 * time.Millisecond
s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.EnableConfigManager = true })
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
// make sure the config client is initialized
time.Sleep(20 * time.Millisecond)
}

func (s *testConfigSuite) TearDownSuite(c *C) {
Expand Down Expand Up @@ -71,6 +74,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) {
err = postJSON(addr, postData)
c.Assert(err, IsNil)

time.Sleep(20 * time.Millisecond)
newCfg := &config.Config{}
err = readJSON(addr, newCfg)
c.Assert(err, IsNil)
Expand All @@ -92,6 +96,7 @@ func (s *testConfigSuite) TestConfigSchedule(c *C) {
err = postJSON(addr, postData)
c.Assert(err, IsNil)

time.Sleep(20 * time.Millisecond)
sc1 := &config.ScheduleConfig{}
c.Assert(readJSON(addr, sc1), IsNil)
c.Assert(*sc, DeepEquals, *sc1)
Expand All @@ -118,6 +123,7 @@ func (s *testConfigSuite) TestConfigReplication(c *C) {
err = postJSON(addr, postData)
c.Assert(err, IsNil)

time.Sleep(20 * time.Millisecond)
rc3 := &config.ReplicationConfig{}
err = readJSON(addr, rc3)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -146,7 +152,9 @@ func (s *testConfigSuite) TestConfigLabelProperty(c *C) {
for _, cmd := range cmds {
err := postJSON(addr, []byte(cmd))
c.Assert(err, IsNil)
time.Sleep(20 * time.Millisecond)
}

cfg = loadProperties()
c.Assert(cfg, HasLen, 2)
c.Assert(cfg["foo"], DeepEquals, []config.StoreLabel{
Expand All @@ -162,7 +170,9 @@ func (s *testConfigSuite) TestConfigLabelProperty(c *C) {
for _, cmd := range cmds {
err := postJSON(addr, []byte(cmd))
c.Assert(err, IsNil)
time.Sleep(20 * time.Millisecond)
}

cfg = loadProperties()
c.Assert(cfg, HasLen, 1)
c.Assert(cfg["foo"], DeepEquals, []config.StoreLabel{{Key: "zone", Value: "cn2"}})
Expand Down
14 changes: 13 additions & 1 deletion server/api/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -107,7 +108,11 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) {
},
}

s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.Replication.StrictlyMatchLabel = false })
server.ConfigCheckInterval = 10 * time.Millisecond
s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) {
cfg.Replication.StrictlyMatchLabel = false
cfg.EnableConfigManager = true
})
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
Expand All @@ -117,6 +122,8 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) {
for _, store := range s.stores {
mustPutStore(c, s.svr, store.Id, store.State, store.Labels)
}
// make sure the config client is initialized
time.Sleep(20 * time.Millisecond)
}

func (s *testLabelsStoreSuite) TearDownSuite(c *C) {
Expand Down Expand Up @@ -183,16 +190,20 @@ type testStrictlyLabelsStoreSuite struct {
}

func (s *testStrictlyLabelsStoreSuite) SetUpSuite(c *C) {
server.ConfigCheckInterval = 10 * time.Millisecond
s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) {
cfg.Replication.LocationLabels = []string{"zone", "disk"}
cfg.Replication.StrictlyMatchLabel = true
cfg.EnableConfigManager = true
})
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)

mustBootstrapCluster(c, s.svr)
// make sure the config client is initialized
time.Sleep(20 * time.Millisecond)
}

func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) {
Expand Down Expand Up @@ -277,6 +288,7 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) {

// enable placement rules. Report no error any more.
c.Assert(postJSON(fmt.Sprintf("%s/config", s.urlPrefix), []byte(`{"enable-placement-rules":"true"}`)), IsNil)
time.Sleep(20 * time.Millisecond)
for _, t := range cases {
_, err := s.svr.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()},
Expand Down
18 changes: 18 additions & 0 deletions server/api/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package api

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

Expand All @@ -37,6 +38,23 @@ func newlogHandler(svr *server.Server, rd *render.Render) *logHandler {
}

func (h *logHandler) Handle(w http.ResponseWriter, r *http.Request) {
if h.svr.GetConfig().EnableConfigManager {
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
cm := h.svr.GetConfigManager()
var str string
json.NewDecoder(r.Body).Decode(&str)
entries := []*entry{{key: "log.level", value: fmt.Sprintf("level = \"%v\"", str)}}
err := redirectUpdateReq(h.svr.Context(), client, cm, entries)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
return
}
var level string
data, err := ioutil.ReadAll(r.Body)
r.Body.Close()
Expand Down
Loading

0 comments on commit 5169308

Please sign in to comment.