From 21623656ec4b10a1bdf8a0c9889408928e675113 Mon Sep 17 00:00:00 2001 From: leoppro Date: Thu, 9 Apr 2020 23:45:17 +0800 Subject: [PATCH 01/18] close tests --- tests/cdc/cdc.go | 2 +- tests/dailytest/case.go | 153 +++++++++++++++++------------------ tests/dailytest/dailytest.go | 76 ++++++++--------- 3 files changed, 115 insertions(+), 116 deletions(-) diff --git a/tests/cdc/cdc.go b/tests/cdc/cdc.go index 78b39f9127b..bf9ae825aa8 100644 --- a/tests/cdc/cdc.go +++ b/tests/cdc/cdc.go @@ -66,6 +66,6 @@ func main() { } }() - dailytest.RunMultiSource(sourceDBs, targetDB, cfg.SourceDBCfg.Name) + //dailytest.RunMultiSource(sourceDBs, targetDB, cfg.SourceDBCfg.Name) dailytest.Run(sourceDB, targetDB, cfg.SourceDBCfg.Name, cfg.WorkerCount, cfg.JobCount, cfg.Batch) } diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index 378a50af981..455eb343223 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -20,7 +20,6 @@ import ( "math/rand" "strings" "sync" - "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -211,95 +210,95 @@ func (tr *testRunner) execSQLs(sqls []string) { // RunCase run some simple test case func RunCase(src *sql.DB, dst *sql.DB, schema string) { tr := &testRunner{src: src, dst: dst, schema: schema} - ineligibleTable(tr, src, dst) - runPKorUKcases(tr) - + //ineligibleTable(tr, src, dst) + //runPKorUKcases(tr) + // tr.run(caseUpdateWhileAddingCol) tr.execSQLs([]string{"DROP TABLE growing_cols;"}) - tr.execSQLs(caseMultiDataType) - tr.execSQLs(caseMultiDataTypeClean) - - tr.execSQLs(caseUKWithNoPK) - tr.execSQLs(caseUKWithNoPKClean) - - tr.execSQLs(caseAlterDatabase) - tr.execSQLs(caseAlterDatabaseClean) + //tr.execSQLs(caseMultiDataType) + //tr.execSQLs(caseMultiDataTypeClean) + // + //tr.execSQLs(caseUKWithNoPK) + //tr.execSQLs(caseUKWithNoPKClean) + // + //tr.execSQLs(caseAlterDatabase) + //tr.execSQLs(caseAlterDatabaseClean) // run casePKAddDuplicateUK - tr.run(func(src *sql.DB) { - err := execSQLs(src, casePKAddDuplicateUK) - // the add unique index will failed by duplicate entry - if err != nil && !strings.Contains(err.Error(), "Duplicate") { - log.S().Fatal(err) - } - }) - tr.execSQLs(casePKAddDuplicateUKClean) + //tr.run(func(src *sql.DB) { + // err := execSQLs(src, casePKAddDuplicateUK) + // // the add unique index will failed by duplicate entry + // if err != nil && !strings.Contains(err.Error(), "Duplicate") { + // log.S().Fatal(err) + // } + //}) + //tr.execSQLs(casePKAddDuplicateUKClean) tr.run(caseUpdateWhileDroppingCol) tr.execSQLs([]string{"DROP TABLE many_cols;"}) - tr.execSQLs(caseInsertBit) - tr.execSQLs(caseInsertBitClean) - - // run caseRecoverAndInsert - tr.execSQLs(caseRecoverAndInsert) - tr.execSQLs(caseRecoverAndInsertClean) - - tr.run(caseTblWithGeneratedCol) - tr.execSQLs([]string{"DROP TABLE gen_contacts;"}) - tr.run(caseCreateView) - tr.execSQLs([]string{"DROP TABLE base_for_view;"}) - tr.execSQLs([]string{"DROP VIEW view_user_sum;"}) + //tr.execSQLs(caseInsertBit) + //tr.execSQLs(caseInsertBitClean) + // + //// run caseRecoverAndInsert + //tr.execSQLs(caseRecoverAndInsert) + //tr.execSQLs(caseRecoverAndInsertClean) + // + //tr.run(caseTblWithGeneratedCol) + //tr.execSQLs([]string{"DROP TABLE gen_contacts;"}) + //tr.run(caseCreateView) + //tr.execSQLs([]string{"DROP TABLE base_for_view;"}) + //tr.execSQLs([]string{"DROP VIEW view_user_sum;"}) // random op on have both pk and uk table - var start time.Time - tr.run(func(src *sql.DB) { - start = time.Now() - - err := updatePKUK(src, 1000) - if err != nil { - log.S().Fatal(errors.ErrorStack(err)) - } - }) - - tr.execSQLs([]string{"DROP TABLE pkuk"}) - log.S().Info("sync updatePKUK take: ", time.Since(start)) + //var start time.Time + //tr.run(func(src *sql.DB) { + // start = time.Now() + // + // err := updatePKUK(src, 1000) + // if err != nil { + // log.S().Fatal(errors.ErrorStack(err)) + // } + //}) + // + //tr.execSQLs([]string{"DROP TABLE pkuk"}) + //log.S().Info("sync updatePKUK take: ", time.Since(start)) // swap unique index value - tr.run(func(src *sql.DB) { - mustExec(src, "create table uindex(id int primary key, a1 int unique)") - - mustExec(src, "insert into uindex(id, a1) values(1, 10), (2, 20)") - - tx, err := src.Begin() - if err != nil { - log.S().Fatal(err) - } - - _, err = tx.Exec("update uindex set a1 = 30 where id = 1") - if err != nil { - log.S().Fatal(err) - } - - _, err = tx.Exec("update uindex set a1 = 10 where id = 2") - if err != nil { - log.S().Fatal(err) - } - - _, err = tx.Exec("update uindex set a1 = 20 where id = 1") - if err != nil { - log.S().Fatal(err) - } - - err = tx.Commit() - if err != nil { - log.S().Fatal(err) - } - }) - tr.run(func(src *sql.DB) { - mustExec(src, "drop table uindex") - }) + //tr.run(func(src *sql.DB) { + // mustExec(src, "create table uindex(id int primary key, a1 int unique)") + // + // mustExec(src, "insert into uindex(id, a1) values(1, 10), (2, 20)") + // + // tx, err := src.Begin() + // if err != nil { + // log.S().Fatal(err) + // } + // + // _, err = tx.Exec("update uindex set a1 = 30 where id = 1") + // if err != nil { + // log.S().Fatal(err) + // } + // + // _, err = tx.Exec("update uindex set a1 = 10 where id = 2") + // if err != nil { + // log.S().Fatal(err) + // } + // + // _, err = tx.Exec("update uindex set a1 = 20 where id = 1") + // if err != nil { + // log.S().Fatal(err) + // } + // + // err = tx.Commit() + // if err != nil { + // log.S().Fatal(err) + // } + //}) + //tr.run(func(src *sql.DB) { + // mustExec(src, "drop table uindex") + //}) // test big cdc msg // TODO: fix me diff --git a/tests/dailytest/dailytest.go b/tests/dailytest/dailytest.go index 51c56e3345f..3b0d7c6b41c 100644 --- a/tests/dailytest/dailytest.go +++ b/tests/dailytest/dailytest.go @@ -26,48 +26,48 @@ func RunMultiSource(srcs []*sql.DB, targetDB *sql.DB, schema string) { // Run runs the daily test func Run(sourceDB *sql.DB, targetDB *sql.DB, schema string, workerCount int, jobCount int, batch int) { - - TableSQLs := []string{` - create table ptest( - a int primary key, - b double NOT NULL DEFAULT 2.0, - c varchar(10) NOT NULL, - d time unique - ); - `, - `create table itest( - a int, - b double NOT NULL DEFAULT 2.0, - c varchar(10) NOT NULL, - d time unique, - PRIMARY KEY(a, b) - ); - `, - `create table ntest( - a int, - b double NOT NULL DEFAULT 2.0, - c varchar(10) NOT NULL, - d time unique not null - ); - `} + // + //TableSQLs := []string{` + // create table ptest( + // a int primary key, + // b double NOT NULL DEFAULT 2.0, + // c varchar(10) NOT NULL, + // d time unique + // ); + // `, + // `create table itest( + // a int, + // b double NOT NULL DEFAULT 2.0, + // c varchar(10) NOT NULL, + // d time unique, + // PRIMARY KEY(a, b) + // ); + // `, + // `create table ntest( + // a int, + // b double NOT NULL DEFAULT 2.0, + // c varchar(10) NOT NULL, + // d time unique not null + // ); + // `} // run the simple test case RunCase(sourceDB, targetDB, schema) - RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { - // generate insert/update/delete sqls and execute - RunDailyTest(sourceDB, TableSQLs, workerCount, jobCount, batch) - }) - - RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { - // truncate test data - TruncateTestTable(sourceDB, TableSQLs) - }) - - RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { - // drop test table - DropTestTable(sourceDB, TableSQLs) - }) + //RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { + // // generate insert/update/delete sqls and execute + // RunDailyTest(sourceDB, TableSQLs, workerCount, jobCount, batch) + //}) + // + //RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { + // // truncate test data + // TruncateTestTable(sourceDB, TableSQLs) + //}) + // + //RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { + // // drop test table + // DropTestTable(sourceDB, TableSQLs) + //}) log.S().Info("test pass!!!") From 6699e4269941f89a943994551a25d89741caa3e2 Mon Sep 17 00:00:00 2001 From: leoppro Date: Thu, 9 Apr 2020 23:51:29 +0800 Subject: [PATCH 02/18] close tests --- tests/dailytest/case.go | 20 ++++++++++++++++---- tests/dailytest/dailytest.go | 1 + 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index 455eb343223..32470b86f1b 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -212,9 +212,9 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) { tr := &testRunner{src: src, dst: dst, schema: schema} //ineligibleTable(tr, src, dst) //runPKorUKcases(tr) - // - tr.run(caseUpdateWhileAddingCol) - tr.execSQLs([]string{"DROP TABLE growing_cols;"}) + //// + //tr.run(caseUpdateWhileAddingCol) + //tr.execSQLs([]string{"DROP TABLE growing_cols;"}) //tr.execSQLs(caseMultiDataType) //tr.execSQLs(caseMultiDataTypeClean) @@ -235,6 +235,18 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) { //}) //tr.execSQLs(casePKAddDuplicateUKClean) + tr.run(caseUpdateWhileDroppingCol) + tr.execSQLs([]string{"DROP TABLE many_cols;"}) + tr.run(caseUpdateWhileDroppingCol) + tr.execSQLs([]string{"DROP TABLE many_cols;"}) + tr.run(caseUpdateWhileDroppingCol) + tr.execSQLs([]string{"DROP TABLE many_cols;"}) + tr.run(caseUpdateWhileDroppingCol) + tr.execSQLs([]string{"DROP TABLE many_cols;"}) + tr.run(caseUpdateWhileDroppingCol) + tr.execSQLs([]string{"DROP TABLE many_cols;"}) + tr.run(caseUpdateWhileDroppingCol) + tr.execSQLs([]string{"DROP TABLE many_cols;"}) tr.run(caseUpdateWhileDroppingCol) tr.execSQLs([]string{"DROP TABLE many_cols;"}) @@ -418,7 +430,7 @@ CREATE TABLE growing_cols ( } func caseUpdateWhileDroppingCol(db *sql.DB) { - const nCols = 10 + const nCols = 2 var builder strings.Builder for i := 0; i < nCols; i++ { if i != 0 { diff --git a/tests/dailytest/dailytest.go b/tests/dailytest/dailytest.go index 3b0d7c6b41c..b95ed1ea5d4 100644 --- a/tests/dailytest/dailytest.go +++ b/tests/dailytest/dailytest.go @@ -52,6 +52,7 @@ func Run(sourceDB *sql.DB, targetDB *sql.DB, schema string, workerCount int, job // `} // run the simple test case + log.S().Info("start test!!!") RunCase(sourceDB, targetDB, schema) //RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { From 506de8bbee9c2eeac36836eb9a3d5754377da241 Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 09:16:47 +0800 Subject: [PATCH 03/18] close tests --- tests/ddl_sequence/conf/diff_config.toml | 27 +++++++++++++ tests/ddl_sequence/data/prepare.sql | 48 ++++++++++++++++++++++++ tests/ddl_sequence/run.sh | 43 +++++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 tests/ddl_sequence/conf/diff_config.toml create mode 100644 tests/ddl_sequence/data/prepare.sql create mode 100644 tests/ddl_sequence/run.sh diff --git a/tests/ddl_sequence/conf/diff_config.toml b/tests/ddl_sequence/conf/diff_config.toml new file mode 100644 index 00000000000..390a9f3128c --- /dev/null +++ b/tests/ddl_sequence/conf/diff_config.toml @@ -0,0 +1,27 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-rowid = false +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "ddl_sequence" + tables = ["~.*"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/ddl_sequence/data/prepare.sql b/tests/ddl_sequence/data/prepare.sql new file mode 100644 index 00000000000..d5adcf84e23 --- /dev/null +++ b/tests/ddl_sequence/data/prepare.sql @@ -0,0 +1,48 @@ +drop database if exists `ddl_sequence`; +create database `ddl_sequence`; +use `ddl_sequence`; + +CREATE TABLE many_cols1 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL, +) +ALTER TABLE many_cols1 DROP COLUMN col0; +INSERT INTO many_cols1 (val) VALUES (1); +DROP TABLE many_cols1; + +CREATE TABLE many_cols1 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL, +) +ALTER TABLE many_cols1 DROP COLUMN col0; +INSERT INTO many_cols1 (val) VALUES (1); +DROP TABLE many_cols1; + +CREATE TABLE many_cols1 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL, +) +ALTER TABLE many_cols1 DROP COLUMN col0; +INSERT INTO many_cols1 (val) VALUES (1); +DROP TABLE many_cols1; + +CREATE TABLE many_cols1 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL, +) +ALTER TABLE many_cols1 DROP COLUMN col0; +INSERT INTO many_cols1 (val) VALUES (1); +DROP TABLE many_cols1; + +CREATE TABLE many_cols1 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL, +) +ALTER TABLE many_cols1 DROP COLUMN col0; +INSERT INTO many_cols1 (val) VALUES (1); +DROP TABLE many_cols1; \ No newline at end of file diff --git a/tests/ddl_sequence/run.sh b/tests/ddl_sequence/run.sh new file mode 100644 index 00000000000..880099d8599 --- /dev/null +++ b/tests/ddl_sequence/run.sh @@ -0,0 +1,43 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT) + + run_cdc_server $WORK_DIR $CDC_BINARY + + TOPIC_NAME="ticdc-row-format-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + mysql) ;& + *) SINK_URI="mysql://root@127.0.0.1:3306/";; + esac + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists row_format.multi_data_type ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From ab3fc71b332ef5030c590be629d1821c360fbd52 Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 09:18:36 +0800 Subject: [PATCH 04/18] update case --- tests/ddl_sequence/data/prepare.sql | 4 +++- tests/ddl_sequence/run.sh | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/ddl_sequence/data/prepare.sql b/tests/ddl_sequence/data/prepare.sql index d5adcf84e23..617d6f4f645 100644 --- a/tests/ddl_sequence/data/prepare.sql +++ b/tests/ddl_sequence/data/prepare.sql @@ -45,4 +45,6 @@ CREATE TABLE many_cols1 ( ) ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); -DROP TABLE many_cols1; \ No newline at end of file +DROP TABLE many_cols1; + +CREATE TABLE finish_mark(a int primary key) \ No newline at end of file diff --git a/tests/ddl_sequence/run.sh b/tests/ddl_sequence/run.sh index 880099d8599..be66a63f128 100644 --- a/tests/ddl_sequence/run.sh +++ b/tests/ddl_sequence/run.sh @@ -32,7 +32,7 @@ function run() { fi run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} # sync_diff can't check non-exist table, so we check expected tables are created in downstream first - check_table_exists row_format.multi_data_type ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists ddl_sequence.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml cleanup_process $CDC_BINARY From 6531d9c48693fdb2404800ddf65d032407111357 Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 09:20:36 +0800 Subject: [PATCH 05/18] update case --- tests/ddl_sequence/data/prepare.sql | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/ddl_sequence/data/prepare.sql b/tests/ddl_sequence/data/prepare.sql index 617d6f4f645..c70b67b11d5 100644 --- a/tests/ddl_sequence/data/prepare.sql +++ b/tests/ddl_sequence/data/prepare.sql @@ -5,7 +5,7 @@ use `ddl_sequence`; CREATE TABLE many_cols1 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, - col0 INT NOT NULL, + col0 INT NOT NULL ) ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); @@ -14,7 +14,7 @@ DROP TABLE many_cols1; CREATE TABLE many_cols1 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, - col0 INT NOT NULL, + col0 INT NOT NULL ) ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); @@ -23,7 +23,7 @@ DROP TABLE many_cols1; CREATE TABLE many_cols1 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, - col0 INT NOT NULL, + col0 INT NOT NULL ) ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); @@ -32,7 +32,7 @@ DROP TABLE many_cols1; CREATE TABLE many_cols1 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, - col0 INT NOT NULL, + col0 INT NOT NULL ) ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); @@ -41,7 +41,7 @@ DROP TABLE many_cols1; CREATE TABLE many_cols1 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, - col0 INT NOT NULL, + col0 INT NOT NULL ) ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); From 13dcc252451920e7457045228b312ae10f449c3a Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 09:22:13 +0800 Subject: [PATCH 06/18] update case --- tests/ddl_sequence/data/prepare.sql | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/ddl_sequence/data/prepare.sql b/tests/ddl_sequence/data/prepare.sql index c70b67b11d5..b0133435fb4 100644 --- a/tests/ddl_sequence/data/prepare.sql +++ b/tests/ddl_sequence/data/prepare.sql @@ -6,7 +6,7 @@ CREATE TABLE many_cols1 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, col0 INT NOT NULL -) +); ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); DROP TABLE many_cols1; @@ -15,7 +15,7 @@ CREATE TABLE many_cols1 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, col0 INT NOT NULL -) +); ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); DROP TABLE many_cols1; @@ -24,7 +24,7 @@ CREATE TABLE many_cols1 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, col0 INT NOT NULL -) +); ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); DROP TABLE many_cols1; @@ -33,7 +33,7 @@ CREATE TABLE many_cols1 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, col0 INT NOT NULL -) +); ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); DROP TABLE many_cols1; @@ -42,7 +42,7 @@ CREATE TABLE many_cols1 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, col0 INT NOT NULL -) +); ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); DROP TABLE many_cols1; From 5c0f4632c4bcb392ef6380769cbcb293f4fc29ae Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 09:24:10 +0800 Subject: [PATCH 07/18] update case --- tests/ddl_sequence/data/prepare.sql | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/tests/ddl_sequence/data/prepare.sql b/tests/ddl_sequence/data/prepare.sql index b0133435fb4..fbff85bacb5 100644 --- a/tests/ddl_sequence/data/prepare.sql +++ b/tests/ddl_sequence/data/prepare.sql @@ -9,42 +9,37 @@ CREATE TABLE many_cols1 ( ); ALTER TABLE many_cols1 DROP COLUMN col0; INSERT INTO many_cols1 (val) VALUES (1); -DROP TABLE many_cols1; -CREATE TABLE many_cols1 ( +CREATE TABLE many_cols2 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, col0 INT NOT NULL ); -ALTER TABLE many_cols1 DROP COLUMN col0; -INSERT INTO many_cols1 (val) VALUES (1); -DROP TABLE many_cols1; +ALTER TABLE many_cols2 DROP COLUMN col0; +INSERT INTO many_cols2 (val) VALUES (1); -CREATE TABLE many_cols1 ( +CREATE TABLE many_cols3 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, col0 INT NOT NULL ); -ALTER TABLE many_cols1 DROP COLUMN col0; -INSERT INTO many_cols1 (val) VALUES (1); -DROP TABLE many_cols1; +ALTER TABLE many_cols3 DROP COLUMN col0; +INSERT INTO many_cols3 (val) VALUES (1); -CREATE TABLE many_cols1 ( +CREATE TABLE many_cols4 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, col0 INT NOT NULL ); -ALTER TABLE many_cols1 DROP COLUMN col0; -INSERT INTO many_cols1 (val) VALUES (1); -DROP TABLE many_cols1; +ALTER TABLE many_cols4 DROP COLUMN col0; +INSERT INTO many_cols4 (val) VALUES (1); -CREATE TABLE many_cols1 ( +CREATE TABLE many_cols5 ( id INT AUTO_INCREMENT PRIMARY KEY, val INT DEFAULT 0, col0 INT NOT NULL ); -ALTER TABLE many_cols1 DROP COLUMN col0; -INSERT INTO many_cols1 (val) VALUES (1); -DROP TABLE many_cols1; +ALTER TABLE many_cols5 DROP COLUMN col0; +INSERT INTO many_cols5 (val) VALUES (1); CREATE TABLE finish_mark(a int primary key) \ No newline at end of file From 0c3db29bc1936949578ed324edb292d153fc76c7 Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 11:24:59 +0800 Subject: [PATCH 08/18] update case --- cdc/changefeed.go | 32 ++++++++++++++++++++++---------- cdc/owner.go | 21 ++++++++++++--------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index f0a53463e94..ae376c8bc63 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -83,9 +83,12 @@ type changeFeed struct { schemas map[uint64]tableIDMap tables map[uint64]entry.TableName orphanTables map[uint64]model.ProcessTableInfo - waitingConfirmTables map[uint64]string - toCleanTables map[uint64]struct{} - infoWriter *storage.OwnerTaskStatusEtcdWriter + waitingConfirmTables map[uint64]struct { + captureID string + startTs uint64 + } + toCleanTables map[uint64]struct{} + infoWriter *storage.OwnerTaskStatusEtcdWriter } // String implements fmt.Stringer interface. @@ -261,8 +264,8 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri return nil } - for tableID, captureID := range c.waitingConfirmTables { - lockStatus, err := c.infoWriter.CheckLock(ctx, c.id, captureID) + for tableID, waitingConfirm := range c.waitingConfirmTables { + lockStatus, err := c.infoWriter.CheckLock(ctx, c.id, waitingConfirm.captureID) if err != nil { return errors.Trace(err) } @@ -270,7 +273,7 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri case model.TableNoLock: delete(c.waitingConfirmTables, tableID) case model.TablePLock: - log.Debug("waiting the c-lock", zap.Uint64("tableID", tableID), zap.String("captureID", captureID)) + log.Debug("waiting the c-lock", zap.Uint64("tableID", tableID), zap.Reflect("waitingConfirm", waitingConfirm)) case model.TablePLockCommited: delete(c.waitingConfirmTables, tableID) } @@ -308,7 +311,10 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri zap.Uint64("start ts", orphan.StartTs), zap.String("capture", captureID)) delete(c.orphanTables, tableID) - c.waitingConfirmTables[tableID] = captureID + c.waitingConfirmTables[tableID] = struct { + captureID string + startTs uint64 + }{captureID: captureID, startTs: orphan.StartTs} default: c.restoreTableInfos(infoClone, captureID) log.Error("fail to put sub changefeed info", zap.Error(err)) @@ -468,9 +474,6 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { if len(c.orphanTables) > 0 { return nil } - if len(c.waitingConfirmTables) > 0 { - return nil - } minResolvedTs := c.targetTs minCheckpointTs := c.targetTs @@ -490,6 +493,15 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { minCheckpointTs = position.CheckPointTs } } + for _, waitingConfirm := range c.waitingConfirmTables { + if minResolvedTs > waitingConfirm.startTs { + minResolvedTs = waitingConfirm.startTs + } + + if minCheckpointTs > waitingConfirm.startTs { + minCheckpointTs = waitingConfirm.startTs + } + } } // if minResolvedTs is greater than ddlResolvedTs, diff --git a/cdc/owner.go b/cdc/owner.go index c19a3668ee4..3ea603f4204 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -209,15 +209,18 @@ func (o *Owner) newChangeFeed( }() cf := &changeFeed{ - info: info, - id: id, - ddlHandler: ddlHandler, - schema: schemaStorage, - schemas: schemas, - tables: tables, - orphanTables: orphanTables, - waitingConfirmTables: make(map[uint64]string), - toCleanTables: make(map[uint64]struct{}), + info: info, + id: id, + ddlHandler: ddlHandler, + schema: schemaStorage, + schemas: schemas, + tables: tables, + orphanTables: orphanTables, + waitingConfirmTables: make(map[uint64]struct { + captureID string + startTs uint64 + }), + toCleanTables: make(map[uint64]struct{}), status: &model.ChangeFeedStatus{ ResolvedTs: 0, CheckpointTs: checkpointTs, From 1ebed37ce63c173a80e8c149d4588521888c6a8a Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 11:29:37 +0800 Subject: [PATCH 09/18] update case --- cdc/changefeed.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index ae376c8bc63..b9a24913633 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -535,12 +535,12 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { var tsUpdated bool - if minResolvedTs > c.status.ResolvedTs { + if minResolvedTs != c.status.ResolvedTs { c.status.ResolvedTs = minResolvedTs tsUpdated = true } - if minCheckpointTs > c.status.CheckpointTs { + if minCheckpointTs != c.status.CheckpointTs { c.status.CheckpointTs = minCheckpointTs err := c.sink.EmitCheckpointEvent(ctx, minCheckpointTs) if err != nil { From 2e47cac3e78df332742498d191e625664704c164 Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 11:44:54 +0800 Subject: [PATCH 10/18] update case --- cdc/changefeed.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index b9a24913633..6836ef03d19 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -533,15 +533,16 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { minCheckpointTs = minResolvedTs } + c.status.ResolvedTs = minResolvedTs + c.status.CheckpointTs = minCheckpointTs + var tsUpdated bool - if minResolvedTs != c.status.ResolvedTs { - c.status.ResolvedTs = minResolvedTs + if minResolvedTs > c.status.ResolvedTs { tsUpdated = true } - if minCheckpointTs != c.status.CheckpointTs { - c.status.CheckpointTs = minCheckpointTs + if minCheckpointTs > c.status.CheckpointTs { err := c.sink.EmitCheckpointEvent(ctx, minCheckpointTs) if err != nil { return errors.Trace(err) From e5f0af7595e3bdb78c80ecab3a4d30488c82509c Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 12:09:18 +0800 Subject: [PATCH 11/18] update case --- cdc/changefeed.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 6836ef03d19..508d2707bbc 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -532,6 +532,10 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { if minCheckpointTs > minResolvedTs { minCheckpointTs = minResolvedTs } + if minResolvedTs < c.status.ResolvedTs || minCheckpointTs < c.status.CheckpointTs { + log.Error("ts fall back", zap.Uint64("minResolvedTs", minResolvedTs), zap.Uint64("c.status.ResolvedTs", c.status.ResolvedTs), + zap.Uint64("minCheckpointTs", minCheckpointTs), zap.Uint64("c.status.CheckpointTs", c.status.CheckpointTs)) + } c.status.ResolvedTs = minResolvedTs c.status.CheckpointTs = minCheckpointTs From e74c7c5245f3e08d769e570e3086e9ef721c646a Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 19:01:01 +0800 Subject: [PATCH 12/18] use ctx --- cdc/processor.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cdc/processor.go b/cdc/processor.go index 92fee6984b6..073fe9db1c6 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -477,6 +477,7 @@ func (p *processor) handleTables(ctx context.Context, oldInfo, newInfo *model.Ta Ts: newInfo.TablePLock.Ts, CheckpointTs: checkpointTs, } + log.Info("add c lock", zap.Reflect("c lock", newInfo.TableCLock)) } } @@ -600,7 +601,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64) defer p.tablesMu.Unlock() ctx = util.PutTableIDInCtx(ctx, tableID) - log.Debug("Add table", zap.Int64("tableID", tableID)) + log.Info("Add table", zap.Int64("tableID", tableID), zap.Uint64("startTs", startTs)) if _, ok := p.tables[tableID]; ok { log.Warn("Ignore existing table", zap.Int64("ID", tableID)) } @@ -678,6 +679,9 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64) if p.position.CheckPointTs > startTs { p.position.CheckPointTs = startTs } + if p.position.ResolvedTs > startTs { + p.position.ResolvedTs = startTs + } syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Inc() p.collectMetrics(ctx, tableID) } From cec8df5de33601df8389e37618554d7be956451e Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 19:02:23 +0800 Subject: [PATCH 13/18] use ctx --- cdc/processor.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cdc/processor.go b/cdc/processor.go index 073fe9db1c6..e452e948d53 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -377,7 +377,10 @@ func (p *processor) updateInfo(ctx context.Context) error { } p.handleTables(ctx, oldStatus, p.status, p.position.CheckPointTs) syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.status.TableInfos))) - + err = updatePosition() + if err != nil { + return errors.Trace(err) + } err = retry.Run(500*time.Millisecond, 5, func() error { err = p.tsRWriter.WriteInfoIntoStorage(ctx) switch errors.Cause(err) { @@ -393,7 +396,7 @@ func (p *processor) updateInfo(ctx context.Context) error { if err != nil { return errors.Trace(err) } - return updatePosition() + return nil } func diffProcessTableInfos(oldInfo, newInfo []*model.ProcessTableInfo) (removed, added []*model.ProcessTableInfo) { From 6df3db935b6e6766eccf75082d47dff91ba71004 Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 20:24:17 +0800 Subject: [PATCH 14/18] update --- cdc/changefeed.go | 4 - cdc/processor.go | 3 +- .../jenkins_ci/integration_test_common.groovy | 2 +- tests/cdc/cdc.go | 2 +- tests/dailytest/case.go | 173 ++++++++---------- tests/dailytest/dailytest.go | 77 ++++---- tests/ddl_sequence/run.sh | 2 +- 7 files changed, 123 insertions(+), 140 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 508d2707bbc..6836ef03d19 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -532,10 +532,6 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { if minCheckpointTs > minResolvedTs { minCheckpointTs = minResolvedTs } - if minResolvedTs < c.status.ResolvedTs || minCheckpointTs < c.status.CheckpointTs { - log.Error("ts fall back", zap.Uint64("minResolvedTs", minResolvedTs), zap.Uint64("c.status.ResolvedTs", c.status.ResolvedTs), - zap.Uint64("minCheckpointTs", minCheckpointTs), zap.Uint64("c.status.CheckpointTs", c.status.CheckpointTs)) - } c.status.ResolvedTs = minResolvedTs c.status.CheckpointTs = minCheckpointTs diff --git a/cdc/processor.go b/cdc/processor.go index e452e948d53..600462d2b46 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -480,7 +480,6 @@ func (p *processor) handleTables(ctx context.Context, oldInfo, newInfo *model.Ta Ts: newInfo.TablePLock.Ts, CheckpointTs: checkpointTs, } - log.Info("add c lock", zap.Reflect("c lock", newInfo.TableCLock)) } } @@ -604,7 +603,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64) defer p.tablesMu.Unlock() ctx = util.PutTableIDInCtx(ctx, tableID) - log.Info("Add table", zap.Int64("tableID", tableID), zap.Uint64("startTs", startTs)) + log.Debug("Add table", zap.Int64("tableID", tableID), zap.Uint64("startTs", startTs)) if _, ok := p.tables[tableID]; ok { log.Warn("Ignore existing table", zap.Int64("ID", tableID)) } diff --git a/scripts/jenkins_ci/integration_test_common.groovy b/scripts/jenkins_ci/integration_test_common.groovy index e0bda0ca5b4..a5597d368a8 100644 --- a/scripts/jenkins_ci/integration_test_common.groovy +++ b/scripts/jenkins_ci/integration_test_common.groovy @@ -1,4 +1,4 @@ -test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format", "tiflash", "availability"] +test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format", "tiflash", "availability", "ddl_sequence"] def prepare_binaries() { stage('Prepare Binaries') { diff --git a/tests/cdc/cdc.go b/tests/cdc/cdc.go index bf9ae825aa8..78b39f9127b 100644 --- a/tests/cdc/cdc.go +++ b/tests/cdc/cdc.go @@ -66,6 +66,6 @@ func main() { } }() - //dailytest.RunMultiSource(sourceDBs, targetDB, cfg.SourceDBCfg.Name) + dailytest.RunMultiSource(sourceDBs, targetDB, cfg.SourceDBCfg.Name) dailytest.Run(sourceDB, targetDB, cfg.SourceDBCfg.Name, cfg.WorkerCount, cfg.JobCount, cfg.Batch) } diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index 32470b86f1b..378a50af981 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -20,6 +20,7 @@ import ( "math/rand" "strings" "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -210,107 +211,95 @@ func (tr *testRunner) execSQLs(sqls []string) { // RunCase run some simple test case func RunCase(src *sql.DB, dst *sql.DB, schema string) { tr := &testRunner{src: src, dst: dst, schema: schema} - //ineligibleTable(tr, src, dst) - //runPKorUKcases(tr) - //// - //tr.run(caseUpdateWhileAddingCol) - //tr.execSQLs([]string{"DROP TABLE growing_cols;"}) - - //tr.execSQLs(caseMultiDataType) - //tr.execSQLs(caseMultiDataTypeClean) - // - //tr.execSQLs(caseUKWithNoPK) - //tr.execSQLs(caseUKWithNoPKClean) - // - //tr.execSQLs(caseAlterDatabase) - //tr.execSQLs(caseAlterDatabaseClean) + ineligibleTable(tr, src, dst) + runPKorUKcases(tr) + + tr.run(caseUpdateWhileAddingCol) + tr.execSQLs([]string{"DROP TABLE growing_cols;"}) + + tr.execSQLs(caseMultiDataType) + tr.execSQLs(caseMultiDataTypeClean) + + tr.execSQLs(caseUKWithNoPK) + tr.execSQLs(caseUKWithNoPKClean) + + tr.execSQLs(caseAlterDatabase) + tr.execSQLs(caseAlterDatabaseClean) // run casePKAddDuplicateUK - //tr.run(func(src *sql.DB) { - // err := execSQLs(src, casePKAddDuplicateUK) - // // the add unique index will failed by duplicate entry - // if err != nil && !strings.Contains(err.Error(), "Duplicate") { - // log.S().Fatal(err) - // } - //}) - //tr.execSQLs(casePKAddDuplicateUKClean) + tr.run(func(src *sql.DB) { + err := execSQLs(src, casePKAddDuplicateUK) + // the add unique index will failed by duplicate entry + if err != nil && !strings.Contains(err.Error(), "Duplicate") { + log.S().Fatal(err) + } + }) + tr.execSQLs(casePKAddDuplicateUKClean) tr.run(caseUpdateWhileDroppingCol) tr.execSQLs([]string{"DROP TABLE many_cols;"}) - tr.run(caseUpdateWhileDroppingCol) - tr.execSQLs([]string{"DROP TABLE many_cols;"}) - tr.run(caseUpdateWhileDroppingCol) - tr.execSQLs([]string{"DROP TABLE many_cols;"}) - tr.run(caseUpdateWhileDroppingCol) - tr.execSQLs([]string{"DROP TABLE many_cols;"}) - tr.run(caseUpdateWhileDroppingCol) - tr.execSQLs([]string{"DROP TABLE many_cols;"}) - tr.run(caseUpdateWhileDroppingCol) - tr.execSQLs([]string{"DROP TABLE many_cols;"}) - tr.run(caseUpdateWhileDroppingCol) - tr.execSQLs([]string{"DROP TABLE many_cols;"}) - //tr.execSQLs(caseInsertBit) - //tr.execSQLs(caseInsertBitClean) - // - //// run caseRecoverAndInsert - //tr.execSQLs(caseRecoverAndInsert) - //tr.execSQLs(caseRecoverAndInsertClean) - // - //tr.run(caseTblWithGeneratedCol) - //tr.execSQLs([]string{"DROP TABLE gen_contacts;"}) - //tr.run(caseCreateView) - //tr.execSQLs([]string{"DROP TABLE base_for_view;"}) - //tr.execSQLs([]string{"DROP VIEW view_user_sum;"}) + tr.execSQLs(caseInsertBit) + tr.execSQLs(caseInsertBitClean) + + // run caseRecoverAndInsert + tr.execSQLs(caseRecoverAndInsert) + tr.execSQLs(caseRecoverAndInsertClean) + + tr.run(caseTblWithGeneratedCol) + tr.execSQLs([]string{"DROP TABLE gen_contacts;"}) + tr.run(caseCreateView) + tr.execSQLs([]string{"DROP TABLE base_for_view;"}) + tr.execSQLs([]string{"DROP VIEW view_user_sum;"}) // random op on have both pk and uk table - //var start time.Time - //tr.run(func(src *sql.DB) { - // start = time.Now() - // - // err := updatePKUK(src, 1000) - // if err != nil { - // log.S().Fatal(errors.ErrorStack(err)) - // } - //}) - // - //tr.execSQLs([]string{"DROP TABLE pkuk"}) - //log.S().Info("sync updatePKUK take: ", time.Since(start)) + var start time.Time + tr.run(func(src *sql.DB) { + start = time.Now() + + err := updatePKUK(src, 1000) + if err != nil { + log.S().Fatal(errors.ErrorStack(err)) + } + }) + + tr.execSQLs([]string{"DROP TABLE pkuk"}) + log.S().Info("sync updatePKUK take: ", time.Since(start)) // swap unique index value - //tr.run(func(src *sql.DB) { - // mustExec(src, "create table uindex(id int primary key, a1 int unique)") - // - // mustExec(src, "insert into uindex(id, a1) values(1, 10), (2, 20)") - // - // tx, err := src.Begin() - // if err != nil { - // log.S().Fatal(err) - // } - // - // _, err = tx.Exec("update uindex set a1 = 30 where id = 1") - // if err != nil { - // log.S().Fatal(err) - // } - // - // _, err = tx.Exec("update uindex set a1 = 10 where id = 2") - // if err != nil { - // log.S().Fatal(err) - // } - // - // _, err = tx.Exec("update uindex set a1 = 20 where id = 1") - // if err != nil { - // log.S().Fatal(err) - // } - // - // err = tx.Commit() - // if err != nil { - // log.S().Fatal(err) - // } - //}) - //tr.run(func(src *sql.DB) { - // mustExec(src, "drop table uindex") - //}) + tr.run(func(src *sql.DB) { + mustExec(src, "create table uindex(id int primary key, a1 int unique)") + + mustExec(src, "insert into uindex(id, a1) values(1, 10), (2, 20)") + + tx, err := src.Begin() + if err != nil { + log.S().Fatal(err) + } + + _, err = tx.Exec("update uindex set a1 = 30 where id = 1") + if err != nil { + log.S().Fatal(err) + } + + _, err = tx.Exec("update uindex set a1 = 10 where id = 2") + if err != nil { + log.S().Fatal(err) + } + + _, err = tx.Exec("update uindex set a1 = 20 where id = 1") + if err != nil { + log.S().Fatal(err) + } + + err = tx.Commit() + if err != nil { + log.S().Fatal(err) + } + }) + tr.run(func(src *sql.DB) { + mustExec(src, "drop table uindex") + }) // test big cdc msg // TODO: fix me @@ -430,7 +419,7 @@ CREATE TABLE growing_cols ( } func caseUpdateWhileDroppingCol(db *sql.DB) { - const nCols = 2 + const nCols = 10 var builder strings.Builder for i := 0; i < nCols; i++ { if i != 0 { diff --git a/tests/dailytest/dailytest.go b/tests/dailytest/dailytest.go index b95ed1ea5d4..51c56e3345f 100644 --- a/tests/dailytest/dailytest.go +++ b/tests/dailytest/dailytest.go @@ -26,49 +26,48 @@ func RunMultiSource(srcs []*sql.DB, targetDB *sql.DB, schema string) { // Run runs the daily test func Run(sourceDB *sql.DB, targetDB *sql.DB, schema string, workerCount int, jobCount int, batch int) { - // - //TableSQLs := []string{` - // create table ptest( - // a int primary key, - // b double NOT NULL DEFAULT 2.0, - // c varchar(10) NOT NULL, - // d time unique - // ); - // `, - // `create table itest( - // a int, - // b double NOT NULL DEFAULT 2.0, - // c varchar(10) NOT NULL, - // d time unique, - // PRIMARY KEY(a, b) - // ); - // `, - // `create table ntest( - // a int, - // b double NOT NULL DEFAULT 2.0, - // c varchar(10) NOT NULL, - // d time unique not null - // ); - // `} + + TableSQLs := []string{` + create table ptest( + a int primary key, + b double NOT NULL DEFAULT 2.0, + c varchar(10) NOT NULL, + d time unique + ); + `, + `create table itest( + a int, + b double NOT NULL DEFAULT 2.0, + c varchar(10) NOT NULL, + d time unique, + PRIMARY KEY(a, b) + ); + `, + `create table ntest( + a int, + b double NOT NULL DEFAULT 2.0, + c varchar(10) NOT NULL, + d time unique not null + ); + `} // run the simple test case - log.S().Info("start test!!!") RunCase(sourceDB, targetDB, schema) - //RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { - // // generate insert/update/delete sqls and execute - // RunDailyTest(sourceDB, TableSQLs, workerCount, jobCount, batch) - //}) - // - //RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { - // // truncate test data - // TruncateTestTable(sourceDB, TableSQLs) - //}) - // - //RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { - // // drop test table - // DropTestTable(sourceDB, TableSQLs) - //}) + RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { + // generate insert/update/delete sqls and execute + RunDailyTest(sourceDB, TableSQLs, workerCount, jobCount, batch) + }) + + RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { + // truncate test data + TruncateTestTable(sourceDB, TableSQLs) + }) + + RunTest(sourceDB, targetDB, schema, func(src *sql.DB) { + // drop test table + DropTestTable(sourceDB, TableSQLs) + }) log.S().Info("test pass!!!") diff --git a/tests/ddl_sequence/run.sh b/tests/ddl_sequence/run.sh index be66a63f128..6cd2ddc90a1 100644 --- a/tests/ddl_sequence/run.sh +++ b/tests/ddl_sequence/run.sh @@ -20,7 +20,7 @@ function run() { run_cdc_server $WORK_DIR $CDC_BINARY - TOPIC_NAME="ticdc-row-format-test-$RANDOM" + TOPIC_NAME="ticdc-ddl-sequence-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; mysql) ;& From 164ebc944b5008e0dff6af1b0840ded7e0cf975e Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 20:54:23 +0800 Subject: [PATCH 15/18] update --- cdc/changefeed.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 508d2707bbc..6a21efb4afc 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -537,16 +537,15 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { zap.Uint64("minCheckpointTs", minCheckpointTs), zap.Uint64("c.status.CheckpointTs", c.status.CheckpointTs)) } - c.status.ResolvedTs = minResolvedTs - c.status.CheckpointTs = minCheckpointTs - var tsUpdated bool if minResolvedTs > c.status.ResolvedTs { + c.status.ResolvedTs = minResolvedTs tsUpdated = true } if minCheckpointTs > c.status.CheckpointTs { + c.status.CheckpointTs = minCheckpointTs err := c.sink.EmitCheckpointEvent(ctx, minCheckpointTs) if err != nil { return errors.Trace(err) From 5ae5f046d122b5532255363183d237838618e462 Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 20:59:32 +0800 Subject: [PATCH 16/18] update --- cdc/changefeed.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 6a21efb4afc..9b07ef530d8 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -539,12 +539,12 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { var tsUpdated bool - if minResolvedTs > c.status.ResolvedTs { + if minResolvedTs != c.status.ResolvedTs { c.status.ResolvedTs = minResolvedTs tsUpdated = true } - if minCheckpointTs > c.status.CheckpointTs { + if minCheckpointTs != c.status.CheckpointTs { c.status.CheckpointTs = minCheckpointTs err := c.sink.EmitCheckpointEvent(ctx, minCheckpointTs) if err != nil { From 876153cca583a59599ea70eb37cef60aba4b0b7c Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 21:09:05 +0800 Subject: [PATCH 17/18] update --- cdc/changefeed.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 9b07ef530d8..6a21efb4afc 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -539,12 +539,12 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { var tsUpdated bool - if minResolvedTs != c.status.ResolvedTs { + if minResolvedTs > c.status.ResolvedTs { c.status.ResolvedTs = minResolvedTs tsUpdated = true } - if minCheckpointTs != c.status.CheckpointTs { + if minCheckpointTs > c.status.CheckpointTs { c.status.CheckpointTs = minCheckpointTs err := c.sink.EmitCheckpointEvent(ctx, minCheckpointTs) if err != nil { From 5533a4d7e2b028d97d5ca583ae4fd0b5b77718ce Mon Sep 17 00:00:00 2001 From: leoppro Date: Fri, 10 Apr 2020 21:33:26 +0800 Subject: [PATCH 18/18] update --- cdc/changefeed.go | 31 ++++++++----------------------- cdc/owner.go | 21 +++++++++------------ 2 files changed, 17 insertions(+), 35 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 6a21efb4afc..245d71ab66f 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -83,12 +83,9 @@ type changeFeed struct { schemas map[uint64]tableIDMap tables map[uint64]entry.TableName orphanTables map[uint64]model.ProcessTableInfo - waitingConfirmTables map[uint64]struct { - captureID string - startTs uint64 - } - toCleanTables map[uint64]struct{} - infoWriter *storage.OwnerTaskStatusEtcdWriter + waitingConfirmTables map[uint64]string + toCleanTables map[uint64]struct{} + infoWriter *storage.OwnerTaskStatusEtcdWriter } // String implements fmt.Stringer interface. @@ -264,8 +261,8 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri return nil } - for tableID, waitingConfirm := range c.waitingConfirmTables { - lockStatus, err := c.infoWriter.CheckLock(ctx, c.id, waitingConfirm.captureID) + for tableID, captureID := range c.waitingConfirmTables { + lockStatus, err := c.infoWriter.CheckLock(ctx, c.id, captureID) if err != nil { return errors.Trace(err) } @@ -273,7 +270,7 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri case model.TableNoLock: delete(c.waitingConfirmTables, tableID) case model.TablePLock: - log.Debug("waiting the c-lock", zap.Uint64("tableID", tableID), zap.Reflect("waitingConfirm", waitingConfirm)) + log.Debug("waiting the c-lock", zap.Uint64("tableID", tableID), zap.String("captureID", captureID)) case model.TablePLockCommited: delete(c.waitingConfirmTables, tableID) } @@ -311,10 +308,7 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri zap.Uint64("start ts", orphan.StartTs), zap.String("capture", captureID)) delete(c.orphanTables, tableID) - c.waitingConfirmTables[tableID] = struct { - captureID string - startTs uint64 - }{captureID: captureID, startTs: orphan.StartTs} + c.waitingConfirmTables[tableID] = captureID default: c.restoreTableInfos(infoClone, captureID) log.Error("fail to put sub changefeed info", zap.Error(err)) @@ -471,7 +465,7 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { } // ProcessorInfos don't contains the whole set table id now. - if len(c.orphanTables) > 0 { + if len(c.orphanTables) > 0 || len(c.waitingConfirmTables) > 0 { return nil } @@ -493,15 +487,6 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { minCheckpointTs = position.CheckPointTs } } - for _, waitingConfirm := range c.waitingConfirmTables { - if minResolvedTs > waitingConfirm.startTs { - minResolvedTs = waitingConfirm.startTs - } - - if minCheckpointTs > waitingConfirm.startTs { - minCheckpointTs = waitingConfirm.startTs - } - } } // if minResolvedTs is greater than ddlResolvedTs, diff --git a/cdc/owner.go b/cdc/owner.go index 3ea603f4204..c19a3668ee4 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -209,18 +209,15 @@ func (o *Owner) newChangeFeed( }() cf := &changeFeed{ - info: info, - id: id, - ddlHandler: ddlHandler, - schema: schemaStorage, - schemas: schemas, - tables: tables, - orphanTables: orphanTables, - waitingConfirmTables: make(map[uint64]struct { - captureID string - startTs uint64 - }), - toCleanTables: make(map[uint64]struct{}), + info: info, + id: id, + ddlHandler: ddlHandler, + schema: schemaStorage, + schemas: schemas, + tables: tables, + orphanTables: orphanTables, + waitingConfirmTables: make(map[uint64]string), + toCleanTables: make(map[uint64]struct{}), status: &model.ChangeFeedStatus{ ResolvedTs: 0, CheckpointTs: checkpointTs,