Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer: fix failure to update BIT columns with downstream TiDB when message passes through Kafka #667

Merged
merged 14 commits into from
Jul 13, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ func syncBinlogs(source <-chan *reader.Message, ld loader.Loader) {
dest := ld.Input()
for msg := range source {
log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset))
txn := loader.SlaveBinlogToTxn(msg.Binlog)
txn, err := loader.SlaveBinlogToTxn(msg.Binlog)
if err != nil {
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
log.Error("transfer binlog failed", zap.Error(err))
}
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
txn.Metadata = msg
dest <- txn

Expand Down
46 changes: 31 additions & 15 deletions pkg/loader/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ package loader

import (
pb "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
types "github.com/pingcap/tidb/types"
july2993 marked this conversation as resolved.
Show resolved Hide resolved
)

// SlaveBinlogToTxn translate the Binlog format into Txn
func SlaveBinlogToTxn(binlog *pb.Binlog) (txn *Txn) {
func SlaveBinlogToTxn(binlog *pb.Binlog) (txn *Txn, err error) {
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
txn = new(Txn)
switch binlog.Type {
case pb.BinlogType_DDL:
Expand All @@ -36,44 +37,53 @@ func SlaveBinlogToTxn(binlog *pb.Binlog) (txn *Txn) {
dml.Tp = getDMLType(mut)

// setup values
dml.Values = getColVals(table, mut.Row.GetColumns())
dml.Values, err = getColVals(table, mut.Row.GetColumns())
if err != nil {
return txn, err
july2993 marked this conversation as resolved.
Show resolved Hide resolved
}

// setup old values
if dml.Tp == UpdateDMLType {
dml.OldValues = getColVals(table, mut.ChangeRow.GetColumns())
dml.OldValues, err = getColVals(table, mut.ChangeRow.GetColumns())
if err != nil {
return txn, err
july2993 marked this conversation as resolved.
Show resolved Hide resolved
}
}
txn.DMLs = append(txn.DMLs, dml)
}
}
}
return
return txn, nil
}

