Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix backup data compare logic #454

Merged
merged 4 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 59 additions & 29 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ type OperatorActions interface {
RegisterWebHookAndServiceOrDie(info *OperatorConfig)
CleanWebHookAndService(info *OperatorConfig) error
StartValidatingAdmissionWebhookServerOrDie(info *OperatorConfig)
BackupRestore(from, to *TidbClusterConfig) error
BackupRestoreOrDie(from, to *TidbClusterConfig)
}

type operatorActions struct {
Expand Down Expand Up @@ -1537,24 +1539,15 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterConfig, to *TidbCluster
return false, nil
}

fromCount, err := from.QueryCount()
b, err := to.DataIsTheSameAs(from)
if err != nil {
glog.Errorf("cluster [%s] count err ", from.ClusterName)
glog.Error(err)
return false, nil
}

toCount, err := to.QueryCount()
if err != nil {
glog.Errorf("cluster [%s] count err ", to.ClusterName)
return false, nil
}

if fromCount != toCount {
glog.Errorf("cluster [%s] count %d cluster [%s] count %d is not equal ",
from.ClusterName, fromCount, to.ClusterName, toCount)
return false, nil
if b {
return true, nil
}
return true, nil
return false, nil
}

err := wait.Poll(oa.pollInterval, BackupAndRestorePollTimeOut, fn)
Expand All @@ -1576,29 +1569,65 @@ func (oa *operatorActions) ForceDeploy(info *TidbClusterConfig) error {
return nil
}

