From 3adc027880755dffd2df9672350c689601555296 Mon Sep 17 00:00:00 2001 From: weekface Date: Mon, 6 May 2019 20:01:18 +0800 Subject: [PATCH] Fix backup data compare logic (#454) * fix data compare logic * add test to binlog backup * move backup to tests --- tests/actions.go | 88 ++++++++++++++------- tests/backup.go | 99 ++++++++++++++++++++++++ tests/backup/backup.go | 1 - tests/backup/backupcase.go | 148 ------------------------------------ tests/cmd/e2e/main.go | 7 +- tests/cmd/stability/main.go | 5 +- 6 files changed, 161 insertions(+), 187 deletions(-) create mode 100644 tests/backup.go delete mode 100644 tests/backup/backup.go delete mode 100644 tests/backup/backupcase.go diff --git a/tests/actions.go b/tests/actions.go index caf320daf6..cff71e241d 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -143,6 +143,8 @@ type OperatorActions interface { RegisterWebHookAndServiceOrDie(info *OperatorConfig) CleanWebHookAndService(info *OperatorConfig) error StartValidatingAdmissionWebhookServerOrDie(info *OperatorConfig) + BackupRestore(from, to *TidbClusterConfig) error + BackupRestoreOrDie(from, to *TidbClusterConfig) } type operatorActions struct { @@ -1537,24 +1539,15 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterConfig, to *TidbCluster return false, nil } - fromCount, err := from.QueryCount() + b, err := to.DataIsTheSameAs(from) if err != nil { - glog.Errorf("cluster [%s] count err ", from.ClusterName) + glog.Error(err) return false, nil } - - toCount, err := to.QueryCount() - if err != nil { - glog.Errorf("cluster [%s] count err ", to.ClusterName) - return false, nil - } - - if fromCount != toCount { - glog.Errorf("cluster [%s] count %d cluster [%s] count %d is not equal ", - from.ClusterName, fromCount, to.ClusterName, toCount) - return false, nil + if b { + return true, nil } - return true, nil + return false, nil } err := wait.Poll(oa.pollInterval, BackupAndRestorePollTimeOut, fn) @@ -1576,29 +1569,65 @@ func (oa *operatorActions) ForceDeploy(info *TidbClusterConfig) error { return nil } -func (info *TidbClusterConfig) QueryCount() (int, error) { - tableName := "test" - db, err := sql.Open("mysql", getDSN(info.Namespace, info.ClusterName, "record", info.Password)) +func (info *TidbClusterConfig) DataIsTheSameAs(otherInfo *TidbClusterConfig) (bool, error) { + tableNum := otherInfo.BlockWriteConfig.TableNum + + infoDb, err := sql.Open("mysql", getDSN(info.Namespace, info.ClusterName, "test", info.Password)) if err != nil { - return 0, err + return false, err } - defer db.Close() - - rows, err := db.Query(fmt.Sprintf("SELECT count(*) FROM %s", tableName)) + defer infoDb.Close() + otherInfoDb, err := sql.Open("mysql", getDSN(otherInfo.Namespace, otherInfo.ClusterName, "test", otherInfo.Password)) if err != nil { - glog.Infof("cluster:[%s], error: %v", info.ClusterName, err) - return 0, err + return false, err + } + defer otherInfoDb.Close() + + getCntFn := func(db *sql.DB, tableName string) (int, error) { + var cnt int + rows, err := db.Query(fmt.Sprintf("SELECT count(*) FROM %s", tableName)) + if err != nil { + return cnt, fmt.Errorf("failed to select count(*) from %s, %v", tableName, err) + } + for rows.Next() { + err := rows.Scan(&cnt) + if err != nil { + return cnt, fmt.Errorf("failed to scan count from %s, %v", tableName, err) + } + return cnt, nil + } + return cnt, fmt.Errorf("can not find count of table %s", tableName) } - for rows.Next() { - var count int - err := rows.Scan(&count) + for i := 0; i < tableNum; i++ { + var tableName string + if i == 0 { + tableName = "block_writer" + } else { + tableName = fmt.Sprintf("block_writer%d", i) + } + + cnt, err := getCntFn(infoDb, tableName) if err != nil { - glog.Infof("cluster:[%s], error :%v", info.ClusterName, err) + return false, err } - return count, nil + otherCnt, err := getCntFn(otherInfoDb, tableName) + if err != nil { + return false, err + } + + if cnt != otherCnt { + err := fmt.Errorf("cluster %s/%s's table %s count(*) = %d and cluster %s/%s's table %s count(*) = %d", + info.Namespace, info.ClusterName, tableName, cnt, + otherInfo.Namespace, otherInfo.ClusterName, tableName, otherCnt) + return false, err + } + glog.Infof("cluster %s/%s's table %s count(*) = %d and cluster %s/%s's table %s count(*) = %d", + info.Namespace, info.ClusterName, tableName, cnt, + otherInfo.Namespace, otherInfo.ClusterName, tableName, otherCnt) } - return 0, fmt.Errorf("can not find count of ") + + return true, nil } func (oa *operatorActions) CreateSecret(info *TidbClusterConfig) error { @@ -1875,6 +1904,7 @@ func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterConfig, to * "binlog.drainer.mysql.user": "root", "binlog.drainer.mysql.password": to.Password, "binlog.drainer.mysql.port": "4000", + "binlog.drainer.ignoreSchemas": "\"INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql\"", } setString := from.TidbClusterHelmSetString(sets) diff --git a/tests/backup.go b/tests/backup.go new file mode 100644 index 0000000000..0d2188d5a8 --- /dev/null +++ b/tests/backup.go @@ -0,0 +1,99 @@ +package tests + +import ( + "time" + + "github.com/golang/glog" + "github.com/pingcap/tidb-operator/tests/slack" + "k8s.io/apimachinery/pkg/util/wait" +) + +func (oa *operatorActions) BackupRestore(from, to *TidbClusterConfig) error { + oa.StopInsertDataTo(from) + + err := oa.DeployAdHocBackup(from) + if err != nil { + glog.Errorf("cluster:[%s] deploy happen error: %v", from.ClusterName, err) + return err + } + + err = oa.CheckAdHocBackup(from) + if err != nil { + glog.Errorf("cluster:[%s] deploy happen error: %v", from.ClusterName, err) + return err + } + + err = oa.CheckTidbClusterStatus(to) + if err != nil { + glog.Errorf("cluster:[%s] deploy faild error: %v", to.ClusterName, err) + return err + } + + err = oa.Restore(from, to) + if err != nil { + glog.Errorf("from cluster:[%s] to cluster [%s] restore happen error: %v", + from.ClusterName, to.ClusterName, err) + return err + } + + err = oa.CheckRestore(from, to) + if err != nil { + glog.Errorf("from cluster:[%s] to cluster [%s] restore failed error: %v", + from.ClusterName, to.ClusterName, err) + return err + } + + go oa.BeginInsertDataToOrDie(from) + err = oa.DeployScheduledBackup(from) + if err != nil { + glog.Errorf("cluster:[%s] scheduler happen error: %v", from.ClusterName, err) + return err + } + + err = oa.CheckScheduledBackup(from) + if err != nil { + glog.Errorf("cluster:[%s] scheduler failed error: %v", from.ClusterName, err) + return err + } + + err = oa.DeployIncrementalBackup(from, to) + if err != nil { + return err + } + + err = oa.CheckIncrementalBackup(from) + if err != nil { + return err + } + + glog.Infof("waiting 1 minutes for binlog to work") + time.Sleep(1 * time.Minute) + + glog.Infof("cluster[%s] begin insert data", from.ClusterName) + go oa.BeginInsertDataTo(from) + + time.Sleep(5 * time.Minute) + + glog.Infof("cluster[%s] stop insert data", from.ClusterName) + oa.StopInsertDataTo(from) + + fn := func() (bool, error) { + b, err := to.DataIsTheSameAs(from) + if err != nil { + glog.Error(err) + return false, nil + } + if b { + return true, nil + } + return false, nil + } + + return wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) +} + +func (oa *operatorActions) BackupRestoreOrDie(from, to *TidbClusterConfig) { + if err := oa.BackupRestore(from, to); err != nil { + slack.NotifyAndPanic(err) + } +} diff --git a/tests/backup/backup.go b/tests/backup/backup.go deleted file mode 100644 index 0d44d47816..0000000000 --- a/tests/backup/backup.go +++ /dev/null @@ -1 +0,0 @@ -package backup diff --git a/tests/backup/backupcase.go b/tests/backup/backupcase.go deleted file mode 100644 index 7d7d36fe13..0000000000 --- a/tests/backup/backupcase.go +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2018 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package backup - -import ( - "fmt" - "time" - - "github.com/pingcap/tidb-operator/tests/slack" - - "github.com/golang/glog" - "github.com/pingcap/tidb-operator/tests" - "k8s.io/apimachinery/pkg/util/wait" -) - -type BackupCase struct { - operator tests.OperatorActions - srcCluster *tests.TidbClusterConfig - desCluster *tests.TidbClusterConfig -} - -func NewBackupCase(operator tests.OperatorActions, srcCluster *tests.TidbClusterConfig, desCluster *tests.TidbClusterConfig) *BackupCase { - return &BackupCase{ - operator: operator, - srcCluster: srcCluster, - desCluster: desCluster, - } -} - -func (bc *BackupCase) Run() error { - - // pause write pressure during backup - bc.operator.StopInsertDataTo(bc.srcCluster) - defer func() { - go func() { - if err := bc.operator.BeginInsertDataTo(bc.srcCluster); err != nil { - glog.Errorf("cluster:[%s] begin insert data failed,error: %v", bc.srcCluster.ClusterName, err) - } - }() - }() - - err := bc.operator.DeployAdHocBackup(bc.srcCluster) - if err != nil { - glog.Errorf("cluster:[%s] deploy happen error: %v", bc.srcCluster.ClusterName, err) - return err - } - - err = bc.operator.CheckAdHocBackup(bc.srcCluster) - if err != nil { - glog.Errorf("cluster:[%s] deploy happen error: %v", bc.srcCluster.ClusterName, err) - return err - } - - err = bc.operator.CheckTidbClusterStatus(bc.desCluster) - if err != nil { - glog.Errorf("cluster:[%s] deploy faild error: %v", bc.desCluster.ClusterName, err) - return err - } - - err = bc.operator.Restore(bc.srcCluster, bc.desCluster) - if err != nil { - glog.Errorf("from cluster:[%s] to cluster [%s] restore happen error: %v", bc.srcCluster.ClusterName, bc.desCluster.ClusterName, err) - return err - } - - err = bc.operator.CheckRestore(bc.srcCluster, bc.desCluster) - if err != nil { - glog.Errorf("from cluster:[%s] to cluster [%s] restore failed error: %v", bc.srcCluster.ClusterName, bc.desCluster.ClusterName, err) - return err - } - - err = bc.operator.DeployScheduledBackup(bc.srcCluster) - if err != nil { - glog.Errorf("cluster:[%s] scheduler happen error: %v", bc.srcCluster.ClusterName, err) - return err - } - - err = bc.operator.CheckScheduledBackup(bc.srcCluster) - if err != nil { - glog.Errorf("cluster:[%s] scheduler failed error: %v", bc.srcCluster.ClusterName, err) - return err - } - - err = bc.operator.DeployIncrementalBackup(bc.srcCluster, bc.desCluster) - if err != nil { - return err - } - - err = bc.operator.CheckIncrementalBackup(bc.srcCluster) - if err != nil { - return err - } - - glog.Infof("waiting 1 minutes for binlog to work") - time.Sleep(1 * time.Minute) - - glog.Infof("cluster[%s] begin insert data", bc.srcCluster.ClusterName) - go bc.operator.BeginInsertDataTo(bc.srcCluster) - - time.Sleep(30 * time.Second) - - glog.Infof("cluster[%s] stop insert data", bc.srcCluster.ClusterName) - bc.operator.StopInsertDataTo(bc.srcCluster) - - return bc.EnsureBackupDataIsCorrect() -} - -func (bc *BackupCase) RunOrDie() { - if err := bc.Run(); err != nil { - slack.NotifyAndPanic(err) - } -} - -func (bc *BackupCase) EnsureBackupDataIsCorrect() error { - fn := func() (bool, error) { - srcCount, err := bc.srcCluster.QueryCount() - if err != nil { - glog.Infof("failed to query count from src cluster: %s/%s", - bc.srcCluster.Namespace, bc.srcCluster.ClusterName) - return false, nil - } - desCount, err := bc.desCluster.QueryCount() - if err != nil { - glog.Infof("failed to query count from dest cluster: %s/%s", - bc.desCluster.Namespace, bc.desCluster.ClusterName) - return false, nil - } - - if srcCount != desCount { - return false, fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount) - } - - return true, nil - } - - return wait.Poll(tests.DefaultPollInterval, tests.DefaultPollTimeout, fn) -} diff --git a/tests/cmd/e2e/main.go b/tests/cmd/e2e/main.go index 8afd114992..edbad817e4 100644 --- a/tests/cmd/e2e/main.go +++ b/tests/cmd/e2e/main.go @@ -23,7 +23,6 @@ import ( "k8s.io/apiserver/pkg/util/logs" "github.com/pingcap/tidb-operator/tests" - "github.com/pingcap/tidb-operator/tests/backup" "github.com/pingcap/tidb-operator/tests/pkg/client" ) @@ -245,11 +244,7 @@ func main() { glog.Fatal(err) } - backupCase := backup.NewBackupCase(oa, backupClusterInfo, restoreClusterInfo) - - if err := backupCase.Run(); err != nil { - glog.Fatal(err) - } + oa.BackupRestoreOrDie(backupClusterInfo, restoreClusterInfo) //clean temp dirs when e2e success err = conf.CleanTempDirs() diff --git a/tests/cmd/stability/main.go b/tests/cmd/stability/main.go index 57d02811d2..220c765342 100644 --- a/tests/cmd/stability/main.go +++ b/tests/cmd/stability/main.go @@ -24,7 +24,6 @@ import ( "github.com/golang/glog" "github.com/jinzhu/copier" "github.com/pingcap/tidb-operator/tests" - "github.com/pingcap/tidb-operator/tests/backup" "github.com/pingcap/tidb-operator/tests/pkg/client" "k8s.io/apiserver/pkg/util/logs" @@ -208,8 +207,8 @@ func main() { oa.DeployTidbClusterOrDie(clusterRestoreTo) oa.CheckTidbClusterStatusOrDie(clusterRestoreTo) - // restore - backup.NewBackupCase(oa, clusterBackupFrom, clusterRestoreTo).RunOrDie() + // backup and restore + oa.BackupRestoreOrDie(clusterBackupFrom, clusterRestoreTo) // stop a node and failover automatically physicalNode, node, faultTime := fta.StopNodeOrDie()