From ee42f751f708b3cf7f40d190b7a4c00b98860250 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Tue, 7 Dec 2021 17:03:47 +0800 Subject: [PATCH 01/13] commit-message: add columns collation by show character set --- dm/syncer/ddl.go | 29 +++++++++++++++++++++++++++++ dm/syncer/ddl_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/dm/syncer/ddl.go b/dm/syncer/ddl.go index a55ef451c67..0ab628c15a7 100644 --- a/dm/syncer/ddl.go +++ b/dm/syncer/ddl.go @@ -206,6 +206,7 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte for _, tableOption := range createStmt.Options { // already have 'Collation' if tableOption.Tp == ast.TableOptionCollate { + adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) return } if tableOption.Tp == ast.TableOptionCharset { @@ -214,16 +215,19 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte } if justCharset == "" { tctx.L().Warn("detect create table risk which use implicit charset and collation", zap.String("originSQL", ddlInfo.originDDL)) + adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) return } // just has charset, can add collation by charset and default collation map collation, ok := charsetAndDefaultCollationMap[strings.ToLower(justCharset)] if !ok { tctx.L().Warn("not found charset default collation.", zap.String("originSQL", ddlInfo.originDDL), zap.String("charset", strings.ToLower(justCharset))) + adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) return } tctx.L().Info("detect create table risk which use explicit charset and implicit collation, we will add collation by SHOW CHARACTER SET", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation)) createStmt.Options = append(createStmt.Options, &ast.TableOption{Tp: ast.TableOptionCollate, StrValue: collation}) + adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) case *ast.CreateDatabaseStmt: var justCharset, collation string @@ -257,6 +261,31 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte } } +// adjustColumnsCollation adds column's collation. +func adjustColumnsCollation(tctx *tcontext.Context, createStmt *ast.CreateTableStmt, charsetAndDefaultCollationMap map[string]string) { + for _, col := range createStmt.Cols { + for _, options := range col.Options { + // already have 'Collation' + if options.Tp == ast.ColumnOptionCollate { + continue + } + } + fieldType := col.Tp + if fieldType.Collate != "" { + continue + } + if fieldType.Charset != "" { + // just have charset + collation, ok := charsetAndDefaultCollationMap[strings.ToLower(fieldType.Charset)] + if !ok { + tctx.L().Warn("not found charset default collation for column.", zap.String("table", createStmt.Table.Name.String()), zap.String("column", col.Name.String()), zap.String("charset", strings.ToLower(fieldType.Charset))) + continue + } + fieldType.Collate = collation + } + } +} + type ddlInfo struct { originDDL string routedDDL string diff --git a/dm/syncer/ddl_test.go b/dm/syncer/ddl_test.go index 1901c3e24f8..f56fb96a48e 100644 --- a/dm/syncer/ddl_test.go +++ b/dm/syncer/ddl_test.go @@ -649,6 +649,18 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) { "create database `test` CHARACTER SET=utf8mb4", "create database `test` COLLATE=utf8mb4_general_ci", "create database if not exists `test`", + "create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20))", + "create table `test`.`t1` (id int, name varchar(20), work varchar(20))", + "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20))", + "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4)", + "create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) COLLATE=utf8mb4_general_ci ", + "create table `test`.`t1` (id int, name varchar(20), work varchar(20)) COLLATE=utf8mb4_general_ci", + "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20)) COLLATE=utf8mb4_general_ci", + "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4) COLLATE=utf8mb4_general_ci", + "create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) CHARSET=utf8mb4 ", + "create table `test`.`t1` (id int, name varchar(20), work varchar(20)) CHARSET=utf8mb4", + "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20)) CHARSET=utf8mb4", + "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4) CHARSET=utf8mb4", } expectedSQLs := []string{ @@ -660,6 +672,18 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) { "CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci", "CREATE DATABASE `test` COLLATE = utf8mb4_general_ci", "CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_bin", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20))", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20))", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20))", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci)", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT COLLATE = UTF8MB4_GENERAL_CI", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20)) DEFAULT COLLATE = UTF8MB4_GENERAL_CI", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT COLLATE = UTF8MB4_GENERAL_CI", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci) DEFAULT COLLATE = UTF8MB4_GENERAL_CI", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", } tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestAdjustTableCollation"))) From c809ba0f45c2fe182a725a93ed73dec771c04c9b Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Fri, 10 Dec 2021 19:50:51 +0800 Subject: [PATCH 02/13] commit-message: add integration test --- dm/tests/others_integration_1.txt | 1 + dm/tests/sync_collation/conf/dm-master.toml | 4 + .../conf/dm-task-increment.yaml | 105 ++++++++++ dm/tests/sync_collation/conf/dm-task.yaml | 79 ++++++++ dm/tests/sync_collation/conf/dm-worker1.toml | 2 + dm/tests/sync_collation/conf/dm-worker2.toml | 2 + dm/tests/sync_collation/conf/source1.yaml | 13 ++ dm/tests/sync_collation/conf/source2.yaml | 13 ++ dm/tests/sync_collation/data/clean_data.sql | 4 + .../sync_collation/data/db1.increment.sql | 13 ++ .../sync_collation/data/db1.increment_err.sql | 3 + dm/tests/sync_collation/data/db1.prepare.sql | 7 + .../sync_collation/data/db1.prepare_err.sql | 8 + .../sync_collation/data/db2.increment.sql | 13 ++ .../sync_collation/data/db2.increment_err.sql | 3 + dm/tests/sync_collation/data/db2.prepare.sql | 7 + .../sync_collation/data/db2.prepare_err.sql | 8 + .../data/tidb.checktable.prepare.sql | 3 + .../data/tidb.checktable.prepare2.sql | 3 + dm/tests/sync_collation/run.sh | 183 ++++++++++++++++++ 20 files changed, 474 insertions(+) create mode 100644 dm/tests/sync_collation/conf/dm-master.toml create mode 100644 dm/tests/sync_collation/conf/dm-task-increment.yaml create mode 100644 dm/tests/sync_collation/conf/dm-task.yaml create mode 100644 dm/tests/sync_collation/conf/dm-worker1.toml create mode 100644 dm/tests/sync_collation/conf/dm-worker2.toml create mode 100644 dm/tests/sync_collation/conf/source1.yaml create mode 100644 dm/tests/sync_collation/conf/source2.yaml create mode 100644 dm/tests/sync_collation/data/clean_data.sql create mode 100644 dm/tests/sync_collation/data/db1.increment.sql create mode 100644 dm/tests/sync_collation/data/db1.increment_err.sql create mode 100644 dm/tests/sync_collation/data/db1.prepare.sql create mode 100644 dm/tests/sync_collation/data/db1.prepare_err.sql create mode 100644 dm/tests/sync_collation/data/db2.increment.sql create mode 100644 dm/tests/sync_collation/data/db2.increment_err.sql create mode 100644 dm/tests/sync_collation/data/db2.prepare.sql create mode 100644 dm/tests/sync_collation/data/db2.prepare_err.sql create mode 100644 dm/tests/sync_collation/data/tidb.checktable.prepare.sql create mode 100644 dm/tests/sync_collation/data/tidb.checktable.prepare2.sql create mode 100755 dm/tests/sync_collation/run.sh diff --git a/dm/tests/others_integration_1.txt b/dm/tests/others_integration_1.txt index 06230c842a0..2b538f41477 100644 --- a/dm/tests/others_integration_1.txt +++ b/dm/tests/others_integration_1.txt @@ -10,3 +10,4 @@ checkpoint_transaction lightning_mode downstream_diff_index slow_relay_writer +sync_collation diff --git a/dm/tests/sync_collation/conf/dm-master.toml b/dm/tests/sync_collation/conf/dm-master.toml new file mode 100644 index 00000000000..7cecf59ad86 --- /dev/null +++ b/dm/tests/sync_collation/conf/dm-master.toml @@ -0,0 +1,4 @@ +# Master Configuration. +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" +auto-compaction-retention = "3s" diff --git a/dm/tests/sync_collation/conf/dm-task-increment.yaml b/dm/tests/sync_collation/conf/dm-task-increment.yaml new file mode 100644 index 00000000000..16f76e63143 --- /dev/null +++ b/dm/tests/sync_collation/conf/dm-task-increment.yaml @@ -0,0 +1,105 @@ +--- +name: task-name-placeholder +task-mode: incremental +is-sharding: false +meta-schema: "dm_meta" +# enable-heartbeat: true +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 +clean-dump-file: false + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + meta: + binlog-name: binlog-name-placeholder-1 + binlog-pos: binlog-pos-placeholder-1 + binlog-gtid: binlog-gtid-placeholder-1 + block-allow-list: "instance" + route-rules: [ "collation-table-rules-1",collation-table-rules-1-2,"collation-schema-rules-1","collation-table-rules-increment-1","collation-schema-rules-increment-1"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + meta: + binlog-name: binlog-name-placeholder-2 + binlog-pos: binlog-pos-placeholder-2 + binlog-gtid: binlog-gtid-placeholder-2 + block-allow-list: "instance" + route-rules: [ "collation-table-rules-2",collation-table-rules-2-2,"collation-schema-rules-2","collation-table-rules-increment-2","collation-schema-rules-increment-2"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +block-allow-list: + instance: + do-dbs: ["sync_collation*"] + +routes: + collation-table-rules-1: + schema-pattern: "sync_collation" + target-schema: "sync_collation" + table-pattern: "t1" + target-table: "t" + collation-table-rules-1-2: + schema-pattern: "sync_collation" + target-schema: "sync_collation" + table-pattern: "t2" + target-table: "t2" + collation-schema-rules-1: + schema-pattern: "sync_collation" + target-schema: "sync_collation" + collation-table-rules-2: + schema-pattern: "sync_collation" + target-schema: "sync_collation2" + table-pattern: "t1" + target-table: "t" + collation-table-rules-2-2: + schema-pattern: "sync_collation" + target-schema: "sync_collation2" + table-pattern: "t2" + target-table: "t2" + collation-schema-rules-2: + schema-pattern: "sync_collation" + target-schema: "sync_collation2" + + collation-table-rules-increment-1: + schema-pattern: "sync_collation_increment" + target-schema: "sync_collation_increment" + table-pattern: "t1" + target-table: "t" + collation-schema-rules-increment-1: + schema-pattern: "sync_collation_increment" + target-schema: "sync_collation_increment" + collation-table-rules-increment-2: + schema-pattern: "sync_collation_increment" + target-schema: "sync_collation_increment2" + table-pattern: "t1" + target-table: "t" + collation-schema-rules-increment-2: + schema-pattern: "sync_collation_increment" + target-schema: "sync_collation_increment2" + + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/dm/tests/sync_collation/conf/dm-task.yaml b/dm/tests/sync_collation/conf/dm-task.yaml new file mode 100644 index 00000000000..a1f9c2b3a99 --- /dev/null +++ b/dm/tests/sync_collation/conf/dm-task.yaml @@ -0,0 +1,79 @@ +--- +name: task-name-placeholder +task-mode: full +is-sharding: false +meta-schema: "dm_meta" +# enable-heartbeat: true +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 +clean-dump-file: false + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + route-rules: [ "collation-table-rules-1","collation-table-rules-1-2","collation-schema-rules-1" ] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + block-allow-list: "instance" + route-rules: [ "collation-table-rules-2","collation-table-rules-2-2","collation-schema-rules-2" ] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +block-allow-list: + instance: + do-dbs: ["sync_collation"] + +routes: + collation-table-rules-1: + schema-pattern: "sync_collation*" + target-schema: "sync_collation" + table-pattern: "t1" + target-table: "t" + collation-table-rules-1-2: + schema-pattern: "sync_collation*" + target-schema: "sync_collation" + table-pattern: "t2" + target-table: "t2" + collation-schema-rules-1: + schema-pattern: "sync_collation*" + target-schema: "sync_collation" + collation-table-rules-2: + schema-pattern: "sync_collation*" + target-schema: "sync_collation2" + table-pattern: "t1" + target-table: "t" + collation-table-rules-2-2: + schema-pattern: "sync_collation*" + target-schema: "sync_collation2" + table-pattern: "t2" + target-table: "t2" + collation-schema-rules-2: + schema-pattern: "sync_collation*" + target-schema: "sync_collation2" + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/dm/tests/sync_collation/conf/dm-worker1.toml b/dm/tests/sync_collation/conf/dm-worker1.toml new file mode 100644 index 00000000000..7a72ea72bf8 --- /dev/null +++ b/dm/tests/sync_collation/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +name = "worker1" +join = "127.0.0.1:8261" diff --git a/dm/tests/sync_collation/conf/dm-worker2.toml b/dm/tests/sync_collation/conf/dm-worker2.toml new file mode 100644 index 00000000000..010e21c73eb --- /dev/null +++ b/dm/tests/sync_collation/conf/dm-worker2.toml @@ -0,0 +1,2 @@ +name = "worker2" +join = "127.0.0.1:8261" diff --git a/dm/tests/sync_collation/conf/source1.yaml b/dm/tests/sync_collation/conf/source1.yaml new file mode 100644 index 00000000000..679a2f4db7c --- /dev/null +++ b/dm/tests/sync_collation/conf/source1.yaml @@ -0,0 +1,13 @@ +source-id: mysql-replica-01 +flavor: 'mysql' +enable-gtid: true +relay-binlog-name: '' +relay-binlog-gtid: '' +enable-relay: false +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 +checker: + check-enable: false diff --git a/dm/tests/sync_collation/conf/source2.yaml b/dm/tests/sync_collation/conf/source2.yaml new file mode 100644 index 00000000000..1d1d87b9d0c --- /dev/null +++ b/dm/tests/sync_collation/conf/source2.yaml @@ -0,0 +1,13 @@ +source-id: mysql-replica-02 +flavor: 'mysql' +enable-gtid: true +relay-binlog-name: '' +relay-binlog-gtid: '' +enable-relay: false +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3307 +checker: + check-enable: false diff --git a/dm/tests/sync_collation/data/clean_data.sql b/dm/tests/sync_collation/data/clean_data.sql new file mode 100644 index 00000000000..646db5fb6b7 --- /dev/null +++ b/dm/tests/sync_collation/data/clean_data.sql @@ -0,0 +1,4 @@ +drop database if exists `sync_collation`; +drop database if exists `sync_collation2`; +drop database if exists `sync_collation_increment`; +drop database if exists `sync_collation_increment2`; \ No newline at end of file diff --git a/dm/tests/sync_collation/data/db1.increment.sql b/dm/tests/sync_collation/data/db1.increment.sql new file mode 100644 index 00000000000..dfe8ac3ecdb --- /dev/null +++ b/dm/tests/sync_collation/data/db1.increment.sql @@ -0,0 +1,13 @@ +drop database if exists `sync_collation`; +create database `sync_collation` character set utf8; +use `sync_collation`; +create table t1 (id int, name varchar(20), primary key(`id`)) character set utf8; +insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); +create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)) character set latin1 collate latin1_bin; +insert into t2 (id, name) values (1, 'Aa'), (2, 'aA'); +set collation_server = utf8_general_ci; +drop database if exists `sync_collation_increment`; +create database `sync_collation_increment`; +use `sync_collation_increment`; +create table t1 (id int, name varchar(20), primary key(`id`)); +insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); \ No newline at end of file diff --git a/dm/tests/sync_collation/data/db1.increment_err.sql b/dm/tests/sync_collation/data/db1.increment_err.sql new file mode 100644 index 00000000000..15e3878432b --- /dev/null +++ b/dm/tests/sync_collation/data/db1.increment_err.sql @@ -0,0 +1,3 @@ +set collation_server = latin1_swedish_ci; +drop database if exists `sync_collation`; +create database `sync_collation`; \ No newline at end of file diff --git a/dm/tests/sync_collation/data/db1.prepare.sql b/dm/tests/sync_collation/data/db1.prepare.sql new file mode 100644 index 00000000000..4ce27c05f52 --- /dev/null +++ b/dm/tests/sync_collation/data/db1.prepare.sql @@ -0,0 +1,7 @@ +drop database if exists `sync_collation`; +create database `sync_collation` character set utf8; +use `sync_collation`; +create table t1 (id int, name varchar(20), primary key(`id`)) character set utf8; +insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); +create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)) character set latin1 collate latin1_bin; +insert into t2 (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/data/db1.prepare_err.sql b/dm/tests/sync_collation/data/db1.prepare_err.sql new file mode 100644 index 00000000000..616a34abfaa --- /dev/null +++ b/dm/tests/sync_collation/data/db1.prepare_err.sql @@ -0,0 +1,8 @@ +set collation_server = latin1_swedish_ci; +drop database if exists `sync_collation`; +create database `sync_collation`; +use `sync_collation`; +create table t1 (id int, name varchar(20), primary key(`id`)); +insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); +create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)); +insert into t2 (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/data/db2.increment.sql b/dm/tests/sync_collation/data/db2.increment.sql new file mode 100644 index 00000000000..eaf03128f93 --- /dev/null +++ b/dm/tests/sync_collation/data/db2.increment.sql @@ -0,0 +1,13 @@ +drop database if exists `sync_collation`; +create database `sync_collation` character set utf8; +use `sync_collation`; +create table t1 (id int, name varchar(20), primary key(`id`)) character set utf8; +insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); +create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)) character set latin1 collate latin1_bin; +insert into t2 (id, name) values (1, 'Aa'), (2, 'aA'); +set collation_server = utf8_general_ci; +drop database if exists `sync_collation_increment`; +create database `sync_collation_increment`; +use `sync_collation_increment`; +create table t1 (id int, name varchar(20), primary key(`id`)); +insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/data/db2.increment_err.sql b/dm/tests/sync_collation/data/db2.increment_err.sql new file mode 100644 index 00000000000..9f1ff3ccdcf --- /dev/null +++ b/dm/tests/sync_collation/data/db2.increment_err.sql @@ -0,0 +1,3 @@ +set collation_server = utf8mb4_0900_ai_ci; +drop database if exists `sync_collation`; +create database `sync_collation`; diff --git a/dm/tests/sync_collation/data/db2.prepare.sql b/dm/tests/sync_collation/data/db2.prepare.sql new file mode 100644 index 00000000000..4ce27c05f52 --- /dev/null +++ b/dm/tests/sync_collation/data/db2.prepare.sql @@ -0,0 +1,7 @@ +drop database if exists `sync_collation`; +create database `sync_collation` character set utf8; +use `sync_collation`; +create table t1 (id int, name varchar(20), primary key(`id`)) character set utf8; +insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); +create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)) character set latin1 collate latin1_bin; +insert into t2 (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/data/db2.prepare_err.sql b/dm/tests/sync_collation/data/db2.prepare_err.sql new file mode 100644 index 00000000000..0679bd912f2 --- /dev/null +++ b/dm/tests/sync_collation/data/db2.prepare_err.sql @@ -0,0 +1,8 @@ +set collation_server = utf8mb4_0900_ai_ci; +drop database if exists `sync_collation`; +create database `sync_collation`; +use `sync_collation`; +create table t1 (id int, name varchar(20), primary key(`id`)); +insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); +create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)); +insert into t2 (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/data/tidb.checktable.prepare.sql b/dm/tests/sync_collation/data/tidb.checktable.prepare.sql new file mode 100644 index 00000000000..a5147aae882 --- /dev/null +++ b/dm/tests/sync_collation/data/tidb.checktable.prepare.sql @@ -0,0 +1,3 @@ +use `sync_collation`; +create table t_check (id int, name varchar(20), primary key (`id`)); +insert into t_check (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/data/tidb.checktable.prepare2.sql b/dm/tests/sync_collation/data/tidb.checktable.prepare2.sql new file mode 100644 index 00000000000..947a85fd9be --- /dev/null +++ b/dm/tests/sync_collation/data/tidb.checktable.prepare2.sql @@ -0,0 +1,3 @@ +use `sync_collation2`; +create table t_check (id int, name varchar(20), primary key (`id`)); +insert into t_check (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/run.sh b/dm/tests/sync_collation/run.sh new file mode 100755 index 00000000000..2b92517fb3f --- /dev/null +++ b/dm/tests/sync_collation/run.sh @@ -0,0 +1,183 @@ +#!/bin/bash + +set -eu + +cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME +db="sync_collation" +db_increment="sync_collation_increment" +db2="sync_collation2" +db_increment2="sync_collation_increment2" +tb="t" +tb2="t2" +tb_check="t_check" + +function run() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + echo "start task in full mode" + cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml + TASK_NAME="full_test" + sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml + dmctl_start_task $WORK_DIR/dm-task.yaml + + # check table + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db2}.${tb} where name = 'aa';" "count(1): 2" + + # check column + # run_sql_tidb_with_retry "select count(1) from ${db}.${tb2} where name = 'aa';" "count(1): 2" + # run_sql_tidb_with_retry "select count(1) from ${db2}.${tb2} where name = 'aa';" "count(1): 2" + + # check database by create table + run_sql_file $cur/data/tidb.checktable.prepare.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + run_sql_tidb_with_retry "select count(1) from ${db}.${tb_check} where name = 'aa';" "count(1): 2" + run_sql_file $cur/data/tidb.checktable.prepare2.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + run_sql_tidb_with_retry "select count(1) from ${db2}.${tb_check} where name = 'aa';" "count(1): 2" + + echo "stop task ${TASK_NAME}" + dmctl_stop_task $TASK_NAME $MASTER_PORT + + # $worker1_run_source_1 > 0 means source1 is operated to worker1 + worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true + if [ $worker1_run_source_1 -gt 0 ]; then + name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') + name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') + else + name2=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + pos2=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + gtid2=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') + name1=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + pos1=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + gtid1=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') + fi + + run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + + echo "start task in incremental mode" + TASK_NAME="incremental_test" + cat $cur/conf/dm-task-increment.yaml >$WORK_DIR/dm-task.yaml + sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-name-placeholder-1/$name1/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-pos-placeholder-1/$pos1/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-gtid-placeholder-1/$gtid1/g" $WORK_DIR/dm-task.yaml + + sed -i "s/binlog-name-placeholder-2/$name2/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml + dmctl_start_task $WORK_DIR/dm-task.yaml + + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + + # check table + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db2}.${tb} where name = 'aa';" "count(1): 2" + + # check column + run_sql_tidb_with_retry "select count(1) from ${db}.${tb2} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db2}.${tb2} where name = 'aa';" "count(1): 2" + + # check database by create table + run_sql_file $cur/data/tidb.checktable.prepare.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + run_sql_tidb_with_retry "select count(1) from ${db}.${tb_check} where name = 'aa';" "count(1): 2" + run_sql_file $cur/data/tidb.checktable.prepare2.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + run_sql_tidb_with_retry "select count(1) from ${db2}.${tb_check} where name = 'aa';" "count(1): 2" + + # check set server collation + run_sql_tidb_with_retry "select count(1) from ${db_increment}.${tb} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db_increment2}.${tb} where name = 'aa';" "count(1): 2" + + dmctl_stop_task $TASK_NAME $MASTER_PORT + + echo "start task in full mode and test not support" + run_sql_file $cur/data/db1.prepare_err.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.prepare_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + + cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml + TASK_NAME="full_err" + sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml + + echo "start task and will failed" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task ${WORK_DIR}/dm-task.yaml" \ + "\"result\": false" 2 \ + "Error 1273: Unsupported collation when new collation is enabled: 'latin1_swedish_ci'" 1 \ + "Error 1273: Unsupported collation when new collation is enabled: 'utf8mb4_0900_ai_ci'" 1 + + dmctl_stop_task $TASK_NAME $MASTER_PORT + + # $worker1_run_source_1 > 0 means source1 is operated to worker1 + worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true + if [ $worker1_run_source_1 -gt 0 ]; then + name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') + name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') + else + name2=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + pos2=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + gtid2=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') + name1=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + pos1=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + gtid1=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') + fi + + echo "start task in incremantal mode and test not support" + + run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + + TASK_NAME="incremental_err" + cat $cur/conf/dm-task-increment.yaml >$WORK_DIR/dm-task.yaml + sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-name-placeholder-1/$name1/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-pos-placeholder-1/$pos1/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-gtid-placeholder-1/$gtid1/g" $WORK_DIR/dm-task.yaml + + sed -i "s/binlog-name-placeholder-2/$name2/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml + dmctl_start_task $WORK_DIR/dm-task.yaml + + run_sql_file $cur/data/db1.increment_err.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + + echo "task sync and will failed" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status ${TASK_NAME}" \ + "Error 1273: Unsupported collation when new collation is enabled: 'latin1_swedish_ci'" 1 \ + "Error 1273: Unsupported collation when new collation is enabled: 'utf8mb4_0900_ai_ci'" 1 + + dmctl_stop_task $TASK_NAME $MASTER_PORT +} + +cleanup_data $TEST_NAME +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" From a2560721f12daeb89fbd3538b4761e8348c845ee Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Mon, 13 Dec 2021 09:18:44 +0800 Subject: [PATCH 03/13] commit-message: fix collation id which is greater than 255 --- dm/pkg/binlog/event/util.go | 11 ++++--- dm/pkg/utils/db.go | 52 ++++++++++++++++++--------------- dm/syncer/dbconn/upstream_db.go | 6 ++-- dm/syncer/ddl.go | 10 +++---- dm/syncer/ddl_test.go | 4 ++- dm/syncer/syncer.go | 3 +- 6 files changed, 49 insertions(+), 37 deletions(-) diff --git a/dm/pkg/binlog/event/util.go b/dm/pkg/binlog/event/util.go index dd660e285fd..3d902ea861f 100644 --- a/dm/pkg/binlog/event/util.go +++ b/dm/pkg/binlog/event/util.go @@ -356,7 +356,7 @@ func GetParserForStatusVars(statusVars []byte) (*parser.Parser, error) { } // GetServerCollationByStatusVars gets server collation by binlog statusVars. -func GetServerCollationByStatusVars(statusVars []byte) (string, error) { +func GetServerCollationByStatusVars(statusVars []byte, idAndCollationMap map[int]string) (string, error) { vars, err := statusVarsToKV(statusVars) b, ok := vars[QCharsetCode] @@ -365,12 +365,15 @@ func GetServerCollationByStatusVars(statusVars []byte) (string, error) { // only happen when this is a dummy event generated by DM err = fmt.Errorf("Q_CHARSET_CODE not found in status_vars %v", statusVars) } - // mysql default 'latin1_swedish_ci' + // mysql 5.7.22 default 'latin1_swedish_ci' return "latin1_swedish_ci", err } + // QCharsetCode 2-byte character_set_client + 2-byte collation_connection + 2-byte collation_server - // collation is less than 255 and we use the first byte. - return mysql.Collations[b[4]], err + r := bytes.NewReader(b[4:]) + var v uint16 + _ = binary.Read(r, binary.LittleEndian, &v) + return idAndCollationMap[int(v)], err } // if returned error is `io.EOF`, it means UnexpectedEOF because we handled expected `io.EOF` as success diff --git a/dm/pkg/utils/db.go b/dm/pkg/utils/db.go index fa9d6337d5d..87563cbd081 100644 --- a/dm/pkg/utils/db.go +++ b/dm/pkg/utils/db.go @@ -391,49 +391,55 @@ func GetServerUnixTS(ctx context.Context, db *sql.DB) (int64, error) { return ts, err } -// GetCharsetAndDefaultCollation gets charset and default collation map. -func GetCharsetAndDefaultCollation(ctx context.Context, db *sql.DB) (map[string]string, error) { +// GetCharsetAndCollationInfo gets charset and collation info. +func GetCharsetAndCollationInfo(ctx context.Context, db *sql.DB) (map[string]string, map[int]string, error) { charsetAndDefaultCollation := make(map[string]string) + idAndCollationMap := make(map[int]string) + conn, err := db.Conn(ctx) if err != nil { - return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + return nil, nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) } defer conn.Close() // Show an example. /* - mysql> SHOW CHARACTER SET; - +----------+---------------------------------+---------------------+--------+ - | Charset | Description | Default collation | Maxlen | - +----------+---------------------------------+---------------------+--------+ - | armscii8 | ARMSCII-8 Armenian | armscii8_general_ci | 1 | - | ascii | US ASCII | ascii_general_ci | 1 | - | big5 | Big5 Traditional Chinese | big5_chinese_ci | 2 | - | binary | Binary pseudo charset | binary | 1 | - | cp1250 | Windows Central European | cp1250_general_ci | 1 | - | cp1251 | Windows Cyrillic | cp1251_general_ci | 1 | - +----------+---------------------------------+---------------------+--------+ + mysql> SELECT COLLATION_NAME,CHARACTER_SET_NAME,ID,IS_DEFAULT from INFORMATION_SCHEMA.COLLATIONS; + +----------------------------+--------------------+-----+------------+ + | COLLATION_NAME | CHARACTER_SET_NAME | ID | IS_DEFAULT | + +----------------------------+--------------------+-----+------------+ + | armscii8_general_ci | armscii8 | 32 | Yes | + | armscii8_bin | armscii8 | 64 | | + | ascii_general_ci | ascii | 11 | Yes | + | ascii_bin | ascii | 65 | | + | big5_chinese_ci | big5 | 1 | Yes | + | big5_bin | big5 | 84 | | + | binary | binary | 63 | Yes | + +----------------------------+--------------------+-----+------------+ */ - rows, err := conn.QueryContext(ctx, "SHOW CHARACTER SET") + rows, err := conn.QueryContext(ctx, "SELECT COLLATION_NAME,CHARACTER_SET_NAME,ID,IS_DEFAULT from INFORMATION_SCHEMA.COLLATIONS") if err != nil { - return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + return nil, nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) } defer rows.Close() for rows.Next() { - var charset, description, collation string - var maxlen int - if scanErr := rows.Scan(&charset, &description, &collation, &maxlen); scanErr != nil { - return nil, terror.DBErrorAdapt(scanErr, terror.ErrDBDriverError) + var collation, charset, isDefault string + var id int + if scanErr := rows.Scan(&collation, &charset, &id, &isDefault); scanErr != nil { + return nil, nil, terror.DBErrorAdapt(scanErr, terror.ErrDBDriverError) + } + idAndCollationMap[id] = strings.ToLower(collation) + if strings.ToLower(isDefault) == "yes" { + charsetAndDefaultCollation[strings.ToLower(charset)] = collation } - charsetAndDefaultCollation[strings.ToLower(charset)] = collation } if err = rows.Close(); err != nil { - return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) + return nil, nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) } - return charsetAndDefaultCollation, err + return charsetAndDefaultCollation, idAndCollationMap, err } // GetSchemaList gets db schema list with `SHOW DATABASES`. diff --git a/dm/syncer/dbconn/upstream_db.go b/dm/syncer/dbconn/upstream_db.go index ccbba8d6c4c..7889e4ad973 100644 --- a/dm/syncer/dbconn/upstream_db.go +++ b/dm/syncer/dbconn/upstream_db.go @@ -71,9 +71,9 @@ func (conn *UpStreamConn) GetServerUnixTS(ctx context.Context) (int64, error) { return utils.GetServerUnixTS(ctx, conn.BaseDB.DB) } -// GetCharsetAndDefaultCollation returns charset and default collation map. -func (conn *UpStreamConn) GetCharsetAndDefaultCollation(ctx context.Context) (map[string]string, error) { - return utils.GetCharsetAndDefaultCollation(ctx, conn.BaseDB.DB) +// GetCharsetAndDefaultCollation returns charset and collation info. +func (conn *UpStreamConn) GetCharsetAndCollationInfo(ctx context.Context) (map[string]string, map[int]string, error) { + return utils.GetCharsetAndCollationInfo(ctx, conn.BaseDB.DB) } // GetParser returns the parser with correct flag for upstream. diff --git a/dm/syncer/ddl.go b/dm/syncer/ddl.go index 0ab628c15a7..5eb8b2cc22c 100644 --- a/dm/syncer/ddl.go +++ b/dm/syncer/ddl.go @@ -127,7 +127,7 @@ func (s *Syncer) genDDLInfo(qec *queryEventContext, sql string) (*ddlInfo, error targetTables: targetTables, } - adjustCollation(s.tctx, ddlInfo, qec.eventStatusVars, s.charsetAndDefaultCollation) + adjustCollation(s.tctx, ddlInfo, qec.eventStatusVars, s.charsetAndDefaultCollation, s.idAndCollationMap) routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.originStmt, ddlInfo.targetTables) ddlInfo.routedDDL = routedDDL return ddlInfo, err @@ -196,7 +196,7 @@ func (s *Syncer) clearOnlineDDL(tctx *tcontext.Context, targetTable *filter.Tabl } // adjustCollation adds collation for create database and check create table. -func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte, charsetAndDefaultCollationMap map[string]string) { +func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte, charsetAndDefaultCollationMap map[string]string, idAndCollationMap map[int]string) { switch createStmt := ddlInfo.originStmt.(type) { case *ast.CreateTableStmt: if createStmt.ReferTable != nil { @@ -225,7 +225,7 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) return } - tctx.L().Info("detect create table risk which use explicit charset and implicit collation, we will add collation by SHOW CHARACTER SET", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation)) + tctx.L().Info("detect create table risk which use explicit charset and implicit collation, we will add collation by INFORMATION_SCHEMA.COLLATIONS", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation)) createStmt.Options = append(createStmt.Options, &ast.TableOption{Tp: ast.TableOptionCollate, StrValue: collation}) adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) @@ -249,11 +249,11 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte tctx.L().Warn("not found charset default collation.", zap.String("originSQL", ddlInfo.originDDL), zap.String("charset", strings.ToLower(justCharset))) return } - tctx.L().Info("detect create database risk which use explicit charset and implicit collation, we will add collation by SHOW CHARACTER SET", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation)) + tctx.L().Info("detect create database risk which use explicit charset and implicit collation, we will add collation by INFORMATION_SCHEMA.COLLATIONS", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation)) } else { // has no charset and collation // add collation by server collation from binlog statusVars - collation, _ = event.GetServerCollationByStatusVars(statusVars) + collation, _ = event.GetServerCollationByStatusVars(statusVars, idAndCollationMap) // add collation tctx.L().Info("detect create database risk which use implicit charset and collation, we will add collation by binlog status_vars", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation)) } diff --git a/dm/syncer/ddl_test.go b/dm/syncer/ddl_test.go index f56fb96a48e..7c2622298d5 100644 --- a/dm/syncer/ddl_test.go +++ b/dm/syncer/ddl_test.go @@ -234,6 +234,7 @@ func (s *testDDLSuite) TestResolveDDLSQL(c *C) { tctx: tctx, } statusVars := []byte{4, 0, 0, 0, 0, 46, 0} + syncer.idAndCollationMap = map[int]string{46: "utf8mb4_bin"} for i, sql := range sqls { qec := &queryEventContext{ eventContext: ec, @@ -696,6 +697,7 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) { } statusVars := []byte{4, 0, 0, 0, 0, 46, 0} charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci"} + idAndCollationMap := map[int]string{46: "utf8mb4_bin"} for i, sql := range sqls { ddlInfo := &ddlInfo{ originDDL: sql, @@ -707,7 +709,7 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) { c.Assert(err, IsNil) c.Assert(stmt, NotNil) ddlInfo.originStmt = stmt - adjustCollation(tctx, ddlInfo, statusVars, charsetAndDefaultCollationMap) + adjustCollation(tctx, ddlInfo, statusVars, charsetAndDefaultCollationMap, idAndCollationMap) routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.originStmt, ddlInfo.targetTables) c.Assert(err, IsNil) c.Assert(routedDDL, Equals, expectedSQLs[i]) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index ca99b8465dd..43930e3bc1b 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -228,6 +228,7 @@ type Syncer struct { relay relay.Process charsetAndDefaultCollation map[string]string + idAndCollationMap map[int]string } // NewSyncer creates a new Syncer. @@ -327,7 +328,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return terror.ErrSchemaTrackerInit.Delegate(err) } - s.charsetAndDefaultCollation, err = s.fromDB.GetCharsetAndDefaultCollation(ctx) + s.charsetAndDefaultCollation, s.idAndCollationMap, err = s.fromDB.GetCharsetAndCollationInfo(ctx) if err != nil { return err } From 8324c3b666dd9176830e2fc89b8d7b81916a0029 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Mon, 13 Dec 2021 10:02:21 +0800 Subject: [PATCH 04/13] commit-message: fix integration --- dm/tests/sync_collation/run.sh | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/dm/tests/sync_collation/run.sh b/dm/tests/sync_collation/run.sh index 2b92517fb3f..f622e8717b9 100755 --- a/dm/tests/sync_collation/run.sh +++ b/dm/tests/sync_collation/run.sh @@ -75,6 +75,9 @@ function run() { run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD echo "start task in incremental mode" + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + TASK_NAME="incremental_test" cat $cur/conf/dm-task-increment.yaml >$WORK_DIR/dm-task.yaml sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml @@ -86,10 +89,7 @@ function run() { sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/dm-task.yaml sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml dmctl_start_task $WORK_DIR/dm-task.yaml - - run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - + # check table run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" run_sql_tidb_with_retry "select count(1) from ${db2}.${tb} where name = 'aa';" "count(1): 2" @@ -149,7 +149,9 @@ function run() { echo "start task in incremantal mode and test not support" run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD - + run_sql_file $cur/data/db1.increment_err.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + TASK_NAME="incremental_err" cat $cur/conf/dm-task-increment.yaml >$WORK_DIR/dm-task.yaml sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml @@ -162,9 +164,6 @@ function run() { sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml dmctl_start_task $WORK_DIR/dm-task.yaml - run_sql_file $cur/data/db1.increment_err.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql_file $cur/data/db2.increment_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - echo "task sync and will failed" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status ${TASK_NAME}" \ From 6c3306ca5c1bd07b7720c60e583f89226b334f36 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Mon, 13 Dec 2021 11:55:33 +0800 Subject: [PATCH 05/13] commit-message: update import tidb version --- dm/syncer/ddl_test.go | 13 +++++++++---- dm/tests/sync_collation/run.sh | 25 +++++++++++++------------ go.mod | 7 +++---- go.sum | 13 ++++++------- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/dm/syncer/ddl_test.go b/dm/syncer/ddl_test.go index 7c2622298d5..54f484afe39 100644 --- a/dm/syncer/ddl_test.go +++ b/dm/syncer/ddl_test.go @@ -642,6 +642,8 @@ func (s *testDDLSuite) TestClearOnlineDDL(c *C) { func (s *testDDLSuite) TestAdjustCollation(c *C) { // duplicate with pkg/parser sqls := []string{ + "create database if not exists `test`", + "create database if not exists `test`", "create table `test`.`t1` (id int) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci", "create table `test`.`t1` (id int) CHARSET=utf8mb4", "create table `test`.`t1` (id int) COLLATE=utf8mb4_general_ci", @@ -649,7 +651,6 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) { "create database `test` CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci", "create database `test` CHARACTER SET=utf8mb4", "create database `test` COLLATE=utf8mb4_general_ci", - "create database if not exists `test`", "create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20))", "create table `test`.`t1` (id int, name varchar(20), work varchar(20))", "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20))", @@ -665,6 +666,8 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) { } expectedSQLs := []string{ + "CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_vi_0900_ai_ci", + "CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_bin", "CREATE TABLE `test`.`t` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT) DEFAULT COLLATE = UTF8MB4_GENERAL_CI", @@ -672,7 +675,6 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) { "CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci", "CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci", "CREATE DATABASE `test` COLLATE = utf8mb4_general_ci", - "CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_bin", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20))", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20))", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20))", @@ -695,10 +697,13 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) { Schema: "test", Name: "t", } - statusVars := []byte{4, 0, 0, 0, 0, 46, 0} + statusVars := []byte{4, 0, 0, 0, 0, 21, 1} charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci"} - idAndCollationMap := map[int]string{46: "utf8mb4_bin"} + idAndCollationMap := map[int]string{46: "utf8mb4_bin", 277: "utf8mb4_vi_0900_ai_ci"} for i, sql := range sqls { + if i == 1 { + statusVars = []byte{4, 0, 0, 0, 0, 46, 0} + } ddlInfo := &ddlInfo{ originDDL: sql, routedDDL: sql, diff --git a/dm/tests/sync_collation/run.sh b/dm/tests/sync_collation/run.sh index f622e8717b9..909ec44bcad 100755 --- a/dm/tests/sync_collation/run.sh +++ b/dm/tests/sync_collation/run.sh @@ -38,12 +38,12 @@ function run() { dmctl_start_task $WORK_DIR/dm-task.yaml # check table - run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry_times "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" 20 run_sql_tidb_with_retry "select count(1) from ${db2}.${tb} where name = 'aa';" "count(1): 2" # check column - # run_sql_tidb_with_retry "select count(1) from ${db}.${tb2} where name = 'aa';" "count(1): 2" - # run_sql_tidb_with_retry "select count(1) from ${db2}.${tb2} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db}.${tb2} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db2}.${tb2} where name = 'aa';" "count(1): 2" # check database by create table run_sql_file $cur/data/tidb.checktable.prepare.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD @@ -75,8 +75,6 @@ function run() { run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD echo "start task in incremental mode" - run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 TASK_NAME="incremental_test" cat $cur/conf/dm-task-increment.yaml >$WORK_DIR/dm-task.yaml @@ -89,9 +87,12 @@ function run() { sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/dm-task.yaml sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml dmctl_start_task $WORK_DIR/dm-task.yaml - + + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + # check table - run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry_times "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" 20 run_sql_tidb_with_retry "select count(1) from ${db2}.${tb} where name = 'aa';" "count(1): 2" # check column @@ -111,9 +112,9 @@ function run() { dmctl_stop_task $TASK_NAME $MASTER_PORT echo "start task in full mode and test not support" + run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD run_sql_file $cur/data/db1.prepare_err.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.prepare_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml TASK_NAME="full_err" @@ -147,11 +148,8 @@ function run() { fi echo "start task in incremantal mode and test not support" - run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD - run_sql_file $cur/data/db1.increment_err.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql_file $cur/data/db2.increment_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - + TASK_NAME="incremental_err" cat $cur/conf/dm-task-increment.yaml >$WORK_DIR/dm-task.yaml sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml @@ -164,6 +162,9 @@ function run() { sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml dmctl_start_task $WORK_DIR/dm-task.yaml + run_sql_file $cur/data/db1.increment_err.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + echo "task sync and will failed" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status ${TASK_NAME}" \ diff --git a/go.mod b/go.mod index 8c01ad2dbf4..47a668e87e2 100644 --- a/go.mod +++ b/go.mod @@ -56,10 +56,9 @@ require ( github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/kvproto v0.0.0-20211202065422-a412f7a319c3 github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 - github.com/pingcap/tidb v1.1.0-beta.0.20211204151954-e3e2e8d946f4 + github.com/pingcap/tidb v1.1.0-beta.0.20211209055157-9f744cdf8266 github.com/pingcap/tidb-tools v5.2.3-0.20211105044302-2dabb6641a6e+incompatible - github.com/pingcap/tidb/parser v0.0.0-20211204151954-e3e2e8d946f4 - github.com/pingcap/tipb v0.0.0-20211201080053-bd104bb270ba // indirect + github.com/pingcap/tidb/parser v0.0.0-20211209055157-9f744cdf8266 github.com/prometheus/client_golang v1.7.1 github.com/prometheus/client_model v0.2.0 github.com/r3labs/diff v1.1.0 @@ -75,7 +74,7 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 github.com/tidwall/gjson v1.9.1 github.com/tidwall/sjson v1.2.2 - github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211201083510-3a7675742ee5 + github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211206072923-c0e876615440 github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee github.com/tinylib/msgp v1.1.0 github.com/uber-go/atomic v1.4.0 diff --git a/go.sum b/go.sum index b99ed82df82..c9aed6bf381 100644 --- a/go.sum +++ b/go.sum @@ -833,8 +833,8 @@ github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041 github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 h1:7rvAtZe/ZUzOKzgriNPQoBNvleJXBk4z7L3Z47+tS98= github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5/go.mod h1:XsOaV712rUk63aOEKYP9PhXTIE3FMNHmC2r1wX5wElY= github.com/pingcap/tidb v1.1.0-beta.0.20211023132847-efa94595c071/go.mod h1:Ci7ABF58a4jn6YtaHi7655jP409edqC2JxWWFRqOubg= -github.com/pingcap/tidb v1.1.0-beta.0.20211204151954-e3e2e8d946f4 h1:axJl34ZMFxOGkobA7+O+fk2uIXU255pcgFu5HPKeh2I= -github.com/pingcap/tidb v1.1.0-beta.0.20211204151954-e3e2e8d946f4/go.mod h1:XBf5r8z1gxV9gQlaiikJ9B9xea4BKdcoUKx/QOZoq8Q= +github.com/pingcap/tidb v1.1.0-beta.0.20211209055157-9f744cdf8266 h1:tfSd06ZEBlJephWmWGtuZRInvETtGA5Tby3QcDsZ6VA= +github.com/pingcap/tidb v1.1.0-beta.0.20211209055157-9f744cdf8266/go.mod h1:R2E0ZlYqy7GLnRUmW5uqbi+jM3pRhtseEPqBuQZrmmA= github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo= github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= @@ -845,10 +845,9 @@ github.com/pingcap/tidb-tools v5.2.3-0.20211105044302-2dabb6641a6e+incompatible github.com/pingcap/tidb-tools v5.2.3-0.20211105044302-2dabb6641a6e+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e/go.mod h1:e1MGCA9Sg3T8jid8PKAEq5eYVuMMCq4n8gJ+Kqp4Plg= github.com/pingcap/tidb/parser v0.0.0-20211023132847-efa94595c071/go.mod h1:e1MGCA9Sg3T8jid8PKAEq5eYVuMMCq4n8gJ+Kqp4Plg= -github.com/pingcap/tidb/parser v0.0.0-20211204151954-e3e2e8d946f4 h1:/6Wp++NoMq1qvAEBImbReARkN0ND1889QgtEijO3Gtw= -github.com/pingcap/tidb/parser v0.0.0-20211204151954-e3e2e8d946f4/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= +github.com/pingcap/tidb/parser v0.0.0-20211209055157-9f744cdf8266 h1:6FuBP6k1AXX4pLiwBLkEqBb26ncwZmxUqoJEpc+WXGM= +github.com/pingcap/tidb/parser v0.0.0-20211209055157-9f744cdf8266/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= github.com/pingcap/tipb v0.0.0-20211008080435-3fd327dfce0e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= -github.com/pingcap/tipb v0.0.0-20211116093845-e9b045a0bdf8/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pingcap/tipb v0.0.0-20211201080053-bd104bb270ba h1:Tt5W/maVBUbG+wxg2nfc88Cqj/HiWYb0TJQ2Rfi0UOQ= github.com/pingcap/tipb v0.0.0-20211201080053-bd104bb270ba/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -1015,8 +1014,8 @@ github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhso github.com/tidwall/sjson v1.2.2 h1:H1Llj/C9G+BoUN2DsybLHjWvr9dx4Uazavf0sXQ+rOs= github.com/tidwall/sjson v1.2.2/go.mod h1:jmW2RZpbKuExPFUHeFSBMiovT9ZyOziEHDRkbsdp0B0= github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211011083157-49c8dd23f1f0/go.mod h1:00plYwQsQ5kBUmafHO+JkjznGgFaBokMZl82TZIbsQk= -github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211201083510-3a7675742ee5 h1:c0zWUeB8aog6TceRyFEjA2fYUJka8RNGBZWU5l7XF9g= -github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211201083510-3a7675742ee5/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= +github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211206072923-c0e876615440 h1:XHRkMms0v6uxUZqErwZbmAs7baVVyNcOC1oOSz+BGgc= +github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211206072923-c0e876615440/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8= github.com/tikv/pd v1.1.0-beta.0.20210323121136-78679e5e209d/go.mod h1:Jw9KG11C/23Rr7DW4XWQ7H5xOgGZo6DFL1OKAF4+Igw= github.com/tikv/pd v1.1.0-beta.0.20210818082359-acba1da0018d/go.mod h1:rammPjeZgpvfrQRPkijcx8tlxF1XM5+m6kRXrkDzCAA= github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ= From e3dcb0e4af93a293633c600216d1b1273d8998ba Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Mon, 13 Dec 2021 16:13:23 +0800 Subject: [PATCH 06/13] commit-message: fix integration test sync_collation --- dm/tests/_utils/run_sql | 2 +- dm/tests/sync_collation/run.sh | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dm/tests/_utils/run_sql b/dm/tests/_utils/run_sql index ed5609dc414..71e018db29f 100755 --- a/dm/tests/_utils/run_sql +++ b/dm/tests/_utils/run_sql @@ -12,4 +12,4 @@ if [[ "$2" = $TIDB_PORT ]]; then fi echo "[$(date)] Executing SQL: $1" >"$TEST_DIR/sql_res.$TEST_NAME.txt" -mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >>"$TEST_DIR/sql_res.$TEST_NAME.txt" +mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >>"$TEST_DIR/sql_res.$TEST_NAME.txt" 2>&1 diff --git a/dm/tests/sync_collation/run.sh b/dm/tests/sync_collation/run.sh index 909ec44bcad..458ad5b70a0 100755 --- a/dm/tests/sync_collation/run.sh +++ b/dm/tests/sync_collation/run.sh @@ -38,7 +38,7 @@ function run() { dmctl_start_task $WORK_DIR/dm-task.yaml # check table - run_sql_tidb_with_retry_times "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" 20 + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" run_sql_tidb_with_retry "select count(1) from ${db2}.${tb} where name = 'aa';" "count(1): 2" # check column @@ -92,7 +92,7 @@ function run() { run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 # check table - run_sql_tidb_with_retry_times "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" 20 + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" run_sql_tidb_with_retry "select count(1) from ${db2}.${tb} where name = 'aa';" "count(1): 2" # check column From 6b74e3726b271bb292c0e698b1336e5f66b8f2e5 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Mon, 13 Dec 2021 17:17:53 +0800 Subject: [PATCH 07/13] commit-message: integration sync_collation add wait task --- dm/tests/_utils/run_sql | 2 +- dm/tests/sync_collation/run.sh | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/dm/tests/_utils/run_sql b/dm/tests/_utils/run_sql index 71e018db29f..ed5609dc414 100755 --- a/dm/tests/_utils/run_sql +++ b/dm/tests/_utils/run_sql @@ -12,4 +12,4 @@ if [[ "$2" = $TIDB_PORT ]]; then fi echo "[$(date)] Executing SQL: $1" >"$TEST_DIR/sql_res.$TEST_NAME.txt" -mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >>"$TEST_DIR/sql_res.$TEST_NAME.txt" 2>&1 +mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >>"$TEST_DIR/sql_res.$TEST_NAME.txt" diff --git a/dm/tests/sync_collation/run.sh b/dm/tests/sync_collation/run.sh index 458ad5b70a0..3b1d2663fd5 100755 --- a/dm/tests/sync_collation/run.sh +++ b/dm/tests/sync_collation/run.sh @@ -37,6 +37,10 @@ function run() { sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml dmctl_start_task $WORK_DIR/dm-task.yaml + # wait + run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db}' and TABLE_NAME = '${tb}';" "count(1): 1" + run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db2}' and TABLE_NAME = '${tb}';" "count(1): 1" + # check table run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" run_sql_tidb_with_retry "select count(1) from ${db2}.${tb} where name = 'aa';" "count(1): 2" @@ -91,6 +95,10 @@ function run() { run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + # wait + run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db}' and TABLE_NAME = '${tb}';" "count(1): 1" + run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db2}' and TABLE_NAME = '${tb}';" "count(1): 1" + # check table run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" run_sql_tidb_with_retry "select count(1) from ${db2}.${tb} where name = 'aa';" "count(1): 2" From b78078ddfd09a267a5a80b74169af6fe2d2e7294 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Mon, 13 Dec 2021 17:19:04 +0800 Subject: [PATCH 08/13] commit-message: integration sync_collation add wait task --- dm/tests/sync_collation/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dm/tests/sync_collation/run.sh b/dm/tests/sync_collation/run.sh index 3b1d2663fd5..a736105bc2c 100755 --- a/dm/tests/sync_collation/run.sh +++ b/dm/tests/sync_collation/run.sh @@ -37,7 +37,7 @@ function run() { sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml dmctl_start_task $WORK_DIR/dm-task.yaml - # wait + # wait run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db}' and TABLE_NAME = '${tb}';" "count(1): 1" run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db2}' and TABLE_NAME = '${tb}';" "count(1): 1" @@ -95,7 +95,7 @@ function run() { run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - # wait + # wait run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db}' and TABLE_NAME = '${tb}';" "count(1): 1" run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db2}' and TABLE_NAME = '${tb}';" "count(1): 1" From 9fe5dd2448fd5b6c9c3417597aea4becf1c5f648 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Wed, 15 Dec 2021 17:05:20 +0800 Subject: [PATCH 09/13] commit-message: fix comment bugs with out integration test --- dm/pkg/utils/db.go | 51 ------------------- dm/syncer/dbconn/upstream_db.go | 45 ++++++++++++++++- dm/syncer/ddl.go | 17 ++++--- dm/syncer/ddl_test.go | 89 ++++++++++++++++++++++++++------- dm/syncer/syncer.go | 9 ++-- 5 files changed, 131 insertions(+), 80 deletions(-) diff --git a/dm/pkg/utils/db.go b/dm/pkg/utils/db.go index 87563cbd081..06bc1c6da97 100644 --- a/dm/pkg/utils/db.go +++ b/dm/pkg/utils/db.go @@ -391,57 +391,6 @@ func GetServerUnixTS(ctx context.Context, db *sql.DB) (int64, error) { return ts, err } -// GetCharsetAndCollationInfo gets charset and collation info. -func GetCharsetAndCollationInfo(ctx context.Context, db *sql.DB) (map[string]string, map[int]string, error) { - charsetAndDefaultCollation := make(map[string]string) - idAndCollationMap := make(map[int]string) - - conn, err := db.Conn(ctx) - if err != nil { - return nil, nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) - } - defer conn.Close() - - // Show an example. - /* - mysql> SELECT COLLATION_NAME,CHARACTER_SET_NAME,ID,IS_DEFAULT from INFORMATION_SCHEMA.COLLATIONS; - +----------------------------+--------------------+-----+------------+ - | COLLATION_NAME | CHARACTER_SET_NAME | ID | IS_DEFAULT | - +----------------------------+--------------------+-----+------------+ - | armscii8_general_ci | armscii8 | 32 | Yes | - | armscii8_bin | armscii8 | 64 | | - | ascii_general_ci | ascii | 11 | Yes | - | ascii_bin | ascii | 65 | | - | big5_chinese_ci | big5 | 1 | Yes | - | big5_bin | big5 | 84 | | - | binary | binary | 63 | Yes | - +----------------------------+--------------------+-----+------------+ - */ - - rows, err := conn.QueryContext(ctx, "SELECT COLLATION_NAME,CHARACTER_SET_NAME,ID,IS_DEFAULT from INFORMATION_SCHEMA.COLLATIONS") - if err != nil { - return nil, nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) - } - - defer rows.Close() - for rows.Next() { - var collation, charset, isDefault string - var id int - if scanErr := rows.Scan(&collation, &charset, &id, &isDefault); scanErr != nil { - return nil, nil, terror.DBErrorAdapt(scanErr, terror.ErrDBDriverError) - } - idAndCollationMap[id] = strings.ToLower(collation) - if strings.ToLower(isDefault) == "yes" { - charsetAndDefaultCollation[strings.ToLower(charset)] = collation - } - } - - if err = rows.Close(); err != nil { - return nil, nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) - } - return charsetAndDefaultCollation, idAndCollationMap, err -} - // GetSchemaList gets db schema list with `SHOW DATABASES`. func GetSchemaList(ctx context.Context, db *sql.DB) ([]string, error) { schemaList := []string{} diff --git a/dm/syncer/dbconn/upstream_db.go b/dm/syncer/dbconn/upstream_db.go index 7889e4ad973..8577605d542 100644 --- a/dm/syncer/dbconn/upstream_db.go +++ b/dm/syncer/dbconn/upstream_db.go @@ -15,6 +15,7 @@ package dbconn import ( "context" + "strings" "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/failpoint" @@ -72,8 +73,48 @@ func (conn *UpStreamConn) GetServerUnixTS(ctx context.Context) (int64, error) { } // GetCharsetAndDefaultCollation returns charset and collation info. -func (conn *UpStreamConn) GetCharsetAndCollationInfo(ctx context.Context) (map[string]string, map[int]string, error) { - return utils.GetCharsetAndCollationInfo(ctx, conn.BaseDB.DB) +func GetCharsetAndCollationInfo(tctx *tcontext.Context, conn *DBConn) (map[string]string, map[int]string, error) { + charsetAndDefaultCollation := make(map[string]string) + idAndCollationMap := make(map[int]string) + + // Show an example. + /* + mysql> SELECT COLLATION_NAME,CHARACTER_SET_NAME,ID,IS_DEFAULT from INFORMATION_SCHEMA.COLLATIONS; + +----------------------------+--------------------+-----+------------+ + | COLLATION_NAME | CHARACTER_SET_NAME | ID | IS_DEFAULT | + +----------------------------+--------------------+-----+------------+ + | armscii8_general_ci | armscii8 | 32 | Yes | + | armscii8_bin | armscii8 | 64 | | + | ascii_general_ci | ascii | 11 | Yes | + | ascii_bin | ascii | 65 | | + | big5_chinese_ci | big5 | 1 | Yes | + | big5_bin | big5 | 84 | | + | binary | binary | 63 | Yes | + +----------------------------+--------------------+-----+------------+ + */ + + rows, err := conn.QuerySQL(tctx, "SELECT COLLATION_NAME,CHARACTER_SET_NAME,ID,IS_DEFAULT from INFORMATION_SCHEMA.COLLATIONS") + if err != nil { + return nil, nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + + defer rows.Close() + for rows.Next() { + var collation, charset, isDefault string + var id int + if scanErr := rows.Scan(&collation, &charset, &id, &isDefault); scanErr != nil { + return nil, nil, terror.DBErrorAdapt(scanErr, terror.ErrDBDriverError) + } + idAndCollationMap[id] = strings.ToLower(collation) + if strings.ToLower(isDefault) == "yes" { + charsetAndDefaultCollation[strings.ToLower(charset)] = collation + } + } + + if err = rows.Close(); err != nil { + return nil, nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) + } + return charsetAndDefaultCollation, idAndCollationMap, err } // GetParser returns the parser with correct flag for upstream. diff --git a/dm/syncer/ddl.go b/dm/syncer/ddl.go index 5eb8b2cc22c..6f1c63991e3 100644 --- a/dm/syncer/ddl.go +++ b/dm/syncer/ddl.go @@ -202,11 +202,11 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte if createStmt.ReferTable != nil { return } + adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) var justCharset string for _, tableOption := range createStmt.Options { // already have 'Collation' if tableOption.Tp == ast.TableOptionCollate { - adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) return } if tableOption.Tp == ast.TableOptionCharset { @@ -215,23 +215,21 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte } if justCharset == "" { tctx.L().Warn("detect create table risk which use implicit charset and collation", zap.String("originSQL", ddlInfo.originDDL)) - adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) return } // just has charset, can add collation by charset and default collation map collation, ok := charsetAndDefaultCollationMap[strings.ToLower(justCharset)] if !ok { tctx.L().Warn("not found charset default collation.", zap.String("originSQL", ddlInfo.originDDL), zap.String("charset", strings.ToLower(justCharset))) - adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) return } tctx.L().Info("detect create table risk which use explicit charset and implicit collation, we will add collation by INFORMATION_SCHEMA.COLLATIONS", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation)) createStmt.Options = append(createStmt.Options, &ast.TableOption{Tp: ast.TableOptionCollate, StrValue: collation}) - adjustColumnsCollation(tctx, createStmt, charsetAndDefaultCollationMap) case *ast.CreateDatabaseStmt: var justCharset, collation string var ok bool + var err error for _, createOption := range createStmt.Options { // already have 'Collation' if createOption.Tp == ast.DatabaseOptionCollate { @@ -253,7 +251,13 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte } else { // has no charset and collation // add collation by server collation from binlog statusVars - collation, _ = event.GetServerCollationByStatusVars(statusVars, idAndCollationMap) + collation, err = event.GetServerCollationByStatusVars(statusVars, idAndCollationMap) + if err != nil { + tctx.L().Error("can not get charset server collation from binlog statusVars.", zap.Error(err), zap.String("originSQL", ddlInfo.originDDL)) + if collation == "" { + return + } + } // add collation tctx.L().Info("detect create database risk which use implicit charset and collation, we will add collation by binlog status_vars", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation)) } @@ -271,6 +275,7 @@ func adjustColumnsCollation(tctx *tcontext.Context, createStmt *ast.CreateTableS } } fieldType := col.Tp + // already have 'Collation' if fieldType.Collate != "" { continue } @@ -281,7 +286,7 @@ func adjustColumnsCollation(tctx *tcontext.Context, createStmt *ast.CreateTableS tctx.L().Warn("not found charset default collation for column.", zap.String("table", createStmt.Table.Name.String()), zap.String("column", col.Name.String()), zap.String("charset", strings.ToLower(fieldType.Charset))) continue } - fieldType.Collate = collation + col.Options = append(col.Options, &ast.ColumnOption{Tp: ast.ColumnOptionCollate, StrValue: collation}) } } } diff --git a/dm/syncer/ddl_test.go b/dm/syncer/ddl_test.go index 54f484afe39..f6b7792cc50 100644 --- a/dm/syncer/ddl_test.go +++ b/dm/syncer/ddl_test.go @@ -639,51 +639,107 @@ func (s *testDDLSuite) TestClearOnlineDDL(c *C) { c.Assert(mock.toFinish, HasLen, 0) } -func (s *testDDLSuite) TestAdjustCollation(c *C) { - // duplicate with pkg/parser +func (s *testDDLSuite) TestAdjustDatabaseCollation(c *C) { + statusVarsArray := [][]byte{ + { + 4, 0, 0, 0, 0, 46, 0, + }, + { + 4, 0, 0, 0, 0, 21, 1, + }, + } + sqls := []string{ "create database if not exists `test`", - "create database if not exists `test`", + "create database `test` CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci", + "create database `test` CHARACTER SET=utf8mb4", + "create database `test` COLLATE=utf8mb4_general_ci", + } + + expectedSQLs := [][]string{ + { + "CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_bin", + "CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci", + "CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci", + "CREATE DATABASE `test` COLLATE = utf8mb4_general_ci", + }, + { + "CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_vi_0900_ai_ci", + "CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci", + "CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci", + "CREATE DATABASE `test` COLLATE = utf8mb4_general_ci", + }, + } + + tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestAdjustTableCollation"))) + syncer := NewSyncer(&config.SubTaskConfig{}, nil, nil) + syncer.tctx = tctx + p := parser.New() + tab := &filter.Table{ + Schema: "test", + Name: "t", + } + + charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci"} + idAndCollationMap := map[int]string{46: "utf8mb4_bin", 277: "utf8mb4_vi_0900_ai_ci"} + for i, statusVars := range statusVarsArray { + for j, sql := range sqls { + ddlInfo := &ddlInfo{ + originDDL: sql, + routedDDL: sql, + sourceTables: []*filter.Table{tab}, + targetTables: []*filter.Table{tab}, + } + stmt, err := p.ParseOneStmt(sql, "", "") + c.Assert(err, IsNil) + c.Assert(stmt, NotNil) + ddlInfo.originStmt = stmt + adjustCollation(tctx, ddlInfo, statusVars, charsetAndDefaultCollationMap, idAndCollationMap) + routedDDL, err := parserpkg.RenameDDLTable(ddlInfo.originStmt, ddlInfo.targetTables) + c.Assert(err, IsNil) + c.Assert(routedDDL, Equals, expectedSQLs[i][j]) + } + } +} + +func (s *testDDLSuite) TestAdjustCollation(c *C) { + sqls := []string{ "create table `test`.`t1` (id int) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci", "create table `test`.`t1` (id int) CHARSET=utf8mb4", "create table `test`.`t1` (id int) COLLATE=utf8mb4_general_ci", "create table `test`.`t1` (id int)", - "create database `test` CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci", - "create database `test` CHARACTER SET=utf8mb4", - "create database `test` COLLATE=utf8mb4_general_ci", "create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20))", "create table `test`.`t1` (id int, name varchar(20), work varchar(20))", "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20))", "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4)", - "create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) COLLATE=utf8mb4_general_ci ", + "create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) COLLATE=utf8mb4_general_ci", + "create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) COLLATE=latin1_swedish_ci", "create table `test`.`t1` (id int, name varchar(20), work varchar(20)) COLLATE=utf8mb4_general_ci", "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20)) COLLATE=utf8mb4_general_ci", "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4) COLLATE=utf8mb4_general_ci", "create table `test`.`t1` (id int, name varchar(20) CHARACTER SET utf8mb4, work varchar(20)) CHARSET=utf8mb4 ", + "create table `test`.`t1` (id int, name varchar(20) CHARACTER SET latin1, work varchar(20)) CHARSET=utf8mb4 ", "create table `test`.`t1` (id int, name varchar(20), work varchar(20)) CHARSET=utf8mb4", "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20)) CHARSET=utf8mb4", "create table `test`.`t1` (id int, name varchar(20) COLLATE utf8mb4_general_ci, work varchar(20) CHARACTER SET utf8mb4) CHARSET=utf8mb4", } expectedSQLs := []string{ - "CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_vi_0900_ai_ci", - "CREATE DATABASE IF NOT EXISTS `test` COLLATE = utf8mb4_bin", "CREATE TABLE `test`.`t` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT) DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT)", - "CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci", - "CREATE DATABASE `test` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci", - "CREATE DATABASE `test` COLLATE = utf8mb4_general_ci", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20))", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20))", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20))", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci)", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT COLLATE = UTF8MB4_GENERAL_CI", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT COLLATE = LATIN1_SWEDISH_CI", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20)) DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci) DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", + "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) CHARACTER SET LATIN1 COLLATE latin1_swedish_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20),`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20)) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", "CREATE TABLE `test`.`t` (`id` INT,`name` VARCHAR(20) COLLATE utf8mb4_general_ci,`work` VARCHAR(20) CHARACTER SET UTF8MB4 COLLATE utf8mb4_general_ci) DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_GENERAL_CI", @@ -697,13 +753,10 @@ func (s *testDDLSuite) TestAdjustCollation(c *C) { Schema: "test", Name: "t", } - statusVars := []byte{4, 0, 0, 0, 0, 21, 1} - charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci"} - idAndCollationMap := map[int]string{46: "utf8mb4_bin", 277: "utf8mb4_vi_0900_ai_ci"} + statusVars := []byte{4, 0, 0, 0, 0, 46, 0} + charsetAndDefaultCollationMap := map[string]string{"utf8mb4": "utf8mb4_general_ci", "latin1": "latin1_swedish_ci"} + idAndCollationMap := map[int]string{46: "utf8mb4_bin"} for i, sql := range sqls { - if i == 1 { - statusVars = []byte{4, 0, 0, 0, 0, 46, 0} - } ddlInfo := &ddlInfo{ originDDL: sql, routedDDL: sql, diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 43930e3bc1b..2d39d929f83 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -140,7 +140,8 @@ type Syncer struct { schemaTracker *schema.Tracker - fromDB *dbconn.UpStreamConn + fromDB *dbconn.UpStreamConn + fromConn *dbconn.DBConn toDB *conn.BaseDB toDBConns []*dbconn.DBConn @@ -328,7 +329,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return terror.ErrSchemaTrackerInit.Delegate(err) } - s.charsetAndDefaultCollation, s.idAndCollationMap, err = s.fromDB.GetCharsetAndCollationInfo(ctx) + s.charsetAndDefaultCollation, s.idAndCollationMap, err = dbconn.GetCharsetAndCollationInfo(tctx, s.fromConn) if err != nil { return err } @@ -2935,10 +2936,12 @@ func (s *Syncer) createDBs(ctx context.Context) error { var err error dbCfg := s.cfg.From dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxDMLConnectionTimeout) - s.fromDB, err = dbconn.NewUpStreamConn(&dbCfg) + fromDB, fromConns, err := dbconn.CreateConns(s.tctx, s.cfg, &dbCfg, 1) if err != nil { return err } + s.fromDB = &dbconn.UpStreamConn{BaseDB: fromDB} + s.fromConn = fromConns[0] conn, err := s.fromDB.BaseDB.GetBaseConn(ctx) if err != nil { return err From 965617403cab0386272737133047399eb7387b3a Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Thu, 16 Dec 2021 15:07:33 +0800 Subject: [PATCH 10/13] commit-message: --- .../conf/dm-task-increment.yaml | 105 ------------- dm/tests/sync_collation/conf/dm-task.yaml | 36 +---- dm/tests/sync_collation/data/clean_data.sql | 4 +- .../sync_collation/data/db1.increment.sql | 12 +- .../sync_collation/data/db1.increment_err.sql | 4 +- .../sync_collation/data/db1.prepare_err.sql | 4 - .../sync_collation/data/db2.increment.sql | 12 +- .../sync_collation/data/db2.increment_err.sql | 4 +- dm/tests/sync_collation/data/db2.prepare.sql | 6 +- .../sync_collation/data/db2.prepare_err.sql | 10 +- .../tidb.checktable.increment.prepare.sql | 6 + .../data/tidb.checktable.prepare.sql | 3 + .../data/tidb.checktable.prepare2.sql | 3 - dm/tests/sync_collation/run.sh | 139 +++++------------- 14 files changed, 76 insertions(+), 272 deletions(-) delete mode 100644 dm/tests/sync_collation/conf/dm-task-increment.yaml create mode 100644 dm/tests/sync_collation/data/tidb.checktable.increment.prepare.sql delete mode 100644 dm/tests/sync_collation/data/tidb.checktable.prepare2.sql diff --git a/dm/tests/sync_collation/conf/dm-task-increment.yaml b/dm/tests/sync_collation/conf/dm-task-increment.yaml deleted file mode 100644 index 16f76e63143..00000000000 --- a/dm/tests/sync_collation/conf/dm-task-increment.yaml +++ /dev/null @@ -1,105 +0,0 @@ ---- -name: task-name-placeholder -task-mode: incremental -is-sharding: false -meta-schema: "dm_meta" -# enable-heartbeat: true -heartbeat-update-interval: 1 -heartbeat-report-interval: 1 -clean-dump-file: false - -target-database: - host: "127.0.0.1" - port: 4000 - user: "root" - password: "" - -mysql-instances: - - source-id: "mysql-replica-01" - meta: - binlog-name: binlog-name-placeholder-1 - binlog-pos: binlog-pos-placeholder-1 - binlog-gtid: binlog-gtid-placeholder-1 - block-allow-list: "instance" - route-rules: [ "collation-table-rules-1",collation-table-rules-1-2,"collation-schema-rules-1","collation-table-rules-increment-1","collation-schema-rules-increment-1"] - mydumper-config-name: "global" - loader-config-name: "global" - syncer-config-name: "global" - - - source-id: "mysql-replica-02" - meta: - binlog-name: binlog-name-placeholder-2 - binlog-pos: binlog-pos-placeholder-2 - binlog-gtid: binlog-gtid-placeholder-2 - block-allow-list: "instance" - route-rules: [ "collation-table-rules-2",collation-table-rules-2-2,"collation-schema-rules-2","collation-table-rules-increment-2","collation-schema-rules-increment-2"] - mydumper-config-name: "global" - loader-config-name: "global" - syncer-config-name: "global" - -block-allow-list: - instance: - do-dbs: ["sync_collation*"] - -routes: - collation-table-rules-1: - schema-pattern: "sync_collation" - target-schema: "sync_collation" - table-pattern: "t1" - target-table: "t" - collation-table-rules-1-2: - schema-pattern: "sync_collation" - target-schema: "sync_collation" - table-pattern: "t2" - target-table: "t2" - collation-schema-rules-1: - schema-pattern: "sync_collation" - target-schema: "sync_collation" - collation-table-rules-2: - schema-pattern: "sync_collation" - target-schema: "sync_collation2" - table-pattern: "t1" - target-table: "t" - collation-table-rules-2-2: - schema-pattern: "sync_collation" - target-schema: "sync_collation2" - table-pattern: "t2" - target-table: "t2" - collation-schema-rules-2: - schema-pattern: "sync_collation" - target-schema: "sync_collation2" - - collation-table-rules-increment-1: - schema-pattern: "sync_collation_increment" - target-schema: "sync_collation_increment" - table-pattern: "t1" - target-table: "t" - collation-schema-rules-increment-1: - schema-pattern: "sync_collation_increment" - target-schema: "sync_collation_increment" - collation-table-rules-increment-2: - schema-pattern: "sync_collation_increment" - target-schema: "sync_collation_increment2" - table-pattern: "t1" - target-table: "t" - collation-schema-rules-increment-2: - schema-pattern: "sync_collation_increment" - target-schema: "sync_collation_increment2" - - -mydumpers: - global: - threads: 4 - chunk-filesize: 64 - skip-tz-utc: true - extra-args: "" - -loaders: - global: - pool-size: 16 - dir: "./dumped_data" - -syncers: - global: - worker-count: 16 - batch: 100 diff --git a/dm/tests/sync_collation/conf/dm-task.yaml b/dm/tests/sync_collation/conf/dm-task.yaml index a1f9c2b3a99..9cb231f72b3 100644 --- a/dm/tests/sync_collation/conf/dm-task.yaml +++ b/dm/tests/sync_collation/conf/dm-task.yaml @@ -1,6 +1,6 @@ --- -name: task-name-placeholder -task-mode: full +name: sync_collation +task-mode: all is-sharding: false meta-schema: "dm_meta" # enable-heartbeat: true @@ -17,49 +17,19 @@ target-database: mysql-instances: - source-id: "mysql-replica-01" block-allow-list: "instance" - route-rules: [ "collation-table-rules-1","collation-table-rules-1-2","collation-schema-rules-1" ] mydumper-config-name: "global" loader-config-name: "global" syncer-config-name: "global" - source-id: "mysql-replica-02" block-allow-list: "instance" - route-rules: [ "collation-table-rules-2","collation-table-rules-2-2","collation-schema-rules-2" ] mydumper-config-name: "global" loader-config-name: "global" syncer-config-name: "global" block-allow-list: instance: - do-dbs: ["sync_collation"] - -routes: - collation-table-rules-1: - schema-pattern: "sync_collation*" - target-schema: "sync_collation" - table-pattern: "t1" - target-table: "t" - collation-table-rules-1-2: - schema-pattern: "sync_collation*" - target-schema: "sync_collation" - table-pattern: "t2" - target-table: "t2" - collation-schema-rules-1: - schema-pattern: "sync_collation*" - target-schema: "sync_collation" - collation-table-rules-2: - schema-pattern: "sync_collation*" - target-schema: "sync_collation2" - table-pattern: "t1" - target-table: "t" - collation-table-rules-2-2: - schema-pattern: "sync_collation*" - target-schema: "sync_collation2" - table-pattern: "t2" - target-table: "t2" - collation-schema-rules-2: - schema-pattern: "sync_collation*" - target-schema: "sync_collation2" + do-dbs: ["sync_collation*"] mydumpers: global: diff --git a/dm/tests/sync_collation/data/clean_data.sql b/dm/tests/sync_collation/data/clean_data.sql index 646db5fb6b7..2dcf97a00d3 100644 --- a/dm/tests/sync_collation/data/clean_data.sql +++ b/dm/tests/sync_collation/data/clean_data.sql @@ -1,4 +1,6 @@ drop database if exists `sync_collation`; drop database if exists `sync_collation2`; drop database if exists `sync_collation_increment`; -drop database if exists `sync_collation_increment2`; \ No newline at end of file +drop database if exists `sync_collation_increment2`; +drop database if exists `sync_collation_server`; +drop database if exists `sync_collation_server2`; diff --git a/dm/tests/sync_collation/data/db1.increment.sql b/dm/tests/sync_collation/data/db1.increment.sql index dfe8ac3ecdb..dffe9f0ea94 100644 --- a/dm/tests/sync_collation/data/db1.increment.sql +++ b/dm/tests/sync_collation/data/db1.increment.sql @@ -1,13 +1,13 @@ -drop database if exists `sync_collation`; -create database `sync_collation` character set utf8; -use `sync_collation`; +drop database if exists `sync_collation_increment`; +create database `sync_collation_increment` character set utf8; +use `sync_collation_increment`; create table t1 (id int, name varchar(20), primary key(`id`)) character set utf8; insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)) character set latin1 collate latin1_bin; insert into t2 (id, name) values (1, 'Aa'), (2, 'aA'); set collation_server = utf8_general_ci; -drop database if exists `sync_collation_increment`; -create database `sync_collation_increment`; -use `sync_collation_increment`; +drop database if exists `sync_collation_server`; +create database `sync_collation_server`; +use `sync_collation_server`; create table t1 (id int, name varchar(20), primary key(`id`)); insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); \ No newline at end of file diff --git a/dm/tests/sync_collation/data/db1.increment_err.sql b/dm/tests/sync_collation/data/db1.increment_err.sql index 15e3878432b..b02217031ba 100644 --- a/dm/tests/sync_collation/data/db1.increment_err.sql +++ b/dm/tests/sync_collation/data/db1.increment_err.sql @@ -1,3 +1,3 @@ set collation_server = latin1_swedish_ci; -drop database if exists `sync_collation`; -create database `sync_collation`; \ No newline at end of file +drop database if exists `sync_collation_increment`; +create database `sync_collation_increment`; \ No newline at end of file diff --git a/dm/tests/sync_collation/data/db1.prepare_err.sql b/dm/tests/sync_collation/data/db1.prepare_err.sql index 616a34abfaa..cddcc949396 100644 --- a/dm/tests/sync_collation/data/db1.prepare_err.sql +++ b/dm/tests/sync_collation/data/db1.prepare_err.sql @@ -2,7 +2,3 @@ set collation_server = latin1_swedish_ci; drop database if exists `sync_collation`; create database `sync_collation`; use `sync_collation`; -create table t1 (id int, name varchar(20), primary key(`id`)); -insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); -create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)); -insert into t2 (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/data/db2.increment.sql b/dm/tests/sync_collation/data/db2.increment.sql index eaf03128f93..d31ff16947c 100644 --- a/dm/tests/sync_collation/data/db2.increment.sql +++ b/dm/tests/sync_collation/data/db2.increment.sql @@ -1,13 +1,13 @@ -drop database if exists `sync_collation`; -create database `sync_collation` character set utf8; -use `sync_collation`; +drop database if exists `sync_collation_increment2`; +create database `sync_collation_increment2` character set utf8; +use `sync_collation_increment2`; create table t1 (id int, name varchar(20), primary key(`id`)) character set utf8; insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)) character set latin1 collate latin1_bin; insert into t2 (id, name) values (1, 'Aa'), (2, 'aA'); set collation_server = utf8_general_ci; -drop database if exists `sync_collation_increment`; -create database `sync_collation_increment`; -use `sync_collation_increment`; +drop database if exists `sync_collation_server2`; +create database `sync_collation_server2`; +use `sync_collation_server2`; create table t1 (id int, name varchar(20), primary key(`id`)); insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/data/db2.increment_err.sql b/dm/tests/sync_collation/data/db2.increment_err.sql index 9f1ff3ccdcf..fbd44e57747 100644 --- a/dm/tests/sync_collation/data/db2.increment_err.sql +++ b/dm/tests/sync_collation/data/db2.increment_err.sql @@ -1,3 +1,3 @@ set collation_server = utf8mb4_0900_ai_ci; -drop database if exists `sync_collation`; -create database `sync_collation`; +drop database if exists `sync_collation_increment2`; +create database `sync_collation_increment2`; diff --git a/dm/tests/sync_collation/data/db2.prepare.sql b/dm/tests/sync_collation/data/db2.prepare.sql index 4ce27c05f52..e67b60de547 100644 --- a/dm/tests/sync_collation/data/db2.prepare.sql +++ b/dm/tests/sync_collation/data/db2.prepare.sql @@ -1,6 +1,6 @@ -drop database if exists `sync_collation`; -create database `sync_collation` character set utf8; -use `sync_collation`; +drop database if exists `sync_collation2`; +create database `sync_collation2` character set utf8; +use `sync_collation2`; create table t1 (id int, name varchar(20), primary key(`id`)) character set utf8; insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)) character set latin1 collate latin1_bin; diff --git a/dm/tests/sync_collation/data/db2.prepare_err.sql b/dm/tests/sync_collation/data/db2.prepare_err.sql index 0679bd912f2..0de2d569389 100644 --- a/dm/tests/sync_collation/data/db2.prepare_err.sql +++ b/dm/tests/sync_collation/data/db2.prepare_err.sql @@ -1,8 +1,4 @@ set collation_server = utf8mb4_0900_ai_ci; -drop database if exists `sync_collation`; -create database `sync_collation`; -use `sync_collation`; -create table t1 (id int, name varchar(20), primary key(`id`)); -insert into t1 (id, name) values (1, 'Aa'), (2, 'aA'); -create table t2 (id int, name varchar(20) character set utf8, primary key(`id`)); -insert into t2 (id, name) values (1, 'Aa'), (2, 'aA'); +drop database if exists `sync_collation2`; +create database `sync_collation2`; +use `sync_collation2`; diff --git a/dm/tests/sync_collation/data/tidb.checktable.increment.prepare.sql b/dm/tests/sync_collation/data/tidb.checktable.increment.prepare.sql new file mode 100644 index 00000000000..fde2a87a791 --- /dev/null +++ b/dm/tests/sync_collation/data/tidb.checktable.increment.prepare.sql @@ -0,0 +1,6 @@ +use `sync_collation_increment`; +create table t_check (id int, name varchar(20), primary key (`id`)); +insert into t_check (id, name) values (1, 'Aa'), (2, 'aA'); +use `sync_collation_increment2`; +create table t_check (id int, name varchar(20), primary key (`id`)); +insert into t_check (id, name) values (1, 'Aa'), (2, 'aA'); \ No newline at end of file diff --git a/dm/tests/sync_collation/data/tidb.checktable.prepare.sql b/dm/tests/sync_collation/data/tidb.checktable.prepare.sql index a5147aae882..ef1fd468038 100644 --- a/dm/tests/sync_collation/data/tidb.checktable.prepare.sql +++ b/dm/tests/sync_collation/data/tidb.checktable.prepare.sql @@ -1,3 +1,6 @@ use `sync_collation`; create table t_check (id int, name varchar(20), primary key (`id`)); insert into t_check (id, name) values (1, 'Aa'), (2, 'aA'); +use `sync_collation2`; +create table t_check (id int, name varchar(20), primary key (`id`)); +insert into t_check (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/data/tidb.checktable.prepare2.sql b/dm/tests/sync_collation/data/tidb.checktable.prepare2.sql deleted file mode 100644 index 947a85fd9be..00000000000 --- a/dm/tests/sync_collation/data/tidb.checktable.prepare2.sql +++ /dev/null @@ -1,3 +0,0 @@ -use `sync_collation2`; -create table t_check (id int, name varchar(20), primary key (`id`)); -insert into t_check (id, name) values (1, 'Aa'), (2, 'aA'); diff --git a/dm/tests/sync_collation/run.sh b/dm/tests/sync_collation/run.sh index a736105bc2c..993ded1f5df 100755 --- a/dm/tests/sync_collation/run.sh +++ b/dm/tests/sync_collation/run.sh @@ -5,19 +5,18 @@ set -eu cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME +TASK_NAME="sync_collation" db="sync_collation" db_increment="sync_collation_increment" +db_server_collation="sync_collation_server" db2="sync_collation2" db_increment2="sync_collation_increment2" -tb="t" +db_server_collation2="sync_collation_server2" +tb="t1" tb2="t2" tb_check="t_check" function run() { - run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD - run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml @@ -31,12 +30,18 @@ function run() { dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - echo "start task in full mode" + echo "prepare data" + run_sql_file $cur/data/clean_data.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/clean_data.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + + echo "start task" cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml - TASK_NAME="full_test" - sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml - dmctl_start_task $WORK_DIR/dm-task.yaml + dmctl_start_task $WORK_DIR/dm-task.yaml "--remove-meta" + echo "check full phase" # wait run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db}' and TABLE_NAME = '${tb}';" "count(1): 1" run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db2}' and TABLE_NAME = '${tb}';" "count(1): 1" @@ -52,134 +57,68 @@ function run() { # check database by create table run_sql_file $cur/data/tidb.checktable.prepare.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD run_sql_tidb_with_retry "select count(1) from ${db}.${tb_check} where name = 'aa';" "count(1): 2" - run_sql_file $cur/data/tidb.checktable.prepare2.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD run_sql_tidb_with_retry "select count(1) from ${db2}.${tb_check} where name = 'aa';" "count(1): 2" - echo "stop task ${TASK_NAME}" - dmctl_stop_task $TASK_NAME $MASTER_PORT - - # $worker1_run_source_1 > 0 means source1 is operated to worker1 - worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true - if [ $worker1_run_source_1 -gt 0 ]; then - name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - else - name2=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos2=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid2=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - name1=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos1=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid1=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - fi - - run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD - - echo "start task in incremental mode" - - TASK_NAME="incremental_test" - cat $cur/conf/dm-task-increment.yaml >$WORK_DIR/dm-task.yaml - sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-name-placeholder-1/$name1/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-pos-placeholder-1/$pos1/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-gtid-placeholder-1/$gtid1/g" $WORK_DIR/dm-task.yaml - - sed -i "s/binlog-name-placeholder-2/$name2/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml - dmctl_start_task $WORK_DIR/dm-task.yaml - + echo "prepare incremental data" run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + echo "check incremental phase" # wait - run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db}' and TABLE_NAME = '${tb}';" "count(1): 1" - run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db2}' and TABLE_NAME = '${tb}';" "count(1): 1" + run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db_increment}' and TABLE_NAME = '${tb}';" "count(1): 1" + run_sql_tidb_with_retry " select count(1) from information_schema.tables where TABLE_SCHEMA='${db_increment2}' and TABLE_NAME = '${tb}';" "count(1): 1" # check table - run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where name = 'aa';" "count(1): 2" - run_sql_tidb_with_retry "select count(1) from ${db2}.${tb} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db_increment}.${tb} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db_increment2}.${tb} where name = 'aa';" "count(1): 2" # check column - run_sql_tidb_with_retry "select count(1) from ${db}.${tb2} where name = 'aa';" "count(1): 2" - run_sql_tidb_with_retry "select count(1) from ${db2}.${tb2} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db_increment}.${tb2} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db_increment2}.${tb2} where name = 'aa';" "count(1): 2" # check database by create table - run_sql_file $cur/data/tidb.checktable.prepare.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD - run_sql_tidb_with_retry "select count(1) from ${db}.${tb_check} where name = 'aa';" "count(1): 2" - run_sql_file $cur/data/tidb.checktable.prepare2.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD - run_sql_tidb_with_retry "select count(1) from ${db2}.${tb_check} where name = 'aa';" "count(1): 2" + run_sql_file $cur/data/tidb.checktable.increment.prepare.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + run_sql_tidb_with_retry "select count(1) from ${db_increment}.${tb_check} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db_increment2}.${tb_check} where name = 'aa';" "count(1): 2" # check set server collation - run_sql_tidb_with_retry "select count(1) from ${db_increment}.${tb} where name = 'aa';" "count(1): 2" - run_sql_tidb_with_retry "select count(1) from ${db_increment2}.${tb} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db_server_collation}.${tb} where name = 'aa';" "count(1): 2" + run_sql_tidb_with_retry "select count(1) from ${db_server_collation2}.${tb} where name = 'aa';" "count(1): 2" dmctl_stop_task $TASK_NAME $MASTER_PORT - echo "start task in full mode and test not support" + echo "prepare data for full phase error test" + run_sql_file $cur/data/clean_data.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/clean_data.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD run_sql_file $cur/data/db1.prepare_err.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.prepare_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml - TASK_NAME="full_err" - sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml + dmctl_start_task $WORK_DIR/dm-task.yaml "--remove-meta" - echo "start task and will failed" + echo "check full phase error" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-task ${WORK_DIR}/dm-task.yaml" \ - "\"result\": false" 2 \ + "query-status ${TASK_NAME}" \ "Error 1273: Unsupported collation when new collation is enabled: 'latin1_swedish_ci'" 1 \ "Error 1273: Unsupported collation when new collation is enabled: 'utf8mb4_0900_ai_ci'" 1 - dmctl_stop_task $TASK_NAME $MASTER_PORT + dmctl_stop_task $TASK_NAME $MASTER_PORT - # $worker1_run_source_1 > 0 means source1 is operated to worker1 - worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true - if [ $worker1_run_source_1 -gt 0 ]; then - name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - else - name2=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos2=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid2=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - name1=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos1=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid1=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - fi - - echo "start task in incremantal mode and test not support" + echo "prepare data for incremental phase error test" + run_sql_file $cur/data/clean_data.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/clean_data.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD - TASK_NAME="incremental_err" - cat $cur/conf/dm-task-increment.yaml >$WORK_DIR/dm-task.yaml - sed -i "s/task-name-placeholder/${TASK_NAME}/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-name-placeholder-1/$name1/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-pos-placeholder-1/$pos1/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-gtid-placeholder-1/$gtid1/g" $WORK_DIR/dm-task.yaml - - sed -i "s/binlog-name-placeholder-2/$name2/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml - dmctl_start_task $WORK_DIR/dm-task.yaml + dmctl_start_task $WORK_DIR/dm-task.yaml "--remove-meta" run_sql_file $cur/data/db1.increment_err.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - echo "task sync and will failed" + echo "check incremental phase error" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status ${TASK_NAME}" \ "Error 1273: Unsupported collation when new collation is enabled: 'latin1_swedish_ci'" 1 \ "Error 1273: Unsupported collation when new collation is enabled: 'utf8mb4_0900_ai_ci'" 1 - - dmctl_stop_task $TASK_NAME $MASTER_PORT } cleanup_data $TEST_NAME From aacddb309fa9fb6f2ee40a588388673873b5cec4 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Thu, 16 Dec 2021 15:10:15 +0800 Subject: [PATCH 11/13] commit-message: Simplify integration test --- dm/tests/sync_collation/run.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dm/tests/sync_collation/run.sh b/dm/tests/sync_collation/run.sh index 993ded1f5df..5eadd952fe6 100755 --- a/dm/tests/sync_collation/run.sh +++ b/dm/tests/sync_collation/run.sh @@ -36,7 +36,7 @@ function run() { run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - + echo "start task" cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml dmctl_start_task $WORK_DIR/dm-task.yaml "--remove-meta" @@ -102,9 +102,9 @@ function run() { "Error 1273: Unsupported collation when new collation is enabled: 'latin1_swedish_ci'" 1 \ "Error 1273: Unsupported collation when new collation is enabled: 'utf8mb4_0900_ai_ci'" 1 - dmctl_stop_task $TASK_NAME $MASTER_PORT + dmctl_stop_task $TASK_NAME $MASTER_PORT - echo "prepare data for incremental phase error test" + echo "prepare data for incremental phase error test" run_sql_file $cur/data/clean_data.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/clean_data.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 run_sql_file $cur/data/clean_data.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD From ae93b4831eb3e2318e58e1814bfe0cef162c59d5 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Thu, 16 Dec 2021 18:04:47 +0800 Subject: [PATCH 12/13] commit-message: fix collation check level --- dm/syncer/ddl.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dm/syncer/ddl.go b/dm/syncer/ddl.go index 6f1c63991e3..ddcf83f7cd4 100644 --- a/dm/syncer/ddl.go +++ b/dm/syncer/ddl.go @@ -254,9 +254,10 @@ func adjustCollation(tctx *tcontext.Context, ddlInfo *ddlInfo, statusVars []byte collation, err = event.GetServerCollationByStatusVars(statusVars, idAndCollationMap) if err != nil { tctx.L().Error("can not get charset server collation from binlog statusVars.", zap.Error(err), zap.String("originSQL", ddlInfo.originDDL)) - if collation == "" { - return - } + } + if collation == "" { + tctx.L().Error("get server collation from binlog statusVars is nil.", zap.Error(err), zap.String("originSQL", ddlInfo.originDDL)) + return } // add collation tctx.L().Info("detect create database risk which use implicit charset and collation, we will add collation by binlog status_vars", zap.String("originSQL", ddlInfo.originDDL), zap.String("collation", collation)) From 94b89494c2508626a3297ad464be38dd936b97b2 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Fri, 17 Dec 2021 09:55:05 +0800 Subject: [PATCH 13/13] commit-message: delete heartbeat in task config --- dm/tests/sync_collation/conf/dm-task.yaml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dm/tests/sync_collation/conf/dm-task.yaml b/dm/tests/sync_collation/conf/dm-task.yaml index 9cb231f72b3..cfdedf6b30c 100644 --- a/dm/tests/sync_collation/conf/dm-task.yaml +++ b/dm/tests/sync_collation/conf/dm-task.yaml @@ -3,10 +3,6 @@ name: sync_collation task-mode: all is-sharding: false meta-schema: "dm_meta" -# enable-heartbeat: true -heartbeat-update-interval: 1 -heartbeat-report-interval: 1 -clean-dump-file: false target-database: host: "127.0.0.1"