Skip to content

Commit

Permalink
syncer(dm) : fix default collation with upstream in create table‘s co…
Browse files Browse the repository at this point in the history
…lumns (pingcap#3769)
  • Loading branch information
WizardXiao authored and zhaoxinyu committed Dec 29, 2021
1 parent 7265b98 commit e2dc007
Show file tree
Hide file tree
Showing 27 changed files with 486 additions and 82 deletions.
11 changes: 7 additions & 4 deletions dm/pkg/binlog/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func GetParserForStatusVars(statusVars []byte) (*parser.Parser, error) {
}

// GetServerCollationByStatusVars gets server collation by binlog statusVars.
func GetServerCollationByStatusVars(statusVars []byte) (string, error) {
func GetServerCollationByStatusVars(statusVars []byte, idAndCollationMap map[int]string) (string, error) {
vars, err := statusVarsToKV(statusVars)
b, ok := vars[QCharsetCode]

Expand All @@ -365,12 +365,15 @@ func GetServerCollationByStatusVars(statusVars []byte) (string, error) {
// only happen when this is a dummy event generated by DM
err = fmt.Errorf("Q_CHARSET_CODE not found in status_vars %v", statusVars)
}
// mysql default 'latin1_swedish_ci'
// mysql 5.7.22 default 'latin1_swedish_ci'
return "latin1_swedish_ci", err
}

// QCharsetCode 2-byte character_set_client + 2-byte collation_connection + 2-byte collation_server
// collation is less than 255 and we use the first byte.
return mysql.Collations[b[4]], err
r := bytes.NewReader(b[4:])
var v uint16
_ = binary.Read(r, binary.LittleEndian, &v)
return idAndCollationMap[int(v)], err
}

// if returned error is `io.EOF`, it means UnexpectedEOF because we handled expected `io.EOF` as success
Expand Down
45 changes: 0 additions & 45 deletions dm/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,51 +391,6 @@ func GetServerUnixTS(ctx context.Context, db *sql.DB) (int64, error) {
return ts, err
}

// GetCharsetAndDefaultCollation gets charset and default collation map.
func GetCharsetAndDefaultCollation(ctx context.Context, db *sql.DB) (map[string]string, error) {
charsetAndDefaultCollation := make(map[string]string)
conn, err := db.Conn(ctx)
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
defer conn.Close()

// Show an example.
/*
mysql> SHOW CHARACTER SET;
+----------+---------------------------------+---------------------+--------+
| Charset | Description | Default collation | Maxlen |
+----------+---------------------------------+---------------------+--------+
| armscii8 | ARMSCII-8 Armenian | armscii8_general_ci | 1 |
| ascii | US ASCII | ascii_general_ci | 1 |
| big5 | Big5 Traditional Chinese | big5_chinese_ci | 2 |
| binary | Binary pseudo charset | binary | 1 |
| cp1250 | Windows Central European | cp1250_general_ci | 1 |
| cp1251 | Windows Cyrillic | cp1251_general_ci | 1 |
+----------+---------------------------------+---------------------+--------+
*/

rows, err := conn.QueryContext(ctx, "SHOW CHARACTER SET")
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}

defer rows.Close()
for rows.Next() {
var charset, description, collation string
var maxlen int
if scanErr := rows.Scan(&charset, &description, &collation, &maxlen); scanErr != nil {
return nil, terror.DBErrorAdapt(scanErr, terror.ErrDBDriverError)
}
charsetAndDefaultCollation[strings.ToLower(charset)] = collation
}

if err = rows.Close(); err != nil {
return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError)
}
return charsetAndDefaultCollation, err
}

