From d32aac8c9222b9d72be0f0b03ad919b14b29832f Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 14 Apr 2022 12:38:36 +0800 Subject: [PATCH 1/2] lightning: fix panic when table name in source file and target cluster is different (#31808) (#32366) close pingcap/tidb#31771 --- br/pkg/lightning/restore/tidb.go | 6 ++++-- br/pkg/lightning/restore/tidb_test.go | 15 +++++++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/br/pkg/lightning/restore/tidb.go b/br/pkg/lightning/restore/tidb.go index 346aa37ce57c1..8e6d0f4531b7c 100644 --- a/br/pkg/lightning/restore/tidb.go +++ b/br/pkg/lightning/restore/tidb.go @@ -264,13 +264,15 @@ func LoadSchemaInfo( if err != nil { return nil, errors.Trace(err) } + // Table names are case-sensitive in mydump.MDTableMeta. + // We should always use the original tbl.Name in checkpoints. tableInfo := &checkpoints.TidbTableInfo{ ID: tblInfo.ID, DB: schema.Name, - Name: tableName, + Name: tbl.Name, Core: tblInfo, } - dbInfo.Tables[tableName] = tableInfo + dbInfo.Tables[tbl.Name] = tableInfo } result[schema.Name] = dbInfo diff --git a/br/pkg/lightning/restore/tidb_test.go b/br/pkg/lightning/restore/tidb_test.go index 7dbe473bcd668..c822dcd807f29 100644 --- a/br/pkg/lightning/restore/tidb_test.go +++ b/br/pkg/lightning/restore/tidb_test.go @@ -325,7 +325,8 @@ func (s *tidbSuite) TestLoadSchemaInfo(c *C) { "CREATE TABLE `t1` (`a` INT PRIMARY KEY);"+ "CREATE TABLE `t2` (`b` VARCHAR(20), `c` BOOL, KEY (`b`, `c`));"+ // an extra table that not exists in dbMetas - "CREATE TABLE `t3` (`d` VARCHAR(20), `e` BOOL);", + "CREATE TABLE `t3` (`d` VARCHAR(20), `e` BOOL);"+ + "CREATE TABLE `T4` (`f` BIGINT PRIMARY KEY);", "", "") c.Assert(err, IsNil) tableInfos := make([]*model.TableInfo, 0, len(nodes)) @@ -350,6 +351,10 @@ func (s *tidbSuite) TestLoadSchemaInfo(c *C) { DB: "db", Name: "t2", }, + { + DB: "db", + Name: "t4", + }, }, }, } @@ -375,13 +380,19 @@ func (s *tidbSuite) TestLoadSchemaInfo(c *C) { Name: "t2", Core: tableInfos[1], }, + "t4": { + ID: 103, + DB: "db", + Name: "t4", + Core: tableInfos[3], + }, }, }, }) tableCntAfter := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStatePending, metric.TableResultSuccess)) - c.Assert(tableCntAfter-tableCntBefore, Equals, 2.0) + c.Assert(tableCntAfter-tableCntBefore, Equals, 3.0) } func (s *tidbSuite) TestLoadSchemaInfoMissing(c *C) { From d4ea430b9167c72aef47c1bc17d77ebf923170b5 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 14 Apr 2022 13:56:35 +0800 Subject: [PATCH 2/2] lightning: use pd timestamp to update gc safepoint (#32734) (#32797) close pingcap/tidb#32733 --- br/pkg/lightning/restore/checksum.go | 16 +++---- br/pkg/lightning/restore/checksum_test.go | 58 +++++++++++++++-------- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/br/pkg/lightning/restore/checksum.go b/br/pkg/lightning/restore/checksum.go index 01a548cfe65c9..2ff1bb3bd6fe9 100644 --- a/br/pkg/lightning/restore/checksum.go +++ b/br/pkg/lightning/restore/checksum.go @@ -278,12 +278,8 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon } } -func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { - physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx) - if err != nil { - return nil, errors.Annotate(err, "fetch tso from pd failed") - } - executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)). +func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) { + executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts). SetConcurrency(e.distSQLScanConcurrency). Build() if err != nil { @@ -326,12 +322,16 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name) - err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0)) + physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx) if err != nil { + return nil, errors.Annotate(err, "fetch tso from pd failed") + } + ts := oracle.ComposeTS(physicalTS, logicalTS) + if err := e.manager.addOneJob(ctx, tbl, ts); err != nil { return nil, errors.Trace(err) } - return e.checksumDB(ctx, tableInfo) + return e.checksumDB(ctx, tableInfo, ts) } type tableChecksumTS struct { diff --git a/br/pkg/lightning/restore/checksum_test.go b/br/pkg/lightning/restore/checksum_test.go index a760f9b30a612..f0273cd56e212 100644 --- a/br/pkg/lightning/restore/checksum_test.go +++ b/br/pkg/lightning/restore/checksum_test.go @@ -7,7 +7,6 @@ import ( "sort" "strings" "sync" - "sync/atomic" "time" "github.com/DATA-DOG/go-sqlmock" @@ -24,6 +23,7 @@ import ( "github.com/pingcap/tipb/go-tipb" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "go.uber.org/atomic" ) var _ = Suite(&checksumSuite{}) @@ -168,13 +168,21 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) { tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 999) c.Assert(err, IsNil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for i := 0; i <= maxErrorRetryCount; i++ { kvClient.maxErrCount = i kvClient.curErrCount = 0 + var checksumTS uint64 + kvClient.onSendReq = func(req *kv.Request) { + checksumTS = req.StartTs + } checksumExec := &tikvChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient} - startTS := oracle.ComposeTS(time.Now().Unix()*1000, 0) - ctx := context.WithValue(context.Background(), &checksumManagerKey, checksumExec) - _, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo}) + physicalTS, logicalTS, err := pdClient.GetTS(ctx) + c.Check(err, IsNil) + subCtx := context.WithValue(ctx, &checksumManagerKey, checksumExec) + _, err = DoChecksum(subCtx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo}) // with max error retry < maxErrorRetryCount, the checksum can success if i >= maxErrorRetryCount { c.Assert(err, ErrorMatches, "tikv timeout") @@ -186,8 +194,10 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) { // after checksum, safepint should be small than start ts ts := pdClient.currentSafePoint() // 1ms for the schedule deviation - c.Assert(ts <= startTS+1, IsTrue) - c.Assert(atomic.LoadUint32(&checksumExec.manager.started) > 0, IsTrue) + startTS := oracle.ComposeTS(physicalTS+1, logicalTS) + c.Check(ts, LessEqual, startTS+1) + c.Check(checksumTS, GreaterEqual, ts) + c.Check(checksumExec.manager.started, Not(Equals), 0) } } @@ -217,15 +227,15 @@ func (s *checksumSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { type safePointTTL struct { safePoint uint64 - // ttl is the last timestamp this safe point is valid - ttl int64 + expiredAt int64 } type testPDClient struct { sync.Mutex pd.Client - count int32 - gcSafePoint []safePointTTL + count atomic.Int32 + gcSafePoint []safePointTTL + logicalTSCounter atomic.Uint64 } func (c *testPDClient) currentSafePoint() uint64 { @@ -233,7 +243,7 @@ func (c *testPDClient) currentSafePoint() uint64 { c.Lock() defer c.Unlock() for _, s := range c.gcSafePoint { - if s.ttl > ts { + if s.expiredAt > ts { return s.safePoint } } @@ -241,27 +251,29 @@ func (c *testPDClient) currentSafePoint() uint64 { } func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) { - return time.Now().Unix(), 0, nil + physicalTS := time.Now().UnixNano() / 1e6 + logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc()) + return physicalTS, logicalTS, nil } func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { if !strings.HasPrefix(serviceID, "lightning") { panic("service ID must start with 'lightning'") } - atomic.AddInt32(&c.count, 1) + c.count.Inc() c.Lock() idx := sort.Search(len(c.gcSafePoint), func(i int) bool { return c.gcSafePoint[i].safePoint >= safePoint }) sp := c.gcSafePoint ttlEnd := time.Now().Unix() + ttl - spTTL := safePointTTL{safePoint: safePoint, ttl: ttlEnd} + spTTL := safePointTTL{safePoint: safePoint, expiredAt: ttlEnd} switch { case idx >= len(sp): c.gcSafePoint = append(c.gcSafePoint, spTTL) case sp[idx].safePoint == safePoint: - if ttlEnd > sp[idx].ttl { - sp[idx].ttl = ttlEnd + if ttlEnd > sp[idx].expiredAt { + sp[idx].expiredAt = ttlEnd } default: c.gcSafePoint = append(append(sp[:idx], spTTL), sp[idx:]...) @@ -289,15 +301,15 @@ func (s *checksumSuite) TestGcTTLManagerSingle(c *C) { time.Sleep(2*time.Second + 10*time.Millisecond) // after 2 seconds, must at least update 5 times - val := atomic.LoadInt32(&pdClient.count) + val := pdClient.count.Load() c.Assert(val, GreaterEqual, int32(5)) // after remove the job, there are no job remain, gc ttl needn't to be updated manager.removeOneJob("test") time.Sleep(10 * time.Millisecond) - val = atomic.LoadInt32(&pdClient.count) + val = pdClient.count.Load() time.Sleep(1*time.Second + 10*time.Millisecond) - c.Assert(atomic.LoadInt32(&pdClient.count), Equals, val) + c.Assert(pdClient.count.Load(), Equals, val) } func (s *checksumSuite) TestGcTTLManagerMulti(c *C) { @@ -387,8 +399,9 @@ func (r *mockResultSubset) RespTime() time.Duration { type mockChecksumKVClient struct { kv.Client - checksum tipb.ChecksumResponse - respDur time.Duration + checksum tipb.ChecksumResponse + respDur time.Duration + onSendReq func(req *kv.Request) // return error count before return success maxErrCount int curErrCount int @@ -396,6 +409,9 @@ type mockChecksumKVClient struct { // a mock client for checksum request func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response { + if c.onSendReq != nil { + c.onSendReq(req) + } if c.curErrCount < c.maxErrCount { c.curErrCount++ return &mockErrorResponse{err: "tikv timeout"}