From ff4784e5dceaa1cc45fc8783e30a3e5033158b5e Mon Sep 17 00:00:00 2001 From: Luo Yangzhixin Date: Mon, 29 Apr 2024 20:02:58 +0800 Subject: [PATCH] lightning: performance optimization for post-import conflict detection (#52656) close pingcap/tidb#52306 --- .../tests/lightning_config_max_error/run.sh | 12 +- .../lightning_duplicate_detection/run.sh | 2 +- .../run.sh | 2 +- .../run.sh | 2 +- .../run.sh | 2 +- .../run.sh | 2 +- .../run.sh | 2 +- lightning/tests/lightning_issue_40657/run.sh | 2 +- lightning/tidb-lightning.toml | 28 +- pkg/lightning/backend/local/duplicate.go | 23 +- pkg/lightning/config/config.go | 2 +- pkg/lightning/errormanager/errormanager.go | 595 +++++++++++------- .../errormanager/errormanager_test.go | 92 +-- .../errormanager/resolveconflict_test.go | 232 ++++--- 14 files changed, 595 insertions(+), 403 deletions(-) diff --git a/lightning/tests/lightning_config_max_error/run.sh b/lightning/tests/lightning_config_max_error/run.sh index 171b0d014446b..b43f886a50197 100755 --- a/lightning/tests/lightning_config_max_error/run.sh +++ b/lightning/tests/lightning_config_max_error/run.sh @@ -28,7 +28,7 @@ duplicated_row_count=$(( ${total_row_count} - ${uniq_row_count} )) remaining_row_count=$(( ${uniq_row_count} + ${duplicated_row_count}/2 )) run_sql 'DROP TABLE IF EXISTS mytest.testtbl' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' stderr_file="/tmp/${TEST_NAME}.stderr" @@ -46,7 +46,7 @@ EOF cat "${stderr_file}" grep -q "${err_msg}" "${stderr_file}" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v2' +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3' # Although conflict error number exceeds the max-error limit, # all the conflict errors are recorded, # because recording of conflict errors are executed batch by batch (batch size 1024), @@ -56,11 +56,11 @@ check_contains "COUNT(*): ${duplicated_row_count}" # import a second time run_sql 'DROP TABLE IF EXISTS mytest.testtbl' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' run_lightning --backend local --config "${mydir}/normal_config.toml" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v2' +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3' check_contains "COUNT(*): ${duplicated_row_count}" # Check remaining records in the target table @@ -70,11 +70,11 @@ check_contains "COUNT(*): ${remaining_row_count}" # import a third time run_sql 'DROP TABLE IF EXISTS mytest.testtbl' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' run_lightning --backend local --config "${mydir}/normal_config_old_style.toml" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v2' +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3' check_contains "COUNT(*): ${duplicated_row_count}" # Check remaining records in the target table diff --git a/lightning/tests/lightning_duplicate_detection/run.sh b/lightning/tests/lightning_duplicate_detection/run.sh index 04b9ead60f68a..ec9effd73a82d 100644 --- a/lightning/tests/lightning_duplicate_detection/run.sh +++ b/lightning/tests/lightning_duplicate_detection/run.sh @@ -53,7 +53,7 @@ verify_detected_rows() { done done mapfile -t expect_rows < <(for row in "${expect_rows[@]}"; do echo "$row"; done | sort | uniq) - mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v2 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" | + mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v3 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" | grep "row_data:" | sed 's/^.*(//' | sed 's/).*$//' | sed 's/"//g' | sed 's/, */,/g' | sort | uniq) equal=0 if [ "${#actual_rows[@]}" = "${#expect_rows[@]}" ]; then diff --git a/lightning/tests/lightning_duplicate_resolution_error/run.sh b/lightning/tests/lightning_duplicate_resolution_error/run.sh index d6ae96f6b6b72..164ae140cc10b 100644 --- a/lightning/tests/lightning_duplicate_resolution_error/run.sh +++ b/lightning/tests/lightning_duplicate_resolution_error/run.sh @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0 mydir=$(dirname "${BASH_SOURCE[0]}") run_sql 'DROP TABLE IF EXISTS dup_resolve.a' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' ! run_lightning --backend local --config "${mydir}/config.toml" [ $? -eq 0 ] diff --git a/lightning/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh b/lightning/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh index 2eab9d2bbe77b..b207b55ef8cbb 100644 --- a/lightning/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh +++ b/lightning/tests/lightning_duplicate_resolution_error_pk_multiple_files/run.sh @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0 mydir=$(dirname "${BASH_SOURCE[0]}") run_sql 'DROP TABLE IF EXISTS dup_resolve.a' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' ! run_lightning --backend local --config "${mydir}/config.toml" [ $? -eq 0 ] diff --git a/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh b/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh index 73cb0c301c7c6..65a20892ce342 100644 --- a/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh +++ b/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files/run.sh @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0 mydir=$(dirname "${BASH_SOURCE[0]}") run_sql 'DROP TABLE IF EXISTS dup_resolve.a' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' ! run_lightning --backend local --config "${mydir}/config.toml" [ $? -eq 0 ] diff --git a/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh b/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh index ca9fe272272ee..ef72491c0a114 100644 --- a/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh +++ b/lightning/tests/lightning_duplicate_resolution_error_uk_multiple_files_multicol_index/run.sh @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0 mydir=$(dirname "${BASH_SOURCE[0]}") run_sql 'DROP TABLE IF EXISTS dup_resolve.a' -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3' ! run_lightning --backend local --config "${mydir}/config.toml" [ $? -eq 0 ] diff --git a/lightning/tests/lightning_duplicate_resolution_merge/run.sh b/lightning/tests/lightning_duplicate_resolution_merge/run.sh index 56ed4118271e3..8d12c4dc9af5f 100644 --- a/lightning/tests/lightning_duplicate_resolution_merge/run.sh +++ b/lightning/tests/lightning_duplicate_resolution_merge/run.sh @@ -45,7 +45,7 @@ check_contains 'count(*): 10' run_sql 'select count(*) from lightning_task_info.conflict_records' check_contains 'count(*): 16' -run_sql 'select count(*) from lightning_task_info.conflict_error_v2' +run_sql 'select count(*) from lightning_task_info.conflict_error_v3' check_contains 'count(*): 4' run_sql 'select count(*) from lightning_task_info.conflict_view' diff --git a/lightning/tests/lightning_issue_40657/run.sh b/lightning/tests/lightning_issue_40657/run.sh index 4d2934a5ca24e..0f3d9ca5d15cb 100644 --- a/lightning/tests/lightning_issue_40657/run.sh +++ b/lightning/tests/lightning_issue_40657/run.sh @@ -24,7 +24,7 @@ run_lightning -d "$CUR/data1" run_sql 'admin check table test.t' run_sql 'select count(*) from test.t' check_contains 'count(*): 4' -run_sql 'select count(*) from lightning_task_info.conflict_error_v2' +run_sql 'select count(*) from lightning_task_info.conflict_error_v3' check_contains 'count(*): 2' run_sql 'truncate table test.t' diff --git a/lightning/tidb-lightning.toml b/lightning/tidb-lightning.toml index 5e720052f38ae..e7945b1c4eafb 100644 --- a/lightning/tidb-lightning.toml +++ b/lightning/tidb-lightning.toml @@ -89,26 +89,24 @@ driver = "file" # - origin. keep the checkpoints data unchanged. #keep-after-success = "remove" - [conflict] -# Starting from v7.3.0, a new version of strategy is introduced to handle conflicting data. The default value is "". Starting from v8.0.0, TiDB Lightning optimizes the conflict strategy for both physical and logical import modes (experimental). -# - "": In the physical import mode, TiDB Lightning does not detect or handle conflicting data. If the source file contains conflicting primary or unique key records, the subsequent step reports an error. In the logical import mode, TiDB Lightning converts the "" strategy to the "error" strategy for processing. -# - "error": When detecting conflicting primary or unique key records in the imported data, TiDB Lightning terminates the import and reports an error. -# - "replace": When encountering conflicting primary or unique key records, TiDB Lightning retains the latest data and overwrites the old data. -# The conflicting data are recorded in the `lightning_task_info.conflict_error_v2` table (recording conflicting data detected by post-import conflict detection in the physical import mode) -# and the `conflict_records` table (recording conflicting data detected by preprocess conflict detection in both logical and physical import modes) of the target TiDB cluster. -# If you turn on both preprocess and post-import conflict detection in physical import mode, the conflicting data can be checked in `lightning_task_info.conflict_view` view. +# Starting from v7.3.0, a new version of strategy is introduced to handle conflicting data. The default value is "". Starting from v8.0.0, TiDB Lightning optimizes the conflict strategy for both physical and logical import modes. +# - "": in the physical import mode, TiDB Lightning does not detect or handle conflicting data. If the source file contains conflicting primary or unique key records, the subsequent step reports an error. In the logical import mode, TiDB Lightning converts the "" strategy to the "error" strategy for processing. +# - "error": when detecting conflicting primary or unique key records in the imported data, TiDB Lightning terminates the import and reports an error. +# - "replace": when encountering conflicting primary or unique key records, TiDB Lightning retains the latest data and overwrites the old data. +# The conflicting data are recorded in the `lightning_task_info.conflict_error_v2` table (recording conflicting data detected by post-import conflict detection in the physical import mode) and the `conflict_records` table (recording conflicting data detected by preprocess conflict detection in both logical and physical import modes) of the target TiDB cluster. +# If you set `conflict.strategy = "replace"` in physical import mode, the conflicting data can be checked in the `lightning_task_info.conflict_view` view. # You can manually insert the correct records into the target table based on your application requirements. Note that the target TiKV must be v5.2.0 or later versions. -# - "ignore": When encountering conflicting primary or unique key records, TiDB Lightning retains the old data and ignores the new data. This option can only be used in the logical import mode. +# - "ignore": when encountering conflicting primary or unique key records, TiDB Lightning retains the old data and ignores the new data. This option can only be used in the logical import mode. strategy = "" -# Controls whether to enable preprocess conflict detection, which check conflicts in the data before importing it to TiDB. In scenarios where the ratio of conflict records is greater than or equal to 1%, it is recommended to enable preprocess conflict detection for better performance in conflict detection. -# In other scenarios, it is recommended to disable it. The default value is false, indicating that TiDB Lightning only checks conflicts after the import. If you set it to true, TiDB Lightning checks conflicts both before and after the import. This parameter is experimental, and it can be used only in the physical import mode. +# Controls whether to enable preprocess conflict detection, which checks conflicts in data before importing it to TiDB. The default value is false, indicating that TiDB Lightning only checks conflicts after the import. If you set it to true, TiDB Lightning checks conflicts both before and after the import. This parameter can be used only in the physical import mode. It is not recommended to set `precheck-conflict-before-import = true` for now. # precheck-conflict-before-import = false -# Controls the maximum number of conflict errors that can be handled when strategy is "replace" or "ignore". You can set it only when strategy is "replace" or "ignore". The default value is 10000. +# Controls the maximum number of conflict errors that can be handled when strategy is "replace" or "ignore". You can set it only when strategy is "replace" or "ignore". The default value is 10000. If you set the value larger than 10000, it is possible that the import will have performance degradation or fail due to potential errors. # threshold = 10000 -# Controls the maximum number of records in the `conflict_records` table. The default value is 10000. In the physical import mode, if the strategy is "replace", the conflict records that are overwritten are recorded. -# In the logical import mode, if the strategy is "ignore", the conflict records that are ignored are recorded; if the strategy is "replace", the conflict records can not be recorded. -# Starting from v8.1.0, max-record-rows will be assigned the value of threshold, regardless the user input. max-record-rows will be deprecated in the future. +# Controls the maximum number of records in the `conflict_records` table. The default value is 10000. +# Starting from v8.1.0, there is no need to configure `max-record-rows` manually, because TiDB Lightning automatically assigns the value of `max-record-rows` with the value of `threshold`, regardless of the user input. `max-record-rows` will be deprecated in a future release. +# In the physical import mode, if the strategy is "replace", the conflict records that are overwritten are recorded. +# In the logical import mode, if the strategy is "ignore", the conflict records that are ignored are recorded; if the strategy is "replace", the conflict records are not recorded. # max-record-rows = 10000 [tikv-importer] diff --git a/pkg/lightning/backend/local/duplicate.go b/pkg/lightning/backend/local/duplicate.go index 2e93a25abe513..f099cbc7b3135 100644 --- a/pkg/lightning/backend/local/duplicate.go +++ b/pkg/lightning/backend/local/duplicate.go @@ -1138,8 +1138,8 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table } return value, nil }, - func(ctx context.Context, key []byte) error { - err := local.deleteDuplicateRow(ctx, logger, key) + func(ctx context.Context, keys [][]byte) error { + err := local.deleteDuplicateRows(ctx, logger, keys) if err != nil { logger.Warn("delete duplicate rows encounter error", log.ShortError(err)) return common.ErrResolveDuplicateRows.Wrap(errors.Trace(err)).GenWithStackByArgs(tableName) @@ -1168,10 +1168,10 @@ func (local *DupeController) getLatestValue( return value, nil } -func (local *DupeController) deleteDuplicateRow( +func (local *DupeController) deleteDuplicateRows( ctx context.Context, logger *log.Task, - key []byte, + keys [][]byte, ) (err error) { // Starts a Delete transaction. txn, err := local.tikvCli.Begin() @@ -1188,10 +1188,15 @@ func (local *DupeController) deleteDuplicateRow( } }() - logger.Debug("deleteDuplicateRow will delete key", - zap.String("category", "resolve-dupe"), - logutil.Key("key", key)) - err = txn.Delete(key) + for _, key := range keys { + logger.Debug("deleteDuplicateRows will delete key", + zap.String("category", "resolve-dupe"), + logutil.Key("key", key)) + if err := txn.Delete(key); err != nil { + return errors.Trace(err) + } + } - return errors.Trace(err) + logger.Debug("number of KV pairs deleted", zap.String("category", "resolve-dupe"), zap.Int("count", txn.Len())) + return nil } diff --git a/pkg/lightning/config/config.go b/pkg/lightning/config/config.go index 509541a552b7e..411c82dfef389 100644 --- a/pkg/lightning/config/config.go +++ b/pkg/lightning/config/config.go @@ -595,7 +595,7 @@ const ( // ReplaceOnDup indicates using REPLACE INTO to insert data for TiDB backend. // ReplaceOnDup records all duplicate records, remove some rows with conflict // and reserve other rows that can be kept and not cause conflict anymore for local backend. - // Users need to analyze the lightning_task_info.conflict_error_v2 table to check whether the reserved data + // Users need to analyze the lightning_task_info.conflict_error_v3 table to check whether the reserved data // cater to their need and check whether they need to add back the correct rows. ReplaceOnDup // IgnoreOnDup indicates using INSERT IGNORE INTO to insert data for TiDB backend. diff --git a/pkg/lightning/errormanager/errormanager.go b/pkg/lightning/errormanager/errormanager.go index bb86304dc2f49..d62d8792fa8eb 100644 --- a/pkg/lightning/errormanager/errormanager.go +++ b/pkg/lightning/errormanager/errormanager.go @@ -19,7 +19,9 @@ import ( "context" "database/sql" "fmt" + "math" "strings" + "sync" "github.com/jedib0t/go-pretty/v6/table" "github.com/jedib0t/go-pretty/v6/text" @@ -53,7 +55,7 @@ const ( syntaxErrorTableName = "syntax_error_v1" typeErrorTableName = "type_error_v1" // ConflictErrorTableName is the table name for duplicate detection. - ConflictErrorTableName = "conflict_error_v2" + ConflictErrorTableName = "conflict_error_v3" // DupRecordTableName is the table name to record duplicate data that displayed to user. DupRecordTableName = "conflict_records" // ConflictViewName is the view name for presenting the union information of ConflictErrorTable and DupRecordTable. @@ -95,10 +97,11 @@ const ( raw_value mediumblob NOT NULL COMMENT 'the value of the conflicted key', raw_handle mediumblob NOT NULL COMMENT 'the data handle derived from the conflicted key or value', raw_row mediumblob NOT NULL COMMENT 'the data retrieved from the handle', - is_data_kv tinyint(1) NOT NULL, + kv_type tinyint(1) NOT NULL COMMENT '0 for index kv, 1 for data kv, 2 for additionally inserted data kv', INDEX (task_id, table_name), INDEX (index_name), - INDEX (table_name, index_name) + INDEX (table_name, index_name), + INDEX (kv_type) ); ` @@ -119,10 +122,10 @@ const ( createConflictView = ` CREATE OR REPLACE VIEW %s.` + ConflictViewName + ` AS SELECT 0 AS is_precheck_conflict, task_id, create_time, table_name, index_name, key_data, row_data, - raw_key, raw_value, raw_handle, raw_row, is_data_kv, NULL AS path, NULL AS offset, NULL AS error, NULL AS row_id + raw_key, raw_value, raw_handle, raw_row, kv_type, NULL AS path, NULL AS offset, NULL AS error, NULL AS row_id FROM %s.` + ConflictErrorTableName + ` UNION ALL SELECT 1 AS is_precheck_conflict, task_id, create_time, table_name, NULL AS index_name, NULL AS key_data, - row_data, NULL AS raw_key, NULL AS raw_value, NULL AS raw_handle, NULL AS raw_row, NULL AS is_data_kv, path, + row_data, NULL AS raw_key, NULL AS raw_value, NULL AS raw_handle, NULL AS raw_row, NULL AS kv_type, path, offset, error, row_id FROM %s.` + DupRecordTableName + `; ` @@ -134,7 +137,7 @@ const ( insertIntoConflictErrorData = ` INSERT INTO %s.` + ConflictErrorTableName + ` - (task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row, is_data_kv) + (task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row, kv_type) VALUES ` @@ -142,29 +145,30 @@ const ( insertIntoConflictErrorIndex = ` INSERT INTO %s.` + ConflictErrorTableName + ` - (task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row, is_data_kv) + (task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row, kv_type) VALUES ` sqlValuesConflictErrorIndex = "(?,?,?,?,?,?,?,?,?,?)" selectIndexConflictKeysReplace = ` - SELECT raw_key, index_name, raw_value, raw_handle + SELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM %s.` + ConflictErrorTableName + ` - WHERE table_name = ? AND is_data_kv = 0 - ORDER BY raw_key; + WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? + ORDER BY _tidb_rowid LIMIT ?; ` selectDataConflictKeysReplace = ` - SELECT raw_key, raw_value + SELECT _tidb_rowid, raw_key, raw_value FROM %s.` + ConflictErrorTableName + ` - WHERE table_name = ? AND is_data_kv = 1 - ORDER BY raw_key; + WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? + ORDER BY _tidb_rowid LIMIT ?; ` deleteNullDataRow = ` DELETE FROM %s.` + ConflictErrorTableName + ` - WHERE key_data = '' and row_data = ''; + WHERE kv_type = 2 + LIMIT ?; ` insertIntoDupRecord = ` @@ -484,197 +488,317 @@ func (em *ErrorManager) ReplaceConflictKeys( tableName string, pool *util.WorkerPool, fnGetLatest func(ctx context.Context, key []byte) ([]byte, error), - fnDeleteKey func(ctx context.Context, key []byte) error, + fnDeleteKeys func(ctx context.Context, key [][]byte) error, ) error { if em.db == nil { return nil } - sessionOpts := encode.SessionOptions{ - // TODO: need to find the correct value for SQLMode - SQLMode: mysql.ModeStrictAllTables, - } - encoder, err := kv.NewBaseKVEncoder(&encode.EncodingConfig{ - Table: tbl, - SessionOptions: sessionOpts, - Logger: em.logger, - }) - if err != nil { - return errors.Trace(err) - } - - g, gCtx := errgroup.WithContext(ctx) - exec := common.SQLWithRetry{ DB: em.db, Logger: em.logger, HideQueryLog: redact.NeedRedact(), } - pool.ApplyOnErrorGroup(g, func() error { - // TODO: provide a detailed document to explain the algorithm and link it here - // demo for "replace" algorithm: https://github.com/lyzx2001/tidb-conflict-replace - // check index KV - indexKvRows, err := em.db.QueryContext( - gCtx, common.SprintfWithIdentifiers(selectIndexConflictKeysReplace, em.schema), - tableName) - if err != nil { - return errors.Trace(err) - } - defer indexKvRows.Close() - for indexKvRows.Next() { - var rawKey, rawValue, rawHandle []byte - var indexName string - if err := indexKvRows.Scan(&rawKey, &indexName, &rawValue, &rawHandle); err != nil { - return errors.Trace(err) - } - em.logger.Debug("got raw_key, index_name, raw_value, raw_handle from table", - zap.Binary("raw_key", rawKey), - zap.String("index_name", indexName), - zap.Binary("raw_value", rawValue), - zap.Binary("raw_handle", rawHandle)) - - // get the latest value of rawKey from downstream TiDB - latestValue, err := fnGetLatest(gCtx, rawKey) - if tikverr.IsErrNotFound(err) { - continue + const rowLimit = 1000 + indexTaskCh := make(chan [2]int64) + indexTaskWg := &sync.WaitGroup{} + indexG, indexGCtx := errgroup.WithContext(ctx) + + go func() { + //nolint:staticcheck + //lint:ignore SA2000 + indexTaskWg.Add(1) + indexTaskCh <- [2]int64{0, math.MaxInt64} + indexTaskWg.Wait() + close(indexTaskCh) + }() + + // TODO: provide a detailed document to explain the algorithm and link it here + // demo for "replace" algorithm: https://github.com/lyzx2001/tidb-conflict-replace + // check index KV + for t := range indexTaskCh { + start, end := t[0], t[1] + pool.ApplyOnErrorGroup(indexG, func() error { + defer indexTaskWg.Done() + + sessionOpts := encode.SessionOptions{ + // TODO: need to find the correct value for SQLMode + SQLMode: mysql.ModeStrictAllTables, } + encoder, err := kv.NewBaseKVEncoder(&encode.EncodingConfig{ + Table: tbl, + SessionOptions: sessionOpts, + Logger: em.logger, + }) if err != nil { return errors.Trace(err) } - // if the latest value of rawKey equals to rawValue, that means this index KV is maintained in downstream TiDB - // if not, that means this index KV has been overwritten, and its corresponding data KV needs to be deleted - if bytes.Equal(rawValue, latestValue) { - continue - } + var handleKeys [][]byte + var insertRows [][2][]byte + for start < end { + indexKvRows, err := em.db.QueryContext( + indexGCtx, common.SprintfWithIdentifiers(selectIndexConflictKeysReplace, em.schema), + tableName, start, end, rowLimit) + if err != nil { + return errors.Trace(err) + } - // rawHandle is the row key of the data KV that needs to be deleted - // get the latest value of the row key of the data KV that needs to be deleted - overwritten, err := fnGetLatest(gCtx, rawHandle) - // if the latest value cannot be found, that means the data KV has been deleted - if tikverr.IsErrNotFound(err) { - continue - } - if err != nil { - return errors.Trace(err) - } + var lastRowID int64 + for indexKvRows.Next() { + var rawKey, rawValue, rawHandle []byte + var indexName string + if err := indexKvRows.Scan(&lastRowID, &rawKey, &indexName, &rawValue, &rawHandle); err != nil { + return errors.Trace(err) + } + em.logger.Debug("got raw_key, index_name, raw_value, raw_handle from table", + zap.Binary("raw_key", rawKey), + zap.String("index_name", indexName), + zap.Binary("raw_value", rawValue), + zap.Binary("raw_handle", rawHandle)) + + // get the latest value of rawKey from downstream TiDB + latestValue, err := fnGetLatest(indexGCtx, rawKey) + if tikverr.IsErrNotFound(err) { + continue + } + if err != nil { + return errors.Trace(err) + } - overwrittenHandle, err := tablecodec.DecodeRowKey(rawHandle) - if err != nil { - return errors.Trace(err) - } - decodedData, _, err := tables.DecodeRawRowData(encoder.SessionCtx, - tbl.Meta(), overwrittenHandle, tbl.Cols(), overwritten) - if err != nil { - return errors.Trace(err) - } - if !tbl.Meta().HasClusteredIndex() { - // for nonclustered PK, need to append handle to decodedData for AddRecord - decodedData = append(decodedData, types.NewIntDatum(overwrittenHandle.IntValue())) - } - _, err = encoder.Table.AddRecord(encoder.SessionCtx.GetTableCtx(), decodedData) - if err != nil { - return errors.Trace(err) - } + // if the latest value of rawKey equals to rawValue, that means this index KV is maintained in downstream TiDB + // if not, that means this index KV has been overwritten, and its corresponding data KV needs to be deleted + if bytes.Equal(rawValue, latestValue) { + continue + } - // find out all the KV pairs that are contained in the data KV - kvPairs := encoder.SessionCtx.TakeKvPairs() - - for _, kvPair := range kvPairs.Pairs { - em.logger.Debug("got encoded KV", - logutil.Key("key", kvPair.Key), - zap.Binary("value", kvPair.Val), - logutil.Key("rawKey", rawKey), - zap.Binary("rawValue", rawValue)) - - // If rawKey equals to KV pair's key and rawValue equals to KV pair's value, - // this latest data KV of the index KV needs to be deleted; - // if not, this latest data KV of the index KV was inserted by other rows, - // so it is unrelated to the index KV that needs to be deleted, we cannot delete it. - - // An example is: - // (pk, uk) - // (1, a) - // (1, b) - // (2, a) - - // (1, a) is overwritten by (2, a). We found a->1 is an overwritten index KV, - // and we are considering if its data KV with key "1" can be deleted. - // We got the latest value of key "1" which is (1, b), - // and encode it to get all KV pairs which is [1->b, b->1]. - // Only if there is a->1 we dare to delete data KV with key "1". - - if bytes.Equal(kvPair.Key, rawKey) && bytes.Equal(kvPair.Val, rawValue) { - if err := exec.Transact(ctx, "insert data conflict error record for conflict detection 'replace' mode", - func(c context.Context, txn *sql.Tx) error { - sb := &strings.Builder{} - _, err2 := common.FprintfWithIdentifiers(sb, insertIntoConflictErrorData, em.schema) - if err2 != nil { - return errors.Trace(err2) + // rawHandle is the row key of the data KV that needs to be deleted + // get the latest value of the row key of the data KV that needs to be deleted + overwritten, err := fnGetLatest(indexGCtx, rawHandle) + // if the latest value cannot be found, that means the data KV has been deleted + if tikverr.IsErrNotFound(err) { + continue + } + if err != nil { + return errors.Trace(err) + } + + overwrittenHandle, err := tablecodec.DecodeRowKey(rawHandle) + if err != nil { + return errors.Trace(err) + } + decodedData, _, err := tables.DecodeRawRowData(encoder.SessionCtx, + tbl.Meta(), overwrittenHandle, tbl.Cols(), overwritten) + if err != nil { + return errors.Trace(err) + } + if !tbl.Meta().HasClusteredIndex() { + // for nonclustered PK, need to append handle to decodedData for AddRecord + decodedData = append(decodedData, types.NewIntDatum(overwrittenHandle.IntValue())) + } + _, err = encoder.Table.AddRecord(encoder.SessionCtx.GetTableCtx(), decodedData) + if err != nil { + return errors.Trace(err) + } + + // find out all the KV pairs that are contained in the data KV + kvPairs := encoder.SessionCtx.TakeKvPairs() + + for _, kvPair := range kvPairs.Pairs { + em.logger.Debug("got encoded KV", + logutil.Key("key", kvPair.Key), + zap.Binary("value", kvPair.Val), + logutil.Key("rawKey", rawKey), + zap.Binary("rawValue", rawValue)) + + // If rawKey equals to KV pair's key and rawValue equals to KV pair's value, + // this latest data KV of the index KV needs to be deleted; + // if not, this latest data KV of the index KV was inserted by other rows, + // so it is unrelated to the index KV that needs to be deleted, we cannot delete it. + + // An example is: + // (pk, uk) + // (1, a) + // (1, b) + // (2, a) + + // (1, a) is overwritten by (2, a). We found a->1 is an overwritten index KV, + // and we are considering if its data KV with key "1" can be deleted. + // We got the latest value of key "1" which is (1, b), + // and encode it to get all KV pairs which is [1->b, b->1]. + // Only if there is a->1 we dare to delete data KV with key "1". + + if bytes.Equal(kvPair.Key, rawKey) && bytes.Equal(kvPair.Val, rawValue) { + handleKeys = append(handleKeys, rawHandle) + var insertRow [2][]byte + insertRow[0] = rawHandle + insertRow[1] = overwritten + insertRows = append(insertRows, insertRow) + break + } + } + } + if err := indexKvRows.Err(); err != nil { + _ = indexKvRows.Close() + return errors.Trace(err) + } + if err := indexKvRows.Close(); err != nil { + return errors.Trace(err) + } + if len(handleKeys) == 0 { + break + } + if err := fnDeleteKeys(indexGCtx, handleKeys); err != nil { + return errors.Trace(err) + } + if err := exec.Transact(ctx, "insert data conflict record for conflict detection 'replace' mode", + func(c context.Context, txn *sql.Tx) error { + sb := &strings.Builder{} + _, err2 := common.FprintfWithIdentifiers(sb, insertIntoConflictErrorData, em.schema) + if err2 != nil { + return errors.Trace(err2) + } + var sqlArgs []any + for i, insertRow := range insertRows { + if i > 0 { + sb.WriteByte(',') } - var sqlArgs []any sb.WriteString(sqlValuesConflictErrorData) sqlArgs = append(sqlArgs, em.taskID, tableName, nil, nil, - rawHandle, - overwritten, - 1, + insertRow[0], + insertRow[1], + 2, ) - _, err := txn.ExecContext(c, sb.String(), sqlArgs...) - return errors.Trace(err) - }); err != nil { - return errors.Trace(err) - } - if err := fnDeleteKey(gCtx, rawHandle); err != nil { + } + _, err := txn.ExecContext(c, sb.String(), sqlArgs...) return errors.Trace(err) + }); err != nil { + return errors.Trace(err) + } + start = lastRowID + 1 + // If the remaining tasks cannot be processed at once, split the task + // into two subtasks and send one of them to the other idle worker if possible. + if end-start > rowLimit { + mid := start + (end-start)/2 + indexTaskWg.Add(1) + select { + case indexTaskCh <- [2]int64{mid, end}: + end = mid + default: + indexTaskWg.Done() } - break } + handleKeys = handleKeys[:0] } - } - if err := indexKvRows.Err(); err != nil { - return errors.Trace(err) - } - - // check data KV - dataKvRows, err := em.db.QueryContext( - gCtx, common.SprintfWithIdentifiers(selectDataConflictKeysReplace, em.schema), - tableName) - if err != nil { - return errors.Trace(err) - } - defer dataKvRows.Close() - - var previousRawKey, latestValue []byte - var mustKeepKvPairs *kv.Pairs + return nil + }) + } + if err := indexG.Wait(); err != nil { + return errors.Trace(err) + } - for dataKvRows.Next() { - var rawKey, rawValue []byte - if err := dataKvRows.Scan(&rawKey, &rawValue); err != nil { + dataTaskCh := make(chan [2]int64) + dataTaskWg := &sync.WaitGroup{} + dataG, dataGCtx := errgroup.WithContext(ctx) + + go func() { + //nolint:staticcheck + //lint:ignore SA2000 + dataTaskWg.Add(1) + dataTaskCh <- [2]int64{0, math.MaxInt64} + dataTaskWg.Wait() + close(dataTaskCh) + }() + + // check data KV + for t := range dataTaskCh { + start, end := t[0], t[1] + pool.ApplyOnErrorGroup(dataG, func() error { + defer dataTaskWg.Done() + + sessionOpts := encode.SessionOptions{ + // TODO: need to find the correct value for SQLMode + SQLMode: mysql.ModeStrictAllTables, + } + encoder, err := kv.NewBaseKVEncoder(&encode.EncodingConfig{ + Table: tbl, + SessionOptions: sessionOpts, + Logger: em.logger, + }) + if err != nil { return errors.Trace(err) } - em.logger.Debug("got group raw_key, raw_value from table", - logutil.Key("raw_key", rawKey), - zap.Binary("raw_value", rawValue)) - - if !bytes.Equal(rawKey, previousRawKey) { - previousRawKey = rawKey - // get the latest value of rawKey from downstream TiDB - latestValue, err = fnGetLatest(gCtx, rawKey) - if err != nil && !tikverr.IsErrNotFound(err) { + + var handleKeys [][]byte + for start < end { + dataKvRows, err := em.db.QueryContext( + dataGCtx, common.SprintfWithIdentifiers(selectDataConflictKeysReplace, em.schema), + tableName, start, end, rowLimit) + if err != nil { return errors.Trace(err) } - if latestValue != nil { + + var lastRowID int64 + var previousRawKey, latestValue []byte + var mustKeepKvPairs *kv.Pairs + + for dataKvRows.Next() { + var rawKey, rawValue []byte + if err := dataKvRows.Scan(&lastRowID, &rawKey, &rawValue); err != nil { + return errors.Trace(err) + } + em.logger.Debug("got group raw_key, raw_value from table", + logutil.Key("raw_key", rawKey), + zap.Binary("raw_value", rawValue)) + + if !bytes.Equal(rawKey, previousRawKey) { + previousRawKey = rawKey + // get the latest value of rawKey from downstream TiDB + latestValue, err = fnGetLatest(dataGCtx, rawKey) + if err != nil && !tikverr.IsErrNotFound(err) { + return errors.Trace(err) + } + if latestValue != nil { + handle, err := tablecodec.DecodeRowKey(rawKey) + if err != nil { + return errors.Trace(err) + } + decodedData, _, err := tables.DecodeRawRowData(encoder.SessionCtx, + tbl.Meta(), handle, tbl.Cols(), latestValue) + if err != nil { + return errors.Trace(err) + } + if !tbl.Meta().HasClusteredIndex() { + // for nonclustered PK, need to append handle to decodedData for AddRecord + decodedData = append(decodedData, types.NewIntDatum(handle.IntValue())) + } + _, err = encoder.Table.AddRecord(encoder.SessionCtx.GetTableCtx(), decodedData) + if err != nil { + return errors.Trace(err) + } + // calculate the new mustKeepKvPairs corresponding to the new rawKey + // find out all the KV pairs that are contained in the data KV + mustKeepKvPairs = encoder.SessionCtx.TakeKvPairs() + } + } + + // if the latest value of rawKey equals to rawValue, that means this data KV is maintained in downstream TiDB + // if not, that means this data KV has been deleted due to overwritten index KV + if bytes.Equal(rawValue, latestValue) { + continue + } + handle, err := tablecodec.DecodeRowKey(rawKey) if err != nil { return errors.Trace(err) } decodedData, _, err := tables.DecodeRawRowData(encoder.SessionCtx, - tbl.Meta(), handle, tbl.Cols(), latestValue) + tbl.Meta(), handle, tbl.Cols(), rawValue) if err != nil { return errors.Trace(err) } @@ -686,76 +810,78 @@ func (em *ErrorManager) ReplaceConflictKeys( if err != nil { return errors.Trace(err) } - // calculate the new mustKeepKvPairs corresponding to the new rawKey - // find out all the KV pairs that are contained in the data KV - mustKeepKvPairs = encoder.SessionCtx.TakeKvPairs() - } - } - - // if the latest value of rawKey equals to rawValue, that means this data KV is maintained in downstream TiDB - // if not, that means this data KV has been deleted due to overwritten index KV - if bytes.Equal(rawValue, latestValue) { - continue - } - handle, err := tablecodec.DecodeRowKey(rawKey) - if err != nil { - return errors.Trace(err) - } - decodedData, _, err := tables.DecodeRawRowData(encoder.SessionCtx, - tbl.Meta(), handle, tbl.Cols(), rawValue) - if err != nil { - return errors.Trace(err) - } - if !tbl.Meta().HasClusteredIndex() { - // for nonclustered PK, need to append handle to decodedData for AddRecord - decodedData = append(decodedData, types.NewIntDatum(handle.IntValue())) - } - _, err = encoder.Table.AddRecord(encoder.SessionCtx.GetTableCtx(), decodedData) - if err != nil { - return errors.Trace(err) - } + // find out all the KV pairs that are contained in the data KV + kvPairs := encoder.SessionCtx.TakeKvPairs() + for _, kvPair := range kvPairs.Pairs { + em.logger.Debug("got encoded KV", + logutil.Key("key", kvPair.Key), + zap.Binary("value", kvPair.Val)) + kvLatestValue, err := fnGetLatest(dataGCtx, kvPair.Key) + if tikverr.IsErrNotFound(err) { + continue + } + if err != nil { + return errors.Trace(err) + } + + // if the value of the KV pair is not equal to the latest value of the key of the KV pair + // that means the value of the KV pair has been overwritten, so it needs no extra operation + if !bytes.Equal(kvLatestValue, kvPair.Val) { + continue + } + + // if the KV pair is contained in mustKeepKvPairs, we cannot delete it + // if not, delete the KV pair + if mustKeepKvPairs != nil { + isContained := slices.ContainsFunc(mustKeepKvPairs.Pairs, func(mustKeepKvPair common.KvPair) bool { + return bytes.Equal(mustKeepKvPair.Key, kvPair.Key) && bytes.Equal(mustKeepKvPair.Val, kvPair.Val) + }) + if isContained { + continue + } + } - // find out all the KV pairs that are contained in the data KV - kvPairs := encoder.SessionCtx.TakeKvPairs() - for _, kvPair := range kvPairs.Pairs { - em.logger.Debug("got encoded KV", - logutil.Key("key", kvPair.Key), - zap.Binary("value", kvPair.Val)) - kvLatestValue, err := fnGetLatest(gCtx, kvPair.Key) - if tikverr.IsErrNotFound(err) { - continue + handleKeys = append(handleKeys, kvPair.Key) + } } - if err != nil { + if err := dataKvRows.Err(); err != nil { + _ = dataKvRows.Close() return errors.Trace(err) } - - // if the value of the KV pair is not equal to the latest value of the key of the KV pair - // that means the value of the KV pair has been overwritten, so it needs no extra operation - if !bytes.Equal(kvLatestValue, kvPair.Val) { - continue + if err := dataKvRows.Close(); err != nil { + return errors.Trace(err) } - - // if the KV pair is contained in mustKeepKvPairs, we cannot delete it - // if not, delete the KV pair - if mustKeepKvPairs != nil { - isContained := slices.ContainsFunc(mustKeepKvPairs.Pairs, func(mustKeepKvPair common.KvPair) bool { - return bytes.Equal(mustKeepKvPair.Key, kvPair.Key) && bytes.Equal(mustKeepKvPair.Val, kvPair.Val) - }) - if isContained { - continue - } + if len(handleKeys) == 0 { + break } - - if err := fnDeleteKey(gCtx, kvPair.Key); err != nil { + if err := fnDeleteKeys(dataGCtx, handleKeys); err != nil { return errors.Trace(err) } + start = lastRowID + 1 + // If the remaining tasks cannot be processed at once, split the task + // into two subtasks and send one of them to the other idle worker if possible. + if end-start > rowLimit { + mid := start + (end-start)/2 + dataTaskWg.Add(1) + select { + case dataTaskCh <- [2]int64{mid, end}: + end = mid + default: + dataTaskWg.Done() + } + } + handleKeys = handleKeys[:0] } - } - if err := dataKvRows.Err(); err != nil { - return errors.Trace(err) - } + return nil + }) + } + if err := dataG.Wait(); err != nil { + return errors.Trace(err) + } + hasRow := true + for { // delete the additionally inserted rows for nonclustered PK if err := exec.Transact(ctx, "delete additionally inserted rows for conflict detection 'replace' mode", func(c context.Context, txn *sql.Tx) error { @@ -764,16 +890,27 @@ func (em *ErrorManager) ReplaceConflictKeys( if err2 != nil { return errors.Trace(err2) } - _, err := txn.ExecContext(c, sb.String()) - return errors.Trace(err) + result, err := txn.ExecContext(c, sb.String(), rowLimit) + if err != nil { + return errors.Trace(err) + } + affected, err := result.RowsAffected() + if err != nil { + return errors.Trace(err) + } + if affected == 0 { + hasRow = false + } + return nil }); err != nil { return errors.Trace(err) } + if !hasRow { + break + } + } - return nil - }) - - return errors.Trace(g.Wait()) + return nil } // RecordDuplicateCount reduce the counter of "duplicate entry" errors. diff --git a/pkg/lightning/errormanager/errormanager_test.go b/pkg/lightning/errormanager/errormanager_test.go index 2b79b4eb54531..0008ea32a037a 100644 --- a/pkg/lightning/errormanager/errormanager_test.go +++ b/pkg/lightning/errormanager/errormanager_test.go @@ -65,7 +65,7 @@ func TestInit(t *testing.T) { em.conflictV1Enabled = true mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`;"). WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v2.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v3.*"). WillReturnResult(sqlmock.NewResult(2, 1)) err = em.Init(ctx) require.NoError(t, err) @@ -77,7 +77,7 @@ func TestInit(t *testing.T) { WillReturnResult(sqlmock.NewResult(5, 1)) mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v1.*"). WillReturnResult(sqlmock.NewResult(6, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v2.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v3.*"). WillReturnResult(sqlmock.NewResult(7, 1)) mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_records.*"). WillReturnResult(sqlmock.NewResult(7, 1)) @@ -286,17 +286,21 @@ func TestReplaceConflictOneKey(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v2.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). WillReturnResult(sqlmock.NewResult(2, 1)) - mockDB.ExpectQuery("\\QSELECT raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 0 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "index_name", "raw_value", "raw_handle"})) - mockDB.ExpectQuery("\\QSELECT raw_key, raw_value FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 1 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "raw_value"}). - AddRow(data1RowKey, data1RowValue). - AddRow(data1RowKey, data2RowValue)) + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + AddRow(1, data1RowKey, data1RowValue). + AddRow(2, data1RowKey, data2RowValue)) + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + } mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v2.*"). - WillReturnResult(driver.ResultNoRows) + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() cfg := config.NewConfig() @@ -323,10 +327,12 @@ func TestReplaceConflictOneKey(t *testing.T) { return nil, fmt.Errorf("key %v is not expected", key) } }, - func(ctx context.Context, key []byte) error { - fnDeleteKeyCount.Add(1) - if !bytes.Equal(key, data1IndexKey) { - return fmt.Errorf("key %v is not expected", key) + func(ctx context.Context, keys [][]byte) error { + fnDeleteKeyCount.Add(int32(len(keys))) + for _, key := range keys { + if !bytes.Equal(key, data1IndexKey) { + return fmt.Errorf("key %v is not expected", key) + } } return nil }, @@ -477,31 +483,39 @@ func TestReplaceConflictOneUniqueKey(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v2.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). WillReturnResult(sqlmock.NewResult(2, 1)) - mockDB.ExpectQuery("\\QSELECT raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 0 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "index_name", "raw_value", "raw_handle"}). - AddRow(data1IndexKey, "uni_b", data1IndexValue, data1RowKey). - AddRow(data1IndexKey, "uni_b", data2IndexValue, data2RowKey). - AddRow(data3IndexKey, "uni_b", data3IndexValue, data3RowKey). - AddRow(data3IndexKey, "uni_b", data4IndexValue, data4RowKey)) + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}). + AddRow(1, data1IndexKey, "uni_b", data1IndexValue, data1RowKey). + AddRow(2, data1IndexKey, "uni_b", data2IndexValue, data2RowKey). + AddRow(3, data3IndexKey, "uni_b", data3IndexValue, data3RowKey). + AddRow(4, data3IndexKey, "uni_b", data4IndexValue, data4RowKey)) mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "test", nil, nil, data2RowKey, data2RowValue, 1). + mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v3.*"). + WithArgs(0, "test", nil, nil, data2RowKey, data2RowValue, 2, + 0, "test", nil, nil, data4RowKey, data4RowValue, 2). WillReturnResult(driver.ResultNoRows) mockDB.ExpectCommit() + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + } + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + AddRow(1, data1RowKey, data1RowValue). + AddRow(2, data1RowKey, data3RowValue)) + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + } mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "test", nil, nil, data4RowKey, data4RowValue, 1). - WillReturnResult(driver.ResultNoRows) + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 2)) mockDB.ExpectCommit() - mockDB.ExpectQuery("\\QSELECT raw_key, raw_value FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 1 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "raw_value"}). - AddRow(data1RowKey, data1RowValue). - AddRow(data1RowKey, data3RowValue)) mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v2.*"). - WillReturnResult(driver.ResultNoRows) + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() cfg := config.NewConfig() @@ -534,10 +548,12 @@ func TestReplaceConflictOneUniqueKey(t *testing.T) { return nil, fmt.Errorf("key %v is not expected", key) } }, - func(ctx context.Context, key []byte) error { - fnDeleteKeyCount.Add(1) - if !bytes.Equal(key, data1IndexKey) && !bytes.Equal(key, data2RowKey) && !bytes.Equal(key, data4RowKey) { - return fmt.Errorf("key %v is not expected", key) + func(ctx context.Context, keys [][]byte) error { + fnDeleteKeyCount.Add(int32(len(keys))) + for _, key := range keys { + if !bytes.Equal(key, data1IndexKey) && !bytes.Equal(key, data2RowKey) && !bytes.Equal(key, data4RowKey) { + return fmt.Errorf("key %v is not expected", key) + } } return nil }, @@ -653,7 +669,7 @@ func TestErrorMgrErrorOutput(t *testing.T) { "|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m|\n" + "|\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|\n" + "|\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m|\n" + - "|\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_error_v2` \x1b[0m|\n" + + "|\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_error_v3` \x1b[0m|\n" + "+---+---------------------+-------------+----------------------------------+\n" require.Equal(t, expected, output) diff --git a/pkg/lightning/errormanager/resolveconflict_test.go b/pkg/lightning/errormanager/resolveconflict_test.go index 4d09217f3f3b0..11020f005f0e9 100644 --- a/pkg/lightning/errormanager/resolveconflict_test.go +++ b/pkg/lightning/errormanager/resolveconflict_test.go @@ -168,31 +168,39 @@ func TestReplaceConflictMultipleKeysNonclusteredPk(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v2.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). WillReturnResult(sqlmock.NewResult(2, 1)) - mockDB.ExpectQuery("\\QSELECT raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 0 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "index_name", "raw_value", "raw_handle"}). - AddRow(data2RowKey, "PRIMARY", data2RowValue, data1RowKey). - AddRow(data2RowKey, "PRIMARY", data3NonclusteredValue, data2NonclusteredKey). - AddRow(data6RowKey, "PRIMARY", data6RowValue, data5RowKey). - AddRow(data6RowKey, "PRIMARY", data7NonclusteredValue, data6NonclusteredKey)) + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}). + AddRow(1, data2RowKey, "PRIMARY", data2RowValue, data1RowKey). + AddRow(2, data2RowKey, "PRIMARY", data3NonclusteredValue, data2NonclusteredKey). + AddRow(3, data6RowKey, "PRIMARY", data6RowValue, data5RowKey). + AddRow(4, data6RowKey, "PRIMARY", data7NonclusteredValue, data6NonclusteredKey)) mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "a", nil, nil, data2NonclusteredKey, data2NonclusteredValue, 1). + mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v3.*"). + WithArgs(0, "a", nil, nil, data2NonclusteredKey, data2NonclusteredValue, 2, + 0, "a", nil, nil, data6NonclusteredKey, data6NonclusteredValue, 2). WillReturnResult(driver.ResultNoRows) mockDB.ExpectCommit() + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + } + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + AddRow(1, data2NonclusteredKey, data2NonclusteredValue). + AddRow(2, data6NonclusteredKey, data6NonclusteredValue)) + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + } mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "a", nil, nil, data6NonclusteredKey, data6NonclusteredValue, 1). - WillReturnResult(driver.ResultNoRows) + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 2)) mockDB.ExpectCommit() - mockDB.ExpectQuery("\\QSELECT raw_key, raw_value FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 1 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "raw_value"}). - AddRow(data2NonclusteredKey, data2NonclusteredValue). - AddRow(data6NonclusteredKey, data6NonclusteredValue)) mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v2.*"). - WillReturnResult(driver.ResultNoRows) + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() cfg := config.NewConfig() @@ -237,10 +245,12 @@ func TestReplaceConflictMultipleKeysNonclusteredPk(t *testing.T) { return nil, fmt.Errorf("key %v is not expected", key) } }, - func(ctx context.Context, key []byte) error { - fnDeleteKeyCount.Add(1) - if !bytes.Equal(key, data2NonclusteredKey) && !bytes.Equal(key, data6NonclusteredKey) && !bytes.Equal(key, data2IndexKey) && !bytes.Equal(key, data3RowKey) && !bytes.Equal(key, data6IndexKey) && !bytes.Equal(key, data7RowKey) { - return fmt.Errorf("key %v is not expected", key) + func(ctx context.Context, keys [][]byte) error { + fnDeleteKeyCount.Add(int32(len(keys))) + for _, key := range keys { + if !bytes.Equal(key, data2NonclusteredKey) && !bytes.Equal(key, data6NonclusteredKey) && !bytes.Equal(key, data2IndexKey) && !bytes.Equal(key, data3RowKey) && !bytes.Equal(key, data6IndexKey) && !bytes.Equal(key, data7RowKey) { + return fmt.Errorf("key %v is not expected", key) + } } return nil }, @@ -342,23 +352,35 @@ func TestReplaceConflictOneKeyNonclusteredPk(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v2.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). WillReturnResult(sqlmock.NewResult(2, 1)) - mockDB.ExpectQuery("\\QSELECT raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 0 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "index_name", "raw_value", "raw_handle"}). - AddRow(data3IndexKey, "PRIMARY", data3IndexValue, data3RowKey). - AddRow(data3IndexKey, "PRIMARY", data4IndexValue, data4RowKey)) + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}). + AddRow(1, data3IndexKey, "PRIMARY", data3IndexValue, data3RowKey). + AddRow(2, data3IndexKey, "PRIMARY", data4IndexValue, data4RowKey)) mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "a", nil, nil, data4RowKey, data4RowValue, 1). + mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v3.*"). + WithArgs(0, "a", nil, nil, data4RowKey, data4RowValue, 2). WillReturnResult(driver.ResultNoRows) mockDB.ExpectCommit() - mockDB.ExpectQuery("\\QSELECT raw_key, raw_value FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 1 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "raw_value"}). - AddRow(data4RowKey, data4RowValue)) + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + } + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + AddRow(1, data4RowKey, data4RowValue)) + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + } + mockDB.ExpectBegin() + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mockDB.ExpectCommit() mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v2.*"). - WillReturnResult(driver.ResultNoRows) + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() cfg := config.NewConfig() @@ -390,10 +412,12 @@ func TestReplaceConflictOneKeyNonclusteredPk(t *testing.T) { return nil, fmt.Errorf("key %v is not expected", key) } }, - func(ctx context.Context, key []byte) error { - fnDeleteKeyCount.Add(1) - if !bytes.Equal(key, data4RowKey) && !bytes.Equal(key, data4NonclusteredKey) { - return fmt.Errorf("key %v is not expected", key) + func(ctx context.Context, keys [][]byte) error { + fnDeleteKeyCount.Add(int32(len(keys))) + for _, key := range keys { + if !bytes.Equal(key, data4RowKey) && !bytes.Equal(key, data4NonclusteredKey) { + return fmt.Errorf("key %v is not expected", key) + } } return nil }, @@ -509,39 +533,43 @@ func TestReplaceConflictOneUniqueKeyNonclusteredPk(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v2.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). WillReturnResult(sqlmock.NewResult(2, 1)) - mockDB.ExpectQuery("\\QSELECT raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 0 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "index_name", "raw_value", "raw_handle"}). - AddRow(data4NonclusteredKey, "uni_b", data4NonclusteredValue, data4RowKey). - AddRow(data4NonclusteredKey, "uni_b", data5NonclusteredValue, data5RowKey). - AddRow(data1NonclusteredKey, "uni_b", data1NonclusteredValue, data1RowKey). - AddRow(data1NonclusteredKey, "uni_b", data2NonclusteredValue, data2RowKey). - AddRow(data3IndexKey, "PRIMARY", data3IndexValue, data3RowKey). - AddRow(data3IndexKey, "PRIMARY", data4NonclusteredValue, data4RowKey)) - mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "a", nil, nil, data5RowKey, data5RowValue, 1). - WillReturnResult(driver.ResultNoRows) - mockDB.ExpectCommit() + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}). + AddRow(1, data4NonclusteredKey, "uni_b", data4NonclusteredValue, data4RowKey). + AddRow(2, data4NonclusteredKey, "uni_b", data5NonclusteredValue, data5RowKey). + AddRow(3, data1NonclusteredKey, "uni_b", data1NonclusteredValue, data1RowKey). + AddRow(4, data1NonclusteredKey, "uni_b", data2NonclusteredValue, data2RowKey). + AddRow(5, data3IndexKey, "PRIMARY", data3IndexValue, data3RowKey). + AddRow(6, data3IndexKey, "PRIMARY", data4NonclusteredValue, data4RowKey)) mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "a", nil, nil, data2RowKey, data2RowValue, 1). + mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v3.*"). + WithArgs(0, "a", nil, nil, data5RowKey, data5RowValue, 2, + 0, "a", nil, nil, data2RowKey, data2RowValue, 2, + 0, "a", nil, nil, data4RowKey, data4RowValue, 2). WillReturnResult(driver.ResultNoRows) mockDB.ExpectCommit() + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + } + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + AddRow(1, data5RowKey, data5RowValue). + AddRow(2, data2RowKey, data2RowValue). + AddRow(3, data4RowKey, data4RowValue)) + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + } mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "a", nil, nil, data4RowKey, data4RowValue, 1). - WillReturnResult(driver.ResultNoRows) + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 3)) mockDB.ExpectCommit() - mockDB.ExpectQuery("\\QSELECT raw_key, raw_value FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 1 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "raw_value"}). - AddRow(data5RowKey, data5RowValue). - AddRow(data2RowKey, data2RowValue). - AddRow(data4RowKey, data4RowValue)) mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v2.*"). - WillReturnResult(driver.ResultNoRows) + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() cfg := config.NewConfig() @@ -589,10 +617,12 @@ func TestReplaceConflictOneUniqueKeyNonclusteredPk(t *testing.T) { return nil, fmt.Errorf("key %x is not expected", key) } }, - func(ctx context.Context, key []byte) error { - fnDeleteKeyCount.Add(1) - if !bytes.Equal(key, data5RowKey) && !bytes.Equal(key, data2RowKey) && !bytes.Equal(key, data4RowKey) && !bytes.Equal(key, data2IndexKey) && !bytes.Equal(key, data4NonclusteredKey) && !bytes.Equal(key, data5IndexKey) { - return fmt.Errorf("key %v is not expected", key) + func(ctx context.Context, keys [][]byte) error { + fnDeleteKeyCount.Add(int32(len(keys))) + for _, key := range keys { + if !bytes.Equal(key, data5RowKey) && !bytes.Equal(key, data2RowKey) && !bytes.Equal(key, data4RowKey) && !bytes.Equal(key, data2IndexKey) && !bytes.Equal(key, data4NonclusteredKey) && !bytes.Equal(key, data5IndexKey) { + return fmt.Errorf("key %v is not expected", key) + } } return nil }, @@ -709,39 +739,43 @@ func TestReplaceConflictOneUniqueKeyNonclusteredVarcharPk(t *testing.T) { mockDB.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_task_info`"). WillReturnResult(sqlmock.NewResult(1, 1)) - mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v2.*"). + mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v3.*"). WillReturnResult(sqlmock.NewResult(2, 1)) - mockDB.ExpectQuery("\\QSELECT raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 0 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "index_name", "raw_value", "raw_handle"}). - AddRow(data4NonclusteredKey, "uni_b", data4NonclusteredValue, data4RowKey). - AddRow(data4NonclusteredKey, "uni_b", data5NonclusteredValue, data5RowKey). - AddRow(data1NonclusteredKey, "uni_b", data1NonclusteredValue, data1RowKey). - AddRow(data1NonclusteredKey, "uni_b", data2NonclusteredValue, data2RowKey). - AddRow(data3IndexKey, "PRIMARY", data3IndexValue, data3RowKey). - AddRow(data3IndexKey, "PRIMARY", data4IndexValue, data4RowKey)) + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}). + AddRow(1, data4NonclusteredKey, "uni_b", data4NonclusteredValue, data4RowKey). + AddRow(2, data4NonclusteredKey, "uni_b", data5NonclusteredValue, data5RowKey). + AddRow(3, data1NonclusteredKey, "uni_b", data1NonclusteredValue, data1RowKey). + AddRow(4, data1NonclusteredKey, "uni_b", data2NonclusteredValue, data2RowKey). + AddRow(5, data3IndexKey, "PRIMARY", data3IndexValue, data3RowKey). + AddRow(6, data3IndexKey, "PRIMARY", data4IndexValue, data4RowKey)) mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "a", nil, nil, data5RowKey, data5RowValue, 1). - WillReturnResult(driver.ResultNoRows) - mockDB.ExpectCommit() - mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "a", nil, nil, data2RowKey, data2RowValue, 1). + mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v3.*"). + WithArgs(0, "a", nil, nil, data5RowKey, data5RowValue, 2, + 0, "a", nil, nil, data2RowKey, data2RowValue, 2, + 0, "a", nil, nil, data4RowKey, data4RowValue, 2). WillReturnResult(driver.ResultNoRows) mockDB.ExpectCommit() + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"})) + } + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}). + AddRow(1, data5RowKey, data5RowValue). + AddRow(2, data2RowKey, data2RowValue). + AddRow(3, data4RowKey, data4RowValue)) + for i := 0; i < 2; i++ { + mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E"). + WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"})) + } mockDB.ExpectBegin() - mockDB.ExpectExec("INSERT INTO `lightning_task_info`\\.conflict_error_v2.*"). - WithArgs(0, "a", nil, nil, data4RowKey, data4RowValue, 1). - WillReturnResult(driver.ResultNoRows) + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 3)) mockDB.ExpectCommit() - mockDB.ExpectQuery("\\QSELECT raw_key, raw_value FROM `lightning_task_info`.conflict_error_v2 WHERE table_name = ? AND is_data_kv = 1 ORDER BY raw_key\\E"). - WillReturnRows(sqlmock.NewRows([]string{"raw_key", "raw_value"}). - AddRow(data5RowKey, data5RowValue). - AddRow(data2RowKey, data2RowValue). - AddRow(data4RowKey, data4RowValue)) mockDB.ExpectBegin() - mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v2.*"). - WillReturnResult(driver.ResultNoRows) + mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*"). + WillReturnResult(sqlmock.NewResult(0, 0)) mockDB.ExpectCommit() cfg := config.NewConfig() @@ -792,10 +826,12 @@ func TestReplaceConflictOneUniqueKeyNonclusteredVarcharPk(t *testing.T) { return nil, fmt.Errorf("key %x is not expected", key) } }, - func(ctx context.Context, key []byte) error { - fnDeleteKeyCount.Add(1) - if !bytes.Equal(key, data5RowKey) && !bytes.Equal(key, data2RowKey) && !bytes.Equal(key, data4RowKey) && !bytes.Equal(key, data2IndexKey) && !bytes.Equal(key, data4NonclusteredKey) && !bytes.Equal(key, data5IndexKey) { - return fmt.Errorf("key %v is not expected", key) + func(ctx context.Context, keys [][]byte) error { + fnDeleteKeyCount.Add(int32(len(keys))) + for _, key := range keys { + if !bytes.Equal(key, data5RowKey) && !bytes.Equal(key, data2RowKey) && !bytes.Equal(key, data4RowKey) && !bytes.Equal(key, data2IndexKey) && !bytes.Equal(key, data4NonclusteredKey) && !bytes.Equal(key, data5IndexKey) { + return fmt.Errorf("key %v is not expected", key) + } } return nil },