Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer: fix failure to update BIT columns with downstream TiDB when message passes through Kafka #667

Merged
merged 14 commits into from
Jul 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,15 @@ func (s *Server) Run() error {
wg.Done()
}()

var syncErr error

wg.Add(1)
go func() {
syncBinlogs(s.kafkaReader.Messages(), s.load)
wg.Done()
defer wg.Done()
syncErr = syncBinlogs(s.kafkaReader.Messages(), s.load)
if syncErr != nil {
s.Close()
}
}()

err := s.load.Run()
Expand All @@ -197,6 +202,10 @@ func (s *Server) Run() error {
return errors.Trace(err)
}

if syncErr != nil {
return errors.Trace(syncErr)
}

if err = s.saveFinishTS(StatusNormal); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -262,16 +271,21 @@ func (s *Server) loadStatus() (int, error) {
return status, errors.Trace(err)
}

func syncBinlogs(source <-chan *reader.Message, ld loader.Loader) {
func syncBinlogs(source <-chan *reader.Message, ld loader.Loader) (err error) {
dest := ld.Input()
defer ld.Close()
for msg := range source {
log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset))
txn := loader.SlaveBinlogToTxn(msg.Binlog)
txn, err := loader.SlaveBinlogToTxn(msg.Binlog)
if err != nil {
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err))
return err
}
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
txn.Metadata = msg
dest <- txn

queueSizeGauge.WithLabelValues("kafka_reader").Set(float64(len(source)))
queueSizeGauge.WithLabelValues("loader_input").Set(float64(len(dest)))
}
ld.Close()
return nil
}
3 changes: 2 additions & 1 deletion arbiter/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) {
}()
ld := dummyLoader{input: dest}

syncBinlogs(source, &ld)
err := syncBinlogs(source, &ld)
c.Assert(err, IsNil)