func (info *TidbClusterConfig) QueryCount() (int, error) {
tableName := "test"
db, err := sql.Open("mysql", getDSN(info.Namespace, info.ClusterName, "record", info.Password))
func (info *TidbClusterConfig) DataIsTheSameAs(otherInfo *TidbClusterConfig) (bool, error) {
tableNum := otherInfo.BlockWriteConfig.TableNum

infoDb, err := sql.Open("mysql", getDSN(info.Namespace, info.ClusterName, "test", info.Password))
if err != nil {
return 0, err
return false, err
}
defer db.Close()

rows, err := db.Query(fmt.Sprintf("SELECT count(*) FROM %s", tableName))
defer infoDb.Close()
otherInfoDb, err := sql.Open("mysql", getDSN(otherInfo.Namespace, otherInfo.ClusterName, "test", otherInfo.Password))
if err != nil {
glog.Infof("cluster:[%s], error: %v", info.ClusterName, err)
return 0, err
return false, err
}
defer otherInfoDb.Close()

getCntFn := func(db *sql.DB, tableName string) (int, error) {
var cnt int
rows, err := db.Query(fmt.Sprintf("SELECT count(*) FROM %s", tableName))
if err != nil {
return cnt, fmt.Errorf("failed to select count(*) from %s, %v", tableName, err)
}
for rows.Next() {
err := rows.Scan(&cnt)
if err != nil {
return cnt, fmt.Errorf("failed to scan count from %s, %v", tableName, err)
}
return cnt, nil
}
return cnt, fmt.Errorf("can not find count of table %s", tableName)
}

for rows.Next() {
var count int
err := rows.Scan(&count)
for i := 0; i < tableNum; i++ {
var tableName string
if i == 0 {
tableName = "block_writer"
} else {
tableName = fmt.Sprintf("block_writer%d", i)
}

cnt, err := getCntFn(infoDb, tableName)
if err != nil {
glog.Infof("cluster:[%s], error :%v", info.ClusterName, err)
return false, err
}
return count, nil
otherCnt, err := getCntFn(otherInfoDb, tableName)
if err != nil {
return false, err
}

if cnt != otherCnt {
err := fmt.Errorf("cluster %s/%s's table %s count(*) = %d and cluster %s/%s's table %s count(*) = %d",
info.Namespace, info.ClusterName, tableName, cnt,
otherInfo.Namespace, otherInfo.ClusterName, tableName, otherCnt)
return false, err
}
glog.Infof("cluster %s/%s's table %s count(*) = %d and cluster %s/%s's table %s count(*) = %d",
info.Namespace, info.ClusterName, tableName, cnt,
otherInfo.Namespace, otherInfo.ClusterName, tableName, otherCnt)
}
return 0, fmt.Errorf("can not find count of ")

return true, nil
}

func (oa *operatorActions) CreateSecret(info *TidbClusterConfig) error {
Expand Down Expand Up @@ -1875,6 +1904,7 @@ func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterConfig, to *
"binlog.drainer.mysql.user": "root",
"binlog.drainer.mysql.password": to.Password,
"binlog.drainer.mysql.port": "4000",
"binlog.drainer.ignoreSchemas": "\"INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql\"",
}

setString := from.TidbClusterHelmSetString(sets)
Expand Down
99 changes: 99 additions & 0 deletions tests/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package tests

import (
"time"

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/tests/slack"
"k8s.io/apimachinery/pkg/util/wait"
)

func (oa *operatorActions) BackupRestore(from, to *TidbClusterConfig) error {
oa.StopInsertDataTo(from)

err := oa.DeployAdHocBackup(from)
if err != nil {
glog.Errorf("cluster:[%s] deploy happen error: %v", from.ClusterName, err)
return err
}

err = oa.CheckAdHocBackup(from)
if err != nil {
glog.Errorf("cluster:[%s] deploy happen error: %v", from.ClusterName, err)
return err
}

err = oa.CheckTidbClusterStatus(to)
if err != nil {
glog.Errorf("cluster:[%s] deploy faild error: %v", to.ClusterName, err)
return err
}

err = oa.Restore(from, to)
if err != nil {
glog.Errorf("from cluster:[%s] to cluster [%s] restore happen error: %v",
from.ClusterName, to.ClusterName, err)
return err
}

err = oa.CheckRestore(from, to)
if err != nil {
glog.Errorf("from cluster:[%s] to cluster [%s] restore failed error: %v",
from.ClusterName, to.ClusterName, err)
return err
}

go oa.BeginInsertDataToOrDie(from)
err = oa.DeployScheduledBackup(from)
if err != nil {
glog.Errorf("cluster:[%s] scheduler happen error: %v", from.ClusterName, err)
return err
}

err = oa.CheckScheduledBackup(from)
if err != nil {
glog.Errorf("cluster:[%s] scheduler failed error: %v", from.ClusterName, err)
return err
}

err = oa.DeployIncrementalBackup(from, to)
if err != nil {
return err
}

err = oa.CheckIncrementalBackup(from)
if err != nil {
return err
}

glog.Infof("waiting 1 minutes for binlog to work")
time.Sleep(1 * time.Minute)

glog.Infof("cluster[%s] begin insert data", from.ClusterName)
go oa.BeginInsertDataTo(from)

time.Sleep(5 * time.Minute)

glog.Infof("cluster[%s] stop insert data", from.ClusterName)
oa.StopInsertDataTo(from)

fn := func() (bool, error) {
b, err := to.DataIsTheSameAs(from)
if err != nil {
glog.Error(err)
return false, nil
}
if b {
return true, nil
}
return false, nil
}

return wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn)
}

func (oa *operatorActions) BackupRestoreOrDie(from, to *TidbClusterConfig) {
if err := oa.BackupRestore(from, to); err != nil {
slack.NotifyAndPanic(err)
}
}
1 change: 0 additions & 1 deletion tests/backup/backup.go

This file was deleted.

148 changes: 0 additions & 148 deletions tests/backup/backupcase.go

This file was deleted.

7 changes: 1 addition & 6 deletions tests/cmd/e2e/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"k8s.io/apiserver/pkg/util/logs"

"github.com/pingcap/tidb-operator/tests"
"github.com/pingcap/tidb-operator/tests/backup"
"github.com/pingcap/tidb-operator/tests/pkg/client"
)

Expand Down Expand Up @@ -245,11 +244,7 @@ func main() {
glog.Fatal(err)
}

backupCase := backup.NewBackupCase(oa, backupClusterInfo, restoreClusterInfo)

if err := backupCase.Run(); err != nil {
glog.Fatal(err)
}
oa.BackupRestoreOrDie(backupClusterInfo, restoreClusterInfo)

//clean temp dirs when e2e success
err = conf.CleanTempDirs()
Expand Down
Loading