Skip to content

Commit dc1ebbe

Browse files
authored
lightning: check hasDupe and tableID when resolve duplicate rows (#40696) (#40792)
close #40657
1 parent c0ec81b commit dc1ebbe

File tree

10 files changed

+155
-8
lines changed

10 files changed

+155
-8
lines changed

br/pkg/lightning/backend/local/local.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ import (
6868
"go.uber.org/atomic"
6969
"go.uber.org/multierr"
7070
"go.uber.org/zap"
71+
"golang.org/x/exp/slices"
7172
"golang.org/x/sync/errgroup"
7273
"golang.org/x/time/rate"
7374
"google.golang.org/grpc"
@@ -1476,13 +1477,18 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
14761477
return err
14771478
}
14781479

1480+
tableIDs := physicalTableIDs(tbl.Meta())
1481+
keyInTable := func(key []byte) bool {
1482+
return slices.Contains(tableIDs, tablecodec.DecodeTableID(key))
1483+
}
1484+
14791485
errLimiter := rate.NewLimiter(1, 1)
14801486
pool := utils.NewWorkerPool(uint(local.dupeConcurrency), "resolve duplicate rows")
14811487
err = local.errorMgr.ResolveAllConflictKeys(
14821488
ctx, tableName, pool,
14831489
func(ctx context.Context, handleRows [][2][]byte) error {
14841490
for {
1485-
err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder)
1491+
err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder, keyInTable)
14861492
if err == nil {
14871493
return nil
14881494
}
@@ -1505,7 +1511,13 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
15051511
return errors.Trace(err)
15061512
}
15071513

1508-
func (local *local) deleteDuplicateRows(ctx context.Context, logger *log.Task, handleRows [][2][]byte, decoder *kv.TableKVDecoder) (err error) {
1514+
func (local *local) deleteDuplicateRows(
1515+
ctx context.Context,
1516+
logger *log.Task,
1517+
handleRows [][2][]byte,
1518+
decoder *kv.TableKVDecoder,
1519+
keyInTable func(key []byte) bool,
1520+
) (err error) {
15091521
// Starts a Delete transaction.
15101522
txn, err := local.tikvCli.Begin()
15111523
if err != nil {
@@ -1530,6 +1542,12 @@ func (local *local) deleteDuplicateRows(ctx context.Context, logger *log.Task, h
15301542
// (if the number of duplicates is small this should fit entirely in memory)
15311543
// (Txn's MemBuf's bufferSizeLimit is currently infinity)
15321544
for _, handleRow := range handleRows {
1545+
// Skip the row key if it's not in the table.
1546+
// This can happen if the table has been recreated or truncated,
1547+
// and the duplicate key is from the old table.
1548+
if !keyInTable(handleRow[0]) {
1549+
continue
1550+
}
15331551
logger.Debug("[resolve-dupe] found row to resolve",
15341552
logutil.Key("handle", handleRow[0]),
15351553
logutil.Key("row", handleRow[1]))

br/pkg/lightning/restore/table_restore.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -775,12 +775,14 @@ func (tr *TableRestore) postProcess(
775775
if e != nil {
776776
tr.logger.Error("collect remote duplicate keys failed", log.ShortError(e))
777777
return false, e
778-
} else {
779-
hasDupe = hasDupe || hasRemoteDupe
780778
}
781-
if err = rc.backend.ResolveDuplicateRows(ctx, tr.encTable, tr.tableName, rc.cfg.TikvImporter.DuplicateResolution); err != nil {
782-
tr.logger.Error("resolve remote duplicate keys failed", log.ShortError(err))
783-
return false, err
779+
hasDupe = hasDupe || hasRemoteDupe
780+
781+
if hasDupe {
782+
if err = rc.backend.ResolveDuplicateRows(ctx, tr.encTable, tr.tableName, rc.cfg.TikvImporter.DuplicateResolution); err != nil {
783+
tr.logger.Error("resolve remote duplicate keys failed", log.ShortError(err))
784+
return false, err
785+
}
784786
}
785787
}
786788

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#!/bin/bash
2+
#
3+
# Copyright 2022 PingCAP, Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
set -eux
18+
19+
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/EnableTestAPI=return"
20+
export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine=sleep(10000)"
21+
22+
run_lightning --backend='local' &
23+
shpid="$!"
24+
pid=
25+
26+
ensure_lightning_is_started() {
27+
for _ in {0..60}; do
28+
pid=$(pstree -p "$shpid" | grep -Eo "tidb-lightning\.\([0-9]*\)" | grep -Eo "[0-9]*") || true
29+
[ -n "$pid" ] && break
30+
sleep 1
31+
done
32+
if [ -z "$pid" ]; then
33+
echo "lightning doesn't start successfully, please check the log" >&2
34+
exit 1
35+
fi
36+
echo "lightning is started, pid is $pid"
37+
}
38+
39+
ready_for_import_engine() {
40+
for _ in {0..60}; do
41+
grep -Fq "start import engine" "$TEST_DIR"/lightning.log && return
42+
sleep 1
43+
done
44+
echo "lightning doesn't start import engine, please check the log" >&2
45+
exit 1
46+
}
47+
48+
ensure_lightning_is_started
49+
ready_for_import_engine
50+
51+
run_curl "https://${PD_ADDR}/pd/api/v1/config/cluster-version"
52+
53+
length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq '[ .[] | select(.rule_type == "key-range" and .labels[0].key == "schedule") ] | length')
54+
if [ "$length" != "1" ]; then
55+
echo "region-label key-range schedule rules should be 1, but got $length" >&2
56+
exit 1
57+
fi
58+
59+
wait "$shpid"
60+
61+
length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq '[ .[] | select(.rule_type == "key-range" and .labels[0].key == "schedule") ] | length')
62+
if [ -n "$length" ] && [ "$length" -ne 0 ]; then
63+
echo "region-label key-range schedule rules should be 0, but got $length" >&2
64+
exit 1
65+
fi
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
[tikv-importer]
2+
backend = "local"
3+
duplicate-resolution = "remove"
4+
5+
[mydumper.csv]
6+
header = true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
CREATE TABLE `t` (
2+
`id` int(11) NOT NULL,
3+
`name` varchar(255) DEFAULT NULL,
4+
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */,
5+
UNIQUE KEY `uni_name` (`name`)
6+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
id,name
2+
1,"aaa01"
3+
2,"aaa02"
4+
3,"aaa03"
5+
4,"aaa04"
6+
5,"aaa04"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
CREATE TABLE `t` (
2+
`id` int(11) NOT NULL,
3+
`name` varchar(255) DEFAULT NULL,
4+
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */,
5+
UNIQUE KEY `uni_name` (`name`)
6+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
id,name
2+
1,"aaa01"
3+
2,"aaa02"
4+
3,"aaa03"
5+
4,"aaa04"
6+
5,"aaa05"

br/tests/lightning_issue_40657/run.sh

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#!/bin/bash
2+
#
3+
# Copyright 2023 PingCAP, Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
set -eux
18+
19+
check_cluster_version 5 2 0 'duplicate detection' || exit 0
20+
21+
run_lightning -d "tests/$TEST_NAME/data1"
22+
run_sql 'admin check table test.t'
23+
run_sql 'select count(*) from test.t'
24+
check_contains 'count(*): 3'
25+
run_sql 'select count(*) from lightning_task_info.conflict_error_v1'
26+
check_contains 'count(*): 2'
27+
28+
run_sql 'truncate table test.t'
29+
run_lightning -d "tests/$TEST_NAME/data2"
30+
run_sql 'admin check table test.t'
31+
run_sql 'select count(*) from test.t'
32+
check_contains 'count(*): 5'

br/tests/lightning_reload_cert/run.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ shpid="$!"
2929
sleep 15
3030
ok=0
3131
for _ in {0..60}; do
32-
if grep -Fq "connection closed before server preface received" "$TEST_DIR"/lightning.log; then
32+
if grep -Fq "connection error" "$TEST_DIR"/lightning.log; then
3333
ok=1
3434
break
3535
fi

0 commit comments

Comments
 (0)