Skip to content

Commit

Permalink
lightning: performance optimization for post-import conflict detection (
Browse files Browse the repository at this point in the history
#52656)

close #52306
  • Loading branch information
lyzx2001 authored Apr 29, 2024
1 parent 20454c4 commit ff4784e
Show file tree
Hide file tree
Showing 14 changed files with 595 additions and 403 deletions.
12 changes: 6 additions & 6 deletions lightning/tests/lightning_config_max_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ duplicated_row_count=$(( ${total_row_count} - ${uniq_row_count} ))
remaining_row_count=$(( ${uniq_row_count} + ${duplicated_row_count}/2 ))

run_sql 'DROP TABLE IF EXISTS mytest.testtbl'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'

stderr_file="/tmp/${TEST_NAME}.stderr"

Expand All @@ -46,7 +46,7 @@ EOF
cat "${stderr_file}"
grep -q "${err_msg}" "${stderr_file}"

run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v2'
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3'
# Although conflict error number exceeds the max-error limit,
# all the conflict errors are recorded,
# because recording of conflict errors are executed batch by batch (batch size 1024),
Expand All @@ -56,11 +56,11 @@ check_contains "COUNT(*): ${duplicated_row_count}"
# import a second time

run_sql 'DROP TABLE IF EXISTS mytest.testtbl'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'

run_lightning --backend local --config "${mydir}/normal_config.toml"

run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v2'
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3'
check_contains "COUNT(*): ${duplicated_row_count}"

# Check remaining records in the target table
Expand All @@ -70,11 +70,11 @@ check_contains "COUNT(*): ${remaining_row_count}"
# import a third time

run_sql 'DROP TABLE IF EXISTS mytest.testtbl'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'

run_lightning --backend local --config "${mydir}/normal_config_old_style.toml"

run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v2'
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v3'
check_contains "COUNT(*): ${duplicated_row_count}"

# Check remaining records in the target table
Expand Down
2 changes: 1 addition & 1 deletion lightning/tests/lightning_duplicate_detection/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ verify_detected_rows() {
done
done
mapfile -t expect_rows < <(for row in "${expect_rows[@]}"; do echo "$row"; done | sort | uniq)
mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v2 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" |
mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v3 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" |
grep "row_data:" | sed 's/^.*(//' | sed 's/).*$//' | sed 's/"//g' | sed 's/, */,/g' | sort | uniq)
equal=0
if [ "${#actual_rows[@]}" = "${#expect_rows[@]}" ]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0
mydir=$(dirname "${BASH_SOURCE[0]}")

run_sql 'DROP TABLE IF EXISTS dup_resolve.a'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'

! run_lightning --backend local --config "${mydir}/config.toml"
[ $? -eq 0 ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0
mydir=$(dirname "${BASH_SOURCE[0]}")

run_sql 'DROP TABLE IF EXISTS dup_resolve.a'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'

! run_lightning --backend local --config "${mydir}/config.toml"
[ $? -eq 0 ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0
mydir=$(dirname "${BASH_SOURCE[0]}")

run_sql 'DROP TABLE IF EXISTS dup_resolve.a'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'

! run_lightning --backend local --config "${mydir}/config.toml"
[ $? -eq 0 ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ check_cluster_version 5 2 0 'duplicate detection' || exit 0
mydir=$(dirname "${BASH_SOURCE[0]}")

run_sql 'DROP TABLE IF EXISTS dup_resolve.a'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2'
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v3'

! run_lightning --backend local --config "${mydir}/config.toml"
[ $? -eq 0 ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ check_contains 'count(*): 10'
run_sql 'select count(*) from lightning_task_info.conflict_records'
check_contains 'count(*): 16'

run_sql 'select count(*) from lightning_task_info.conflict_error_v2'
run_sql 'select count(*) from lightning_task_info.conflict_error_v3'
check_contains 'count(*): 4'

run_sql 'select count(*) from lightning_task_info.conflict_view'
Expand Down
2 changes: 1 addition & 1 deletion lightning/tests/lightning_issue_40657/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ run_lightning -d "$CUR/data1"
run_sql 'admin check table test.t'
run_sql 'select count(*) from test.t'
check_contains 'count(*): 4'
run_sql 'select count(*) from lightning_task_info.conflict_error_v2'
run_sql 'select count(*) from lightning_task_info.conflict_error_v3'
check_contains 'count(*): 2'

run_sql 'truncate table test.t'
Expand Down
28 changes: 13 additions & 15 deletions lightning/tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,24 @@ driver = "file"
# - origin. keep the checkpoints data unchanged.
#keep-after-success = "remove"


[conflict]
# Starting from v7.3.0, a new version of strategy is introduced to handle conflicting data. The default value is "". Starting from v8.0.0, TiDB Lightning optimizes the conflict strategy for both physical and logical import modes (experimental).
# - "": In the physical import mode, TiDB Lightning does not detect or handle conflicting data. If the source file contains conflicting primary or unique key records, the subsequent step reports an error. In the logical import mode, TiDB Lightning converts the "" strategy to the "error" strategy for processing.
# - "error": When detecting conflicting primary or unique key records in the imported data, TiDB Lightning terminates the import and reports an error.
# - "replace": When encountering conflicting primary or unique key records, TiDB Lightning retains the latest data and overwrites the old data.
# The conflicting data are recorded in the `lightning_task_info.conflict_error_v2` table (recording conflicting data detected by post-import conflict detection in the physical import mode)
# and the `conflict_records` table (recording conflicting data detected by preprocess conflict detection in both logical and physical import modes) of the target TiDB cluster.
# If you turn on both preprocess and post-import conflict detection in physical import mode, the conflicting data can be checked in `lightning_task_info.conflict_view` view.
# Starting from v7.3.0, a new version of strategy is introduced to handle conflicting data. The default value is "". Starting from v8.0.0, TiDB Lightning optimizes the conflict strategy for both physical and logical import modes.
# - "": in the physical import mode, TiDB Lightning does not detect or handle conflicting data. If the source file contains conflicting primary or unique key records, the subsequent step reports an error. In the logical import mode, TiDB Lightning converts the "" strategy to the "error" strategy for processing.
# - "error": when detecting conflicting primary or unique key records in the imported data, TiDB Lightning terminates the import and reports an error.
# - "replace": when encountering conflicting primary or unique key records, TiDB Lightning retains the latest data and overwrites the old data.
# The conflicting data are recorded in the `lightning_task_info.conflict_error_v2` table (recording conflicting data detected by post-import conflict detection in the physical import mode) and the `conflict_records` table (recording conflicting data detected by preprocess conflict detection in both logical and physical import modes) of the target TiDB cluster.
# If you set `conflict.strategy = "replace"` in physical import mode, the conflicting data can be checked in the `lightning_task_info.conflict_view` view.
# You can manually insert the correct records into the target table based on your application requirements. Note that the target TiKV must be v5.2.0 or later versions.
# - "ignore": When encountering conflicting primary or unique key records, TiDB Lightning retains the old data and ignores the new data. This option can only be used in the logical import mode.
# - "ignore": when encountering conflicting primary or unique key records, TiDB Lightning retains the old data and ignores the new data. This option can only be used in the logical import mode.
strategy = ""
# Controls whether to enable preprocess conflict detection, which check conflicts in the data before importing it to TiDB. In scenarios where the ratio of conflict records is greater than or equal to 1%, it is recommended to enable preprocess conflict detection for better performance in conflict detection.
# In other scenarios, it is recommended to disable it. The default value is false, indicating that TiDB Lightning only checks conflicts after the import. If you set it to true, TiDB Lightning checks conflicts both before and after the import. This parameter is experimental, and it can be used only in the physical import mode.
# Controls whether to enable preprocess conflict detection, which checks conflicts in data before importing it to TiDB. The default value is false, indicating that TiDB Lightning only checks conflicts after the import. If you set it to true, TiDB Lightning checks conflicts both before and after the import. This parameter can be used only in the physical import mode. It is not recommended to set `precheck-conflict-before-import = true` for now.
# precheck-conflict-before-import = false
# Controls the maximum number of conflict errors that can be handled when strategy is "replace" or "ignore". You can set it only when strategy is "replace" or "ignore". The default value is 10000.
# Controls the maximum number of conflict errors that can be handled when strategy is "replace" or "ignore". You can set it only when strategy is "replace" or "ignore". The default value is 10000. If you set the value larger than 10000, it is possible that the import will have performance degradation or fail due to potential errors.
# threshold = 10000
# Controls the maximum number of records in the `conflict_records` table. The default value is 10000. In the physical import mode, if the strategy is "replace", the conflict records that are overwritten are recorded.
# In the logical import mode, if the strategy is "ignore", the conflict records that are ignored are recorded; if the strategy is "replace", the conflict records can not be recorded.
# Starting from v8.1.0, max-record-rows will be assigned the value of threshold, regardless the user input. max-record-rows will be deprecated in the future.
# Controls the maximum number of records in the `conflict_records` table. The default value is 10000.
# Starting from v8.1.0, there is no need to configure `max-record-rows` manually, because TiDB Lightning automatically assigns the value of `max-record-rows` with the value of `threshold`, regardless of the user input. `max-record-rows` will be deprecated in a future release.
# In the physical import mode, if the strategy is "replace", the conflict records that are overwritten are recorded.
# In the logical import mode, if the strategy is "ignore", the conflict records that are ignored are recorded; if the strategy is "replace", the conflict records are not recorded.
# max-record-rows = 10000

[tikv-importer]
Expand Down
23 changes: 14 additions & 9 deletions pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,8 +1138,8 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table
}
return value, nil
},
func(ctx context.Context, key []byte) error {
err := local.deleteDuplicateRow(ctx, logger, key)
func(ctx context.Context, keys [][]byte) error {
err := local.deleteDuplicateRows(ctx, logger, keys)
if err != nil {
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
return common.ErrResolveDuplicateRows.Wrap(errors.Trace(err)).GenWithStackByArgs(tableName)
Expand Down Expand Up @@ -1168,10 +1168,10 @@ func (local *DupeController) getLatestValue(
return value, nil
}

func (local *DupeController) deleteDuplicateRow(
func (local *DupeController) deleteDuplicateRows(
ctx context.Context,
logger *log.Task,
key []byte,
keys [][]byte,
) (err error) {
// Starts a Delete transaction.
txn, err := local.tikvCli.Begin()
Expand All @@ -1188,10 +1188,15 @@ func (local *DupeController) deleteDuplicateRow(
}
}()

logger.Debug("deleteDuplicateRow will delete key",
zap.String("category", "resolve-dupe"),
logutil.Key("key", key))
err = txn.Delete(key)
for _, key := range keys {
logger.Debug("deleteDuplicateRows will delete key",
zap.String("category", "resolve-dupe"),
logutil.Key("key", key))
if err := txn.Delete(key); err != nil {
return errors.Trace(err)
}
}

return errors.Trace(err)
logger.Debug("number of KV pairs deleted", zap.String("category", "resolve-dupe"), zap.Int("count", txn.Len()))
return nil
}
2 changes: 1 addition & 1 deletion pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ const (
// ReplaceOnDup indicates using REPLACE INTO to insert data for TiDB backend.
// ReplaceOnDup records all duplicate records, remove some rows with conflict
// and reserve other rows that can be kept and not cause conflict anymore for local backend.
// Users need to analyze the lightning_task_info.conflict_error_v2 table to check whether the reserved data
// Users need to analyze the lightning_task_info.conflict_error_v3 table to check whether the reserved data
// cater to their need and check whether they need to add back the correct rows.
ReplaceOnDup
// IgnoreOnDup indicates using INSERT IGNORE INTO to insert data for TiDB backend.
Expand Down
Loading

0 comments on commit ff4784e

Please sign in to comment.