func getColVals(table *pb.Table, cols []*pb.Column) map[string]interface{} {
func getColVals(table *pb.Table, cols []*pb.Column) (map[string]interface{}, error) {
vals := make(map[string]interface{}, len(cols))
for i, col := range cols {
name := table.ColumnInfo[i].Name
arg := columnToArg(table.ColumnInfo[i].GetMysqlType(), col)
arg, err := columnToArg(table.ColumnInfo[i].GetMysqlType(), col)
if err != nil {
return vals, err
}
vals[name] = arg
}
return vals
return vals, nil
}

func columnToArg(mysqlType string, c *pb.Column) (arg interface{}) {
func columnToArg(mysqlType string, c *pb.Column) (arg interface{}, err error) {
if c.GetIsNull() {
return nil
return nil, nil
}

if c.Int64Value != nil {
return c.GetInt64Value()
return c.GetInt64Value(), nil
}

if c.Uint64Value != nil {
return c.GetUint64Value()
return c.GetUint64Value(), nil
}

if c.DoubleValue != nil {
return c.GetDoubleValue()
return c.GetDoubleValue(), nil
}

if c.BytesValue != nil {
Expand All @@ -82,12 +92,18 @@ func columnToArg(mysqlType string, c *pb.Column) (arg interface{}) {
// it work for tidb to use binary
if mysqlType == "json" {
var str string = string(c.GetBytesValue())
return str
return str, nil
}
// https://github.com/pingcap/tidb/issues/10988
// Binary literal is not passed correctly for TiDB in some cases, so encode BIT types as integers instead of byte strings. Since the longest value is BIT(64), it is safe to always convert as uint64
if mysqlType == "bit" {
val, err := types.BinaryLiteral(c.GetBytesValue()).ToInt(nil)
july2993 marked this conversation as resolved.
Show resolved Hide resolved
return val, err
}
return c.GetBytesValue()
return c.GetBytesValue(), nil
}

return c.GetStringValue()
return c.GetStringValue(), nil
}

func getDMLType(mut *pb.TableMutation) DMLType {
Expand Down
33 changes: 24 additions & 9 deletions pkg/loader/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func (s *slaveBinlogToTxnSuite) TestTranslateDDL(c *C) {
DdlQuery: []byte(sql),
},
}
txn := SlaveBinlogToTxn(&binlog)
txn, err := SlaveBinlogToTxn(&binlog)
c.Assert(err, IsNil)
c.Assert(txn.DDL.Database, Equals, db)
c.Assert(txn.DDL.Table, Equals, table)
c.Assert(txn.DDL.SQL, Equals, sql)
Expand Down Expand Up @@ -80,7 +81,8 @@ func (s *slaveBinlogToTxnSuite) TestTranslateDML(c *C) {
binlog := pb.Binlog{
DmlData: &dml,
}
txn := SlaveBinlogToTxn(&binlog)
txn, err := SlaveBinlogToTxn(&binlog)
c.Assert(err, IsNil)
c.Assert(txn.DMLs, HasLen, 2)
for _, dml := range txn.DMLs {
c.Assert(dml.Database, Equals, db)
Expand Down Expand Up @@ -115,32 +117,45 @@ var _ = Suite(&columnToArgSuite{})

func (s *columnToArgSuite) TestHandleMySQLJSON(c *C) {
colVal := `{"key": "value"}`
arg := columnToArg("json", &pb.Column{BytesValue: []byte(colVal)})
arg, err := columnToArg("json", &pb.Column{BytesValue: []byte(colVal)})
c.Assert(err, IsNil)
c.Assert(arg, Equals, colVal)
}

func (s *columnToArgSuite) TestGetCorrectArgs(c *C) {
isNull := true
col := &pb.Column{IsNull: &isNull}
c.Assert(columnToArg("", col), IsNil)
val, err := columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, IsNil)

var i64 int64 = 666
col = &pb.Column{Int64Value: &i64}
c.Assert(columnToArg("", col), Equals, i64)
val, err = columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, Equals, i64)

var u64 uint64 = 777
col = &pb.Column{Uint64Value: &u64}
c.Assert(columnToArg("", col), Equals, u64)
val, err = columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, Equals, u64)

var d float64 = 3.14
col = &pb.Column{DoubleValue: &d}
c.Assert(columnToArg("", col), Equals, d)
val, err = columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, Equals, d)

var b []byte = []byte{1, 2, 3}
col = &pb.Column{BytesValue: b}
c.Assert(columnToArg("", col), DeepEquals, b)
val, err = columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, DeepEquals, b)

var ss string = "hello world"
col = &pb.Column{StringValue: &ss}
c.Assert(columnToArg("", col), DeepEquals, ss)
val, err = columnToArg("", col)
c.Assert(err, IsNil)
c.Assert(val, DeepEquals, ss)
}
29 changes: 25 additions & 4 deletions tests/dailytest/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,35 @@ var case2Clean = []string{`
}

var case3 = []string{`
CREATE TABLE a(id INT PRIMARY KEY, a1 INT);
CREATE TABLE binlog_case3(id INT PRIMARY KEY, a1 INT);
`,
`
INSERT INTO a(id, a1) VALUES(1,1),(2,1);
INSERT INTO binlog_case3(id, a1) VALUES(1,1),(2,1);
`,
`
ALTER TABLE a ADD UNIQUE INDEX aidx(a1);
ALTER TABLE binlog_case3 ADD UNIQUE INDEX aidx(a1);
`,
}

var case3Clean = []string{
`DROP TABLE a`,
`DROP TABLE binlog_case3`,
}

july2993 marked this conversation as resolved.
Show resolved Hide resolved
// Test issue: TOOL-1346
var case4 = []string{`
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
CREATE TABLE binlog_case4(a BIT(1) NOT NULL);
`,
`
INSERT INTO binlog_case4 VALUES (0x01);
`,
`
UPDATE binlog_case4 SET a = 0x00;
`,
}

var case4Clean = []string{`
DROP TABLE binlog_case4;
`,
}

type testRunner struct {
Expand Down Expand Up @@ -160,6 +177,10 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) {
})
tr.execSQLs(case3Clean)

// run case4
tr.execSQLs(case4)
tr.execSQLs(case4Clean)

tr.run(caseTblWithGeneratedCol)
tr.execSQLs([]string{"DROP TABLE gen_contacts;"})

Expand Down
7 changes: 6 additions & 1 deletion tests/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ func main() {
case msg := <-breader.Messages():
str := msg.Binlog.String()
log.S().Debugf("recv: %.2000s", str)
ld.Input() <- loader.SlaveBinlogToTxn(msg.Binlog)
txn, err := loader.SlaveBinlogToTxn(msg.Binlog)
if err != nil {
log.S().Error(errors.ErrorStack(err))
log.S().Fatal(err)
july2993 marked this conversation as resolved.
Show resolved Hide resolved
}
ld.Input() <- txn
case txn := <-ld.Successes():
log.S().Debug("succ: ", txn)
}
Expand Down
18 changes: 0 additions & 18 deletions tests/update_bit1/drainer.toml

This file was deleted.

28 changes: 0 additions & 28 deletions tests/update_bit1/run.sh

This file was deleted.