Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stability cases: stop kubelet and etcd #665

Merged
merged 11 commits into from
Jul 25, 2019
80 changes: 75 additions & 5 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"os/exec"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -159,7 +160,8 @@ type OperatorActions interface {
CheckTidbClustersAvailable(infos []*TidbClusterConfig) error
CheckOperatorDownOrDie(infos []*TidbClusterConfig)
CheckTidbClustersAvailableOrDie(infos []*TidbClusterConfig)
CheckOneEtcdDownOrDie(operatorConfig *OperatorConfig, clusters []*TidbClusterConfig, faultNode string)
CheckEtcdDownOrDie(operatorConfig *OperatorConfig, clusters []*TidbClusterConfig, faultNode string)
CheckKubeletDownOrDie(operatorConfig *OperatorConfig, clusters []*TidbClusterConfig, faultNode string)
CheckOneApiserverDownOrDie(operatorConfig *OperatorConfig, clusters []*TidbClusterConfig, faultNode string)
CheckKubeProxyDownOrDie(operatorConfig *OperatorConfig, clusters []*TidbClusterConfig)
CheckKubeSchedulerDownOrDie(operatorConfig *OperatorConfig, clusters []*TidbClusterConfig)
Expand Down Expand Up @@ -296,7 +298,6 @@ func (tc *TidbClusterConfig) TidbClusterHelmSetString(m map[string]string) strin
"tikv.storageClassName": tc.StorageClassName,
"tidb.storageClassName": tc.StorageClassName,
"tidb.password": tc.Password,
"pd.maxStoreDownTime": "5m",
"pd.image": tc.PDImage,
"tikv.image": tc.TiKVImage,
"tidb.image": tc.TiDBImage,
Expand Down Expand Up @@ -523,6 +524,43 @@ func (oa *operatorActions) DeployTidbClusterOrDie(info *TidbClusterConfig) {
func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error {
glog.Infof("cleaning tidbcluster %s/%s", info.Namespace, info.ClusterName)
oa.EmitEvent(info, "CleanTidbCluster")
ns := info.Namespace
tcName := info.ClusterName

selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: map[string]string{
label.InstanceLabelKey: tcName,
},
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: label.ComponentLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: []string{label.PDLabelVal, label.TiKVLabelVal},
},
},
})
if err != nil {
return err
}
pvcList, err := oa.kubeCli.CoreV1().PersistentVolumeClaims(ns).List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
var beforePVCNames []string
for _, pvc := range pvcList.Items {
beforePVCNames = append(beforePVCNames, pvc.GetName())
}
glog.V(4).Info(beforePVCNames)

pvList, err := oa.kubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
var beforePVNames []string
for _, pv := range pvList.Items {
beforePVNames = append(beforePVNames, pv.GetName())
}
glog.V(4).Info(beforePVNames)

charts := []string{
info.ClusterName,
Expand All @@ -538,7 +576,38 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterConfig) error {
}
}

err := oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(getBackupDirPodName, &metav1.DeleteOptions{})
time.Sleep(time.Minute)

pvcList, err = oa.kubeCli.CoreV1().PersistentVolumeClaims(ns).List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
var afterPVCNames []string
for _, pvc := range pvcList.Items {
afterPVCNames = append(afterPVCNames, pvc.GetName())
}
glog.V(4).Info(afterPVCNames)

pvList, err = oa.kubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return err
}
var afterPVNames []string
for _, pv := range pvList.Items {
afterPVNames = append(afterPVNames, pv.GetName())
}
glog.V(4).Info(afterPVNames)

if !reflect.DeepEqual(beforePVCNames, afterPVCNames) {
return fmt.Errorf("pvc changed when we delete cluster: %s/%s, before: %v, after: %v",
ns, tcName, beforePVCNames, afterPVCNames)
}
if !reflect.DeepEqual(beforePVNames, afterPVNames) {
return fmt.Errorf("pv changed when we delete cluster: %s/%s, before: %v, after: %v",
ns, tcName, beforePVNames, afterPVNames)
}

err = oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(getBackupDirPodName, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete dir pod %v", err)
}
Expand Down Expand Up @@ -934,7 +1003,8 @@ func (oa *operatorActions) CheckUpgrade(ctx context.Context, info *TidbClusterCo
}
glog.V(4).Infof("index:%d,schedulers:%v,error:%v", i, schedulers, err)
if len(schedulers) > 1 {
return true, fmt.Errorf("there are too many evict leader schedulers: %v", schedulers)
glog.Errorf("there are too many evict leader schedulers: %v", schedulers)
return false, nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be two schedulers at the same time

}
if len(schedulers) == 0 {
return false, nil
Expand Down Expand Up @@ -1098,7 +1168,7 @@ func (oa *operatorActions) tikvMembersReadyFn(tc *v1alpha1.TidbCluster) (bool, e
}
if len(tc.Status.TiKV.Stores) != int(replicas) {
glog.Infof("tidbcluster: %s/%s .status.TiKV.Stores.count(%d) != %d",
ns, tcName, len(tc.Status.TiKV.Stores), tc.Spec.TiKV.Replicas)
ns, tcName, len(tc.Status.TiKV.Stores), replicas)
return false, nil
}
if tikvSet.Status.ReadyReplicas != tikvSet.Status.Replicas {
Expand Down
16 changes: 14 additions & 2 deletions tests/cmd/stability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,26 @@ func run() {
// truncate tikv sst file
oa.TruncateSSTFileThenCheckFailoverOrDie(clusters[0], 5*time.Minute)

// stop etcd
// stop one etcd
faultEtcd := tests.SelectNode(cfg.ETCDs)
fta.StopETCDOrDie(faultEtcd)
defer fta.StartETCDOrDie(faultEtcd)
time.Sleep(3 * time.Minute)
oa.CheckOneEtcdDownOrDie(ocfg, deployedClusters, faultEtcd)
oa.CheckEtcdDownOrDie(ocfg, deployedClusters, faultEtcd)
fta.StartETCDOrDie(faultEtcd)

// stop all etcds
fta.StopETCDOrDie()
time.Sleep(10 * time.Minute)
fta.StartETCDOrDie()
oa.CheckEtcdDownOrDie(ocfg, deployedClusters, "")

// stop all kubelets
fta.StopKubeletOrDie()
time.Sleep(10 * time.Minute)
fta.StartKubeletOrDie()
oa.CheckKubeletDownOrDie(ocfg, deployedClusters, "")

// stop all kube-proxy and k8s/operator/tidbcluster is available
fta.StopKubeProxyOrDie()
oa.CheckKubeProxyDownOrDie(ocfg, clusters)
Expand Down
52 changes: 44 additions & 8 deletions tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (oa *operatorActions) CheckFailoverPending(info *TidbClusterConfig, node st
if _, exist := affectedPods[failureStore.PodName]; exist {
err := fmt.Errorf("cluster: [%s] the tikv store[%s] should be mark failure after %s", info.FullName(), failureStore.PodName, deadline.Format(time.RFC3339))
glog.Errorf(err.Error())
return false, err
return false, nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may have been a failover before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a comment in the code?

}
}

Expand Down Expand Up @@ -276,7 +276,7 @@ func (oa *operatorActions) getPodsByNode(info *TidbClusterConfig, node string) (
}

func (oa *operatorActions) CheckFailoverOrDie(clusters []*TidbClusterConfig, faultNode string) {
if err := wait.Poll(1*time.Minute, 30*time.Minute, func() (bool, error) {
if err := wait.Poll(1*time.Minute, 60*time.Minute, func() (bool, error) {
var passes []bool
for i := range clusters {
pass, err := oa.CheckFailover(clusters[i], faultNode)
Expand Down Expand Up @@ -409,7 +409,7 @@ func (oa *operatorActions) tidbFailover(pod *corev1.Pod, tc *v1alpha1.TidbCluste
failure := false
for _, failureMember := range tc.Status.TiDB.FailureMembers {
if failureMember.PodName == pod.GetName() {
glog.Infof("tidbCluster:[%s/%s]'s store pod:[%s] have not become failuremember", tc.Namespace, tc.Name, pod.Name)
glog.Infof("tidbCluster:[%s/%s]'s store pod:[%s] have become failuremember", tc.Namespace, tc.Name, pod.Name)
failure = true
break
}
Expand Down Expand Up @@ -472,8 +472,33 @@ func (oa *operatorActions) GetNodeMap(info *TidbClusterConfig, component string)
return nodeMap, nil
}

func (oa *operatorActions) CheckOneEtcdDownOrDie(operatorConfig *OperatorConfig, clusters []*TidbClusterConfig, faultNode string) {
glog.Infof("check k8s/operator/tidbCluster status when one etcd down")
func (oa *operatorActions) CheckKubeletDownOrDie(operatorConfig *OperatorConfig, clusters []*TidbClusterConfig, faultNode string) {
glog.Infof("check k8s/operator/tidbCluster status when kubelet down")
time.Sleep(10 * time.Minute)
KeepOrDie(3*time.Second, 10*time.Minute, func() error {
err := oa.CheckK8sAvailable(nil, nil)
if err != nil {
return err
}
glog.V(4).Infof("k8s cluster is available.")
err = oa.CheckOperatorAvailable(operatorConfig)
if err != nil {
return err
}
glog.V(4).Infof("tidb operator is available.")
err = oa.CheckTidbClustersAvailable(clusters)
if err != nil {
return err
}
glog.V(4).Infof("all clusters are available")
return nil
})
}

func (oa *operatorActions) CheckEtcdDownOrDie(operatorConfig *OperatorConfig, clusters []*TidbClusterConfig, faultNode string) {
glog.Infof("check k8s/operator/tidbCluster status when etcd down")
// kube-apiserver may block 15 min
time.Sleep(20 * time.Minute)
KeepOrDie(3*time.Second, 10*time.Minute, func() error {
err := oa.CheckK8sAvailable(nil, nil)
if err != nil {
Expand Down Expand Up @@ -687,22 +712,33 @@ func (oa *operatorActions) CheckK8sAvailable(excludeNodes map[string]string, exc
}

func (oa *operatorActions) CheckOperatorAvailable(operatorConfig *OperatorConfig) error {
return wait.Poll(3*time.Second, 3*time.Minute, func() (bool, error) {
var errCount int
var e error
return wait.Poll(10*time.Second, 3*time.Minute, func() (bool, error) {
if errCount >= 10 {
return true, e
}
controllerDeployment, err := oa.kubeCli.AppsV1().Deployments(operatorConfig.Namespace).Get(tidbControllerName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get deployment:%s failed,error:%v", tidbControllerName, err)
return false, nil
}
if controllerDeployment.Status.AvailableReplicas != *controllerDeployment.Spec.Replicas {
return false, fmt.Errorf("the %s is not available", tidbControllerName)
e = fmt.Errorf("the %s is not available", tidbControllerName)
glog.Error(e)
errCount++
return false, nil
}
schedulerDeployment, err := oa.kubeCli.AppsV1().Deployments(operatorConfig.Namespace).Get(tidbSchedulerName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get deployment:%s failed,error:%v", tidbSchedulerName, err)
return false, nil
}
if schedulerDeployment.Status.AvailableReplicas != *schedulerDeployment.Spec.Replicas {
return false, fmt.Errorf("the %s is not available", tidbSchedulerName)
e = fmt.Errorf("the %s is not available", tidbSchedulerName)
glog.Error(e)
errCount++
return false, nil
}
return true, nil
})
Expand Down
77 changes: 62 additions & 15 deletions tests/fault.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math/rand"
"os"
"sync"
"time"

"github.com/pingcap/tidb-operator/tests/slack"
Expand Down Expand Up @@ -37,8 +38,10 @@ type FaultTriggerActions interface {
StopETCDOrDie(nodes ...string)
StartETCD(nodes ...string) error
StartETCDOrDie(nodes ...string)
StopKubelet(node string) error
StartKubelet(node string) error
StopKubelet(nodes ...string) error
StopKubeletOrDie(nodes ...string)
StartKubelet(nodes ...string) error
StartKubeletOrDie(nodes ...string)
StopKubeAPIServer(node string) error
StopKubeAPIServerOrDie(node string)
StartKubeAPIServer(node string) error
Expand Down Expand Up @@ -333,43 +336,87 @@ func (fa *faultTriggerActions) StopETCD(nodes ...string) error {
}

func (fa *faultTriggerActions) StopETCDOrDie(nodes ...string) {
glog.Infof("stopping %v etcds", nodes)
if err := fa.StopETCD(nodes...); err != nil {
slack.NotifyAndPanic(err)
}
}

// StartETCD starts the etcd service.
// If the `nodes` is empty, StartETCD will start all etcd service.
func (fa *faultTriggerActions) StartETCD(nodes ...string) error {
// StopKubelet stops the kubelet service.
func (fa *faultTriggerActions) StopKubelet(nodes ...string) error {
if len(nodes) == 0 {
for _, ns := range fa.cfg.ETCDs {
for _, ns := range fa.cfg.Nodes {
nodes = append(nodes, ns.Nodes...)
}
}

for _, node := range nodes {
if err := fa.serviceAction(node, manager.ETCDService, startAction); err != nil {
if err := fa.serviceAction(node, manager.KubeletService, stopAction); err != nil {
return err
}
}

return nil
}

func (fa *faultTriggerActions) StartETCDOrDie(nodes ...string) {
if err := fa.StartETCD(nodes...); err != nil {
func (fa *faultTriggerActions) StopKubeletOrDie(nodes ...string) {
glog.Infof("stopping %v kubelets", nodes)
if err := fa.StopKubelet(nodes...); err != nil {
slack.NotifyAndPanic(err)
}
}

// StopKubelet stops the kubelet service.
func (fa *faultTriggerActions) StopKubelet(node string) error {
return fa.serviceAction(node, manager.KubeletService, stopAction)
// StartKubelet starts the kubelet service.
func (fa *faultTriggerActions) StartKubelet(nodes ...string) error {
if len(nodes) == 0 {
for _, ns := range fa.cfg.Nodes {
nodes = append(nodes, ns.Nodes...)
}
}

for _, node := range nodes {
if err := fa.serviceAction(node, manager.KubeletService, startAction); err != nil {
return err
}
}

return nil
}

// StartKubelet starts the kubelet service.
func (fa *faultTriggerActions) StartKubelet(node string) error {
return fa.serviceAction(node, manager.KubeletService, startAction)
func (fa *faultTriggerActions) StartKubeletOrDie(nodes ...string) {
if err := fa.StartKubelet(nodes...); err != nil {
slack.NotifyAndPanic(err)
}
}

// StartETCD starts the etcd service.
// If the `nodes` is empty, StartETCD will start all etcd service.
func (fa *faultTriggerActions) StartETCD(nodes ...string) error {
if len(nodes) == 0 {
for _, ns := range fa.cfg.ETCDs {
nodes = append(nodes, ns.Nodes...)
}
}

var wg sync.WaitGroup
for _, node := range nodes {
wg.Add(1)
go func(n string) {
defer wg.Done()
if err := fa.serviceAction(n, manager.ETCDService, startAction); err != nil {
slack.NotifyAndPanic(fmt.Errorf("failed to start %s etcd, %v", n, err))
}
}(node)
}
wg.Wait()

return nil
}

func (fa *faultTriggerActions) StartETCDOrDie(nodes ...string) {
if err := fa.StartETCD(nodes...); err != nil {
slack.NotifyAndPanic(err)
}
}

// StopKubeScheduler stops the kube-scheduler service.
Expand Down