// GetSchemaList gets db schema list with `SHOW DATABASES`.
func GetSchemaList(ctx context.Context, db *sql.DB) ([]string, error) {
schemaList := []string{}
Expand Down
47 changes: 44 additions & 3 deletions dm/syncer/dbconn/upstream_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package dbconn

import (
"context"
"strings"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -71,9 +72,49 @@ func (conn *UpStreamConn) GetServerUnixTS(ctx context.Context) (int64, error) {
return utils.GetServerUnixTS(ctx, conn.BaseDB.DB)
}

// GetCharsetAndDefaultCollation returns charset and default collation map.
func (conn *UpStreamConn) GetCharsetAndDefaultCollation(ctx context.Context) (map[string]string, error) {
return utils.GetCharsetAndDefaultCollation(ctx, conn.BaseDB.DB)
// GetCharsetAndDefaultCollation returns charset and collation info.
func GetCharsetAndCollationInfo(tctx *tcontext.Context, conn *DBConn) (map[string]string, map[int]string, error) {
charsetAndDefaultCollation := make(map[string]string)
idAndCollationMap := make(map[int]string)

// Show an example.
/*
mysql> SELECT COLLATION_NAME,CHARACTER_SET_NAME,ID,IS_DEFAULT from INFORMATION_SCHEMA.COLLATIONS;
+----------------------------+--------------------+-----+------------+
| COLLATION_NAME | CHARACTER_SET_NAME | ID | IS_DEFAULT |
+----------------------------+--------------------+-----+------------+
| armscii8_general_ci | armscii8 | 32 | Yes |
| armscii8_bin | armscii8 | 64 | |
| ascii_general_ci | ascii | 11 | Yes |
| ascii_bin | ascii | 65 | |
| big5_chinese_ci | big5 | 1 | Yes |
| big5_bin | big5 | 84 | |
| binary | binary | 63 | Yes |
+----------------------------+--------------------+-----+------------+
*/

rows, err := conn.QuerySQL(tctx, "SELECT COLLATION_NAME,CHARACTER_SET_NAME,ID,IS_DEFAULT from INFORMATION_SCHEMA.COLLATIONS")
if err != nil {
return nil, nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}

defer rows.Close()
for rows.Next() {
var collation, charset, isDefault string
var id int
if scanErr := rows.Scan(&collation, &charset, &id, &isDefault); scanErr != nil {
return nil, nil, terror.DBErrorAdapt(scanErr, terror.ErrDBDriverError)
}
idAndCollationMap[id] = strings.ToLower(collation)
if strings.ToLower(isDefault) == "yes" {
charsetAndDefaultCollation[strings.ToLower(charset)] = collation
}
}

if err = rows.Close(); err != nil {
return nil, nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError)
}
return charsetAndDefaultCollation, idAndCollationMap, err
}

// GetParser returns the parser with correct flag for upstream.
Expand Down
45 changes: 40 additions & 5 deletions dm/syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *Syncer) genDDLInfo(qec *queryEventContext, sql string) (*ddlInfo, error
targetTables: targetTables,
}

adjustCollation(s.tctx, ddlInfo, qec.eventStatusVars, s.charsetAndDefaultCollation)
adjustCollation(s.tctx, ddlInfo, qec.eventStatusVars, s.charsetAndDefaultCollation, s.idAndCollationMap)
routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.originStmt, ddlInfo.targetTables)
ddlInfo.routedDDL = routedDDL
return ddlInfo, err
Expand Down Expand Up @@ -196,12 +196,13 @@ func (s *Syncer) clearOnlineDDL(tctx *tcontext.Context, targetTable *filter.Tabl
}

// adjustCollation adds collation for create database and check create table.
func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte, charsetAndDefaultCollationMap map[string]string) {
func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte, charsetAndDefaultCollationMap map[string]string, idAndCollationMap map[int]string) {
switch createStmt := ddlInfo.originStmt.(type) {
case *ast.CreateTableStmt:
if createStmt.ReferTable != nil {
return
}
adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap)
var justCharset string
for _, tableOption := range createStmt.Options {
// already have 'Collation'
Expand All @@ -222,12 +223,13 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte
tctx.L().Warn("not found charset default collation.", zap.String("originSQL", ddlInfo.originDDL), zap.String("charset", strings.ToLower(justCharset)))
return
}
tctx.L().Info("detect create table risk which use explicit charset and implicit collation, we will add collation by SHOW CHARACTER SET", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation))
tctx.L().Info("detect create table risk which use explicit charset and implicit collation, we will add collation by INFORMATION_SCHEMA.COLLATIONS", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation))
createStmt.Options = append(createStmt.Options, &ast.TableOption{Tp: ast.TableOptionCollate, StrValue: collation})

