diff --git a/tests/actions.go b/tests/actions.go index 9863c9bd41c..1d0163fc9e6 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -17,7 +17,6 @@ import ( "database/sql" "encoding/json" "fmt" - "github.com/pingcap/tidb-operator/tests/pkg/metrics" "io/ioutil" "net/http" "net/url" @@ -32,7 +31,6 @@ import ( "github.com/golang/glog" pingcapErrors "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb-operator/tests/pkg/webhook" admissionV1beta1 "k8s.io/api/admissionregistration/v1beta1" "k8s.io/api/apps/v1beta1" batchv1 "k8s.io/api/batch/v1" @@ -49,7 +47,9 @@ import ( "github.com/pingcap/tidb-operator/pkg/controller" "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/tests/pkg/blockwriter" + "github.com/pingcap/tidb-operator/tests/pkg/metrics" "github.com/pingcap/tidb-operator/tests/pkg/util" + "github.com/pingcap/tidb-operator/tests/pkg/webhook" ) const ( @@ -443,7 +443,7 @@ func (oa *operatorActions) CheckTidbClusterStatus(info *TidbClusterConfig) error ns := info.Namespace tcName := info.ClusterName - if err := wait.Poll(oa.pollInterval, DefaultPollTimeout, func() (bool, error) { + if err := wait.Poll(oa.pollInterval, 30*time.Minute, func() (bool, error) { var tc *v1alpha1.TidbCluster var err error if tc, err = oa.cli.PingcapV1alpha1().TidbClusters(ns).Get(tcName, metav1.GetOptions{}); err != nil { @@ -1524,7 +1524,7 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterConfig, to *TidbCluster return true, nil } - err := wait.Poll(oa.pollInterval, DefaultPollTimeout, fn) + err := wait.Poll(oa.pollInterval, 30*time.Minute, fn) if err != nil { return fmt.Errorf("failed to launch scheduler backup job: %v", err) } diff --git a/tests/backup/backupcase.go b/tests/backup/backupcase.go index 4c39178f6e5..649839e251f 100644 --- a/tests/backup/backupcase.go +++ b/tests/backup/backupcase.go @@ -19,6 +19,7 @@ import ( "github.com/golang/glog" "github.com/pingcap/tidb-operator/tests" + "k8s.io/apimachinery/pkg/util/wait" ) type BackupCase struct { @@ -110,21 +111,7 @@ func (bc *BackupCase) Run() error { glog.Infof("cluster[%s] stop insert data", bc.srcCluster.ClusterName) bc.operator.StopInsertDataTo(bc.srcCluster) - time.Sleep(5 * time.Second) - - srcCount, err := bc.srcCluster.QueryCount() - if err != nil { - return err - } - desCount, err := bc.desCluster.QueryCount() - if err != nil { - return err - } - if srcCount != desCount { - return fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount) - } - - return nil + return bc.EnsureBackupDataIsCorrect() } func (bc *BackupCase) RunOrDie() { @@ -132,3 +119,28 @@ func (bc *BackupCase) RunOrDie() { panic(err) } } + +func (bc *BackupCase) EnsureBackupDataIsCorrect() error { + fn := func() (bool, error) { + srcCount, err := bc.srcCluster.QueryCount() + if err != nil { + glog.Infof("failed to query count from src cluster: %s/%s", + bc.srcCluster.Namespace, bc.srcCluster.ClusterName) + return false, nil + } + desCount, err := bc.desCluster.QueryCount() + if err != nil { + glog.Infof("failed to query count from dest cluster: %s/%s", + bc.desCluster.Namespace, bc.desCluster.ClusterName) + return false, nil + } + + if srcCount != desCount { + return false, fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount) + } + + return true, nil + } + + return wait.Poll(tests.DefaultPollInterval, tests.DefaultPollTimeout, fn) +} diff --git a/tests/cmd/stability/main.go b/tests/cmd/stability/main.go index 6e196634331..ab43aa1d826 100644 --- a/tests/cmd/stability/main.go +++ b/tests/cmd/stability/main.go @@ -32,12 +32,12 @@ func main() { logs.InitLogs() defer logs.FlushLogs() go func() { - glog.Info(http.ListenAndServe("localhost:6060", nil)) + glog.Info(http.ListenAndServe(":6060", nil)) }() conf := tests.ParseConfigOrDie() cli, kubeCli := client.NewCliOrDie() - oa := tests.NewOperatorActions(cli, kubeCli, tests.DefaultPollTimeout, conf) + oa := tests.NewOperatorActions(cli, kubeCli, tests.DefaultPollInterval, conf) fta := tests.NewFaultTriggerAction(cli, kubeCli, conf) fta.CheckAndRecoverEnvOrDie() diff --git a/tests/failover.go b/tests/failover.go index 5939830605d..8915d18519a 100644 --- a/tests/failover.go +++ b/tests/failover.go @@ -13,7 +13,6 @@ import ( "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/tests/pkg/client" "github.com/pingcap/tidb-operator/tests/pkg/ops" - "github.com/pingcap/tidb-operator/tests/pkg/util" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -52,7 +51,7 @@ func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterCon return err } maxStoreDownTime := pdCfg.Schedule.MaxStoreDownTime.Duration - glog.Infof("failover config: maxStoreDownTime=%v tikvFailoverPeriod=%v", maxStoreDownTime, tikvFailoverPeriod) + glog.Infof("truncate sst file failover config: maxStoreDownTime=%v tikvFailoverPeriod=%v", maxStoreDownTime, tikvFailoverPeriod) // find an up store var store v1alpha1.TiKVStore @@ -67,53 +66,16 @@ func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterCon glog.Errorf("failed to find an up store") return errors.New("no up store for truncating sst file") } else { - glog.Infof("target store: id=%s pod=%s", store.ID, store.PodName) + glog.Infof("truncate sst file target store: id=%s pod=%s", store.ID, store.PodName) } - // checkout pod status - podBeforeRestart, err := cli.CoreV1().Pods(info.Namespace).Get(store.PodName, metav1.GetOptions{}) + glog.Infof("deleting pod: [%s/%s] and wait 1 minute for the pod to terminate", info.Namespace, store.PodName) + err = cli.CoreV1().Pods(info.Namespace).Delete(store.PodName, nil) if err != nil { - glog.Errorf("failed to get target pod: pod=%s err=%s", store.PodName, err.Error()) return err } - var rc int32 - if c := util.GetContainerStatusFromPod(podBeforeRestart, func(status corev1.ContainerStatus) bool { - return status.Name == "tikv" - }); c != nil { - rc = c.RestartCount - } else { - glog.Errorf("failed to get container status from tikv pod") - return errors.New("failed to get container status from tikv pod") - } - - oa.emitEvent(info, fmt.Sprintf("TruncateSSTFile: tikv: %s", store.PodName)) - // restart tikv to ensure sst files - err = tikvOps.KillProcess(info.Namespace, store.PodName, "tikv", "tikv-server") - if err != nil { - glog.Errorf("kill tikv: pod=%s err=%s", store.PodName, err.Error()) - return err - } - - err = tikvOps.PollPod(info.Namespace, store.PodName, - func(pod *corev1.Pod, err error) (bool, error) { - if pod == nil { - glog.Warningf("pod is nil: err=%s", err.Error()) - return false, nil - } - tikv := util.GetContainerStatusFromPod(pod, func(status corev1.ContainerStatus) bool { - return status.Name == "tikv" - }) - - if pod.Status.Phase == corev1.PodRunning && tikv != nil && tikv.RestartCount > rc { - return true, nil - } - return false, nil - }) - if err != nil { - glog.Errorf("tikv process hasn't been restarted: err=%s", err.Error()) - return err - } + time.Sleep(1 * time.Minute) // truncate the sst file and wait for failover err = tikvOps.TruncateSSTFile(ops.TruncateOptions{ @@ -121,15 +83,16 @@ func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterCon Cluster: info.ClusterName, Store: store.ID, }) + if err != nil { + return err + } + oa.emitEvent(info, fmt.Sprintf("TruncateSSTFile: tikv: %s/%s", info.Namespace, store.PodName)) - // make tikv crash - //err = tikvOps.KillProcess(info.Namespace, store.PodName, "tikv", "tikv-server") - //if err != nil { - // glog.Errorf("cluster: [%s/%s] kill tikv: pod=%s err=%s", - // info.Namespace, info.ClusterName, - // store.PodName, err.Error()) - // return err - //} + glog.Infof("deleting pod: [%s/%s] again", info.Namespace, store.PodName) + err = cli.CoreV1().Pods(info.Namespace).Delete(store.PodName, nil) + if err != nil { + return err + } tikvOps.SetPoll(DefaultPollInterval, maxStoreDownTime+tikvFailoverPeriod+failoverTimeout) diff --git a/tests/pkg/blockwriter/blockwriter.go b/tests/pkg/blockwriter/blockwriter.go index 978c391a23b..265823496c9 100644 --- a/tests/pkg/blockwriter/blockwriter.go +++ b/tests/pkg/blockwriter/blockwriter.go @@ -29,7 +29,7 @@ import ( ) const ( - queryChanSize int = 10000 + queryChanSize int = 100 ) // BlockWriterCase is for concurrent writing blocks. diff --git a/tests/pkg/metrics/annotation_util.go b/tests/pkg/metrics/annotation_util.go index d7ea217beb5..bcd7c59f8e2 100644 --- a/tests/pkg/metrics/annotation_util.go +++ b/tests/pkg/metrics/annotation_util.go @@ -17,8 +17,6 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "io/ioutil" "net" "net/http" @@ -26,6 +24,9 @@ import ( "os" "path" "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" ) //Client request grafana API on a set of resource paths. @@ -133,11 +134,11 @@ func (cli *Client) AddAnnotation(annotation Annotation) error { if resp.StatusCode != http.StatusOK { return fmt.Errorf("add annotation faield, statusCode=%v", resp.Status) } - all, err := ioutil.ReadAll(resp.Body) + _, err = ioutil.ReadAll(resp.Body) if err != nil { return err } - fmt.Println(all) + // fmt.Println(all) return nil }