From 465037ff7ee7f0070c61b6deca6762d53765abea Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Sun, 28 Jun 2020 10:38:24 +0800 Subject: [PATCH] update black/white list to block/allow list && update master/slave to primary/secondary (#984) --- arbiter/server.go | 2 +- drainer/checkpoint/checkpoint.go | 2 +- drainer/checkpoint/file.go | 2 +- drainer/checkpoint/mysql.go | 8 ++++---- drainer/checkpoint/mysql_test.go | 10 +++++----- drainer/relay.go | 4 ++-- drainer/relay/reader.go | 6 +++--- drainer/relay/reader_test.go | 2 +- drainer/relay/relayer.go | 2 +- drainer/sync/kafka.go | 4 ++-- drainer/syncer.go | 4 ++-- drainer/translator/kafka.go | 14 +++++++------- drainer/translator/kafka_test.go | 22 +++++++++++----------- pkg/filter/filter.go | 14 +++++++------- pkg/loader/README.md | 2 +- pkg/loader/load.go | 2 +- pkg/loader/translate.go | 4 ++-- pkg/loader/translate_test.go | 14 +++++++------- pkg/sql/sql.go | 2 +- tests/kafka/kafka.go | 2 +- 20 files changed, 61 insertions(+), 61 deletions(-) diff --git a/arbiter/server.go b/arbiter/server.go index 8abbcffa6..ada7d4366 100644 --- a/arbiter/server.go +++ b/arbiter/server.go @@ -297,7 +297,7 @@ func syncBinlogs(ctx context.Context, source <-chan *reader.Message, ld loader.L } receivedTs = msg.Binlog.CommitTs - txn, err := loader.SlaveBinlogToTxn(msg.Binlog) + txn, err := loader.SecondaryBinlogToTxn(msg.Binlog) if err != nil { log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err)) return err diff --git a/drainer/checkpoint/checkpoint.go b/drainer/checkpoint/checkpoint.go index 761f6a884..c33dd2f48 100644 --- a/drainer/checkpoint/checkpoint.go +++ b/drainer/checkpoint/checkpoint.go @@ -31,7 +31,7 @@ type CheckPoint interface { Load() error // Save saves checkpoint information. - Save(commitTS int64, slaveTS int64, consistent bool) error + Save(commitTS int64, secondaryTS int64, consistent bool) error // TS gets checkpoint commit timestamp. TS() int64 diff --git a/drainer/checkpoint/file.go b/drainer/checkpoint/file.go index 3af305e15..133eca8a5 100644 --- a/drainer/checkpoint/file.go +++ b/drainer/checkpoint/file.go @@ -82,7 +82,7 @@ func (sp *FileCheckPoint) Load() error { } // Save implements CheckPoint.Save interface -func (sp *FileCheckPoint) Save(ts, slaveTS int64, consistent bool) error { +func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool) error { sp.Lock() defer sp.Unlock() diff --git a/drainer/checkpoint/mysql.go b/drainer/checkpoint/mysql.go index d4b6c9d46..1b1ea20ec 100644 --- a/drainer/checkpoint/mysql.go +++ b/drainer/checkpoint/mysql.go @@ -122,7 +122,7 @@ func (sp *MysqlCheckPoint) Load() error { } // Save implements checkpoint.Save interface -func (sp *MysqlCheckPoint) Save(ts, slaveTS int64, consistent bool) error { +func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error { sp.Lock() defer sp.Unlock() @@ -133,9 +133,9 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64, consistent bool) error { sp.CommitTS = ts sp.ConsistentSaved = consistent - if slaveTS > 0 { - sp.TsMap["master-ts"] = ts - sp.TsMap["slave-ts"] = slaveTS + if secondaryTS > 0 { + sp.TsMap["primary-ts"] = ts + sp.TsMap["secondary-ts"] = secondaryTS } b, err := json.Marshal(sp) diff --git a/drainer/checkpoint/mysql_test.go b/drainer/checkpoint/mysql_test.go index 237886670..5c6488cdb 100644 --- a/drainer/checkpoint/mysql_test.go +++ b/drainer/checkpoint/mysql_test.go @@ -65,8 +65,8 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) { } err = cp.Save(65536, 3333, false) c.Assert(err, IsNil) - c.Assert(cp.TsMap["master-ts"], Equals, int64(65536)) - c.Assert(cp.TsMap["slave-ts"], Equals, int64(3333)) + c.Assert(cp.TsMap["primary-ts"], Equals, int64(65536)) + c.Assert(cp.TsMap["secondary-ts"], Equals, int64(3333)) } type loadSuite struct{} @@ -83,15 +83,15 @@ func (s *loadSuite) TestShouldLoadFromDB(c *C) { TsMap: make(map[string]int64), } rows := sqlmock.NewRows([]string{"checkPoint"}). - AddRow(`{"commitTS": 1024, "consistent": true, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`) + AddRow(`{"commitTS": 1024, "consistent": true, "ts-map": {"primary-ts": 2000, "secondary-ts": 1999}}`) mock.ExpectQuery("select checkPoint from db.tbl.*").WillReturnRows(rows) err = cp.Load() c.Assert(err, IsNil) c.Assert(cp.CommitTS, Equals, int64(1024)) c.Assert(cp.ConsistentSaved, Equals, true) - c.Assert(cp.TsMap["master-ts"], Equals, int64(2000)) - c.Assert(cp.TsMap["slave-ts"], Equals, int64(1999)) + c.Assert(cp.TsMap["primary-ts"], Equals, int64(2000)) + c.Assert(cp.TsMap["secondary-ts"], Equals, int64(1999)) } func (s *loadSuite) TestShouldUseInitialCommitTs(c *C) { diff --git a/drainer/relay.go b/drainer/relay.go index b33452bd7..92cb862be 100644 --- a/drainer/relay.go +++ b/drainer/relay.go @@ -112,7 +112,7 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) continue } - txn, err := loader.SlaveBinlogToTxn(sbinlog) + txn, err := loader.SecondaryBinlogToTxn(sbinlog) if err != nil { return errors.Trace(err) } @@ -149,7 +149,7 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) return errors.Trace(readerErr) } - err := cp.Save(lastSuccessTS, 0 /* slaveTS */, true /*consistent*/) + err := cp.Save(lastSuccessTS, 0 /* secondaryTS */, true /*consistent*/) if err != nil { return errors.Trace(err) } diff --git a/drainer/relay/reader.go b/drainer/relay/reader.go index 64752d913..a63d14c06 100644 --- a/drainer/relay/reader.go +++ b/drainer/relay/reader.go @@ -79,8 +79,8 @@ func (r *reader) Run() context.CancelFunc { break } - slaveBinlog := new(obinlog.Binlog) - if err = slaveBinlog.Unmarshal(blg.Payload); err != nil { + secondaryBinlog := new(obinlog.Binlog) + if err = secondaryBinlog.Unmarshal(blg.Payload); err != nil { break } @@ -88,7 +88,7 @@ func (r *reader) Run() context.CancelFunc { case <-ctx.Done(): err = ctx.Err() log.Warn("Producing transaction is interrupted") - case r.binlogs <- slaveBinlog: + case r.binlogs <- secondaryBinlog: } } // If binlogger is not done, notify it to stop. diff --git a/drainer/relay/reader_test.go b/drainer/relay/reader_test.go index a3a7f6e47..0ed4d1dd5 100644 --- a/drainer/relay/reader_test.go +++ b/drainer/relay/reader_test.go @@ -90,7 +90,7 @@ func (r *testReaderSuite) readBinlogAndCheck(c *C, dir string, expectedNumber in number := 0 for txn := range relayReader.Binlogs() { number++ - loaderTxn, err := loader.SlaveBinlogToTxn(txn) + loaderTxn, err := loader.SecondaryBinlogToTxn(txn) c.Assert(err, IsNil) lastTxn = loaderTxn } diff --git a/drainer/relay/relayer.go b/drainer/relay/relayer.go index 81fe8ea02..08b2c77ed 100644 --- a/drainer/relay/relayer.go +++ b/drainer/relay/relayer.go @@ -63,7 +63,7 @@ func NewRelayer(dir string, maxFileSize int64, tableInfoGetter translator.TableI // WriteBinlog writes binlog to relay log. func (r *relayer) WriteBinlog(schema string, table string, tiBinlog *tb.Binlog, pv *tb.PrewriteValue) (tb.Pos, error) { pos := tb.Pos{} - binlog, err := translator.TiBinlogToSlaveBinlog(r.tableInfoGetter, schema, table, tiBinlog, pv) + binlog, err := translator.TiBinlogToSecondaryBinlog(r.tableInfoGetter, schema, table, tiBinlog, pv) if err != nil { return pos, errors.Trace(err) } diff --git a/drainer/sync/kafka.go b/drainer/sync/kafka.go index 093bb0436..0fb323028 100644 --- a/drainer/sync/kafka.go +++ b/drainer/sync/kafka.go @@ -121,12 +121,12 @@ func (p *KafkaSyncer) SetSafeMode(mode bool) bool { // Sync implements Syncer interface func (p *KafkaSyncer) Sync(item *Item) error { - slaveBinlog, err := translator.TiBinlogToSlaveBinlog(p.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue) + secondaryBinlog, err := translator.TiBinlogToSecondaryBinlog(p.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue) if err != nil { return errors.Trace(err) } - err = p.saveBinlog(slaveBinlog, item) + err = p.saveBinlog(secondaryBinlog, item) if err != nil { return errors.Trace(err) } diff --git a/drainer/syncer.go b/drainer/syncer.go index 3d2f5061b..035d3ac2c 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -249,13 +249,13 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) { log.Info("handleSuccess quit") } -func (s *Syncer) savePoint(ts, slaveTS int64) { +func (s *Syncer) savePoint(ts, secondaryTS int64) { if ts < s.cp.TS() { log.Error("save ts is less than checkpoint ts %d", zap.Int64("save ts", ts), zap.Int64("checkpoint ts", s.cp.TS())) } log.Info("write save point", zap.Int64("ts", ts)) - err := s.cp.Save(ts, slaveTS, false) + err := s.cp.Save(ts, secondaryTS, false) if err != nil { log.Fatal("save checkpoint failed", zap.Int64("ts", ts), zap.Error(err)) } diff --git a/drainer/translator/kafka.go b/drainer/translator/kafka.go index 1d39dd63d..f92d09e96 100644 --- a/drainer/translator/kafka.go +++ b/drainer/translator/kafka.go @@ -32,8 +32,8 @@ import ( "go.uber.org/zap" ) -// TiBinlogToSlaveBinlog translates the format to slave binlog -func TiBinlogToSlaveBinlog( +// TiBinlogToSecondaryBinlog translates the format to secondary binlog +func TiBinlogToSecondaryBinlog( infoGetter TableInfoGetter, schema string, table string, @@ -41,7 +41,7 @@ func TiBinlogToSlaveBinlog( pv *pb.PrewriteValue, ) (*obinlog.Binlog, error) { if tiBinlog.DdlJobId > 0 { // DDL - slaveBinlog := &obinlog.Binlog{ + secondaryBinlog := &obinlog.Binlog{ Type: obinlog.BinlogType_DDL, CommitTs: tiBinlog.GetCommitTs(), DdlData: &obinlog.DDLData{ @@ -50,10 +50,10 @@ func TiBinlogToSlaveBinlog( DdlQuery: tiBinlog.GetDdlQuery(), }, } - return slaveBinlog, nil + return secondaryBinlog, nil } - slaveBinlog := &obinlog.Binlog{ + secondaryBinlog := &obinlog.Binlog{ Type: obinlog.BinlogType_DML, CommitTs: tiBinlog.GetCommitTs(), DmlData: new(obinlog.DMLData), @@ -73,7 +73,7 @@ func TiBinlogToSlaveBinlog( iter := newSequenceIterator(&mut) table := genTable(schema, info) - slaveBinlog.DmlData.Tables = append(slaveBinlog.DmlData.Tables, table) + secondaryBinlog.DmlData.Tables = append(secondaryBinlog.DmlData.Tables, table) for { tableMutation, err := nextRow(schema, info, isTblDroppingCol, iter) @@ -86,7 +86,7 @@ func TiBinlogToSlaveBinlog( table.Mutations = append(table.Mutations, tableMutation) } } - return slaveBinlog, nil + return secondaryBinlog, nil } func genTable(schema string, tableInfo *model.TableInfo) (table *obinlog.Table) { diff --git a/drainer/translator/kafka_test.go b/drainer/translator/kafka_test.go index c25496d73..fe0e8ce55 100644 --- a/drainer/translator/kafka_test.go +++ b/drainer/translator/kafka_test.go @@ -33,10 +33,10 @@ var _ = check.Suite(&testKafkaSuite{}) func (t *testKafkaSuite) TestDDL(c *check.C) { t.SetDDL() - slaveBinog, err := TiBinlogToSlaveBinlog(t, t.Schema, t.Table, t.TiBinlog, nil) + secondaryBinlog, err := TiBinlogToSecondaryBinlog(t, t.Schema, t.Table, t.TiBinlog, nil) c.Assert(err, check.IsNil) - c.Assert(slaveBinog, check.DeepEquals, &obinlog.Binlog{ + c.Assert(secondaryBinlog, check.DeepEquals, &obinlog.Binlog{ Type: obinlog.BinlogType_DDL, CommitTs: t.TiBinlog.GetCommitTs(), DdlData: &obinlog.DDLData{ @@ -48,13 +48,13 @@ func (t *testKafkaSuite) TestDDL(c *check.C) { } func (t *testKafkaSuite) testDML(c *check.C, tp obinlog.MutationType) { - slaveBinog, err := TiBinlogToSlaveBinlog(t, t.Schema, t.Table, t.TiBinlog, t.PV) + secondaryBinlog, err := TiBinlogToSecondaryBinlog(t, t.Schema, t.Table, t.TiBinlog, t.PV) c.Assert(err, check.IsNil) - c.Assert(slaveBinog.GetCommitTs(), check.Equals, t.TiBinlog.GetCommitTs()) - c.Assert(slaveBinog.Type, check.Equals, obinlog.BinlogType_DML) + c.Assert(secondaryBinlog.GetCommitTs(), check.Equals, t.TiBinlog.GetCommitTs()) + c.Assert(secondaryBinlog.Type, check.Equals, obinlog.BinlogType_DML) - table := slaveBinog.DmlData.Tables[0] + table := secondaryBinlog.DmlData.Tables[0] tableMut := table.Mutations[0] c.Assert(tableMut.GetType(), check.Equals, tp) @@ -67,13 +67,13 @@ func (t *testKafkaSuite) testDML(c *check.C, tp obinlog.MutationType) { func (t *testKafkaSuite) TestAllDML(c *check.C) { t.SetAllDML(c) - slaveBinog, err := TiBinlogToSlaveBinlog(t, t.Schema, t.Table, t.TiBinlog, t.PV) + secondaryBinlog, err := TiBinlogToSecondaryBinlog(t, t.Schema, t.Table, t.TiBinlog, t.PV) c.Assert(err, check.IsNil) - c.Assert(slaveBinog.Type, check.Equals, obinlog.BinlogType_DML) - c.Assert(slaveBinog.GetCommitTs(), check.Equals, t.TiBinlog.GetCommitTs()) + c.Assert(secondaryBinlog.Type, check.Equals, obinlog.BinlogType_DML) + c.Assert(secondaryBinlog.GetCommitTs(), check.Equals, t.TiBinlog.GetCommitTs()) - table := slaveBinog.DmlData.Tables[0] + table := secondaryBinlog.DmlData.Tables[0] insertMut := table.Mutations[0] updateMut := table.Mutations[1] @@ -137,7 +137,7 @@ func checkColumn(c *check.C, info *obinlog.ColumnInfo, col *obinlog.Column, datu datumV := fmt.Sprintf("%v", datum.GetValue()) if info.GetMysqlType() == "enum" { - // we set uint64 as the index for slave proto but not the name + // we set uint64 as the index for secondary proto but not the name datumV = fmt.Sprintf("%v", datum.GetInt64()) } diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index df112ce82..42a4e1cfd 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -77,14 +77,14 @@ func (s *Filter) genRegexMap() { } } -// whiteFilter whitelist filtering -func (s *Filter) whiteFilter(stbs []TableName) []TableName { +// allowFilter allowlist filtering +func (s *Filter) allowFilter(stbs []TableName) []TableName { var tbs []TableName if len(s.doTables) == 0 && len(s.doDBs) == 0 { return stbs } for _, tb := range stbs { - // if the white list contains "schema_s.table_t" and "schema_s", + // if the allow list contains "schema_s.table_t" and "schema_s", // all tables in that schema_s will pass the Filter. if s.matchTable(s.doTables, tb) { tbs = append(tbs, tb) @@ -96,8 +96,8 @@ func (s *Filter) whiteFilter(stbs []TableName) []TableName { return tbs } -// blackFilter return TableName which is not in the blacklist -func (s *Filter) blackFilter(stbs []TableName) []TableName { +// blockFilter return TableName which is not in the blocklist +func (s *Filter) blockFilter(stbs []TableName) []TableName { var tbs []TableName if len(s.ignoreTables) == 0 && len(s.ignoreDBs) == 0 { return stbs @@ -119,8 +119,8 @@ func (s *Filter) blackFilter(stbs []TableName) []TableName { func (s *Filter) SkipSchemaAndTable(schema string, table string) bool { tbs := []TableName{{Schema: strings.ToLower(schema), Table: strings.ToLower(table)}} - tbs = s.whiteFilter(tbs) - tbs = s.blackFilter(tbs) + tbs = s.allowFilter(tbs) + tbs = s.blockFilter(tbs) return len(tbs) == 0 } diff --git a/pkg/loader/README.md b/pkg/loader/README.md index 8f2cb5e79..978e8dec2 100644 --- a/pkg/loader/README.md +++ b/pkg/loader/README.md @@ -7,7 +7,7 @@ A package to load data into MySQL in real-time, aimed to be used by *reparo*, *d ### Getting started - Example is available via [example_loader_test.go](./example_loader_test.go) - You need to write a translator to use *Loader* like *SlaveBinlogToTxn* in [translate.go](./translate.go) to translate upstream data format (e.g. binlog) into `Txn` objects. + You need to write a translator to use *Loader* like *SecondaryBinlogToTxn* in [translate.go](./translate.go) to translate upstream data format (e.g. binlog) into `Txn` objects. ## Overview diff --git a/pkg/loader/load.go b/pkg/loader/load.go index 16e6eb92d..1c204dabd 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -814,7 +814,7 @@ func getAppliedTS(db *gosql.DB) int64 { errCode, ok := pkgsql.GetSQLErrCode(err) // if tidb dont't support `show master status`, will return 1105 ErrUnknown error if !ok || int(errCode) != tmysql.ErrUnknown { - log.Warn("get ts from slave cluster failed", zap.Error(err)) + log.Warn("get ts from secondary cluster failed", zap.Error(err)) } return 0 } diff --git a/pkg/loader/translate.go b/pkg/loader/translate.go index 8ed28fd1d..ed394f715 100644 --- a/pkg/loader/translate.go +++ b/pkg/loader/translate.go @@ -18,8 +18,8 @@ import ( "github.com/pingcap/tidb/types" ) -// SlaveBinlogToTxn translate the Binlog format into Txn -func SlaveBinlogToTxn(binlog *pb.Binlog) (*Txn, error) { +// SecondaryBinlogToTxn translate the Binlog format into Txn +func SecondaryBinlogToTxn(binlog *pb.Binlog) (*Txn, error) { txn := new(Txn) var err error switch binlog.Type { diff --git a/pkg/loader/translate_test.go b/pkg/loader/translate_test.go index c709d42e0..d2252bd94 100644 --- a/pkg/loader/translate_test.go +++ b/pkg/loader/translate_test.go @@ -19,11 +19,11 @@ import ( . "github.com/pingcap/check" ) -type slaveBinlogToTxnSuite struct{} +type secondaryBinlogToTxnSuite struct{} -var _ = Suite(&slaveBinlogToTxnSuite{}) +var _ = Suite(&secondaryBinlogToTxnSuite{}) -func (s *slaveBinlogToTxnSuite) TestTranslateDDL(c *C) { +func (s *secondaryBinlogToTxnSuite) TestTranslateDDL(c *C) { db, table := "test", "hello" sql := "CREATE TABLE hello (id INT AUTO_INCREMENT) PRIMARY KEY(id);" binlog := pb.Binlog{ @@ -34,14 +34,14 @@ func (s *slaveBinlogToTxnSuite) TestTranslateDDL(c *C) { DdlQuery: []byte(sql), }, } - txn, err := SlaveBinlogToTxn(&binlog) + txn, err := SecondaryBinlogToTxn(&binlog) c.Assert(err, IsNil) c.Assert(txn.DDL.Database, Equals, db) c.Assert(txn.DDL.Table, Equals, table) c.Assert(txn.DDL.SQL, Equals, sql) } -func (s *slaveBinlogToTxnSuite) TestTranslateDML(c *C) { +func (s *secondaryBinlogToTxnSuite) TestTranslateDML(c *C) { db, table := "test", "hello" var oldVal, newVal int64 = 41, 42 dml := pb.DMLData{ @@ -81,7 +81,7 @@ func (s *slaveBinlogToTxnSuite) TestTranslateDML(c *C) { binlog := pb.Binlog{ DmlData: &dml, } - txn, err := SlaveBinlogToTxn(&binlog) + txn, err := SecondaryBinlogToTxn(&binlog) c.Assert(err, IsNil) c.Assert(txn.DMLs, HasLen, 2) for _, dml := range txn.DMLs { @@ -99,7 +99,7 @@ func (s *slaveBinlogToTxnSuite) TestTranslateDML(c *C) { c.Assert(insert.Values["uid"], Equals, newVal) } -func (s *slaveBinlogToTxnSuite) TestGetDMLType(c *C) { +func (s *secondaryBinlogToTxnSuite) TestGetDMLType(c *C) { mut := pb.TableMutation{} mut.Type = pb.MutationType(404).Enum() c.Assert(getDMLType(&mut), Equals, UnknownDMLType) diff --git a/pkg/sql/sql.go b/pkg/sql/sql.go index d1a9c7eed..b5f2c6edc 100644 --- a/pkg/sql/sql.go +++ b/pkg/sql/sql.go @@ -191,7 +191,7 @@ func GetTidbPosition(db *sql.DB) (int64, error) { defer rows.Close() if !rows.Next() { - return 0, errors.New("get slave cluster's ts failed") + return 0, errors.New("get secondary cluster's ts failed") } fields, err := ScanRow(rows) diff --git a/tests/kafka/kafka.go b/tests/kafka/kafka.go index 0b6904745..1ee0a15bf 100644 --- a/tests/kafka/kafka.go +++ b/tests/kafka/kafka.go @@ -87,7 +87,7 @@ func main() { for msg := range breader.Messages() { str := msg.Binlog.String() log.S().Debugf("recv: %.2000s", str) - txn, err := loader.SlaveBinlogToTxn(msg.Binlog) + txn, err := loader.SecondaryBinlogToTxn(msg.Binlog) if err != nil { log.S().Fatal(err) }