From 64c96d14e819ed8fcd17122e4ad59df055e225e6 Mon Sep 17 00:00:00 2001 From: CWen Date: Mon, 18 Mar 2019 19:52:51 +0800 Subject: [PATCH] stability: add fault-trigger client (#326) * stability: add fault-trigger client --- tests/actions.go | 8 +- tests/pkg/fault-trigger/api/response.go | 27 ++- tests/pkg/fault-trigger/api/router.go | 5 +- tests/pkg/fault-trigger/client/client.go | 194 +++++++++++++++++ tests/pkg/fault-trigger/client/client_test.go | 201 ++++++++++++++++++ tests/pkg/fault-trigger/manager/etcd.go | 4 +- tests/pkg/fault-trigger/manager/kubelet.go | 4 +- tests/pkg/fault-trigger/manager/types.go | 10 + tests/pkg/util/http.go | 15 ++ 9 files changed, 457 insertions(+), 11 deletions(-) create mode 100644 tests/pkg/fault-trigger/client/client.go create mode 100644 tests/pkg/fault-trigger/client/client_test.go create mode 100644 tests/pkg/util/http.go diff --git a/tests/actions.go b/tests/actions.go index 43276080637..dc32d9de3b7 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -419,7 +419,7 @@ func (oa *operatorActions) pdMembersReadyFn(tc *v1alpha1.TidbCluster) (bool, err replicas := tc.Spec.PD.Replicas + int32(failureCount) if *pdSet.Spec.Replicas != replicas { glog.Infof("statefulset: %s/%s .spec.Replicas(%d) != %d", - ns, pdSetName, *pdSet.Spec.Replicas, ns, tcName, replicas) + ns, pdSetName, *pdSet.Spec.Replicas, replicas) return false, nil } if pdSet.Status.ReadyReplicas != tc.Spec.PD.Replicas { @@ -589,7 +589,7 @@ func (oa *operatorActions) reclaimPolicySyncFn(tc *v1alpha1.TidbCluster) (bool, for _, pvc := range pvcList.Items { pvName := pvc.Spec.VolumeName if pv, err := oa.kubeCli.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{}); err != nil { - glog.Errorf("failed to get pv: %s", pvName, err) + glog.Errorf("failed to get pv: %s, error: %v", pvName, err) return false, nil } else if pv.Spec.PersistentVolumeReclaimPolicy != tc.Spec.PVReclaimPolicy { glog.Errorf("pv: %s's reclaimPolicy is not Retain", pvName) @@ -608,7 +608,7 @@ func (oa *operatorActions) metaSyncFn(tc *v1alpha1.TidbCluster) (bool, error) { var cluster *metapb.Cluster var err error if cluster, err = pdCli.GetCluster(); err != nil { - glog.Errorf("failed to get cluster from pdControl: %s/%s", ns, tcName, err) + glog.Errorf("failed to get cluster from pdControl: %s/%s, error: %v", ns, tcName, err) return false, nil } @@ -808,7 +808,7 @@ func (oa *operatorActions) schedulerHAFn(tc *v1alpha1.TidbCluster) (bool, error) } nodeMap[nodeName] = append(nodeMap[nodeName], pod.GetName()) if len(nodeMap[nodeName]) > totalCount/2 { - return false, fmt.Errorf("node % have %d pods, greater than %d/2", + return false, fmt.Errorf("node %s have %d pods, greater than %d/2", nodeName, len(nodeMap[nodeName]), totalCount) } } diff --git a/tests/pkg/fault-trigger/api/response.go b/tests/pkg/fault-trigger/api/response.go index 9280cf66f71..cd9a45070b0 100644 --- a/tests/pkg/fault-trigger/api/response.go +++ b/tests/pkg/fault-trigger/api/response.go @@ -13,7 +13,13 @@ package api -import "net/http" +import ( + "encoding/json" + "net/http" + + "github.com/golang/glog" + "github.com/juju/errors" +) // Response defines a new response struct for http type Response struct { @@ -41,3 +47,22 @@ func (r *Response) payload(payload interface{}) *Response { r.Payload = payload return r } + +// ExtractResponse extract response from api +func ExtractResponse(data []byte) ([]byte, error) { + respData := &Response{} + if err := json.Unmarshal(data, respData); err != nil { + return nil, errors.Trace(err) + } + + if respData.StatusCode != http.StatusOK { + d, err := json.Marshal(respData.Payload) + if err != nil { + glog.Errorf("marshal data failed %v", d) + } + + return d, errors.New(respData.Message) + } + + return json.Marshal(respData.Payload) +} diff --git a/tests/pkg/fault-trigger/api/router.go b/tests/pkg/fault-trigger/api/router.go index bb58405babd..9b39afcfa73 100644 --- a/tests/pkg/fault-trigger/api/router.go +++ b/tests/pkg/fault-trigger/api/router.go @@ -16,13 +16,14 @@ package api import restful "github.com/emicklei/go-restful" const ( - apiPrefix = "/pingcap.com/api/v1" + // APIPrefix defines a prefix string for fault-trigger api + APIPrefix = "/pingcap.com/api/v1" ) func (s *Server) newService() *restful.WebService { ws := new(restful.WebService) ws. - Path(apiPrefix). + Path(APIPrefix). Consumes(restful.MIME_JSON). Produces(restful.MIME_JSON) diff --git a/tests/pkg/fault-trigger/client/client.go b/tests/pkg/fault-trigger/client/client.go new file mode 100644 index 00000000000..17a812cbe6c --- /dev/null +++ b/tests/pkg/fault-trigger/client/client.go @@ -0,0 +1,194 @@ +package client + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + "github.com/pingcap/tidb-operator/tests/pkg/fault-trigger/api" + "github.com/pingcap/tidb-operator/tests/pkg/fault-trigger/manager" + "github.com/pingcap/tidb-operator/tests/pkg/util" +) + +// Client is a fault-trigger client +type Client interface { + // ListVMs lists all virtual machines + ListVMs() ([]*manager.VM, error) + // StartVM start a specified virtual machine + StartVM(vm *manager.VM) error + // StopVM stops a specified virtual machine + StopVM(vm *manager.VM) error + // StartETCD starts the etcd service + StartETCD() error + // StopETCD stops the etcd service + StopETCD() error + // StartKubelet starts the kubelet service + StartKubelet() error + // StopKubelet stops the kubelet service + StopKubelet() error +} + +// client is used to communicate with the fault-trigger +type client struct { + cfg Config + httpCli *http.Client +} + +// Config defines for fault-trigger client +type Config struct { + Addr string +} + +// NewClient creates a new fault-trigger client from a given address +func NewClient(cfg Config) Client { + return &client{ + cfg: cfg, + httpCli: http.DefaultClient, + } +} + +type clientError struct { + code int + msg string +} + +func (e *clientError) Error() string { + return fmt.Sprintf("%s (code: %d)", e.msg, e.code) +} + +func (c client) do(req *http.Request) (*http.Response, []byte, error) { + resp, err := c.httpCli.Do(req) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + code := resp.StatusCode + + if code != http.StatusOK { + return resp, nil, &clientError{ + code: code, + msg: "fail to request to http service", + } + } + + bodyByte, err := ioutil.ReadAll(resp.Body) + if err != nil { + return resp, nil, &clientError{ + code: code, + msg: fmt.Sprintf("failed to read data from resp body, error: %v", err), + } + } + + data, err := api.ExtractResponse(bodyByte) + if err != nil { + return resp, nil, &clientError{ + code: code, + msg: err.Error(), + } + } + + return resp, data, err +} + +func (c client) get(url string) ([]byte, error) { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %v", err) + } + + _, body, err := c.do(req) + if err != nil { + return nil, err + } + + return body, nil +} + +func (c *client) ListVMs() ([]*manager.VM, error) { + url := util.GenURL(fmt.Sprintf("%s%s/vms", c.cfg.Addr, api.APIPrefix)) + data, err := c.get(url) + if err != nil { + return nil, err + } + + var vms []*manager.VM + if err = json.Unmarshal(data, &vms); err != nil { + return nil, err + } + + return vms, nil +} + +func (c *client) StartVM(vm *manager.VM) error { + if err := vm.Verify(); err != nil { + return err + } + + vmName := vm.Name + if len(vmName) == 0 { + vmName = vm.IP + } + + url := util.GenURL(fmt.Sprintf("%s/%s/vm/%s/start", c.cfg.Addr, api.APIPrefix, vmName)) + if _, err := c.get(url); err != nil { + return err + } + + return nil +} + +func (c *client) StopVM(vm *manager.VM) error { + if err := vm.Verify(); err != nil { + return err + } + + vmName := vm.Name + if len(vmName) == 0 { + vmName = vm.IP + } + + url := util.GenURL(fmt.Sprintf("%s/%s/vm/%s/stop", c.cfg.Addr, api.APIPrefix, vmName)) + if _, err := c.get(url); err != nil { + return err + } + + return nil +} + +func (c *client) StartETCD() error { + url := util.GenURL(fmt.Sprintf("%s/%s/etcd/start", c.cfg.Addr, api.APIPrefix)) + if _, err := c.get(url); err != nil { + return err + } + + return nil +} + +func (c *client) StopETCD() error { + url := util.GenURL(fmt.Sprintf("%s/%s/etcd/stop", c.cfg.Addr, api.APIPrefix)) + if _, err := c.get(url); err != nil { + return err + } + + return nil +} + +func (c *client) StartKubelet() error { + url := util.GenURL(fmt.Sprintf("%s/%s/kubelet/start", c.cfg.Addr, api.APIPrefix)) + if _, err := c.get(url); err != nil { + return err + } + + return nil +} + +func (c *client) StopKubelet() error { + url := util.GenURL(fmt.Sprintf("%s/%s/kubelet/stop", c.cfg.Addr, api.APIPrefix)) + if _, err := c.get(url); err != nil { + return err + } + + return nil +} diff --git a/tests/pkg/fault-trigger/client/client_test.go b/tests/pkg/fault-trigger/client/client_test.go new file mode 100644 index 00000000000..0cdb497b818 --- /dev/null +++ b/tests/pkg/fault-trigger/client/client_test.go @@ -0,0 +1,201 @@ +package client + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + . "github.com/onsi/gomega" + "github.com/pingcap/tidb-operator/tests/pkg/fault-trigger/api" + "github.com/pingcap/tidb-operator/tests/pkg/fault-trigger/manager" +) + +func TestListVMs(t *testing.T) { + g := NewGomegaWithT(t) + + vms := []*manager.VM{ + { + Name: "vm1", + IP: "10.16.30.11", + }, + { + Name: "vm2", + IP: "10.16.30.12", + }, + } + + resp := &api.Response{ + Action: "listVMs", + StatusCode: 200, + Payload: vms, + } + + respJSON, _ := json.Marshal(resp) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, string(respJSON)) + })) + defer ts.Close() + + cli := NewClient(Config{ + Addr: ts.URL, + }) + + vms2, err := cli.ListVMs() + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(vms).To(Equal(vms2)) +} + +func TestStartVM(t *testing.T) { + g := NewGomegaWithT(t) + + resp := &api.Response{ + Action: "startVM", + StatusCode: 200, + Message: "OK", + } + + respJSON, _ := json.Marshal(resp) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, string(respJSON)) + })) + defer ts.Close() + + cli := NewClient(Config{ + Addr: ts.URL, + }) + + err := cli.StartVM(&manager.VM{ + Name: "vm1", + }) + g.Expect(err).NotTo(HaveOccurred()) + + err = cli.StartVM(&manager.VM{ + Host: "127.0.0.1", + }) + g.Expect(err).To(HaveOccurred()) +} + +func TestStopVM(t *testing.T) { + g := NewGomegaWithT(t) + + resp := &api.Response{ + Action: "stopVM", + StatusCode: 200, + Message: "OK", + } + + respJSON, _ := json.Marshal(resp) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, string(respJSON)) + })) + defer ts.Close() + + cli := NewClient(Config{ + Addr: ts.URL, + }) + + err := cli.StopVM(&manager.VM{ + Name: "vm1", + }) + + g.Expect(err).NotTo(HaveOccurred()) + + err = cli.StopVM(&manager.VM{ + Host: "127.0.0.1", + }) + g.Expect(err).To(HaveOccurred()) +} + +func TestStartETCD(t *testing.T) { + g := NewGomegaWithT(t) + + resp := &api.Response{ + Action: "startETCD", + StatusCode: 200, + Message: "OK", + } + + respJSON, _ := json.Marshal(resp) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, string(respJSON)) + })) + defer ts.Close() + + cli := NewClient(Config{ + Addr: ts.URL, + }) + + err := cli.StartETCD() + g.Expect(err).NotTo(HaveOccurred()) +} + +func TestStopETCD(t *testing.T) { + g := NewGomegaWithT(t) + + resp := &api.Response{ + Action: "stopETCD", + StatusCode: 200, + Message: "OK", + } + + respJSON, _ := json.Marshal(resp) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, string(respJSON)) + })) + defer ts.Close() + + cli := NewClient(Config{ + Addr: ts.URL, + }) + + err := cli.StopETCD() + g.Expect(err).NotTo(HaveOccurred()) +} + +func TestStartKubelet(t *testing.T) { + g := NewGomegaWithT(t) + + resp := &api.Response{ + Action: "startKubelet", + StatusCode: 200, + Message: "OK", + } + + respJSON, _ := json.Marshal(resp) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, string(respJSON)) + })) + defer ts.Close() + + cli := NewClient(Config{ + Addr: ts.URL, + }) + + err := cli.StartKubelet() + g.Expect(err).NotTo(HaveOccurred()) +} + +func TestStopKubelet(t *testing.T) { + g := NewGomegaWithT(t) + + resp := &api.Response{ + Action: "stopKubelet", + StatusCode: 200, + Message: "OK", + } + + respJSON, _ := json.Marshal(resp) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, string(respJSON)) + })) + defer ts.Close() + + cli := NewClient(Config{ + Addr: ts.URL, + }) + + err := cli.StopKubelet() + g.Expect(err).NotTo(HaveOccurred()) +} diff --git a/tests/pkg/fault-trigger/manager/etcd.go b/tests/pkg/fault-trigger/manager/etcd.go index 13c0df2f4c3..cd16a527ae9 100644 --- a/tests/pkg/fault-trigger/manager/etcd.go +++ b/tests/pkg/fault-trigger/manager/etcd.go @@ -19,7 +19,7 @@ import ( "github.com/golang/glog" ) -// StartETCD starts etcd +// StartETCD starts the etcd service func (m *Manager) StartETCD() error { shell := "systemctl start etcd" cmd := exec.Command("/bin/sh", "-c", shell) @@ -31,7 +31,7 @@ func (m *Manager) StartETCD() error { return nil } -// StopETCD stops etcd +// StopETCD stops the etcd service func (m *Manager) StopETCD() error { shell := "systemctl stop etcd" cmd := exec.Command("/bin/sh", "-c", shell) diff --git a/tests/pkg/fault-trigger/manager/kubelet.go b/tests/pkg/fault-trigger/manager/kubelet.go index 6cdbf112275..4dad0c0c60c 100644 --- a/tests/pkg/fault-trigger/manager/kubelet.go +++ b/tests/pkg/fault-trigger/manager/kubelet.go @@ -19,7 +19,7 @@ import ( "github.com/golang/glog" ) -// StartKubelet starts kubelet +// StartKubelet starts the kubelet service func (m *Manager) StartKubelet() error { shell := "systemctl start kubelet" cmd := exec.Command("/bin/sh", "-c", shell) @@ -31,7 +31,7 @@ func (m *Manager) StartKubelet() error { return nil } -// StopKubelet stops kubelet +// StopKubelet stops the kubelet service func (m *Manager) StopKubelet() error { shell := "systemctl stop kubelet" cmd := exec.Command("/bin/sh", "-c", shell) diff --git a/tests/pkg/fault-trigger/manager/types.go b/tests/pkg/fault-trigger/manager/types.go index fa2fa099ec6..b2b7e7f1a0f 100644 --- a/tests/pkg/fault-trigger/manager/types.go +++ b/tests/pkg/fault-trigger/manager/types.go @@ -13,6 +13,8 @@ package manager +import "errors" + // VM defines the descriptive information of a virtual machine type VM struct { Host string `json:"host"` @@ -21,3 +23,11 @@ type VM struct { IP string `json:"ip"` Role []string `json:"role"` } + +func (v *VM) Verify() error { + if len(v.Name) == 0 && len(v.IP) == 0 { + return errors.New("name or ip must be provided") + } + + return nil +} diff --git a/tests/pkg/util/http.go b/tests/pkg/util/http.go new file mode 100644 index 00000000000..2b97fffc7dc --- /dev/null +++ b/tests/pkg/util/http.go @@ -0,0 +1,15 @@ +package util + +import ( + "fmt" + "strings" +) + +// GenURL adds 'http' prefix for URL +func GenURL(url string) string { + if strings.Contains(url, "http") { + return url + } + + return fmt.Sprintf("http://%s", url) +}