From 35603f6e66f6ebefa0703ea6c166053e3d0295b7 Mon Sep 17 00:00:00 2001 From: freemindLi <1376964097@qq.com> Date: Mon, 17 Feb 2020 19:09:33 +0800 Subject: [PATCH 1/7] 1.Optimize the performance of bidirectional synchronous update table of _drainer_repl_mark --- drainer/loopbacksync/loopbacksync.go | 2 + pkg/loader/executor.go | 76 +++++++++++++++++++++++++--- pkg/loader/load.go | 17 ++++++- pkg/loader/model.go | 2 +- pkg/loader/model_test.go | 7 ++- 5 files changed, 91 insertions(+), 13 deletions(-) diff --git a/drainer/loopbacksync/loopbacksync.go b/drainer/loopbacksync/loopbacksync.go index 9960bb180..d0e03e8e6 100644 --- a/drainer/loopbacksync/loopbacksync.go +++ b/drainer/loopbacksync/loopbacksync.go @@ -16,6 +16,8 @@ package loopbacksync const ( //MarkTableName mark table name MarkTableName = "retl._drainer_repl_mark" + //ID syncer worker thread id + ID = "id" //ChannelID channel id ChannelID = "channel_id" //Val val diff --git a/pkg/loader/executor.go b/pkg/loader/executor.go index 4058bfece..e9e73f39e 100644 --- a/pkg/loader/executor.go +++ b/pkg/loader/executor.go @@ -18,6 +18,7 @@ import ( gosql "database/sql" "fmt" "strings" + "sync/atomic" "time" "github.com/pingcap/tidb-binlog/drainer/loopbacksync" @@ -33,10 +34,13 @@ import ( ) var defaultBatchSize = 128 +var defaultWorkerCount = 16 +var index int64 type executor struct { db *gosql.DB batchSize int + workerCount int info *loopbacksync.LoopBackSync queryHistogramVec *prometheus.HistogramVec refreshTableInfo func(schema string, table string) (info *tableInfo, err error) @@ -44,8 +48,9 @@ type executor struct { func newExecutor(db *gosql.DB) *executor { exe := &executor{ - db: db, - batchSize: defaultBatchSize, + db: db, + batchSize: defaultBatchSize, + workerCount: defaultWorkerCount, } return exe @@ -65,6 +70,10 @@ func (e *executor) setSyncInfo(info *loopbacksync.LoopBackSync) { e.info = info } +func (e *executor) setWorkerCount(workerCount int) { + e.workerCount = workerCount +} + func (e *executor) withQueryHistogramVec(queryHistogramVec *prometheus.HistogramVec) *executor { e.queryHistogramVec = queryHistogramVec return e @@ -116,19 +125,72 @@ func (tx *tx) commit() error { } func (e *executor) updateMark(channel string, tx *tx) error { + if e.info == nil { + return nil + } + index = e.getIndex(index) + var args []interface{} + sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.ID, loopbacksync.ChannelID) + args = append(args, index, e.info.ChannelID) + _, err1 := tx.autoRollbackExec(sql, args...) + if err1 != nil { + return errors.Trace(err1) + } + return nil +} + +func (e *executor) initMarkTable() error { if e.info == nil { return nil } status := 1 - columns := fmt.Sprintf("(%s,%s,%s) VALUES(?,?,?)", loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + channel := "" + var builder strings.Builder + holder := "(?,?,?,?)" + columns := fmt.Sprintf("(%s,%s,%s,%s) ", loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + builder.WriteString("REPLACE INTO " + loopbacksync.MarkTableName + columns + " VALUES ") + for i := 0; i < e.workerCount; i++ { + if i > 0 { + builder.WriteByte(',') + } + builder.WriteString(holder) + } var args []interface{} - sql := fmt.Sprintf("INSERT INTO %s%s on duplicate key update %s=%s+1;", loopbacksync.MarkTableName, columns, loopbacksync.Val, loopbacksync.Val) - args = append(args, e.info.ChannelID, status, channel) - _, err := tx.autoRollbackExec(sql, args...) + for id := 0; id < e.workerCount; id++ { + args = append(args, id, e.info.ChannelID, status, channel) + } + tx, err := e.begin() if err != nil { return errors.Trace(err) } - return nil + _, err1 := tx.autoRollbackExec(builder.String(), args...) + if err1 != nil { + return errors.Trace(err1) + } + err2 := tx.commit() + return errors.Trace(err2) +} + +func (e *executor) cleanChannelInfo() error { + if e.info == nil { + return nil + } + tx, err := e.begin() + if err != nil { + return errors.Trace(err) + } + var args []interface{} + sql := fmt.Sprintf("delete from %s where %s=? ", loopbacksync.MarkTableName, loopbacksync.ChannelID) + args = append(args, e.info.ChannelID) + _, err1 := tx.autoRollbackExec(sql, args...) + if err1 != nil { + return errors.Trace(err1) + } + err2 := tx.commit() + return errors.Trace(err2) +} +func (e *executor) getIndex(index int64) int64 { + return atomic.AddInt64(&index, 1) % ((int64)(e.workerCount)) } // return a wrap of sql.Tx diff --git a/pkg/loader/load.go b/pkg/loader/load.go index a8493b2e9..c2f158b85 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -505,16 +505,30 @@ func (s *loaderImpl) createMarkTable() error { return nil } +func (s *loaderImpl) initMarkTable() error { + if err := s.createMarkTable(); err != nil { + return errors.Trace(err) + } + executor := s.getExecutor() + return executor.initMarkTable() +} + +func (s *loaderImpl) cleanChannelInfo() { + executor := s.getExecutor() + _ = executor.cleanChannelInfo() +} + // Run will quit when meet any error, or all the txn are drained func (s *loaderImpl) Run() error { if s.loopBackSyncInfo != nil && s.loopBackSyncInfo.LoopbackControl { - if err := s.createMarkTable(); err != nil { + if err := s.initMarkTable(); err != nil { return errors.Trace(err) } } txnManager := newTxnManager(1024, s.input) defer func() { log.Info("Run()... in Loader quit") + s.cleanChannelInfo() close(s.successTxn) txnManager.Close() }() @@ -624,6 +638,7 @@ func (s *loaderImpl) getExecutor() *executor { e = e.withRefreshTableInfo(s.refreshTableInfo) } e.setSyncInfo(s.loopBackSyncInfo) + e.setWorkerCount(s.workerCount) if s.metrics != nil && s.metrics.QueryHistogramVec != nil { e = e.withQueryHistogramVec(s.metrics.QueryHistogramVec) } diff --git a/pkg/loader/model.go b/pkg/loader/model.go index 05e05de06..9001ee457 100644 --- a/pkg/loader/model.go +++ b/pkg/loader/model.go @@ -192,7 +192,7 @@ func (dml *DML) updateSQL() (sql string, args []interface{}) { } func createMarkTableDDL() string { - sql := fmt.Sprintf("CREATE TABLE If Not Exists %s ( %s bigint primary key, %s bigint DEFAULT 0, %s varchar(64));", loopbacksync.MarkTableName, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + sql := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.ID, loopbacksync.ChannelID) return sql } diff --git a/pkg/loader/model_test.go b/pkg/loader/model_test.go index 8a83e5007..e4caf939c 100644 --- a/pkg/loader/model_test.go +++ b/pkg/loader/model_test.go @@ -242,11 +242,10 @@ func (s *SQLSuite) TestUpdateMarkSQL(c *check.C) { db, mock, err := sqlmock.New() c.Assert(err, check.IsNil) defer db.Close() - columns := fmt.Sprintf("(%s,%s,%s) VALUES(?,?,?)", loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) - sql := fmt.Sprintf("INSERT INTO %s%s on duplicate key update %s=%s+1;", loopbacksync.MarkTableName, columns, loopbacksync.Val, loopbacksync.Val) + sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.ID, loopbacksync.ChannelID) mock.ExpectBegin() mock.ExpectExec(regexp.QuoteMeta(sql)). - WithArgs(100, 1, "").WillReturnResult(sqlmock.NewResult(1, 1)) + WithArgs(1, 100).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() e := newExecutor(db) tx, err := e.begin() @@ -261,6 +260,6 @@ func (s *SQLSuite) TestUpdateMarkSQL(c *check.C) { } func (s *SQLSuite) TestCreateMarkTable(c *check.C) { sql := createMarkTableDDL() - sql1 := fmt.Sprintf("CREATE TABLE If Not Exists %s ( %s bigint primary key, %s bigint DEFAULT 0, %s varchar(64));", loopbacksync.MarkTableName, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + sql1 := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.ID, loopbacksync.ChannelID) c.Assert(sql, check.Equals, sql1) } From 656b43011a0c12d10f12f5d99b0fb0e08374e4c2 Mon Sep 17 00:00:00 2001 From: freemindLi <1376964097@qq.com> Date: Tue, 18 Feb 2020 10:27:50 +0800 Subject: [PATCH 2/7] 1.fix code format --- drainer/loopbacksync/loopbacksync.go | 4 ++-- pkg/loader/executor.go | 17 ++++++++++------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/drainer/loopbacksync/loopbacksync.go b/drainer/loopbacksync/loopbacksync.go index d0e03e8e6..8388ece3d 100644 --- a/drainer/loopbacksync/loopbacksync.go +++ b/drainer/loopbacksync/loopbacksync.go @@ -16,8 +16,8 @@ package loopbacksync const ( //MarkTableName mark table name MarkTableName = "retl._drainer_repl_mark" - //ID syncer worker thread id - ID = "id" + //ID syncer worker coroutine id + ID = "coroutineID" //ChannelID channel id ChannelID = "channel_id" //Val val diff --git a/pkg/loader/executor.go b/pkg/loader/executor.go index e9e73f39e..9ccafb127 100644 --- a/pkg/loader/executor.go +++ b/pkg/loader/executor.go @@ -33,9 +33,11 @@ import ( "golang.org/x/sync/errgroup" ) -var defaultBatchSize = 128 -var defaultWorkerCount = 16 -var index int64 +var ( + defaultBatchSize = 128 + defaultWorkerCount = 16 + index int64 +) type executor struct { db *gosql.DB @@ -128,10 +130,10 @@ func (e *executor) updateMark(channel string, tx *tx) error { if e.info == nil { return nil } - index = e.getIndex(index) + v := e.addIndex() var args []interface{} sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.ID, loopbacksync.ChannelID) - args = append(args, index, e.info.ChannelID) + args = append(args, v, e.info.ChannelID) _, err1 := tx.autoRollbackExec(sql, args...) if err1 != nil { return errors.Trace(err1) @@ -189,8 +191,9 @@ func (e *executor) cleanChannelInfo() error { err2 := tx.commit() return errors.Trace(err2) } -func (e *executor) getIndex(index int64) int64 { - return atomic.AddInt64(&index, 1) % ((int64)(e.workerCount)) +func (e *executor) addIndex() int64 { + atomic.StoreInt64(&index, atomic.AddInt64(&index, 1)%((int64)(e.workerCount))) + return atomic.LoadInt64(&index) } // return a wrap of sql.Tx From 65ff6f2fbd5ff6a48776a660b288deba90be3ec7 Mon Sep 17 00:00:00 2001 From: freemindLi <1376964097@qq.com> Date: Tue, 18 Feb 2020 11:00:27 +0800 Subject: [PATCH 3/7] 1.fix code format --- drainer/loopbacksync/loopbacksync.go | 2 +- pkg/loader/executor.go | 4 ++-- pkg/loader/model.go | 2 +- pkg/loader/model_test.go | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/drainer/loopbacksync/loopbacksync.go b/drainer/loopbacksync/loopbacksync.go index 8388ece3d..f3c5d2f22 100644 --- a/drainer/loopbacksync/loopbacksync.go +++ b/drainer/loopbacksync/loopbacksync.go @@ -17,7 +17,7 @@ const ( //MarkTableName mark table name MarkTableName = "retl._drainer_repl_mark" //ID syncer worker coroutine id - ID = "coroutineID" + CoroutineID = "coroutine_id" //ChannelID channel id ChannelID = "channel_id" //Val val diff --git a/pkg/loader/executor.go b/pkg/loader/executor.go index 9ccafb127..61bfe3b46 100644 --- a/pkg/loader/executor.go +++ b/pkg/loader/executor.go @@ -132,7 +132,7 @@ func (e *executor) updateMark(channel string, tx *tx) error { } v := e.addIndex() var args []interface{} - sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.ID, loopbacksync.ChannelID) + sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.CoroutineID, loopbacksync.ChannelID) args = append(args, v, e.info.ChannelID) _, err1 := tx.autoRollbackExec(sql, args...) if err1 != nil { @@ -149,7 +149,7 @@ func (e *executor) initMarkTable() error { channel := "" var builder strings.Builder holder := "(?,?,?,?)" - columns := fmt.Sprintf("(%s,%s,%s,%s) ", loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + columns := fmt.Sprintf("(%s,%s,%s,%s) ", loopbacksync.CoroutineID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) builder.WriteString("REPLACE INTO " + loopbacksync.MarkTableName + columns + " VALUES ") for i := 0; i < e.workerCount; i++ { if i > 0 { diff --git a/pkg/loader/model.go b/pkg/loader/model.go index 9001ee457..3725a70a8 100644 --- a/pkg/loader/model.go +++ b/pkg/loader/model.go @@ -192,7 +192,7 @@ func (dml *DML) updateSQL() (sql string, args []interface{}) { } func createMarkTableDDL() string { - sql := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.ID, loopbacksync.ChannelID) + sql := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.CoroutineID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.CoroutineID, loopbacksync.ChannelID) return sql } diff --git a/pkg/loader/model_test.go b/pkg/loader/model_test.go index e4caf939c..2c3b2cb48 100644 --- a/pkg/loader/model_test.go +++ b/pkg/loader/model_test.go @@ -242,7 +242,7 @@ func (s *SQLSuite) TestUpdateMarkSQL(c *check.C) { db, mock, err := sqlmock.New() c.Assert(err, check.IsNil) defer db.Close() - sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.ID, loopbacksync.ChannelID) + sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.CoroutineID, loopbacksync.ChannelID) mock.ExpectBegin() mock.ExpectExec(regexp.QuoteMeta(sql)). WithArgs(1, 100).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -260,6 +260,6 @@ func (s *SQLSuite) TestUpdateMarkSQL(c *check.C) { } func (s *SQLSuite) TestCreateMarkTable(c *check.C) { sql := createMarkTableDDL() - sql1 := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.ID, loopbacksync.ChannelID) + sql1 := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.CoroutineID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.CoroutineID, loopbacksync.ChannelID) c.Assert(sql, check.Equals, sql1) } From bac5ff67619d5c77285127d9a79ca254c84b21b8 Mon Sep 17 00:00:00 2001 From: freemindLi <1376964097@qq.com> Date: Tue, 18 Feb 2020 11:05:17 +0800 Subject: [PATCH 4/7] 1.fix code format --- drainer/loopbacksync/loopbacksync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drainer/loopbacksync/loopbacksync.go b/drainer/loopbacksync/loopbacksync.go index f3c5d2f22..b18e0025c 100644 --- a/drainer/loopbacksync/loopbacksync.go +++ b/drainer/loopbacksync/loopbacksync.go @@ -16,7 +16,7 @@ package loopbacksync const ( //MarkTableName mark table name MarkTableName = "retl._drainer_repl_mark" - //ID syncer worker coroutine id + //CoroutineID syncer worker coroutine id CoroutineID = "coroutine_id" //ChannelID channel id ChannelID = "channel_id" From bd92b4b5a95d7d7b64ffa53741393d8e9339aebf Mon Sep 17 00:00:00 2001 From: freemindLi <1376964097@qq.com> Date: Thu, 20 Feb 2020 11:28:41 +0800 Subject: [PATCH 5/7] 1.fix code format --- drainer/loopbacksync/loopbacksync.go | 4 ++-- pkg/loader/executor.go | 10 ++++------ pkg/loader/model.go | 2 +- pkg/loader/model_test.go | 4 ++-- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/drainer/loopbacksync/loopbacksync.go b/drainer/loopbacksync/loopbacksync.go index b18e0025c..01b29d7f9 100644 --- a/drainer/loopbacksync/loopbacksync.go +++ b/drainer/loopbacksync/loopbacksync.go @@ -16,8 +16,8 @@ package loopbacksync const ( //MarkTableName mark table name MarkTableName = "retl._drainer_repl_mark" - //CoroutineID syncer worker coroutine id - CoroutineID = "coroutine_id" + //ID syncer worker coroutine id + ID = "id" //ChannelID channel id ChannelID = "channel_id" //Val val diff --git a/pkg/loader/executor.go b/pkg/loader/executor.go index 61bfe3b46..4ec52dce8 100644 --- a/pkg/loader/executor.go +++ b/pkg/loader/executor.go @@ -130,10 +130,9 @@ func (e *executor) updateMark(channel string, tx *tx) error { if e.info == nil { return nil } - v := e.addIndex() var args []interface{} - sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.CoroutineID, loopbacksync.ChannelID) - args = append(args, v, e.info.ChannelID) + sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.ID, loopbacksync.ChannelID) + args = append(args, e.addIndex(), e.info.ChannelID) _, err1 := tx.autoRollbackExec(sql, args...) if err1 != nil { return errors.Trace(err1) @@ -149,7 +148,7 @@ func (e *executor) initMarkTable() error { channel := "" var builder strings.Builder holder := "(?,?,?,?)" - columns := fmt.Sprintf("(%s,%s,%s,%s) ", loopbacksync.CoroutineID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + columns := fmt.Sprintf("(%s,%s,%s,%s) ", loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) builder.WriteString("REPLACE INTO " + loopbacksync.MarkTableName + columns + " VALUES ") for i := 0; i < e.workerCount; i++ { if i > 0 { @@ -192,8 +191,7 @@ func (e *executor) cleanChannelInfo() error { return errors.Trace(err2) } func (e *executor) addIndex() int64 { - atomic.StoreInt64(&index, atomic.AddInt64(&index, 1)%((int64)(e.workerCount))) - return atomic.LoadInt64(&index) + return atomic.AddInt64(&index, 1) % ((int64)(e.workerCount)) } // return a wrap of sql.Tx diff --git a/pkg/loader/model.go b/pkg/loader/model.go index 3725a70a8..9001ee457 100644 --- a/pkg/loader/model.go +++ b/pkg/loader/model.go @@ -192,7 +192,7 @@ func (dml *DML) updateSQL() (sql string, args []interface{}) { } func createMarkTableDDL() string { - sql := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.CoroutineID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.CoroutineID, loopbacksync.ChannelID) + sql := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.ID, loopbacksync.ChannelID) return sql } diff --git a/pkg/loader/model_test.go b/pkg/loader/model_test.go index 2c3b2cb48..e4caf939c 100644 --- a/pkg/loader/model_test.go +++ b/pkg/loader/model_test.go @@ -242,7 +242,7 @@ func (s *SQLSuite) TestUpdateMarkSQL(c *check.C) { db, mock, err := sqlmock.New() c.Assert(err, check.IsNil) defer db.Close() - sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.CoroutineID, loopbacksync.ChannelID) + sql := fmt.Sprintf("update %s set %s=%s+1 where %s=? and %s=? limit 1;", loopbacksync.MarkTableName, loopbacksync.Val, loopbacksync.Val, loopbacksync.ID, loopbacksync.ChannelID) mock.ExpectBegin() mock.ExpectExec(regexp.QuoteMeta(sql)). WithArgs(1, 100).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -260,6 +260,6 @@ func (s *SQLSuite) TestUpdateMarkSQL(c *check.C) { } func (s *SQLSuite) TestCreateMarkTable(c *check.C) { sql := createMarkTableDDL() - sql1 := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.CoroutineID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.CoroutineID, loopbacksync.ChannelID) + sql1 := fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", loopbacksync.MarkTableName, loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo, loopbacksync.ID, loopbacksync.ChannelID) c.Assert(sql, check.Equals, sql1) } From 6cfc72375008f40ce5fad458dce566ceadb758df Mon Sep 17 00:00:00 2001 From: freemindLi <1376964097@qq.com> Date: Mon, 24 Feb 2020 15:43:21 +0800 Subject: [PATCH 6/7] 1.modify initMarkTable logic --- pkg/loader/executor.go | 32 -------------------------------- pkg/loader/load.go | 38 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/pkg/loader/executor.go b/pkg/loader/executor.go index 4ec52dce8..6ca4a1fe5 100644 --- a/pkg/loader/executor.go +++ b/pkg/loader/executor.go @@ -140,38 +140,6 @@ func (e *executor) updateMark(channel string, tx *tx) error { return nil } -func (e *executor) initMarkTable() error { - if e.info == nil { - return nil - } - status := 1 - channel := "" - var builder strings.Builder - holder := "(?,?,?,?)" - columns := fmt.Sprintf("(%s,%s,%s,%s) ", loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) - builder.WriteString("REPLACE INTO " + loopbacksync.MarkTableName + columns + " VALUES ") - for i := 0; i < e.workerCount; i++ { - if i > 0 { - builder.WriteByte(',') - } - builder.WriteString(holder) - } - var args []interface{} - for id := 0; id < e.workerCount; id++ { - args = append(args, id, e.info.ChannelID, status, channel) - } - tx, err := e.begin() - if err != nil { - return errors.Trace(err) - } - _, err1 := tx.autoRollbackExec(builder.String(), args...) - if err1 != nil { - return errors.Trace(err1) - } - err2 := tx.commit() - return errors.Trace(err2) -} - func (e *executor) cleanChannelInfo() error { if e.info == nil { return nil diff --git a/pkg/loader/load.go b/pkg/loader/load.go index c2f158b85..33275fe24 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -509,8 +509,42 @@ func (s *loaderImpl) initMarkTable() error { if err := s.createMarkTable(); err != nil { return errors.Trace(err) } - executor := s.getExecutor() - return executor.initMarkTable() + return s.initMarkTableData() +} +func (s *loaderImpl) initMarkTableData() error { + tx, err := s.db.Begin() + if err != nil { + return err + } + status := 1 + channel := "" + var builder strings.Builder + holder := "(?,?,?,?)" + columns := fmt.Sprintf("(%s,%s,%s,%s) ", loopbacksync.ID, loopbacksync.ChannelID, loopbacksync.Val, loopbacksync.ChannelInfo) + builder.WriteString("REPLACE INTO " + loopbacksync.MarkTableName + columns + " VALUES ") + for i := 0; i < s.workerCount; i++ { + if i > 0 { + builder.WriteByte(',') + } + builder.WriteString(holder) + } + var args []interface{} + for id := 0; id < s.workerCount; id++ { + args = append(args, id, s.loopBackSyncInfo.ChannelID, status, channel) + } + query := builder.String() + _, err = tx.Exec(query, args...) + if err != nil { + log.Error("Exec fail, will rollback", zap.String("query", query), zap.Reflect("args", args), zap.Error(err)) + if rbErr := tx.Rollback(); rbErr != nil { + log.Error("Auto rollback", zap.Error(rbErr)) + } + return err + } + if err = tx.Commit(); err != nil { + return err + } + return nil } func (s *loaderImpl) cleanChannelInfo() { From b67c0107c11cb70b811da019411659c494c02ad4 Mon Sep 17 00:00:00 2001 From: freemindLi <1376964097@qq.com> Date: Mon, 24 Feb 2020 16:04:30 +0800 Subject: [PATCH 7/7] 1.modify initMarkTable logic --- pkg/loader/load.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/loader/load.go b/pkg/loader/load.go index 33275fe24..8329f808f 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -514,7 +514,7 @@ func (s *loaderImpl) initMarkTable() error { func (s *loaderImpl) initMarkTableData() error { tx, err := s.db.Begin() if err != nil { - return err + return errors.Trace(err) } status := 1 channel := "" @@ -533,16 +533,15 @@ func (s *loaderImpl) initMarkTableData() error { args = append(args, id, s.loopBackSyncInfo.ChannelID, status, channel) } query := builder.String() - _, err = tx.Exec(query, args...) - if err != nil { + if _, err = tx.Exec(query, args...); err != nil { log.Error("Exec fail, will rollback", zap.String("query", query), zap.Reflect("args", args), zap.Error(err)) if rbErr := tx.Rollback(); rbErr != nil { log.Error("Auto rollback", zap.Error(rbErr)) } - return err + return errors.Trace(err) } if err = tx.Commit(); err != nil { - return err + return errors.Trace(err) } return nil }