case *ast.CreateDatabaseStmt:
var justCharset, collation string
var ok bool
var err error
for _, createOption := range createStmt.Options {
// already have 'Collation'
if createOption.Tp == ast.DatabaseOptionCollate {
Expand All @@ -245,18 +247,51 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte
tctx.L().Warn("not found charset default collation.", zap.String("originSQL", ddlInfo.originDDL), zap.String("charset", strings.ToLower(justCharset)))
return
}
tctx.L().Info("detect create database risk which use explicit charset and implicit collation, we will add collation by SHOW CHARACTER SET", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation))
tctx.L().Info("detect create database risk which use explicit charset and implicit collation, we will add collation by INFORMATION_SCHEMA.COLLATIONS", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation))
} else {
// has no charset and collation
// add collation by server collation from binlog statusVars
collation, _ = event.GetServerCollationByStatusVars(statusVars)
collation, err = event.GetServerCollationByStatusVars(statusVars, idAndCollationMap)
if err != nil {
tctx.L().Error("can not get charset server collation from binlog statusVars.", zap.Error(err), zap.String("originSQL", ddlInfo.originDDL))
}
if collation == "" {
tctx.L().Error("get server collation from binlog statusVars is nil.", zap.Error(err), zap.String("originSQL", ddlInfo.originDDL))
return
}
// add collation
tctx.L().Info("detect create database risk which use implicit charset and collation, we will add collation by binlog status_vars", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation))
}
createStmt.Options = append(createStmt.Options, &ast.DatabaseOption{Tp: ast.DatabaseOptionCollate, Value: collation})
}
}

// adjustColumnsCollation adds column's collation.
func adjustColumnsCollation(tctx *tcontext.Context, createStmt *ast.CreateTableStmt, charsetAndDefaultCollationMap map[string]string) {
for _, col := range createStmt.Cols {
for _, options := range col.Options {
// already have 'Collation'
if options.Tp == ast.ColumnOptionCollate {
continue
}
}
fieldType := col.Tp
// already have 'Collation'
if fieldType.Collate != "" {
continue
}
if fieldType.Charset != "" {
// just have charset
collation, ok := charsetAndDefaultCollationMap[strings.ToLower(fieldType.Charset)]
if !ok {
tctx.L().Warn("not found charset default collation for column.", zap.String("table", createStmt.Table.Name.String()), zap.String("column", col.Name.String()), zap.String("charset", strings.ToLower(fieldType.Charset)))
continue
}
col.Options = append(col.Options, &ast.ColumnOption{Tp: ast.ColumnOptionCollate, StrValue: collation})
}
}
}

