Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release2.1] reparo: add unit test && add safe mode #662

Merged
merged 7 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/gorilla/websocket v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20180820150422-93bf4626fba7 // indirect
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/kr/pretty v0.1.0 // indirect
github.com/montanaflynn/stats v0.0.0-20180911141734-db72e6cae808 // indirect
github.com/ngaut/log v0.0.0-20160810023011-cec23d3e10b0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d h1:cVtBfNW5XTHiKQe7jDaDBSh/EVM4XLPutLAGboIXuM0=
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down
15 changes: 14 additions & 1 deletion reparo/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reparo

import (
"encoding/json"
"flag"
"fmt"
"os"
Expand All @@ -23,7 +24,7 @@ const (

// Config is the main configuration for the retore tool.
type Config struct {
*flag.FlagSet
*flag.FlagSet `toml:"-" json:"-"`
Dir string `toml:"data-dir" json:"data-dir"`
StartDatetime string `toml:"start-datetime" json:"start-datetime"`
StopDatetime string `toml:"stop-datetime" json:"stop-datetime"`
Expand All @@ -43,6 +44,8 @@ type Config struct {
LogRotate string `toml:"log-rotate" json:"log-rotate"`
LogLevel string `toml:"log-level" json:"log-level"`

SafeMode bool `toml:"safe-mode" json:"safe-mode"`

configFile string
printVersion bool
}
Expand All @@ -67,9 +70,19 @@ func NewConfig() *Config {
fs.StringVar(&c.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&c.configFile, "config", "", "[REQUIRED] path to configuration file")
fs.BoolVar(&c.printVersion, "V", false, "print reparo version info")
fs.BoolVar(&c.SafeMode, "safe-mode", false, "enable safe mode to support reentrant")
return c
}

func (c *Config) String() string {
cfgBytes, err := json.Marshal(c)
if err != nil {
log.Errorf("marshal config failed %v", err)
}

return string(cfgBytes)
}

// Parse parses keys/values from command line flags and toml configuration file.
func (c *Config) Parse(args []string) (err error) {
// Parse first to get config file
Expand Down
2 changes: 1 addition & 1 deletion reparo/reparo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Reparo struct {
func New(cfg *Config) (*Reparo, error) {
log.Infof("cfg %+v", cfg)

syncer, err := syncer.New(cfg.DestType, cfg.DestDB)
syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.SafeMode)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
22 changes: 22 additions & 0 deletions reparo/syncer/memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package syncer

import (
"github.com/pingcap/check"
)

type testMemorySuite struct{}

var _ = check.Suite(&testMemorySuite{})

func (s *testMemorySuite) TestMemorySyncer(c *check.C) {
syncer, err := newMemSyncer()
c.Assert(err, check.IsNil)

syncTest(c, Syncer(syncer))

binlog := syncer.GetBinlogs()
c.Assert(binlog, check.HasLen, 2)

err = syncer.Close()
c.Assert(err, check.IsNil)
}
20 changes: 16 additions & 4 deletions reparo/syncer/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,31 @@ type mysqlSyncer struct {
loaderErr error
}

var _ Syncer = &mysqlSyncer{}
var (
_ Syncer = &mysqlSyncer{}
defaultWorkerCount = 16
defaultBatchSize = 20
)

// should be only used for unit test to create mock db
var createDB = loader.CreateDB

func newMysqlSyncer(cfg *DBConfig) (*mysqlSyncer, error) {
db, err := loader.CreateDB(cfg.User, cfg.Password, cfg.Host, cfg.Port)
func newMysqlSyncer(cfg *DBConfig, safemode bool) (*mysqlSyncer, error) {
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port)
if err != nil {
return nil, errors.Trace(err)
}

loader, err := loader.NewLoader(db, loader.WorkerCount(16), loader.BatchSize(20))
return newMysqlSyncerFromSQLDB(db, safemode)
}

func newMysqlSyncerFromSQLDB(db *sql.DB, safemode bool) (*mysqlSyncer, error) {
loader, err := loader.NewLoader(db, loader.WorkerCount(defaultWorkerCount), loader.BatchSize(defaultBatchSize))
if err != nil {
return nil, errors.Annotate(err, "new loader failed")
}

loader.SetSafeMode(safemode)
syncer := &mysqlSyncer{db: db, loader: loader}
syncer.runLoader()

Expand Down
100 changes: 100 additions & 0 deletions reparo/syncer/mysql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package syncer

import (
"database/sql"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/check"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)

type testMysqlSuite struct{}

var _ = check.Suite(&testMysqlSuite{})

func (s *testMysqlSuite) TestMysqlSyncer(c *check.C) {
s.testMysqlSyncer(c, true)
s.testMysqlSyncer(c, false)
}

func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) {
var (
mock sqlmock.Sqlmock
)
originWorkerCount := defaultWorkerCount
defaultWorkerCount = 1
defer func() {
defaultWorkerCount = originWorkerCount
}()

oldCreateDB := createDB
createDB = func(string, string, string, int) (db *sql.DB, err error) {
db, mock, err = sqlmock.New()
return
}
defer func() {
createDB = oldCreateDB
}()

syncer, err := newMysqlSyncer(&DBConfig{}, safemode)
c.Assert(err, check.IsNil)

mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectCommit()

mock.ExpectQuery("show columns from `test`.`t1`").WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}).AddRow("a", "int", "YES", "", "NULL", "").AddRow("b", "varchar(24)", "YES", "", "NULL", "").AddRow("c", "varchar(24)", "YES", "", "NULL", ""))

rows := sqlmock.NewRows([]string{"Table", "Non_unique", "Key_name", "Seq_in_index", "Column_name", "Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Comment", "Index_comment"})
mock.ExpectQuery("show index from `test`.`t1`").WillReturnRows(rows)

mock.ExpectBegin()
insertPattern := "INSERT INTO"
if safemode {
insertPattern = "REPLACE INTO"
}
mock.ExpectExec(insertPattern).WithArgs(1, "test", nil).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("DELETE FROM").WithArgs(1, "test").WillReturnResult(sqlmock.NewResult(0, 1))
if safemode {
mock.ExpectExec("DELETE FROM").WithArgs().WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(insertPattern).WithArgs(nil, nil, "abc").WillReturnResult(sqlmock.NewResult(0, 1))
} else {
mock.ExpectExec("UPDATE").WithArgs("abc", "test").WillReturnResult(sqlmock.NewResult(0, 1))
}
mock.ExpectCommit()

syncTest(c, Syncer(syncer))

err = syncer.Close()
c.Assert(err, check.IsNil)
}

func syncTest(c *check.C, syncer Syncer) {
ddlBinlog := &pb.Binlog{
Tp: pb.BinlogType_DDL,
DdlQuery: []byte("create database test;"),
}
dmlBinlog := &pb.Binlog{
Tp: pb.BinlogType_DML,
DmlData: &pb.DMLData{
Events: generateDMLEvents(c),
},
}

binlogs := make([]*pb.Binlog, 0, 2)
err := syncer.Sync(ddlBinlog, func(binlog *pb.Binlog) {
c.Log(binlog)
binlogs = append(binlogs, binlog)
})
c.Assert(err, check.IsNil)

err = syncer.Sync(dmlBinlog, func(binlog *pb.Binlog) {
c.Log(binlog)
binlogs = append(binlogs, binlog)
})
c.Assert(err, check.IsNil)

time.Sleep(100 * time.Millisecond)
c.Assert(binlogs, check.HasLen, 2)
}
2 changes: 1 addition & 1 deletion reparo/syncer/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ func printInsertOrDeleteEvent(row [][]byte) {
}

tp := col.Tp[0]
fmt.Printf("%s(%s): %s \n", col.Name, col.MysqlType, formatValueToString(val, tp))
fmt.Printf("%s(%s): %s\n", col.Name, col.MysqlType, formatValueToString(val, tp))
}
}
110 changes: 110 additions & 0 deletions reparo/syncer/print_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package syncer

import (
"strings"

capturer "github.com/kami-zh/go-capturer"
"github.com/pingcap/check"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)

type testPrintSuite struct{}

var _ = check.Suite(&testPrintSuite{})

func (s *testPrintSuite) TestPrintSyncer(c *check.C) {
syncer, err := newPrintSyncer()
c.Assert(err, check.IsNil)

out := capturer.CaptureStdout(func() {
syncTest(c, Syncer(syncer))
})

c.Assert(out, check.Equals,
"DDL query: create database test;\n"+
"schema: test; table: t1; type: Insert\n"+
"a(int): 1\n"+
"b(varchar): test\n"+
"schema: test; table: t1; type: Delete\n"+
"a(int): 1\n"+
"b(varchar): test\n"+
"schema: test; table: t1; type: Update\n"+
"c(varchar): test => abc\n")

err = syncer.Close()
c.Assert(err, check.IsNil)
}

func (s *testPrintSuite) TestPrintEventHeader(c *check.C) {
schema := "test"
table := "t1"
event := &pb.Event{
Tp: pb.EventType_Insert,
SchemaName: &schema,
TableName: &table,
}

out := capturer.CaptureStdout(func() {
printEventHeader(event)
})
lines := strings.Split(strings.TrimSpace(out), "\n")
c.Assert(lines, check.HasLen, 1)
c.Assert(lines[0], check.Matches, ".*schema: test; table: t1; type: Insert.*")
}

func (s *testPrintSuite) TestPrintDDL(c *check.C) {
ddlBinlog := &pb.Binlog{
Tp: pb.BinlogType_DDL,
DdlQuery: []byte("create database test;"),
}

out := capturer.CaptureStdout(func() {
printDDL(ddlBinlog)
})
lines := strings.Split(strings.TrimSpace(out), "\n")
c.Assert(lines, check.HasLen, 1)
c.Assert(lines[0], check.Matches, ".*DDL query: create database test;.*")
}

func (s *testPrintSuite) TestPrintRow(c *check.C) {
cols := generateColumns(c)

insertEvent := &pb.Event{
Tp: pb.EventType_Insert,
Row: [][]byte{cols[0], cols[1]},
}

out := capturer.CaptureStdout(func() {
printEvent(insertEvent)
})
lines := strings.Split(strings.TrimSpace(out), "\n")
c.Assert(lines, check.HasLen, 3)
c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Insert")
c.Assert(lines[1], check.Equals, "a(int): 1")
c.Assert(lines[2], check.Equals, "b(varchar): test")

deleteEvent := &pb.Event{
Tp: pb.EventType_Delete,
Row: [][]byte{cols[0], cols[1]},
}
out = capturer.CaptureStdout(func() {
printEvent(deleteEvent)
})
lines = strings.Split(strings.TrimSpace(out), "\n")
c.Assert(lines, check.HasLen, 3)
c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Delete")
c.Assert(lines[1], check.Equals, "a(int): 1")
c.Assert(lines[2], check.Equals, "b(varchar): test")

updateEvent := &pb.Event{
Tp: pb.EventType_Update,
Row: [][]byte{cols[2]},
}
out = capturer.CaptureStdout(func() {
printEvent(updateEvent)
})
lines = strings.Split(strings.TrimSpace(out), "\n")
c.Assert(lines, check.HasLen, 2)
c.Assert(lines[0], check.Equals, "schema: ; table: ; type: Update")
c.Assert(lines[1], check.Equals, "c(varchar): test => abc")
}
4 changes: 2 additions & 2 deletions reparo/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type Syncer interface {
}

// New creates a new executor based on the name.
func New(name string, cfg *DBConfig) (Syncer, error) {
func New(name string, cfg *DBConfig, safemode bool) (Syncer, error) {
switch name {
case "mysql":
return newMysqlSyncer(cfg)
return newMysqlSyncer(cfg, safemode)
case "print":
return newPrintSyncer()
case "memory":
Expand Down
Loading