From 863aad25e4ba640a9d6a92e41be122fc5e156b83 Mon Sep 17 00:00:00 2001 From: weekface Date: Thu, 13 Jun 2019 17:43:28 +0800 Subject: [PATCH] add operator upgrade case --- tests/actions.go | 22 ++ tests/cmd/stability/main.go | 433 ++++++++++++------------------- tests/cmd/stability/stability.go | 74 ++++++ tests/config.go | 24 +- tests/failover.go | 28 +- tests/pkg/ops/tikv.go | 29 +++ 6 files changed, 335 insertions(+), 275 deletions(-) create mode 100644 tests/cmd/stability/stability.go diff --git a/tests/actions.go b/tests/actions.go index 5e92ca8ffe5..27c35ea48a7 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -113,6 +113,7 @@ type OperatorActions interface { CleanOperator(info *OperatorConfig) error CleanOperatorOrDie(info *OperatorConfig) UpgradeOperator(info *OperatorConfig) error + UpgradeOperatorOrDie(info *OperatorConfig) DumpAllLogs(info *OperatorConfig, clusterInfos []*TidbClusterConfig) error DeployTidbCluster(info *TidbClusterConfig) error DeployTidbClusterOrDie(info *TidbClusterConfig) @@ -160,6 +161,7 @@ type OperatorActions interface { RegisterWebHookAndService(context *apimachinery.CertContext, info *OperatorConfig) error RegisterWebHookAndServiceOrDie(context *apimachinery.CertContext, info *OperatorConfig) CleanWebHookAndService(info *OperatorConfig) error + CleanWebHookAndServiceOrDie(info *OperatorConfig) EventWorker() EmitEvent(info *TidbClusterConfig, msg string) BackupRestore(from, to *TidbClusterConfig) error @@ -463,7 +465,20 @@ func (oa *operatorActions) UpgradeOperator(info *OperatorConfig) error { return nil } +func (oa *operatorActions) UpgradeOperatorOrDie(info *OperatorConfig) { + if err := oa.UpgradeOperator(info); err != nil { + slack.NotifyAndPanic(err) + } +} + func (oa *operatorActions) DeployTidbCluster(info *TidbClusterConfig) error { + ns := info.Namespace + tcName := info.ClusterName + if _, err := oa.cli.PingcapV1alpha1().TidbClusters(ns).Get(tcName, metav1.GetOptions{}); err == nil { + // already deployed + return nil + } + glog.Infof("deploying tidb cluster [%s/%s]", info.Namespace, info.ClusterName) oa.EmitEvent(info, "DeployTidbCluster") @@ -2230,6 +2245,13 @@ func (oa *operatorActions) CleanWebHookAndService(info *OperatorConfig) error { return nil } +func (oa *operatorActions) CleanWebHookAndServiceOrDie(info *OperatorConfig) { + err := oa.CleanWebHookAndService(info) + if err != nil { + slack.NotifyAndPanic(err) + } +} + type pumpStatus struct { StatusMap map[string]*nodeStatus } diff --git a/tests/cmd/stability/main.go b/tests/cmd/stability/main.go index 5e1be7042a0..c1483794d0e 100644 --- a/tests/cmd/stability/main.go +++ b/tests/cmd/stability/main.go @@ -21,13 +21,9 @@ import ( "strconv" "time" - "github.com/pingcap/tidb-operator/tests/pkg/apimachinery" - - "k8s.io/api/core/v1" - "github.com/golang/glog" - "github.com/jinzhu/copier" "github.com/pingcap/tidb-operator/tests" + "github.com/pingcap/tidb-operator/tests/pkg/apimachinery" "github.com/pingcap/tidb-operator/tests/pkg/client" "github.com/pingcap/tidb-operator/tests/slack" "github.com/robfig/cron" @@ -38,6 +34,7 @@ import ( var successCount int var cfg *tests.Config var context *apimachinery.CertContext +var upgradeVersions []string func main() { logs.InitLogs() @@ -46,6 +43,7 @@ func main() { glog.Info(http.ListenAndServe(":6060", nil)) }() cfg = tests.ParseConfigOrDie() + upgradeVersions = cfg.GetUpgradeTidbVersionsOrDie() ns := os.Getenv("NAMESPACE") var err error @@ -67,281 +65,188 @@ func main() { func run() { cli, kubeCli := client.NewCliOrDie() - tidbVersion := cfg.GetTiDBVersionOrDie() - upgardeTiDBVersions := cfg.GetUpgradeTidbVersionsOrDie() - - operatorCfg := &tests.OperatorConfig{ - Namespace: "pingcap", - ReleaseName: "operator", - Image: cfg.OperatorImage, - Tag: cfg.OperatorTag, - SchedulerImage: "gcr.io/google-containers/hyperkube", - SchedulerFeatures: []string{ - "StableScheduling", - }, - LogLevel: "2", - WebhookServiceName: tests.WebhookServiceName, - WebhookSecretName: "webhook-secret", - WebhookConfigName: "webhook-config", - ImagePullPolicy: v1.PullAlways, - } - clusterName1 := "stability-cluster1" - clusterName2 := "stability-cluster2" - cluster1 := &tests.TidbClusterConfig{ - Namespace: clusterName1, - ClusterName: clusterName1, - OperatorTag: cfg.OperatorTag, - PDImage: fmt.Sprintf("pingcap/pd:%s", tidbVersion), - TiKVImage: fmt.Sprintf("pingcap/tikv:%s", tidbVersion), - TiDBImage: fmt.Sprintf("pingcap/tidb:%s", tidbVersion), - StorageClassName: "local-storage", - Password: "admin", - UserName: "root", - InitSecretName: fmt.Sprintf("%s-set-secret", clusterName1), - BackupSecretName: fmt.Sprintf("%s-backup-secret", clusterName1), - BackupName: "backup", - Resources: map[string]string{ - "pd.resources.limits.cpu": "1000m", - "pd.resources.limits.memory": "2Gi", - "pd.resources.requests.cpu": "200m", - "pd.resources.requests.memory": "1Gi", - "tikv.resources.limits.cpu": "8000m", - "tikv.resources.limits.memory": "16Gi", - "tikv.resources.requests.cpu": "1000m", - "tikv.resources.requests.memory": "2Gi", - "tidb.resources.limits.cpu": "8000m", - "tidb.resources.limits.memory": "8Gi", - "tidb.resources.requests.cpu": "500m", - "tidb.resources.requests.memory": "1Gi", - "monitor.persistent": "true", - "discovery.image": cfg.OperatorImage, - "tikv.defaultcfBlockCacheSize": "8GB", - "tikv.writecfBlockCacheSize": "2GB", - }, - Args: map[string]string{ - "binlog.drainer.workerCount": "1024", - "binlog.drainer.txnBatch": "512", - }, - Monitor: true, - BlockWriteConfig: cfg.BlockWriter, - PDMaxReplicas: 3, - TiKVGrpcConcurrency: 4, - TiDBTokenLimit: 1000, - PDLogLevel: "info", - EnableConfigMapRollout: true, - } - cluster1.SubValues = tests.GetAffinityConfigOrDie(cluster1.ClusterName, cluster1.Namespace) - - cluster2 := &tests.TidbClusterConfig{ - Namespace: clusterName2, - ClusterName: clusterName2, - OperatorTag: cfg.OperatorTag, - PDImage: fmt.Sprintf("pingcap/pd:%s", tidbVersion), - TiKVImage: fmt.Sprintf("pingcap/tikv:%s", tidbVersion), - TiDBImage: fmt.Sprintf("pingcap/tidb:%s", tidbVersion), - StorageClassName: "local-storage", - Password: "admin", - UserName: "root", - InitSecretName: fmt.Sprintf("%s-set-secret", clusterName2), - BackupSecretName: fmt.Sprintf("%s-backup-secret", clusterName2), - BackupName: "backup", - Resources: map[string]string{ - "pd.resources.limits.cpu": "1000m", - "pd.resources.limits.memory": "2Gi", - "pd.resources.requests.cpu": "200m", - "pd.resources.requests.memory": "1Gi", - "tikv.resources.limits.cpu": "8000m", - "tikv.resources.limits.memory": "8Gi", - "tikv.resources.requests.cpu": "1000m", - "tikv.resources.requests.memory": "2Gi", - "tidb.resources.limits.cpu": "8000m", - "tidb.resources.limits.memory": "8Gi", - "tidb.resources.requests.cpu": "500m", - "tidb.resources.requests.memory": "1Gi", - // TODO assert the the monitor's pvc exist and clean it when bootstrapping - "monitor.persistent": "true", - "discovery.image": cfg.OperatorImage, - }, - Args: map[string]string{}, - Monitor: true, - BlockWriteConfig: cfg.BlockWriter, - PDMaxReplicas: 3, - TiKVGrpcConcurrency: 4, - TiDBTokenLimit: 1000, - PDLogLevel: "info", - EnableConfigMapRollout: false, - } - cluster2.SubValues = tests.GetAffinityConfigOrDie(cluster2.ClusterName, cluster2.Namespace) + ocfg := newOperatorConfig() - // cluster backup and restore - clusterBackupFrom := cluster1 - clusterRestoreTo := &tests.TidbClusterConfig{} - copier.Copy(clusterRestoreTo, clusterBackupFrom) - clusterRestoreTo.ClusterName = "cluster-restore" - clusterRestoreTo.SubValues = tests.GetAffinityConfigOrDie(clusterRestoreTo.ClusterName, clusterRestoreTo.Namespace) + cluster1 := newTidbClusterConfig("ns1", "cluster1") + cluster2 := newTidbClusterConfig("ns2", "cluster2") + cluster3 := newTidbClusterConfig("ns1", "cluster3") - onePDCluster := &tests.TidbClusterConfig{} - copier.Copy(onePDCluster, cluster1) - onePDCluster.ClusterName = "pd-replicas-1" - onePDCluster.Namespace = "pd-replicas-1" - onePDCluster.Resources["pd.replicas"] = "1" + restoreCluster1 := newTidbClusterConfig("ns1", "restore1") + restoreCluster2 := newTidbClusterConfig("ns2", "restore2") - allClusters := []*tests.TidbClusterConfig{cluster1, cluster2, clusterRestoreTo} + onePDCluster1 := newTidbClusterConfig("ns1", "one-pd-cluster-1") + onePDCluster2 := newTidbClusterConfig("ns2", "one-pd-cluster-2") + onePDCluster1.Resources["pd.replicas"] = "1" + onePDCluster2.Resources["pd.replicas"] = "1" + + allClusters := []*tests.TidbClusterConfig{ + cluster1, + cluster2, + cluster3, + restoreCluster1, + restoreCluster2, + onePDCluster1, + onePDCluster2, + } + deployedClusers := make([]*tests.TidbClusterConfig, 0) fta := tests.NewFaultTriggerAction(cli, kubeCli, cfg) + oa := tests.NewOperatorActions(cli, kubeCli, tests.DefaultPollInterval, cfg, allClusters) + oa.LabelNodesOrDie() - fta.CheckAndRecoverEnvOrDie() - oa.CheckK8sAvailableOrDie(nil, nil) go wait.Forever(oa.EventWorker, 10*time.Second) - oa.LabelNodesOrDie() - - // clean and deploy operator - oa.CleanOperatorOrDie(operatorCfg) - oa.DeployOperatorOrDie(operatorCfg) + oa.CleanOperatorOrDie(ocfg) + oa.DeployOperatorOrDie(ocfg) - // clean all clusters for _, cluster := range allClusters { oa.CleanTidbClusterOrDie(cluster) } - oa.CleanTidbClusterOrDie(onePDCluster) - - // deploy and check cluster1, cluster2 - oa.DeployTidbClusterOrDie(cluster1) - oa.DeployTidbClusterOrDie(cluster2) - oa.DeployTidbClusterOrDie(onePDCluster) - oa.CheckTidbClusterStatusOrDie(cluster1) - oa.CheckTidbClusterStatusOrDie(cluster2) - oa.CheckTidbClusterStatusOrDie(onePDCluster) - - oa.CleanTidbClusterOrDie(onePDCluster) - - // check disaster tolerance - oa.CheckDisasterToleranceOrDie(cluster1) - oa.CheckDisasterToleranceOrDie(cluster2) - - go oa.BeginInsertDataToOrDie(cluster1) - go oa.BeginInsertDataToOrDie(cluster2) - defer oa.StopInsertDataTo(cluster1) - defer oa.StopInsertDataTo(cluster2) - - // scale out cluster1 and cluster2 - cluster1.ScaleTiDB(3).ScaleTiKV(5).ScalePD(5) - oa.ScaleTidbClusterOrDie(cluster1) - cluster2.ScaleTiDB(3).ScaleTiKV(5).ScalePD(5) - oa.ScaleTidbClusterOrDie(cluster2) - oa.CheckTidbClusterStatusOrDie(cluster1) - oa.CheckTidbClusterStatusOrDie(cluster2) - - // scale in cluster1 and cluster2 - cluster1.ScaleTiDB(2).ScaleTiKV(3).ScalePD(3) - oa.ScaleTidbClusterOrDie(cluster1) - cluster2.ScaleTiDB(2).ScaleTiKV(3).ScalePD(3) - oa.ScaleTidbClusterOrDie(cluster2) - oa.CheckTidbClusterStatusOrDie(cluster1) - oa.CheckTidbClusterStatusOrDie(cluster2) - - // before upgrade cluster, register webhook first - oa.RegisterWebHookAndServiceOrDie(context, operatorCfg) - - // upgrade cluster1 and cluster2 - firstUpgradeVersion := upgardeTiDBVersions[0] - assignedNodes1 := oa.GetTidbMemberAssignedNodesOrDie(cluster1) - assignedNodes2 := oa.GetTidbMemberAssignedNodesOrDie(cluster2) - cluster1.UpgradeAll(firstUpgradeVersion) - cluster2.UpgradeAll(firstUpgradeVersion) - oa.UpgradeTidbClusterOrDie(cluster1) - oa.UpgradeTidbClusterOrDie(cluster2) - - // check pause upgrade feature in cluster2 - oa.CheckManualPauseTiDBOrDie(cluster2) - - oa.CheckTidbClusterStatusOrDie(cluster1) - oa.CheckTidbClusterStatusOrDie(cluster2) - - oa.CheckTidbMemberAssignedNodesOrDie(cluster1, assignedNodes1) - oa.CheckTidbMemberAssignedNodesOrDie(cluster2, assignedNodes2) - - // after upgrade cluster, clean webhook - oa.CleanWebHookAndService(operatorCfg) - - // cluster1: bad configuration change case - cluster1.TiDBPreStartScript = strconv.Quote("exit 1") - oa.UpgradeTidbClusterOrDie(cluster1) - cluster1.TiKVPreStartScript = strconv.Quote("exit 1") - oa.UpgradeTidbClusterOrDie(cluster1) - cluster1.PDPreStartScript = strconv.Quote("exit 1") - oa.UpgradeTidbClusterOrDie(cluster1) - - time.Sleep(30 * time.Second) - oa.CheckTidbClustersAvailableOrDie([]*tests.TidbClusterConfig{cluster1}) - - // rollback cluster1 - cluster1.PDPreStartScript = strconv.Quote("") - cluster1.TiKVPreStartScript = strconv.Quote("") - cluster1.TiDBPreStartScript = strconv.Quote("") - oa.UpgradeTidbClusterOrDie(cluster1) - oa.CheckTidbClusterStatusOrDie(cluster1) - - // cluster2: enable and normal configuration change case - cluster2.EnableConfigMapRollout = true - oa.UpgradeTidbClusterOrDie(cluster2) - oa.CheckTidbClusterStatusOrDie(cluster2) - cluster2.UpdatePdMaxReplicas(cfg.PDMaxReplicas). - UpdateTiKVGrpcConcurrency(cfg.TiKVGrpcConcurrency). - UpdateTiDBTokenLimit(cfg.TiDBTokenLimit) - oa.UpgradeTidbClusterOrDie(cluster2) - oa.CheckTidbClusterStatusOrDie(cluster2) - - // after upgrade cluster, clean webhook - oa.CleanWebHookAndService(operatorCfg) - - // check data regions disaster tolerance - oa.CheckDataRegionDisasterToleranceOrDie(cluster1) - oa.CheckDataRegionDisasterToleranceOrDie(cluster2) - - // deploy and check cluster restore - oa.DeployTidbClusterOrDie(clusterRestoreTo) - oa.CheckTidbClusterStatusOrDie(clusterRestoreTo) - - // backup and restore - oa.BackupRestoreOrDie(clusterBackupFrom, clusterRestoreTo) - - oa.CleanOperatorOrDie(operatorCfg) - oa.CheckOperatorDownOrDie(allClusters) - oa.DeployOperatorOrDie(operatorCfg) - - // stop a node and failover automatically - physicalNode, node, faultTime := fta.StopNodeOrDie() - oa.EmitEvent(nil, fmt.Sprintf("StopNode: %s on %s", node, physicalNode)) - oa.CheckFailoverPendingOrDie(allClusters, node, &faultTime) - oa.CheckFailoverOrDie(allClusters, node) - time.Sleep(3 * time.Minute) - fta.StartNodeOrDie(physicalNode, node) - oa.EmitEvent(nil, fmt.Sprintf("StartNode: %s on %s", node, physicalNode)) - oa.CheckRecoverOrDie(allClusters) - for _, cluster := range allClusters { - oa.CheckTidbClusterStatusOrDie(cluster) - } - // truncate a sst file and check failover - oa.TruncateSSTFileThenCheckFailoverOrDie(cluster1, 5*time.Minute) - - // stop one etcd node and k8s/operator/tidbcluster is available - faultEtcd := tests.SelectNode(cfg.ETCDs) - fta.StopETCDOrDie(faultEtcd) - defer fta.StartETCDOrDie(faultEtcd) - // TODO make the pause interval as a argument - time.Sleep(3 * time.Minute) - oa.CheckOneEtcdDownOrDie(operatorCfg, allClusters, faultEtcd) - fta.StartETCDOrDie(faultEtcd) + caseFn := func(clusters []*tests.TidbClusterConfig, restoreCluster *tests.TidbClusterConfig, upgradeVersion string) { + // check env + fta.CheckAndRecoverEnvOrDie() + oa.CheckK8sAvailableOrDie(nil, nil) + + // deploy + for _, cluster := range clusters { + oa.DeployTidbClusterOrDie(cluster) + deployedClusers = append(deployedClusers, cluster) + } + for _, cluster := range clusters { + oa.CheckTidbClusterStatusOrDie(cluster) + oa.CheckDisasterToleranceOrDie(cluster) + go oa.BeginInsertDataToOrDie(cluster) + } + + // scale out + for _, cluster := range clusters { + cluster.ScaleTiDB(3).ScaleTiKV(5).ScalePD(5) + oa.ScaleTidbClusterOrDie(cluster) + } + for _, cluster := range clusters { + oa.CheckTidbClusterStatusOrDie(cluster) + oa.CheckDisasterToleranceOrDie(cluster) + } + + // scale in + for _, cluster := range clusters { + cluster.ScaleTiDB(2).ScaleTiKV(3).ScalePD(3) + oa.ScaleTidbClusterOrDie(cluster) + } + for _, cluster := range clusters { + oa.CheckTidbClusterStatusOrDie(cluster) + oa.CheckDisasterToleranceOrDie(cluster) + } + + // upgrade + oa.RegisterWebHookAndServiceOrDie(context, ocfg) + for idx, cluster := range clusters { + assignedNodes := oa.GetTidbMemberAssignedNodesOrDie(cluster) + cluster.UpgradeAll(upgradeVersion) + oa.UpgradeTidbClusterOrDie(cluster) + if idx == 0 { + oa.CheckManualPauseTiDBOrDie(cluster) + } + oa.CheckTidbClusterStatusOrDie(cluster) + oa.CheckTidbMemberAssignedNodesOrDie(cluster, assignedNodes) + } + + // configuration change + for _, cluster := range clusters { + cluster.EnableConfigMapRollout = true + + // bad conf + cluster.TiDBPreStartScript = strconv.Quote("exit 1") + cluster.TiKVPreStartScript = strconv.Quote("exit 1") + cluster.PDPreStartScript = strconv.Quote("exit 1") + oa.UpgradeTidbClusterOrDie(cluster) + time.Sleep(30 * time.Second) + oa.CheckTidbClustersAvailableOrDie([]*tests.TidbClusterConfig{cluster}) + // rollback conf + cluster.PDPreStartScript = strconv.Quote("") + cluster.TiKVPreStartScript = strconv.Quote("") + cluster.TiDBPreStartScript = strconv.Quote("") + oa.UpgradeTidbClusterOrDie(cluster) + oa.CheckTidbClusterStatusOrDie(cluster) + + cluster.UpdatePdMaxReplicas(cfg.PDMaxReplicas). + UpdateTiKVGrpcConcurrency(cfg.TiKVGrpcConcurrency). + UpdateTiDBTokenLimit(cfg.TiDBTokenLimit) + oa.UpgradeTidbClusterOrDie(cluster) + oa.CheckTidbClusterStatusOrDie(cluster) + } + oa.CleanWebHookAndServiceOrDie(ocfg) + + for _, cluster := range clusters { + oa.CheckDataRegionDisasterToleranceOrDie(cluster) + } + + // backup and restore + oa.DeployTidbClusterOrDie(restoreCluster) + deployedClusers = append(deployedClusers, restoreCluster) + oa.CheckTidbClusterStatusOrDie(restoreCluster) + oa.BackupRestoreOrDie(clusters[0], restoreCluster) + + // delete operator + oa.CleanOperatorOrDie(ocfg) + oa.CheckOperatorDownOrDie(deployedClusers) + oa.DeployOperatorOrDie(ocfg) + + // stop node + physicalNode, node, faultTime := fta.StopNodeOrDie() + oa.EmitEvent(nil, fmt.Sprintf("StopNode: %s on %s", node, physicalNode)) + oa.CheckFailoverPendingOrDie(deployedClusers, node, &faultTime) + oa.CheckFailoverOrDie(deployedClusers, node) + time.Sleep(3 * time.Minute) + fta.StartNodeOrDie(physicalNode, node) + oa.EmitEvent(nil, fmt.Sprintf("StartNode: %s on %s", node, physicalNode)) + oa.CheckRecoverOrDie(deployedClusers) + for _, cluster := range deployedClusers { + oa.CheckTidbClusterStatusOrDie(cluster) + } + + // truncate tikv sst file + oa.TruncateSSTFileThenCheckFailoverOrDie(clusters[0], 5*time.Minute) + + // stop etcd + faultEtcd := tests.SelectNode(cfg.ETCDs) + fta.StopETCDOrDie(faultEtcd) + defer fta.StartETCDOrDie(faultEtcd) + time.Sleep(3 * time.Minute) + oa.CheckOneEtcdDownOrDie(ocfg, deployedClusers, faultEtcd) + fta.StartETCDOrDie(faultEtcd) + } - //clean temp dirs when stability success - err := cfg.CleanTempDirs() - if err != nil { - glog.Errorf("failed to clean temp dirs, this error can be ignored.") + // before operator upgrade + preUpgrade := []*tests.TidbClusterConfig{ + cluster1, + cluster2, + onePDCluster1, + } + caseFn(preUpgrade, restoreCluster1, upgradeVersions[0]) + + // after operator upgrade + if cfg.UpgradeOperatorImage != "" && cfg.UpgradeOperatorTag != "" { + ocfg.Image = cfg.UpgradeOperatorImage + ocfg.Tag = cfg.UpgradeOperatorTag + oa.UpgradeOperatorOrDie(ocfg) + time.Sleep(5 * time.Minute) + postUpgrade := []*tests.TidbClusterConfig{ + cluster3, + onePDCluster2, + cluster1, + cluster2, + onePDCluster1, + } + var v string + if len(upgradeVersions) == 2 { + v = upgradeVersions[1] + } + // caseFn(postUpgrade, restoreCluster2, tidbUpgradeVersion) + caseFn(postUpgrade, restoreCluster2, v) } successCount++ diff --git a/tests/cmd/stability/stability.go b/tests/cmd/stability/stability.go new file mode 100644 index 00000000000..27cf1f58938 --- /dev/null +++ b/tests/cmd/stability/stability.go @@ -0,0 +1,74 @@ +package main + +import ( + "fmt" + + "github.com/pingcap/tidb-operator/tests" + "k8s.io/api/core/v1" +) + +func newOperatorConfig() *tests.OperatorConfig { + return &tests.OperatorConfig{ + Namespace: "pingcap", + ReleaseName: "operator", + Image: cfg.OperatorImage, + Tag: cfg.OperatorTag, + SchedulerImage: "gcr.io/google-containers/hyperkube", + SchedulerFeatures: []string{ + "StableScheduling", + }, + LogLevel: "2", + WebhookServiceName: tests.WebhookServiceName, + WebhookSecretName: "webhook-secret", + WebhookConfigName: "webhook-config", + ImagePullPolicy: v1.PullAlways, + } +} + +func newTidbClusterConfig(ns, clusterName string) *tests.TidbClusterConfig { + tidbVersion := cfg.GetTiDBVersionOrDie() + + return &tests.TidbClusterConfig{ + Namespace: ns, + ClusterName: clusterName, + OperatorTag: cfg.OperatorTag, + PDImage: fmt.Sprintf("pingcap/pd:%s", tidbVersion), + TiKVImage: fmt.Sprintf("pingcap/tikv:%s", tidbVersion), + TiDBImage: fmt.Sprintf("pingcap/tidb:%s", tidbVersion), + StorageClassName: "local-storage", + UserName: "root", + Password: "admin", + InitSecretName: fmt.Sprintf("%s-set-secret", clusterName), + BackupSecretName: fmt.Sprintf("%s-backup-secret", clusterName), + BackupName: "backup", + Resources: map[string]string{ + "pd.resources.limits.cpu": "1000m", + "pd.resources.limits.memory": "2Gi", + "pd.resources.requests.cpu": "200m", + "pd.resources.requests.memory": "1Gi", + "tikv.resources.limits.cpu": "8000m", + "tikv.resources.limits.memory": "16Gi", + "tikv.resources.requests.cpu": "1000m", + "tikv.resources.requests.memory": "2Gi", + "tidb.resources.limits.cpu": "8000m", + "tidb.resources.limits.memory": "8Gi", + "tidb.resources.requests.cpu": "500m", + "tidb.resources.requests.memory": "1Gi", + "monitor.persistent": "true", + "discovery.image": cfg.OperatorImage, + "tikv.defaultcfBlockCacheSize": "8GB", + "tikv.writecfBlockCacheSize": "2GB", + }, + Args: map[string]string{ + "binlog.drainer.workerCount": "1024", + "binlog.drainer.txnBatch": "512", + }, + Monitor: true, + BlockWriteConfig: cfg.BlockWriter, + PDMaxReplicas: 3, + TiKVGrpcConcurrency: 4, + TiDBTokenLimit: 1000, + PDLogLevel: "info", + SubValues: tests.GetAffinityConfigOrDie(clusterName, ns), + } +} diff --git a/tests/config.go b/tests/config.go index 83cf9850671..272ef37c3b6 100644 --- a/tests/config.go +++ b/tests/config.go @@ -26,16 +26,18 @@ const ( type Config struct { configFile string - TidbVersions string `yaml:"tidb_versions" json:"tidb_versions"` - OperatorTag string `yaml:"operator_tag" json:"operator_tag"` - OperatorImage string `yaml:"operator_image" json:"operator_image"` - LogDir string `yaml:"log_dir" json:"log_dir"` - FaultTriggerPort int `yaml:"fault_trigger_port" json:"fault_trigger_port"` - Nodes []Nodes `yaml:"nodes" json:"nodes"` - ETCDs []Nodes `yaml:"etcds" json:"etcds"` - APIServers []Nodes `yaml:"apiservers" json:"apiservers"` - CertFile string - KeyFile string + TidbVersions string `yaml:"tidb_versions" json:"tidb_versions"` + OperatorTag string `yaml:"operator_tag" json:"operator_tag"` + OperatorImage string `yaml:"operator_image" json:"operator_image"` + UpgradeOperatorTag string `yaml:"upgrade_operator_tag" json:"upgrade_operator_tag"` + UpgradeOperatorImage string `yaml:"upgrade_operator_image" json:"upgrade_operator_image"` + LogDir string `yaml:"log_dir" json:"log_dir"` + FaultTriggerPort int `yaml:"fault_trigger_port" json:"fault_trigger_port"` + Nodes []Nodes `yaml:"nodes" json:"nodes"` + ETCDs []Nodes `yaml:"etcds" json:"etcds"` + APIServers []Nodes `yaml:"apiservers" json:"apiservers"` + CertFile string + KeyFile string PDMaxReplicas int `yaml:"pd_max_replicas" json:"pd_max_replicas"` TiKVGrpcConcurrency int `yaml:"tikv_grpc_concurrency" json:"tikv_grpc_concurrency"` @@ -81,6 +83,8 @@ func NewConfig() (*Config, error) { flag.StringVar(&cfg.TidbVersions, "tidb-versions", "v3.0.0-beta.1,v3.0.0-rc.1", "tidb versions") flag.StringVar(&cfg.OperatorTag, "operator-tag", "master", "operator tag used to choose charts") flag.StringVar(&cfg.OperatorImage, "operator-image", "pingcap/tidb-operator:latest", "operator image") + flag.StringVar(&cfg.UpgradeOperatorTag, "upgrade-operator-tag", "", "upgrade operator tag used to choose charts") + flag.StringVar(&cfg.UpgradeOperatorImage, "upgrade-operator-image", "", "upgrade operator image") flag.StringVar(&cfg.OperatorRepoDir, "operator-repo-dir", "/tidb-operator", "local directory to which tidb-operator cloned") flag.StringVar(&cfg.ChartDir, "chart-dir", "", "chart dir") flag.StringVar(&slack.WebhookURL, "slack-webhook-url", "", "slack webhook url") diff --git a/tests/failover.go b/tests/failover.go index 84fa3e66a5f..7921b74e57d 100644 --- a/tests/failover.go +++ b/tests/failover.go @@ -47,11 +47,13 @@ func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterCon // find an up store var store v1alpha1.TiKVStore + var podName string for _, v := range tc.Status.TiKV.Stores { if v.State != v1alpha1.TiKVStateUp { continue } store = v + podName = v.PodName break } if len(store.ID) == 0 { @@ -96,7 +98,7 @@ func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterCon tikvOps.SetPoll(DefaultPollInterval, maxStoreDownTime+tikvFailoverPeriod+failoverTimeout) - return tikvOps.PollTiDBCluster(info.Namespace, info.ClusterName, + err = tikvOps.PollTiDBCluster(info.Namespace, info.ClusterName, func(tc *v1alpha1.TidbCluster, err error) (bool, error) { _, ok := tc.Status.TiKV.FailureStores[store.ID] glog.Infof("cluster: [%s/%s] check if target store failed: %t", @@ -106,6 +108,30 @@ func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterCon } return true, nil }) + if err != nil { + glog.Errorf("failed to check truncate sst file: %v", err) + return err + } + + if err := wait.Poll(1*time.Minute, 30*time.Minute, func() (bool, error) { + if err := tikvOps.RecoverSSTFile(info.Namespace, podName); err != nil { + glog.Errorf("failed to recovery sst file %s/%s", info.Namespace, podName) + return false, nil + } + + return true, nil + }); err != nil { + return err + } + + glog.Infof("deleting pod: [%s/%s] again", info.Namespace, store.PodName) + return wait.Poll(10*time.Second, time.Minute, func() (bool, error) { + err = oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(store.PodName, &metav1.DeleteOptions{}) + if err != nil { + return false, nil + } + return true, nil + }) } func (oa *operatorActions) TruncateSSTFileThenCheckFailoverOrDie(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) { diff --git a/tests/pkg/ops/tikv.go b/tests/pkg/ops/tikv.go index 9e399449c3b..bb72613531f 100644 --- a/tests/pkg/ops/tikv.go +++ b/tests/pkg/ops/tikv.go @@ -15,6 +15,7 @@ package ops import ( "fmt" + "os/exec" "strconv" "strings" "time" @@ -121,3 +122,31 @@ func (ops *TiKVOps) TruncateSSTFile(opts TruncateOptions) error { return nil } + +func (ops *TiKVOps) RecoverSSTFile(ns, podName string) error { + annotateCmd := fmt.Sprintf("kubectl annotate pod %s -n %s runmode=debug", podName, ns) + glog.Info(annotateCmd) + _, err := exec.Command("/bin/sh", "-c", annotateCmd).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to annotation pod: %s/%s", ns, podName) + } + + findCmd := fmt.Sprintf("kubectl exec -it -n %s %s -- find /var/lib/tikv/db -name '*.sst.save'", ns, podName) + glog.Info(findCmd) + findData, err := exec.Command("/bin/sh", "-c", findCmd).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to find .save files: %s/%s", ns, podName) + } + + for _, saveFile := range strings.Split(string(findData), "\n") { + sstFile := strings.TrimSuffix(saveFile, ".save") + mvCmd := fmt.Sprintf("kubectl exec -it -n %s %s -- mv %s %s", ns, podName, saveFile, sstFile) + glog.Info(mvCmd) + _, err := exec.Command("/bin/sh", "-c", mvCmd).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to recovery .sst files: %s/%s, %s, %s", ns, podName, sstFile, saveFile) + } + } + + return nil +}