Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

fix consistency bug #128

Merged
merged 4 commits into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GitHash=$(shell git rev-parse
LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GitBranch=$(shell git rev-parse --abbrev-ref HEAD)"
LDFLAGS += -X "github.com/pingcap/dumpling/v4/cli.GoVersion=$(shell go version)"

FAILPOINT_ENABLE := $$(find $$PWD/ -type d | grep -vE "(\.git|tools)" | xargs bin/failpoint-ctl enable)
FAILPOINT_DISABLE := $$(find $$PWD/ -type d | grep -vE "(\.git|tools)" | xargs bin/failpoint-ctl disable)

GO = go
GOLDFLAGS = -ldflags '$(LDFLAGS)'
ifeq ("$(WITH_RACE)", "1")
Expand All @@ -17,8 +20,21 @@ build: bin/dumpling
bin/%: cmd/%/main.go $(wildcard v4/**/*.go)
$(GO) build $(GOLDFLAGS) -tags codes -o $@ $<

test:
$(GO) list ./... | xargs $(GO) test $(GOLDFLAGS) -coverprofile=coverage.txt -covermode=atomic
test: failpoint-enable
$(GO) list ./... | xargs $(GO) test $(GOLDFLAGS) -coverprofile=coverage.txt -covermode=atomic ||{ $(FAILPOINT_DISABLE); exit 1; }
@make failpoint-disable

integration_test: failpoint-enable bin/dumpling
@make failpoint-disable
./tests/run.sh ||{ $(FAILPOINT_DISABLE); exit 1; }

bin/failpoint-ctl: go.mod
$(GO) build -o $@ github.com/pingcap/failpoint/failpoint-ctl

failpoint-enable: bin/failpoint-ctl
# Converting gofail failpoints...
@$(FAILPOINT_ENABLE)

integration_test: bin/dumpling
./tests/run.sh
failpoint-disable: bin/failpoint-ctl
# Restoring gofail failpoints...
@$(FAILPOINT_DISABLE)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/pd/v4 v4.0.0
github.com/pingcap/tidb-tools v4.0.0-rc.2.0.20200521050818-6dd445d83fe0+incompatible
github.com/pkg/errors v0.9.1
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.3
github.com/stretchr/testify v1.5.1 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.14.0
golang.org/x/mod v0.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,12 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JH
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 h1:KH4f4Si9XK6/IW50HtoaiLIFHGkapOM6w83za47UYik=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 h1:58naV4XMEqm0hl9LcYo6cZoGBGiLtefMQMF/vo3XLgQ=
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30=
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4=
Expand Down Expand Up @@ -270,6 +273,7 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
Expand Down
60 changes: 60 additions & 0 deletions tests/consistency/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/sh

set -eu
cur=$(cd `dirname $0`; pwd)

DB_NAME="mysql_consistency"
TABLE_NAME="t"

# drop database on mysql
run_sql "drop database if exists \`$DB_NAME\`;"

# build data on mysql
run_sql "create database $DB_NAME;"
run_sql "create table $DB_NAME.$TABLE_NAME (a int(255));"

# insert 100 records
run_sql "insert into $DB_NAME.$TABLE_NAME values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('1')/g");"

# dumping with consistency flush
export DUMPLING_TEST_DATABASE=$DB_NAME
export GO_FAILPOINTS="github.com/pingcap/dumpling/v4/export/ConsistencyCheck=1*sleep(5000)"
run_dumpling &
# wait dumpling process to start to sleep
sleep 2

# record metadata info
metadata=`run_sql "show master status;"`
metaLog=`echo $metadata | awk -F 'File:' '{print $2}' | awk '{print $1}'`
metaPos=`echo $metadata | awk -F 'Position:' '{print $2}' | awk '{print $1}'`
metaGTID=`echo $metadata | awk -F 'Executed_Gtid_Set:' '{print $2}' | awk '{print $1}'`
# insert 100 more records, test whether dumpling will dump these data out
run_sql "insert into $DB_NAME.$TABLE_NAME values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('1')/g");"

wait

# check data record count
cnt=`grep -o "(1)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.0.sql|wc -l`
echo "1st records count is ${cnt}"
[ $cnt = 100 ]

# check metadata
echo "metaLog: $metaLog"
echo "metaPos: $metaPos"
echo "metaGTID: $metaGTID"
if [ $metaLog != "" ]; then
[ `grep -o "Log: $metaLog" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ]
fi
if [ $metaPos != "" ]; then
[ `grep -o "Pos: $metaPos" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ]
fi
if [ $metaGTID != "" ]; then
[ `grep -o "GTID: $metaGTID" ${DUMPLING_OUTPUT_DIR}/metadata|wc -l` ]
fi

# test dumpling normally
export GO_FAILPOINTS=""
run_dumpling
cnt=`grep -o "(1)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.0.sql|wc -l`
echo "2nd records count is ${cnt}"
[ $cnt = 200 ]
49 changes: 24 additions & 25 deletions v4/export/consistency.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
package export

import (
"context"
"database/sql"
"errors"
"fmt"
)

