From d181a9b2c0b23b777ba75daee90fbb3d1cc44fc8 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 12 Nov 2020 14:31:55 +0800 Subject: [PATCH 1/6] support rebuild mysql connection to retry failed chunks --- v4/export/dump.go | 55 +++++++++++++++++++++++++++----------- v4/export/dump_test.go | 9 +++++-- v4/export/metadata.go | 40 +++++++++++++++++---------- v4/export/metadata_test.go | 39 ++++++++++++++------------- v4/export/retry.go | 7 ++++- v4/export/writer_util.go | 1 + 6 files changed, 100 insertions(+), 51 deletions(-) diff --git a/v4/export/dump.go b/v4/export/dump.go index 1d9eb73c..70706dd4 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -129,7 +129,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { defer newPool.Close() } - m := newGlobalMetadata(conf.ExternalStorage) + m := newGlobalMetadata(conf.ExternalStorage, snapshot) // write metadata even if dump failed defer m.writeGlobalMetaData(ctx) @@ -141,7 +141,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { return errors.Trace(err) } m.recordStartTime(time.Now()) - err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, false, snapshot) + err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, false) if err != nil { log.Info("get global metadata failed", zap.Error(err)) } @@ -170,7 +170,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { return errors.Trace(err) } m.recordStartTime(time.Now()) - err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, false, snapshot) + err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, false) if err != nil { log.Info("get global metadata failed", zap.Error(err)) } @@ -186,7 +186,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { if conf.PosAfterConnect { conn := connectPool.getConn() // record again, to provide a location to exit safe mode for DM - err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, true, snapshot) + err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, true) if err != nil { log.Info("get global metadata (after connection pool established) failed", zap.Error(err)) } @@ -223,7 +223,20 @@ func Dump(pCtx context.Context, conf *Config) (err error) { } if conf.Sql == "" { - if err = dumpDatabases(ctx, conf, connectPool, writer); err != nil { + if err = dumpDatabases(ctx, conf, connectPool, writer, func(conn *sql.Conn) (*sql.Conn, error) { + conn.Close() + conn, err = createConnWithConsistency(ctx, pool) + if err != nil { + return nil, err + } + if conf.PosAfterConnect { + err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, true) + if err != nil { + return nil, err + } + } + return conn, nil + }); err != nil { return err } } else { @@ -236,7 +249,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { return nil } -func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsPool, writer Writer) error { +func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsPool, writer Writer, rebuildConnFunc func(*sql.Conn) (*sql.Conn, error)) error { allTables := conf.Tables g, ctx := errgroup.WithContext(pCtx) for dbName, tables := range allTables { @@ -264,19 +277,31 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP for _, tableIR := range tableDataIRArray { tableIR := tableIR g.Go(func() error { - retryTime := 1 - return utils.WithRetry(ctx, func() error { - log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()), - zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex())) - conn := connectPool.getConn() - defer connectPool.releaseConn(conn) + conn := connectPool.getConn() + defer func() { + connectPool.releaseConn(conn) + }() + retryTime := 0 + var lastErr error + return utils.WithRetry(ctx, func() (err error) { + defer func() { + lastErr = err + }() retryTime += 1 - err := tableIR.Start(ctx, conn) + log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()), + zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()), zap.NamedError("lastError", lastErr)) + if retryTime > 1 { + conn, err = rebuildConnFunc(conn) + if err != nil { + return + } + } + err = tableIR.Start(ctx, conn) if err != nil { - return err + return } return writer.WriteTableData(ctx, tableIR) - }, newDumpChunkBackoffer()) + }, newDumpChunkBackoffer(conf.Consistency == "none" || conf.Consistency == "snapshot")) }) } } diff --git a/v4/export/dump_test.go b/v4/export/dump_test.go index 8f9e542e..b3c05686 100644 --- a/v4/export/dump_test.go +++ b/v4/export/dump_test.go @@ -83,7 +83,9 @@ func (s *testDumpSuite) TestDumpDatabase(c *C) { mockWriter := newMockWriter() connectPool := newMockConnectPool(c, db) - err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter) + err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter, func(conn *sql.Conn) (*sql.Conn, error) { + return conn, nil + }) c.Assert(err, IsNil) c.Assert(len(mockWriter.databaseMeta), Equals, 1) @@ -202,6 +204,7 @@ func (s *testDumpSuite) TestDumpDatabaseWithRetry(c *C) { mockConfig.SortByPk = false mockConfig.Databases = []string{"test"} mockConfig.Tables = NewDatabaseTables().AppendTables("test", "t") + mockConfig.Consistency = "none" db, mock, err := sqlmock.New() c.Assert(err, IsNil) @@ -221,7 +224,9 @@ func (s *testDumpSuite) TestDumpDatabaseWithRetry(c *C) { mockWriter := newMockWriter() connectPool := newMockConnectPool(c, db) - err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter) + err = dumpDatabases(context.Background(), mockConfig, connectPool, mockWriter, func(conn *sql.Conn) (*sql.Conn, error) { + return conn, nil + }) c.Assert(err, IsNil) c.Assert(len(mockWriter.databaseMeta), Equals, 1) diff --git a/v4/export/metadata.go b/v4/export/metadata.go index d21fdb8b..d8ce9769 100644 --- a/v4/export/metadata.go +++ b/v4/export/metadata.go @@ -15,7 +15,9 @@ import ( ) type globalMetadata struct { - buffer bytes.Buffer + buffer bytes.Buffer + afterConnBuffer bytes.Buffer + snapshot string storage storage.ExternalStorage } @@ -31,10 +33,11 @@ const ( mariadbShowMasterStatusFieldNum = 4 ) -func newGlobalMetadata(s storage.ExternalStorage) *globalMetadata { +func newGlobalMetadata(s storage.ExternalStorage, snapshot string) *globalMetadata { return &globalMetadata{ - storage: s, - buffer: bytes.Buffer{}, + storage: s, + buffer: bytes.Buffer{}, + snapshot: snapshot, } } @@ -47,16 +50,25 @@ func (m *globalMetadata) recordStartTime(t time.Time) { } func (m *globalMetadata) recordFinishTime(t time.Time) { + m.buffer.Write(m.afterConnBuffer.Bytes()) m.buffer.WriteString("Finished dump at: " + t.Format(metadataTimeLayout) + "\n") } -func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerType, afterConn bool, snapshot string) error { +func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerType, afterConn bool) error { + if afterConn { + m.afterConnBuffer.Reset() + return recordGlobalMetaData(db, &m.afterConnBuffer, serverType, afterConn, m.snapshot) + } + return recordGlobalMetaData(db, &m.buffer, serverType, afterConn, m.snapshot) +} + +func recordGlobalMetaData(db *sql.Conn, buffer *bytes.Buffer, serverType ServerType, afterConn bool, snapshot string) error { // get master status info - m.buffer.WriteString("SHOW MASTER STATUS:") + buffer.WriteString("SHOW MASTER STATUS:") if afterConn { - m.buffer.WriteString(" /* AFTER CONNECTION POOL ESTABLISHED */") + buffer.WriteString(" /* AFTER CONNECTION POOL ESTABLISHED */") } - m.buffer.WriteString("\n") + buffer.WriteString("\n") switch serverType { // For MySQL: // mysql 5.6+ @@ -92,7 +104,7 @@ func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerTyp gtidSet := getValidStr(str, gtidSetFieldIndex) if logFile != "" { - fmt.Fprintf(&m.buffer, "\tLog: %s\n\tPos: %s\n\tGTID:%s\n", logFile, pos, gtidSet) + fmt.Fprintf(buffer, "\tLog: %s\n\tPos: %s\n\tGTID:%s\n", logFile, pos, gtidSet) } // For MariaDB: // SHOW MASTER STATUS; @@ -122,12 +134,12 @@ func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerTyp } if logFile != "" { - fmt.Fprintf(&m.buffer, "\tLog: %s\n\tPos: %s\n\tGTID:%s\n", logFile, pos, gtidSet) + fmt.Fprintf(buffer, "\tLog: %s\n\tPos: %s\n\tGTID:%s\n", logFile, pos, gtidSet) } default: return errors.New("unsupported serverType" + serverType.String() + "for recordGlobalMetaData") } - m.buffer.WriteString("\n") + buffer.WriteString("\n") if serverType == ServerTypeTiDB { return nil } @@ -184,11 +196,11 @@ func (m *globalMetadata) recordGlobalMetaData(db *sql.Conn, serverType ServerTyp } } if len(host) > 0 { - m.buffer.WriteString("SHOW SLAVE STATUS:\n") + buffer.WriteString("SHOW SLAVE STATUS:\n") if isms { - m.buffer.WriteString("\tConnection name: " + connName + "\n") + buffer.WriteString("\tConnection name: " + connName + "\n") } - fmt.Fprintf(&m.buffer, "\tHost: %s\n\tLog: %s\n\tPos: %s\n\tGTID:%s\n\n", host, logFile, pos, gtidSet) + fmt.Fprintf(buffer, "\tHost: %s\n\tLog: %s\n\tPos: %s\n\tGTID:%s\n\n", host, logFile, pos, gtidSet) } return nil }) diff --git a/v4/export/metadata_test.go b/v4/export/metadata_test.go index 564a3448..0345f9f3 100644 --- a/v4/export/metadata_test.go +++ b/v4/export/metadata_test.go @@ -30,8 +30,8 @@ func (s *testMetaDataSuite) TestMysqlMetaData(c *C) { mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows( sqlmock.NewRows([]string{"exec_master_log_pos", "relay_master_log_file", "master_host", "Executed_Gtid_Set", "Seconds_Behind_Master"})) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: ON.000001\n"+ @@ -68,9 +68,10 @@ func (s *testMetaDataSuite) TestMetaDataAfterConn(c *C) { sqlmock.NewRows([]string{"exec_master_log_pos", "relay_master_log_file", "master_host", "Executed_Gtid_Set", "Seconds_Behind_Master"})) mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(rows2) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, true, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, true), IsNil) + m.buffer.Write(m.afterConnBuffer.Bytes()) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: ON.000001\n"+ @@ -101,8 +102,8 @@ func (s *testMetaDataSuite) TestMysqlWithFollowersMetaData(c *C) { mock.ExpectQuery("SELECT @@default_master_connection").WillReturnError(fmt.Errorf("mock error")) mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(followerRows) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: ON.000001\n"+ @@ -132,8 +133,8 @@ func (s *testMetaDataSuite) TestMysqlWithNullFollowersMetaData(c *C) { mock.ExpectQuery("SELECT @@default_master_connection").WillReturnError(fmt.Errorf("mock error")) mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(sqlmock.NewRows([]string{"SQL_Remaining_Delay"}).AddRow(nil)) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: ON.000001\n"+ @@ -159,8 +160,8 @@ func (s *testMetaDataSuite) TestMariaDBMetaData(c *C) { AddRow(gtidSet) mock.ExpectQuery("SELECT @@global.gtid_binlog_pos").WillReturnRows(rows) mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows(rows) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMariaDB, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMariaDB, false), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) } @@ -186,8 +187,8 @@ func (s *testMetaDataSuite) TestMariaDBWithFollowersMetaData(c *C) { AddRow("connection_1")) mock.ExpectQuery("SHOW ALL SLAVES STATUS").WillReturnRows(followerRows) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: ON.000001\n"+ @@ -224,8 +225,8 @@ func (s *testMetaDataSuite) TestEarlierMysqlMetaData(c *C) { mock.ExpectQuery("SHOW SLAVE STATUS").WillReturnRows( sqlmock.NewRows([]string{"exec_master_log_pos", "relay_master_log_file", "master_host", "Executed_Gtid_Set", "Seconds_Behind_Master"})) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeMySQL, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: mysql-bin.000001\n"+ @@ -247,8 +248,8 @@ func (s *testMetaDataSuite) TestTiDBSnapshotMetaData(c *C) { AddRow(logFile, pos, "", "") mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(rows) - m := newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeTiDB, false, ""), IsNil) + m := newGlobalMetadata(s.createStorage(c), "") + c.Assert(m.recordGlobalMetaData(conn, ServerTypeTiDB, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: tidb-binlog\n"+ "\tPos: 420633329401856001\n"+ @@ -258,8 +259,8 @@ func (s *testMetaDataSuite) TestTiDBSnapshotMetaData(c *C) { rows = sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB"}). AddRow(logFile, pos, "", "") mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(rows) - m = newGlobalMetadata(s.createStorage(c)) - c.Assert(m.recordGlobalMetaData(conn, ServerTypeTiDB, false, snapshot), IsNil) + m = newGlobalMetadata(s.createStorage(c), snapshot) + c.Assert(m.recordGlobalMetaData(conn, ServerTypeTiDB, false), IsNil) c.Assert(m.buffer.String(), Equals, "SHOW MASTER STATUS:\n"+ "\tLog: tidb-binlog\n"+ "\tPos: 420633273211289601\n"+ diff --git a/v4/export/retry.go b/v4/export/retry.go index 894d7042..33c4e631 100644 --- a/v4/export/retry.go +++ b/v4/export/retry.go @@ -14,7 +14,12 @@ const ( dumpChunkMaxWaitInterval = 200 * time.Millisecond ) -func newDumpChunkBackoffer() *dumpChunkBackoffer { +func newDumpChunkBackoffer(canRetry bool) *dumpChunkBackoffer { + if !canRetry { + return &dumpChunkBackoffer{ + attempt: 1, + } + } return &dumpChunkBackoffer{ attempt: dumpChunkRetryTime, delayTime: dumpChunkWaitInterval, diff --git a/v4/export/writer_util.go b/v4/export/writer_util.go index 348d5023..957d67c9 100644 --- a/v4/export/writer_util.go +++ b/v4/export/writer_util.go @@ -313,6 +313,7 @@ func WriteInsertInCsv(pCtx context.Context, tblIR TableDataIR, w storage.Writer, log.Debug("dumping table", zap.String("table", tblIR.TableName()), + zap.Int("chunkIndex", tblIR.ChunkIndex()), zap.Int("record counts", counter)) if bf.Len() > 0 { wp.input <- bf From 9102cd79163ab23984554b1cdde8e42d3d729301 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 12 Nov 2020 15:39:04 +0800 Subject: [PATCH 2/6] refine consistency variables --- v4/export/config.go | 4 ++-- v4/export/consistency.go | 24 ++++++++++++++++-------- v4/export/consistency_test.go | 24 ++++++++++++------------ v4/export/dump.go | 12 ++++++------ v4/export/dump_test.go | 2 +- 5 files changed, 37 insertions(+), 29 deletions(-) diff --git a/v4/export/config.go b/v4/export/config.go index bfa9bdf5..98fd4e1f 100644 --- a/v4/export/config.go +++ b/v4/export/config.go @@ -138,7 +138,7 @@ func DefaultConfig() *Config { SortByPk: true, Tables: nil, Snapshot: "", - Consistency: "auto", + Consistency: consistencyTypeAuto, NoViews: true, Rows: UnspecifiedSize, Where: "", @@ -196,7 +196,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) { flags.String(flagLoglevel, "info", "Log level: {debug|info|warn|error|dpanic|panic|fatal}") flags.StringP(flagLogfile, "L", "", "Log file `path`, leave empty to write to console") flags.String(flagLogfmt, "text", "Log `format`: {text|json}") - flags.String(flagConsistency, "auto", "Consistency level during dumping: {auto|none|flush|lock|snapshot}") + flags.String(flagConsistency, consistencyTypeAuto, "Consistency level during dumping: {auto|none|flush|lock|snapshot}") flags.String(flagSnapshot, "", "Snapshot position (uint64 from pd timestamp for TiDB). Valid only when consistency=snapshot") flags.BoolP(flagNoViews, "W", true, "Do not dump views") flags.String(flagStatusAddr, ":8281", "dumpling API server and pprof addr") diff --git a/v4/export/consistency.go b/v4/export/consistency.go index 9f948c28..06cb1892 100644 --- a/v4/export/consistency.go +++ b/v4/export/consistency.go @@ -7,6 +7,14 @@ import ( "github.com/pingcap/errors" ) +const ( + consistencyTypeAuto = "auto" + consistencyTypeFlush = "flush" + consistencyTypeLock = "lock" + consistencyTypeSnapshot = "snapshot" + consistencyTypeNone = "none" +) + func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB) (ConsistencyController, error) { resolveAutoConsistency(conf) conn, err := session.Conn(ctx) @@ -14,22 +22,22 @@ func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB return nil, err } switch conf.Consistency { - case "flush": + case consistencyTypeFlush: return &ConsistencyFlushTableWithReadLock{ serverType: conf.ServerInfo.ServerType, conn: conn, }, nil - case "lock": + case consistencyTypeLock: return &ConsistencyLockDumpingTables{ conn: conn, allTables: conf.Tables, }, nil - case "snapshot": + case consistencyTypeSnapshot: if conf.ServerInfo.ServerType != ServerTypeTiDB { return nil, errors.New("snapshot consistency is not supported for this server") } return &ConsistencyNone{}, nil - case "none": + case consistencyTypeNone: return &ConsistencyNone{}, nil default: return nil, errors.Errorf("invalid consistency option %s", conf.Consistency) @@ -105,15 +113,15 @@ func (c *ConsistencyLockDumpingTables) TearDown(ctx context.Context) error { const snapshotFieldIndex = 1 func resolveAutoConsistency(conf *Config) { - if conf.Consistency != "auto" { + if conf.Consistency != consistencyTypeAuto { return } switch conf.ServerInfo.ServerType { case ServerTypeTiDB: - conf.Consistency = "snapshot" + conf.Consistency = consistencyTypeSnapshot case ServerTypeMySQL, ServerTypeMariaDB: - conf.Consistency = "flush" + conf.Consistency = consistencyTypeFlush default: - conf.Consistency = "none" + conf.Consistency = consistencyTypeNone } } diff --git a/v4/export/consistency_test.go b/v4/export/consistency_test.go index 71999e67..63eff2e1 100644 --- a/v4/export/consistency_test.go +++ b/v4/export/consistency_test.go @@ -33,13 +33,13 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) { conf := DefaultConfig() resultOk := sqlmock.NewResult(0, 1) - conf.Consistency = "none" + conf.Consistency = consistencyTypeNone ctrl, _ := NewConsistencyController(ctx, conf, db) _, ok := ctrl.(*ConsistencyNone) c.Assert(ok, IsTrue) s.assertLifetimeErrNil(ctx, ctrl, c) - conf.Consistency = "flush" + conf.Consistency = consistencyTypeFlush mock.ExpectExec("FLUSH TABLES WITH READ LOCK").WillReturnResult(resultOk) mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk) ctrl, _ = NewConsistencyController(ctx, conf, db) @@ -50,14 +50,14 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) { c.Fatalf(err.Error()) } - conf.Consistency = "snapshot" + conf.Consistency = consistencyTypeSnapshot conf.ServerInfo.ServerType = ServerTypeTiDB ctrl, _ = NewConsistencyController(ctx, conf, db) _, ok = ctrl.(*ConsistencyNone) c.Assert(ok, IsTrue) s.assertLifetimeErrNil(ctx, ctrl, c) - conf.Consistency = "lock" + conf.Consistency = consistencyTypeLock conf.Tables = NewDatabaseTables(). AppendTables("db1", "t1", "t2", "t3"). AppendViews("db2", "t4") @@ -80,14 +80,14 @@ func (s *testConsistencySuite) TestResolveAutoConsistency(c *C) { serverTp ServerType resolvedConsistency string }{ - {ServerTypeTiDB, "snapshot"}, - {ServerTypeMySQL, "flush"}, - {ServerTypeMariaDB, "flush"}, - {ServerTypeUnknown, "none"}, + {ServerTypeTiDB, consistencyTypeSnapshot}, + {ServerTypeMySQL, consistencyTypeFlush}, + {ServerTypeMariaDB, consistencyTypeFlush}, + {ServerTypeUnknown, consistencyTypeNone}, } for _, x := range cases { - conf.Consistency = "auto" + conf.Consistency = consistencyTypeAuto conf.ServerInfo.ServerType = x.serverTp resolveAutoConsistency(conf) cmt := Commentf("server type %s", x.serverTp.String()) @@ -109,20 +109,20 @@ func (s *testConsistencySuite) TestConsistencyControllerError(c *C) { c.Assert(strings.Contains(err.Error(), "invalid consistency option"), IsTrue) // snapshot consistency is only available in TiDB - conf.Consistency = "snapshot" + conf.Consistency = consistencyTypeSnapshot conf.ServerInfo.ServerType = ServerTypeUnknown _, err = NewConsistencyController(ctx, conf, db) c.Assert(err, NotNil) // flush consistency is unavailable in TiDB - conf.Consistency = "flush" + conf.Consistency = consistencyTypeFlush conf.ServerInfo.ServerType = ServerTypeTiDB ctrl, _ := NewConsistencyController(ctx, conf, db) err = ctrl.Setup(ctx) c.Assert(err, NotNil) // lock table fail - conf.Consistency = "lock" + conf.Consistency = consistencyTypeLock conf.Tables = NewDatabaseTables().AppendTables("db", "t") mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New("")) ctrl, _ = NewConsistencyController(ctx, conf, db) diff --git a/v4/export/dump.go b/v4/export/dump.go index 70706dd4..8bd9e988 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -76,7 +76,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { } snapshot := conf.Snapshot - if snapshot == "" && (doPdGC || conf.Consistency == "snapshot") { + if snapshot == "" && (doPdGC || conf.Consistency == consistencyTypeLock) { conn, err := pool.Conn(ctx) if err != nil { conn.Close() @@ -97,7 +97,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { if err != nil { return err } - if conf.Consistency == "snapshot" { + if conf.Consistency == consistencyTypeLock { hasTiKV, err := CheckTiDBWithTiKV(pool) if err != nil { return err @@ -135,7 +135,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { // for consistency lock, we should lock tables at first to get the tables we want to lock & dump // for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables - if conf.Consistency == "lock" { + if conf.Consistency == consistencyTypeLock { conn, err := createConnWithConsistency(ctx, pool) if err != nil { return errors.Trace(err) @@ -164,7 +164,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { // for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached // for other consistencies, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot. - if conf.Consistency != "lock" { + if conf.Consistency != consistencyTypeLock { conn, err := pool.Conn(ctx) if err != nil { return errors.Trace(err) @@ -193,7 +193,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { connectPool.releaseConn(conn) } - if conf.Consistency != "lock" { + if conf.Consistency != consistencyTypeLock { conn := connectPool.getConn() if err = prepareTableListToDump(conf, conn); err != nil { connectPool.releaseConn(conn) @@ -301,7 +301,7 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP return } return writer.WriteTableData(ctx, tableIR) - }, newDumpChunkBackoffer(conf.Consistency == "none" || conf.Consistency == "snapshot")) + }, newDumpChunkBackoffer(conf.Consistency == consistencyTypeNone || conf.Consistency == consistencyTypeLock)) }) } } diff --git a/v4/export/dump_test.go b/v4/export/dump_test.go index b3c05686..52a231a7 100644 --- a/v4/export/dump_test.go +++ b/v4/export/dump_test.go @@ -204,7 +204,7 @@ func (s *testDumpSuite) TestDumpDatabaseWithRetry(c *C) { mockConfig.SortByPk = false mockConfig.Databases = []string{"test"} mockConfig.Tables = NewDatabaseTables().AppendTables("test", "t") - mockConfig.Consistency = "none" + mockConfig.Consistency = consistencyTypeNone db, mock, err := sqlmock.New() c.Assert(err, IsNil) From 54e389f110fc539c0fdbd8112b7bcb7ab20d7e6e Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 12 Nov 2020 16:05:00 +0800 Subject: [PATCH 3/6] add --trx-consistency-only and support rebuilding mysql conn to retry --- v4/export/config.go | 10 +++++++++- v4/export/consistency.go | 19 +++++++++++++++++++ v4/export/dump.go | 36 ++++++++++++++++++++++++++++++------ 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/v4/export/config.go b/v4/export/config.go index 98fd4e1f..9b50b237 100644 --- a/v4/export/config.go +++ b/v4/export/config.go @@ -62,7 +62,9 @@ const ( flagOutputFilenameTemplate = "output-filename-template" flagCompleteInsert = "complete-insert" flagParams = "params" - FlagHelp = "help" + flagTrxConsistencyOnly = "trx-consistency-only" + + FlagHelp = "help" ) type Config struct { @@ -110,6 +112,7 @@ type Config struct { Where string FileType string CompleteInsert bool + TrxConsistencyOnly bool EscapeBackslash bool DumpEmptyDatabase bool OutputFileTemplate *template.Template `json:"-"` @@ -222,6 +225,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) { flags.Bool(flagCompleteInsert, false, "Use complete INSERT statements that include column names") flags.StringToString(flagParams, nil, `Extra session variables used while dumping, accepted format: --params "character_set_client=latin1,character_set_connection=latin1"`) flags.Bool(FlagHelp, false, "Print help message and quit") + flags.Bool(flagTrxConsistencyOnly, true, "Only support transactional consistency") } // GetDSN generates DSN from Config @@ -355,6 +359,10 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + conf.TrxConsistencyOnly, err = flags.GetBool(flagTrxConsistencyOnly) + if err != nil { + return errors.Trace(err) + } if conf.Threads <= 0 { return errors.Errorf("--threads is set to %d. It should be greater than 0", conf.Threads) diff --git a/v4/export/consistency.go b/v4/export/consistency.go index 06cb1892..163fb454 100644 --- a/v4/export/consistency.go +++ b/v4/export/consistency.go @@ -47,6 +47,7 @@ func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB type ConsistencyController interface { Setup(context.Context) error TearDown(context.Context) error + PingContext(context.Context) error } type ConsistencyNone struct{} @@ -59,6 +60,10 @@ func (c *ConsistencyNone) TearDown(_ context.Context) error { return nil } +func (c *ConsistencyNone) PingContext(_ context.Context) error { + return nil +} + type ConsistencyFlushTableWithReadLock struct { serverType ServerType conn *sql.Conn @@ -82,6 +87,13 @@ func (c *ConsistencyFlushTableWithReadLock) TearDown(ctx context.Context) error return UnlockTables(ctx, c.conn) } +func (c *ConsistencyFlushTableWithReadLock) PingContext(ctx context.Context) error { + if c.conn == nil { + return errors.New("connection has already closed!") + } + return c.conn.PingContext(ctx) +} + type ConsistencyLockDumpingTables struct { conn *sql.Conn allTables DatabaseTables @@ -110,6 +122,13 @@ func (c *ConsistencyLockDumpingTables) TearDown(ctx context.Context) error { return UnlockTables(ctx, c.conn) } +func (c *ConsistencyLockDumpingTables) PingContext(ctx context.Context) error { + if c.conn == nil { + return errors.New("connection has already closed!") + } + return c.conn.PingContext(ctx) +} + const snapshotFieldIndex = 1 func resolveAutoConsistency(conf *Config) { diff --git a/v4/export/dump.go b/v4/export/dump.go index 8bd9e988..01b8c561 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -202,8 +202,13 @@ func Dump(pCtx context.Context, conf *Config) (err error) { connectPool.releaseConn(conn) } - if err = conCtrl.TearDown(ctx); err != nil { - return err + if conf.TrxConsistencyOnly { + if conf.Consistency == consistencyTypeFlush || conf.Consistency == consistencyTypeLock { + log.Info("All the dumping transactions have started. Start to unlock tables") + } + if err = conCtrl.TearDown(ctx); err != nil { + return err + } } failpoint.Inject("ConsistencyCheck", nil) @@ -224,15 +229,23 @@ func Dump(pCtx context.Context, conf *Config) (err error) { if conf.Sql == "" { if err = dumpDatabases(ctx, conf, connectPool, writer, func(conn *sql.Conn) (*sql.Conn, error) { + // make sure that the lock connection is still alive + err := conCtrl.PingContext(ctx) + if err != nil { + return conn, err + } + // give up the last broken connection conn.Close() - conn, err = createConnWithConsistency(ctx, pool) + newConn, err := createConnWithConsistency(ctx, pool) if err != nil { - return nil, err + return conn, err } + conn = newConn + // renew the master status after connection. dm can't close safe-mode until current pos if conf.PosAfterConnect { err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, true) if err != nil { - return nil, err + return conn, err } } return conn, nil @@ -301,7 +314,7 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP return } return writer.WriteTableData(ctx, tableIR) - }, newDumpChunkBackoffer(conf.Consistency == consistencyTypeNone || conf.Consistency == consistencyTypeLock)) + }, newDumpChunkBackoffer(canRebuildConn(conf.Consistency, conf.TrxConsistencyOnly))) }) } } @@ -445,3 +458,14 @@ func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, ttl int64, } } } + +func canRebuildConn(consistency string, trxConsistencyOnly bool) bool { + switch consistency { + case consistencyTypeLock, consistencyTypeFlush: + return !trxConsistencyOnly + case consistencyTypeSnapshot, consistencyTypeNone: + return true + default: + return false + } +} From 278a1496ceea55a792af12e5240913c6492b2150 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 13 Nov 2020 13:15:52 +0800 Subject: [PATCH 4/6] fix error message --- v4/export/consistency.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v4/export/consistency.go b/v4/export/consistency.go index 163fb454..e9357b3a 100644 --- a/v4/export/consistency.go +++ b/v4/export/consistency.go @@ -89,7 +89,7 @@ func (c *ConsistencyFlushTableWithReadLock) TearDown(ctx context.Context) error func (c *ConsistencyFlushTableWithReadLock) PingContext(ctx context.Context) error { if c.conn == nil { - return errors.New("connection has already closed!") + return errors.New("consistency connection has already been closed!") } return c.conn.PingContext(ctx) } @@ -124,7 +124,7 @@ func (c *ConsistencyLockDumpingTables) TearDown(ctx context.Context) error { func (c *ConsistencyLockDumpingTables) PingContext(ctx context.Context) error { if c.conn == nil { - return errors.New("connection has already closed!") + return errors.New("consistency connection has already been closed!") } return c.conn.PingContext(ctx) } From 311ad54beaa439c2583fbf61c9155a0f67a79522 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 16 Nov 2020 13:29:42 +0800 Subject: [PATCH 5/6] address comment --- v4/export/config.go | 106 ++++++++++++++++++++++---------------------- v4/export/dump.go | 4 +- 2 files changed, 55 insertions(+), 55 deletions(-) diff --git a/v4/export/config.go b/v4/export/config.go index 2b852037..3e845a44 100644 --- a/v4/export/config.go +++ b/v4/export/config.go @@ -23,47 +23,47 @@ import ( ) const ( - flagDatabase = "database" - flagTablesList = "tables-list" - flagHost = "host" - flagUser = "user" - flagPort = "port" - flagPassword = "password" - flagAllowCleartextPasswords = "allow-cleartext-passwords" - flagThreads = "threads" - flagFilesize = "filesize" - flagStatementSize = "statement-size" - flagOutput = "output" - flagLoglevel = "loglevel" - flagLogfile = "logfile" - flagLogfmt = "logfmt" - flagConsistency = "consistency" - flagSnapshot = "snapshot" - flagNoViews = "no-views" - flagStatusAddr = "status-addr" - flagRows = "rows" - flagWhere = "where" - flagEscapeBackslash = "escape-backslash" - flagFiletype = "filetype" - flagNoHeader = "no-header" - flagNoSchemas = "no-schemas" - flagNoData = "no-data" - flagCsvNullValue = "csv-null-value" - flagSql = "sql" - flagFilter = "filter" - flagCaseSensitive = "case-sensitive" - flagDumpEmptyDatabase = "dump-empty-database" - flagTidbMemQuotaQuery = "tidb-mem-quota-query" - flagCA = "ca" - flagCert = "cert" - flagKey = "key" - flagCsvSeparator = "csv-separator" - flagCsvDelimiter = "csv-delimiter" - flagOutputFilenameTemplate = "output-filename-template" - flagCompleteInsert = "complete-insert" - flagParams = "params" - flagReadTimeout = "read-timeout" - flagTrxConsistencyOnly = "trx-consistency-only" + flagDatabase = "database" + flagTablesList = "tables-list" + flagHost = "host" + flagUser = "user" + flagPort = "port" + flagPassword = "password" + flagAllowCleartextPasswords = "allow-cleartext-passwords" + flagThreads = "threads" + flagFilesize = "filesize" + flagStatementSize = "statement-size" + flagOutput = "output" + flagLoglevel = "loglevel" + flagLogfile = "logfile" + flagLogfmt = "logfmt" + flagConsistency = "consistency" + flagSnapshot = "snapshot" + flagNoViews = "no-views" + flagStatusAddr = "status-addr" + flagRows = "rows" + flagWhere = "where" + flagEscapeBackslash = "escape-backslash" + flagFiletype = "filetype" + flagNoHeader = "no-header" + flagNoSchemas = "no-schemas" + flagNoData = "no-data" + flagCsvNullValue = "csv-null-value" + flagSql = "sql" + flagFilter = "filter" + flagCaseSensitive = "case-sensitive" + flagDumpEmptyDatabase = "dump-empty-database" + flagTidbMemQuotaQuery = "tidb-mem-quota-query" + flagCA = "ca" + flagCert = "cert" + flagKey = "key" + flagCsvSeparator = "csv-separator" + flagCsvDelimiter = "csv-delimiter" + flagOutputFilenameTemplate = "output-filename-template" + flagCompleteInsert = "complete-insert" + flagParams = "params" + flagReadTimeout = "read-timeout" + flagTransactionalConsistency = "transactional-consistency" FlagHelp = "help" ) @@ -109,16 +109,16 @@ type Config struct { CsvDelimiter string ReadTimeout time.Duration - TableFilter filter.Filter `json:"-"` - Rows uint64 - Where string - FileType string - CompleteInsert bool - TrxConsistencyOnly bool - EscapeBackslash bool - DumpEmptyDatabase bool - OutputFileTemplate *template.Template `json:"-"` - SessionParams map[string]interface{} + TableFilter filter.Filter `json:"-"` + Rows uint64 + Where string + FileType string + CompleteInsert bool + TransactionalConsistency bool + EscapeBackslash bool + DumpEmptyDatabase bool + OutputFileTemplate *template.Template `json:"-"` + SessionParams map[string]interface{} PosAfterConnect bool @@ -232,7 +232,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) { flags.Bool(FlagHelp, false, "Print help message and quit") flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.") flags.MarkHidden(flagReadTimeout) - flags.Bool(flagTrxConsistencyOnly, true, "Only support transactional consistency") + flags.Bool(flagTransactionalConsistency, true, "Only support transactional consistency") } // GetDSN generates DSN from Config @@ -370,7 +370,7 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - conf.TrxConsistencyOnly, err = flags.GetBool(flagTrxConsistencyOnly) + conf.TransactionalConsistency, err = flags.GetBool(flagTransactionalConsistency) if err != nil { return errors.Trace(err) } diff --git a/v4/export/dump.go b/v4/export/dump.go index c4caad84..5edab3be 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -201,7 +201,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { } } - if conf.TrxConsistencyOnly { + if conf.TransactionalConsistency { if conf.Consistency == consistencyTypeFlush || conf.Consistency == consistencyTypeLock { log.Info("All the dumping transactions have started. Start to unlock tables") } @@ -309,7 +309,7 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP return } return writer.WriteTableData(ctx, tableIR) - }, newDumpChunkBackoffer(canRebuildConn(conf.Consistency, conf.TrxConsistencyOnly))) + }, newDumpChunkBackoffer(canRebuildConn(conf.Consistency, conf.TransactionalConsistency))) }) } } From 91dcef296b33b9220e28c6c0deea33138e529798 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 16 Nov 2020 16:21:05 +0800 Subject: [PATCH 6/6] refine code --- v4/export/dump.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/v4/export/dump.go b/v4/export/dump.go index 5edab3be..3bd6f50c 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -76,7 +76,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { } snapshot := conf.Snapshot - if snapshot == "" && (doPdGC || conf.Consistency == consistencyTypeLock) { + if snapshot == "" && (doPdGC || conf.Consistency == consistencyTypeSnapshot) { conn, err := pool.Conn(ctx) if err != nil { conn.Close() @@ -97,7 +97,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { if err != nil { return err } - if conf.Consistency == consistencyTypeLock { + if conf.Consistency == consistencyTypeSnapshot { hasTiKV, err := CheckTiDBWithTiKV(pool) if err != nil { return err @@ -240,7 +240,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) { return conn, err } conn = newConn - // renew the master status after connection. dm can't close safe-mode until current pos + // renew the master status after connection. dm can't close safe-mode until dm reaches current pos if conf.PosAfterConnect { err = m.recordGlobalMetaData(conn, conf.ServerInfo.ServerType, true) if err != nil {