type ddlInfo struct {
originDDL string
routedDDL string
Expand Down
106 changes: 95 additions & 11 deletions dm/syncer/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (s *testDDLSuite) TestResolveDDLSQL(c *C) {
tctx: tctx,
}
statusVars := []byte{4, 0, 0, 0, 0, 46, 0}
syncer.idAndCollationMap = map[int]string{46: "utf8mb4_bin"}
for i, sql := range sqls {
qec := &queryEventContext{
eventContext: ec,
Expand Down Expand Up @@ -638,28 +639,110 @@ func (s *testDDLSuite) TestClearOnlineDDL(c *C) {
c.Assert(mock.toFinish, HasLen, 0)
}

func (s *testDDLSuite) TestAdjustDatabaseCollation(c *C) {
statusVarsArray := [][]byte{
{
4, 0, 0, 0, 0, 46, 0,
},
{
4, 0, 0, 0, 0, 21, 1,
},
}

sqls := []string{
"create database if not exists `test`",
"create database `test` CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci",
"create database `test` CHARACTER SET=utf8mb4",
"create database `test` COLLATE=utf8mb4_general_ci",
}

expectedSQLs := [][]string{
{
"CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_bin",
"CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci",
"CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci",
"CREATE DATABASE `test` COLLATE = utf8mb4_general_ci",
},
{
"CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_vi_0900_ai_ci",
"CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci",
"CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci",
"CREATE DATABASE `test` COLLATE = utf8mb4_general_ci",
},
}

tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestAdjustTableCollation")))
syncer := NewSyncer(&config.SubTaskConfig{}, nil, nil)
syncer.tctx = tctx
p := parser.New()
tab := &filter.Table{
Schema: "test",
Name: "t",
}

charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci"}
idAndCollationMap := map[int]string{46: "utf8mb4_bin", 277: "utf8mb4_vi_0900_ai_ci"}
for i, statusVars := range statusVarsArray {
for j, sql := range sqls {
ddlInfo := &ddlInfo{
originDDL: sql,
routedDDL: sql,
sourceTables: []*filter.Table{tab},
targetTables: []*filter.Table{tab},
}
stmt, err := p.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
c.Assert(stmt, NotNil)
ddlInfo.originStmt = stmt
adjustCollation(tctx, ddlInfo, statusVars, charsetAndDefaultCollationMap, idAndCollationMap)
routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.originStmt, ddlInfo.targetTables)
c.Assert(err, IsNil)
c.Assert(routedDDL, Equals, expectedSQLs[i][j])
}
}
}

func (s *testDDLSuite) TestAdjustCollation(c *C) {
// duplicate with pkg/parser
sqls := []string{
"create table `test`.`t1` (id int) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int) CHARSET=utf8mb4",
"create table `test`.`t1` (id int) COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int)",
"create database `test` CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci",
"create database `test` CHARACTER SET=utf8mb4",
"create database `test` COLLATE=utf8mb4_general_ci",
"create database if not exists `test`",
"create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20))",
"create table `test`.`t1` (id int, name varchar(20), work varchar(20))",
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20))",
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4)",
"create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) COLLATE=latin1_swedish_ci",
"create table `test`.`t1` (id int, name varchar(20), work varchar(20)) COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20)) COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4) COLLATE=utf8mb4_general_ci",
"create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) CHARSET=utf8mb4 ",
"create table `test`.`t1` (id int, name varchar(20) CHARACTER SET latin1, work varchar(20)) CHARSET=utf8mb4 ",
"create table `test`.`t1` (id int, name varchar(20), work varchar(20)) CHARSET=utf8mb4",
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20)) CHARSET=utf8mb4",
"create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4) CHARSET=utf8mb4",
}

expectedSQLs := []string{
"CREATE TABLE `test`.`t` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT) DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT)",
"CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci",
"CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci",
"CREATE DATABASE `test` COLLATE = utf8mb4_general_ci",
"CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_bin",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20))",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20))",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20))",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci)",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT COLLATE = LATIN1_SWEDISH_CI",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20)) DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci) DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET LATIN1 COLLATE latin1_swedish_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
"CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI",
}

tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestAdjustTableCollation")))
Expand All @@ -671,7 +754,8 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) {
Name: "t",
}
statusVars := []byte{4, 0, 0, 0, 0, 46, 0}
charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci"}
charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci", "latin1": "latin1_swedish_ci"}
idAndCollationMap := map[int]string{46: "utf8mb4_bin"}
for i, sql := range sqls {
ddlInfo := &ddlInfo{
originDDL: sql,
Expand All @@ -683,7 +767,7 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) {
c.Assert(err, IsNil)
c.Assert(stmt, NotNil)
ddlInfo.originStmt = stmt
adjustCollation(tctx, ddlInfo, statusVars, charsetAndDefaultCollationMap)
adjustCollation(tctx, ddlInfo, statusVars, charsetAndDefaultCollationMap, idAndCollationMap)
routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.originStmt, ddlInfo.targetTables)
c.Assert(err, IsNil)
c.Assert(routedDDL, Equals, expectedSQLs[i])
Expand Down
Loading

0 comments on commit e2dc007

Please sign in to comment.