func NewConsistencyController(conf *Config, session *sql.DB) (ConsistencyController, error) {
func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB) (ConsistencyController, error) {
resolveAutoConsistency(conf)
conn, err := session.Conn(ctx)
if err != nil {
return nil, err
}
switch conf.Consistency {
case "flush":
return &ConsistencyFlushTableWithReadLock{
serverType: conf.ServerInfo.ServerType,
db: session,
conn: conn,
}, nil
case "lock":
return &ConsistencyLockDumpingTables{
db: session,
conn: conn,
allTables: conf.Tables,
}, nil
case "snapshot":
Expand All @@ -32,49 +37,46 @@ func NewConsistencyController(conf *Config, session *sql.DB) (ConsistencyControl
}

type ConsistencyController interface {
Setup() error
TearDown() error
Setup(context.Context) error
TearDown(context.Context) error
}

type ConsistencyNone struct{}

func (c *ConsistencyNone) Setup() error {
func (c *ConsistencyNone) Setup(_ context.Context) error {
return nil
}

func (c *ConsistencyNone) TearDown() error {
func (c *ConsistencyNone) TearDown(_ context.Context) error {
return nil
}

type ConsistencyFlushTableWithReadLock struct {
serverType ServerType
db *sql.DB
conn *sql.Conn
}

func (c *ConsistencyFlushTableWithReadLock) Setup() error {
func (c *ConsistencyFlushTableWithReadLock) Setup(ctx context.Context) error {
if c.serverType == ServerTypeTiDB {
return withStack(errors.New("'flush table with read lock' cannot be used to ensure the consistency in TiDB"))
}
return FlushTableWithReadLock(c.db)
return FlushTableWithReadLock(ctx, c.conn)
}

func (c *ConsistencyFlushTableWithReadLock) TearDown() error {
err := c.db.Ping()
if err != nil {
return withStack(errors.New("ConsistencyFlushTableWithReadLock lost database connection"))
}
return UnlockTables(c.db)
func (c *ConsistencyFlushTableWithReadLock) TearDown(ctx context.Context) error {
defer c.conn.Close()
return UnlockTables(ctx, c.conn)
}

type ConsistencyLockDumpingTables struct {
db *sql.DB
conn *sql.Conn
allTables DatabaseTables
}

func (c *ConsistencyLockDumpingTables) Setup() error {
func (c *ConsistencyLockDumpingTables) Setup(ctx context.Context) error {
for dbName, tables := range c.allTables {
for _, table := range tables {
err := LockTables(c.db, dbName, table.Name)
err := LockTables(ctx, c.conn, dbName, table.Name)
if err != nil {
return err
}
Expand All @@ -83,12 +85,9 @@ func (c *ConsistencyLockDumpingTables) Setup() error {
return nil
}

func (c *ConsistencyLockDumpingTables) TearDown() error {
err := c.db.Ping()
if err != nil {
return withStack(errors.New("ConsistencyLockDumpingTables lost database connection"))
}
return UnlockTables(c.db)
func (c *ConsistencyLockDumpingTables) TearDown(ctx context.Context) error {
defer c.conn.Close()
return UnlockTables(ctx, c.conn)
}

const showMasterStatusFieldNum = 5
Expand Down
39 changes: 22 additions & 17 deletions v4/export/consistency_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package export

import (
"context"
"errors"
"strings"

Expand All @@ -18,41 +19,43 @@ func (s *testConsistencySuite) assertNil(err error, c *C) {
}
}

func (s *testConsistencySuite) assertLifetimeErrNil(ctrl ConsistencyController, c *C) {
s.assertNil(ctrl.Setup(), c)
s.assertNil(ctrl.TearDown(), c)
func (s *testConsistencySuite) assertLifetimeErrNil(ctx context.Context, ctrl ConsistencyController, c *C) {
s.assertNil(ctrl.Setup(ctx), c)
s.assertNil(ctrl.TearDown(ctx), c)
}

func (s *testConsistencySuite) TestConsistencyController(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conf := DefaultConfig()
resultOk := sqlmock.NewResult(0, 1)

conf.Consistency = "none"
ctrl, _ := NewConsistencyController(conf, db)
ctrl, _ := NewConsistencyController(ctx, conf, db)
_, ok := ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
s.assertLifetimeErrNil(ctx, ctrl, c)

conf.Consistency = "flush"
mock.ExpectExec("FLUSH TABLES WITH READ LOCK").WillReturnResult(resultOk)
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyFlushTableWithReadLock)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
s.assertLifetimeErrNil(ctx, ctrl, c)
if err = mock.ExpectationsWereMet(); err != nil {
c.Fatalf(err.Error())
}

conf.Consistency = "snapshot"
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
s.assertLifetimeErrNil(ctx, ctrl, c)

conf.Consistency = "lock"
conf.Tables = NewDatabaseTables().
Expand All @@ -62,10 +65,10 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {
mock.ExpectExec("LOCK TABLES").WillReturnResult(resultOk)
}
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyLockDumpingTables)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
s.assertLifetimeErrNil(ctx, ctrl, c)
if err = mock.ExpectationsWereMet(); err != nil {
c.Fatalf(err.Error())
}
Expand Down Expand Up @@ -96,31 +99,33 @@ func (s *testConsistencySuite) TestConsistencyControllerError(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conf := DefaultConfig()

conf.Consistency = "invalid_str"
_, err = NewConsistencyController(conf, db)
_, err = NewConsistencyController(ctx, conf, db)
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "invalid consistency option"), IsTrue)

// snapshot consistency is only available in TiDB
conf.Consistency = "snapshot"
conf.ServerInfo.ServerType = ServerTypeUnknown
_, err = NewConsistencyController(conf, db)
_, err = NewConsistencyController(ctx, conf, db)
c.Assert(err, NotNil)

// flush consistency is unavailable in TiDB
conf.Consistency = "flush"
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ := NewConsistencyController(conf, db)
err = ctrl.Setup()
ctrl, _ := NewConsistencyController(ctx, conf, db)
err = ctrl.Setup(ctx)
c.Assert(err, NotNil)

// lock table fail
conf.Consistency = "lock"
conf.Tables = NewDatabaseTables().AppendTables("db", "t")
mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New(""))
ctrl, _ = NewConsistencyController(conf, db)
err = ctrl.Setup()
ctrl, _ = NewConsistencyController(ctx, conf, db)
err = ctrl.Setup(ctx)
c.Assert(err, NotNil)
}
Loading