Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

rebuild mysql conn when retry failed chunks and support --transactional-consistency parameter #199

Merged
merged 7 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 58 additions & 51 deletions v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,47 @@ import (
)

const (
flagDatabase = "database"
flagTablesList = "tables-list"
flagHost = "host"
flagUser = "user"
flagPort = "port"
flagPassword = "password"
flagAllowCleartextPasswords = "allow-cleartext-passwords"
flagThreads = "threads"
flagFilesize = "filesize"
flagStatementSize = "statement-size"
flagOutput = "output"
flagLoglevel = "loglevel"
flagLogfile = "logfile"
flagLogfmt = "logfmt"
flagConsistency = "consistency"
flagSnapshot = "snapshot"
flagNoViews = "no-views"
flagStatusAddr = "status-addr"
flagRows = "rows"
flagWhere = "where"
flagEscapeBackslash = "escape-backslash"
flagFiletype = "filetype"
flagNoHeader = "no-header"
flagNoSchemas = "no-schemas"
flagNoData = "no-data"
flagCsvNullValue = "csv-null-value"
flagSql = "sql"
flagFilter = "filter"
flagCaseSensitive = "case-sensitive"
flagDumpEmptyDatabase = "dump-empty-database"
flagTidbMemQuotaQuery = "tidb-mem-quota-query"
flagCA = "ca"
flagCert = "cert"
flagKey = "key"
flagCsvSeparator = "csv-separator"
flagCsvDelimiter = "csv-delimiter"
flagOutputFilenameTemplate = "output-filename-template"
flagCompleteInsert = "complete-insert"
flagParams = "params"
flagReadTimeout = "read-timeout"
flagDatabase = "database"
flagTablesList = "tables-list"
flagHost = "host"
flagUser = "user"
flagPort = "port"
flagPassword = "password"
flagAllowCleartextPasswords = "allow-cleartext-passwords"
flagThreads = "threads"
flagFilesize = "filesize"
flagStatementSize = "statement-size"
flagOutput = "output"
flagLoglevel = "loglevel"
flagLogfile = "logfile"
flagLogfmt = "logfmt"
flagConsistency = "consistency"
flagSnapshot = "snapshot"
flagNoViews = "no-views"
flagStatusAddr = "status-addr"
flagRows = "rows"
flagWhere = "where"
flagEscapeBackslash = "escape-backslash"
flagFiletype = "filetype"
flagNoHeader = "no-header"
flagNoSchemas = "no-schemas"
flagNoData = "no-data"
flagCsvNullValue = "csv-null-value"
flagSql = "sql"
flagFilter = "filter"
flagCaseSensitive = "case-sensitive"
flagDumpEmptyDatabase = "dump-empty-database"
flagTidbMemQuotaQuery = "tidb-mem-quota-query"
flagCA = "ca"
flagCert = "cert"
flagKey = "key"
flagCsvSeparator = "csv-separator"
flagCsvDelimiter = "csv-delimiter"
flagOutputFilenameTemplate = "output-filename-template"
flagCompleteInsert = "complete-insert"
flagParams = "params"
flagReadTimeout = "read-timeout"
flagTransactionalConsistency = "transactional-consistency"

FlagHelp = "help"
)
Expand Down Expand Up @@ -108,15 +109,16 @@ type Config struct {
CsvDelimiter string
ReadTimeout time.Duration

TableFilter filter.Filter `json:"-"`
Rows uint64
Where string
FileType string
CompleteInsert bool
EscapeBackslash bool
DumpEmptyDatabase bool
OutputFileTemplate *template.Template `json:"-"`
SessionParams map[string]interface{}
TableFilter filter.Filter `json:"-"`
Rows uint64
Where string
FileType string
CompleteInsert bool
TransactionalConsistency bool
EscapeBackslash bool
DumpEmptyDatabase bool
OutputFileTemplate *template.Template `json:"-"`
SessionParams map[string]interface{}

PosAfterConnect bool

Expand All @@ -141,7 +143,7 @@ func DefaultConfig() *Config {
SortByPk: true,
Tables: nil,
Snapshot: "",
Consistency: "auto",
Consistency: consistencyTypeAuto,
NoViews: true,
Rows: UnspecifiedSize,
Where: "",
Expand Down Expand Up @@ -202,7 +204,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) {
flags.String(flagLoglevel, "info", "Log level: {debug|info|warn|error|dpanic|panic|fatal}")
flags.StringP(flagLogfile, "L", "", "Log file `path`, leave empty to write to console")
flags.String(flagLogfmt, "text", "Log `format`: {text|json}")
flags.String(flagConsistency, "auto", "Consistency level during dumping: {auto|none|flush|lock|snapshot}")
flags.String(flagConsistency, consistencyTypeAuto, "Consistency level during dumping: {auto|none|flush|lock|snapshot}")
flags.String(flagSnapshot, "", "Snapshot position (uint64 from pd timestamp for TiDB). Valid only when consistency=snapshot")
flags.BoolP(flagNoViews, "W", true, "Do not dump views")
flags.String(flagStatusAddr, ":8281", "dumpling API server and pprof addr")
Expand Down Expand Up @@ -230,6 +232,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) {
flags.Bool(FlagHelp, false, "Print help message and quit")
flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.")
flags.MarkHidden(flagReadTimeout)
flags.Bool(flagTransactionalConsistency, true, "Only support transactional consistency")
}

// GetDSN generates DSN from Config
Expand Down Expand Up @@ -367,6 +370,10 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
conf.TransactionalConsistency, err = flags.GetBool(flagTransactionalConsistency)
if err != nil {
return errors.Trace(err)
}

if conf.Threads <= 0 {
return errors.Errorf("--threads is set to %d. It should be greater than 0", conf.Threads)
Expand Down
43 changes: 35 additions & 8 deletions v4/export/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,37 @@ import (
"github.com/pingcap/errors"
)

const (
consistencyTypeAuto = "auto"
consistencyTypeFlush = "flush"
consistencyTypeLock = "lock"
consistencyTypeSnapshot = "snapshot"
consistencyTypeNone = "none"
)

func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB) (ConsistencyController, error) {
resolveAutoConsistency(conf)
conn, err := session.Conn(ctx)
if err != nil {
return nil, err
}
switch conf.Consistency {
case "flush":
case consistencyTypeFlush:
return &ConsistencyFlushTableWithReadLock{
serverType: conf.ServerInfo.ServerType,
conn: conn,
}, nil
case "lock":
case consistencyTypeLock:
return &ConsistencyLockDumpingTables{
conn: conn,
allTables: conf.Tables,
}, nil
case "snapshot":
case consistencyTypeSnapshot:
if conf.ServerInfo.ServerType != ServerTypeTiDB {
return nil, errors.New("snapshot consistency is not supported for this server")
}
return &ConsistencyNone{}, nil
case "none":
case consistencyTypeNone:
return &ConsistencyNone{}, nil
default:
return nil, errors.Errorf("invalid consistency option %s", conf.Consistency)
Expand All @@ -39,6 +47,7 @@ func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB
type ConsistencyController interface {
Setup(context.Context) error
TearDown(context.Context) error
PingContext(context.Context) error
}

type ConsistencyNone struct{}
Expand All @@ -51,6 +60,10 @@ func (c *ConsistencyNone) TearDown(_ context.Context) error {
return nil
}

func (c *ConsistencyNone) PingContext(_ context.Context) error {
return nil
}

type ConsistencyFlushTableWithReadLock struct {
serverType ServerType
conn *sql.Conn
Expand All @@ -74,6 +87,13 @@ func (c *ConsistencyFlushTableWithReadLock) TearDown(ctx context.Context) error
return UnlockTables(ctx, c.conn)
}

func (c *ConsistencyFlushTableWithReadLock) PingContext(ctx context.Context) error {
if c.conn == nil {
return errors.New("consistency connection has already been closed!")
}
return c.conn.PingContext(ctx)
}

type ConsistencyLockDumpingTables struct {
conn *sql.Conn
allTables DatabaseTables
Expand Down Expand Up @@ -102,18 +122,25 @@ func (c *ConsistencyLockDumpingTables) TearDown(ctx context.Context) error {
return UnlockTables(ctx, c.conn)
}

func (c *ConsistencyLockDumpingTables) PingContext(ctx context.Context) error {
if c.conn == nil {
return errors.New("consistency connection has already been closed!")
}
return c.conn.PingContext(ctx)
}

const snapshotFieldIndex = 1

func resolveAutoConsistency(conf *Config) {
if conf.Consistency != "auto" {
if conf.Consistency != consistencyTypeAuto {
return
}
switch conf.ServerInfo.ServerType {
case ServerTypeTiDB:
conf.Consistency = "snapshot"
conf.Consistency = consistencyTypeSnapshot
case ServerTypeMySQL, ServerTypeMariaDB:
conf.Consistency = "flush"
conf.Consistency = consistencyTypeFlush
default:
conf.Consistency = "none"
conf.Consistency = consistencyTypeNone
}
}
24 changes: 12 additions & 12 deletions v4/export/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {
conf := DefaultConfig()
resultOk := sqlmock.NewResult(0, 1)

conf.Consistency = "none"
conf.Consistency = consistencyTypeNone
ctrl, _ := NewConsistencyController(ctx, conf, db)
_, ok := ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctx, ctrl, c)

conf.Consistency = "flush"
conf.Consistency = consistencyTypeFlush
mock.ExpectExec("FLUSH TABLES WITH READ LOCK").WillReturnResult(resultOk)
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)
ctrl, _ = NewConsistencyController(ctx, conf, db)
Expand All @@ -50,14 +50,14 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {
c.Fatalf(err.Error())
}

conf.Consistency = "snapshot"
conf.Consistency = consistencyTypeSnapshot
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctx, ctrl, c)

conf.Consistency = "lock"
conf.Consistency = consistencyTypeLock
conf.Tables = NewDatabaseTables().
AppendTables("db1", "t1", "t2", "t3").
AppendViews("db2", "t4")
Expand All @@ -80,14 +80,14 @@ func (s *testConsistencySuite) TestResolveAutoConsistency(c *C) {
serverTp ServerType
resolvedConsistency string
}{
{ServerTypeTiDB, "snapshot"},
{ServerTypeMySQL, "flush"},
{ServerTypeMariaDB, "flush"},
{ServerTypeUnknown, "none"},
{ServerTypeTiDB, consistencyTypeSnapshot},
{ServerTypeMySQL, consistencyTypeFlush},
{ServerTypeMariaDB, consistencyTypeFlush},
{ServerTypeUnknown, consistencyTypeNone},
}

for _, x := range cases {
conf.Consistency = "auto"
conf.Consistency = consistencyTypeAuto
conf.ServerInfo.ServerType = x.serverTp
resolveAutoConsistency(conf)
cmt := Commentf("server type %s", x.serverTp.String())
Expand All @@ -109,20 +109,20 @@ func (s *testConsistencySuite) TestConsistencyControllerError(c *C) {
c.Assert(strings.Contains(err.Error(), "invalid consistency option"), IsTrue)

// snapshot consistency is only available in TiDB
conf.Consistency = "snapshot"
conf.Consistency = consistencyTypeSnapshot
conf.ServerInfo.ServerType = ServerTypeUnknown
_, err = NewConsistencyController(ctx, conf, db)
c.Assert(err, NotNil)

// flush consistency is unavailable in TiDB
conf.Consistency = "flush"
conf.Consistency = consistencyTypeFlush
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ := NewConsistencyController(ctx, conf, db)
err = ctrl.Setup(ctx)
c.Assert(err, NotNil)

// lock table fail
conf.Consistency = "lock"
conf.Consistency = consistencyTypeLock
conf.Tables = NewDatabaseTables().AppendTables("db", "t")
mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New(""))
ctrl, _ = NewConsistencyController(ctx, conf, db)
Expand Down
Loading