c.Assert(len(dest), Equals, 2)
for _, m := range msgs {
Expand Down
49 changes: 33 additions & 16 deletions pkg/loader/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ package loader

import (
pb "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
"github.com/pingcap/tidb/types"
)

// SlaveBinlogToTxn translate the Binlog format into Txn
func SlaveBinlogToTxn(binlog *pb.Binlog) (txn *Txn) {
txn = new(Txn)
func SlaveBinlogToTxn(binlog *pb.Binlog) (*Txn, error) {
txn := new(Txn)
var err error
switch binlog.Type {
case pb.BinlogType_DDL:
data := binlog.DdlData
Expand All @@ -36,44 +38,53 @@ func SlaveBinlogToTxn(binlog *pb.Binlog) (txn *Txn) {
dml.Tp = getDMLType(mut)

// setup values
dml.Values = getColVals(table, mut.Row.GetColumns())
dml.Values, err = getColVals(table, mut.Row.GetColumns())
if err != nil {
return nil, err
}

// setup old values
if dml.Tp == UpdateDMLType {
dml.OldValues = getColVals(table, mut.ChangeRow.GetColumns())
dml.OldValues, err = getColVals(table, mut.ChangeRow.GetColumns())
if err != nil {
return nil, err
}
}
txn.DMLs = append(txn.DMLs, dml)
}
}
}
return
return txn, nil
}

func getColVals(table *pb.Table, cols []*pb.Column) map[string]interface{} {
func getColVals(table *pb.Table, cols []*pb.Column) (map[string]interface{}, error) {
vals := make(map[string]interface{}, len(cols))
for i, col := range cols {
name := table.ColumnInfo[i].Name
arg := columnToArg(table.ColumnInfo[i].GetMysqlType(), col)
arg, err := columnToArg(table.ColumnInfo[i].GetMysqlType(), col)
if err != nil {
return vals, err
}
vals[name] = arg
}
return vals
return vals, nil
}

func columnToArg(mysqlType string, c *pb.Column) (arg interface{}) {
func columnToArg(mysqlType string, c *pb.Column) (arg interface{}, err error) {
if c.GetIsNull() {
return nil
return nil, nil
}

if c.Int64Value != nil {
return c.GetInt64Value()
return c.GetInt64Value(), nil
}

if c.Uint64Value != nil {
return c.GetUint64Value()
return c.GetUint64Value(), nil
}

if c.DoubleValue != nil {
return c.GetDoubleValue()
return c.GetDoubleValue(), nil
}

if c.BytesValue != nil {
Expand All @@ -82,12 +93,18 @@ func columnToArg(mysqlType string, c *pb.Column) (arg interface{}) {
// it work for tidb to use binary
if mysqlType == "json" {
var str string = string(c.GetBytesValue())
return str
return str, nil
}
// https://github.com/pingcap/tidb/issues/10988
// Binary literal is not passed correctly for TiDB in some cases, so encode BIT types as integers instead of byte strings. Since the longest value is BIT(64), it is safe to always convert as uint64
if mysqlType == "bit" {
val, err := types.BinaryLiteral(c.GetBytesValue()).ToInt(nil)
july2993 marked this conversation as resolved.
Show resolved Hide resolved
return val, err
}
return c.GetBytesValue()
return c.GetBytesValue(), nil
}

return c.GetStringValue()
return c.GetStringValue(), nil
}

func getDMLType(mut *pb.TableMutation) DMLType {
Expand Down
33 changes: 24 additions & 9 deletions pkg/loader/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func (s *slaveBinlogToTxnSuite) TestTranslateDDL(c *C) {
DdlQuery: []byte(sql),
},
}
txn := SlaveBinlogToTxn(&binlog)
txn, err := SlaveBinlogToTxn(&binlog)
c.Assert(err, IsNil)
c.Assert(txn.DDL.Database, Equals, db)
c.Assert(txn.DDL.Table, Equals, table)
c.Assert(txn.DDL.SQL, Equals, sql)
Expand Down Expand Up @@ -80,7 +81,8 @@ func (s *slaveBinlogToTxnSuite) TestTranslateDML(c *C) {
binlog := pb.Binlog{
DmlData: &dml,
}
txn := SlaveBinlogToTxn(&binlog)
txn, err := SlaveBinlogToTxn(&binlog)
c.Assert(err, IsNil)
c.Assert(txn.DMLs, HasLen, 2)
for _, dml := range txn.DMLs {
c.Assert(dml.Database, Equals, db)
Expand Down Expand Up @@ -115,32 +117,45 @@ var _ = Suite(&columnToArgSuite{})

func (s *columnToArgSuite) TestHandleMySQLJSON(c *C) {
colVal := `{"key": "value"}`
arg := columnToArg("json", &pb.Column{BytesValue: []byte(colVal)})
arg, err := columnToArg("json", &pb.Column{BytesValue: []byte(colVal)})
c.Assert(err, IsNil)
c.Assert(arg, Equals, colVal)
}

func (s *columnToArgSuite) TestGetCorrectArgs(c *C) {
isNull := true
col := &pb.Column{IsNull: &isNull}
c.Assert(columnToArg("", col), IsNil)
val, err := columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, IsNil)

var i64 int64 = 666
col = &pb.Column{Int64Value: &i64}
c.Assert(columnToArg("", col), Equals, i64)
val, err = columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, Equals, i64)

var u64 uint64 = 777
col = &pb.Column{Uint64Value: &u64}
c.Assert(columnToArg("", col), Equals, u64)
val, err = columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, Equals, u64)

var d float64 = 3.14
col = &pb.Column{DoubleValue: &d}
c.Assert(columnToArg("", col), Equals, d)
val, err = columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, Equals, d)

var b []byte = []byte{1, 2, 3}
col = &pb.Column{BytesValue: b}
c.Assert(columnToArg("", col), DeepEquals, b)
val, err = columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, DeepEquals, b)

var ss string = "hello world"
col = &pb.Column{StringValue: &ss}
c.Assert(columnToArg("", col), DeepEquals, ss)
val, err = columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, DeepEquals, ss)
}
83 changes: 52 additions & 31 deletions tests/dailytest/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (

// test different data type of mysql
// mysql will change boolean to tinybit(1)
var case1 = []string{`
CREATE TABLE binlog_case1 (
var caseMultiDataType = []string{`
CREATE TABLE binlog_multi_data_type (
id INT AUTO_INCREMENT,
t_boolean BOOLEAN,
t_bigint BIGINT,
Expand All @@ -51,7 +51,7 @@ CREATE TABLE binlog_case1 (
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
`,
`
INSERT INTO binlog_case1(t_boolean, t_bigint, t_double, t_decimal, t_bit
INSERT INTO binlog_multi_data_type(t_boolean, t_bigint, t_double, t_decimal, t_bit
,t_date, t_datetime, t_timestamp, t_time, t_year
,t_char, t_varchar, t_blob, t_text, t_enum
,t_set, t_json) VALUES
Expand All @@ -61,63 +61,80 @@ INSERT INTO binlog_case1(t_boolean, t_bigint, t_double, t_decimal, t_bit
,'a,b', NULL);
`,
`
INSERT INTO binlog_case1(t_boolean) VALUES(TRUE);
INSERT INTO binlog_multi_data_type(t_boolean) VALUES(TRUE);
`,
`
INSERT INTO binlog_case1(t_boolean) VALUES(FALSE);
INSERT INTO binlog_multi_data_type(t_boolean) VALUES(FALSE);
`,
// minmum value of bigint
`
INSERT INTO binlog_case1(t_bigint) VALUES(-9223372036854775808);
INSERT INTO binlog_multi_data_type(t_bigint) VALUES(-9223372036854775808);
`,
// maximum value of bigint
`
INSERT INTO binlog_case1(t_bigint) VALUES(9223372036854775807);
INSERT INTO binlog_multi_data_type(t_bigint) VALUES(9223372036854775807);
`,
`
INSERT INTO binlog_case1(t_json) VALUES('{"key1": "value1", "key2": "value2"}');
INSERT INTO binlog_multi_data_type(t_json) VALUES('{"key1": "value1", "key2": "value2"}');
`,
}

var case1Clean = []string{`
DROP TABLE binlog_case1`,
var caseMultiDataTypeClean = []string{`
DROP TABLE binlog_multi_data_type`,
}

// https://internal.pingcap.net/jira/browse/TOOL-714
var case2 = []string{`
CREATE TABLE binlog_case2 (id INT, a1 INT, a3 INT, UNIQUE KEY dex1(a1, a3));
var caseUKWithNoPK = []string{`
CREATE TABLE binlog_uk_with_no_pk (id INT, a1 INT, a3 INT, UNIQUE KEY dex1(a1, a3));
`,
`
INSERT INTO binlog_case2(id, a1, a3) VALUES(1, 1, NULL);
INSERT INTO binlog_uk_with_no_pk(id, a1, a3) VALUES(1, 1, NULL);
`,
`
INSERT INTO binlog_case2(id, a1, a3) VALUES(2, 1, NULL);
INSERT INTO binlog_uk_with_no_pk(id, a1, a3) VALUES(2, 1, NULL);
`,
`
UPDATE binlog_case2 SET id = 10 WHERE id = 1;
UPDATE binlog_uk_with_no_pk SET id = 10 WHERE id = 1;
`,
`
UPDATE binlog_case2 SET id = 100 WHERE id = 10;
UPDATE binlog_uk_with_no_pk SET id = 100 WHERE id = 10;
`,
}

var case2Clean = []string{`
DROP TABLE binlog_case2`,
var caseUKWithNoPKClean = []string{`
DROP TABLE binlog_uk_with_no_pk`,
}

var case3 = []string{`
CREATE TABLE a(id INT PRIMARY KEY, a1 INT);
var casePKAddDuplicateUK = []string{`
CREATE TABLE binlog_pk_add_duplicate_uk(id INT PRIMARY KEY, a1 INT);
`,
`
INSERT INTO a(id, a1) VALUES(1,1),(2,1);
INSERT INTO binlog_pk_add_duplicate_uk(id, a1) VALUES(1,1),(2,1);
`,
`
ALTER TABLE a ADD UNIQUE INDEX aidx(a1);
ALTER TABLE binlog_pk_add_duplicate_uk ADD UNIQUE INDEX aidx(a1);
`,
}

var case3Clean = []string{
`DROP TABLE a`,
var casePKAddDuplicateUKClean = []string{
`DROP TABLE binlog_pk_add_duplicate_uk`,
}

july2993 marked this conversation as resolved.
Show resolved Hide resolved
// Test issue: TOOL-1346
var caseInsertBit = []string{`
CREATE TABLE binlog_insert_bit(a BIT(1) NOT NULL);
`,
`
INSERT INTO binlog_insert_bit VALUES (0x01);
`,
`
UPDATE binlog_insert_bit SET a = 0x00;
`,
}

var caseInsertBitClean = []string{`
DROP TABLE binlog_insert_bit;
`,
}

type testRunner struct {
Expand Down Expand Up @@ -145,20 +162,24 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) {

runPKcases(tr)

tr.execSQLs(case1)
tr.execSQLs(case1Clean)
tr.execSQLs(caseMultiDataType)
tr.execSQLs(caseMultiDataTypeClean)

tr.execSQLs(case2)
tr.execSQLs(case2Clean)
tr.execSQLs(caseUKWithNoPK)
tr.execSQLs(caseUKWithNoPKClean)

// run case3
// run casePKAddDuplicateUK
tr.run(func(src *sql.DB) {
err := execSQLs(src, case3)
err := execSQLs(src, casePKAddDuplicateUK)
if err != nil && !strings.Contains(err.Error(), "Duplicate for key") {
log.S().Fatal(err)
}
})
tr.execSQLs(case3Clean)
tr.execSQLs(casePKAddDuplicateUKClean)

// run caseInsertBit
tr.execSQLs(caseInsertBit)
tr.execSQLs(caseInsertBitClean)

tr.run(caseTblWithGeneratedCol)
tr.execSQLs([]string{"DROP TABLE gen_contacts;"})
Expand Down
Loading