diff --git a/br/pkg/lightning/restore/checksum.go b/br/pkg/lightning/restore/checksum.go index e3719f2f7e413..03bb2e26994e6 100644 --- a/br/pkg/lightning/restore/checksum.go +++ b/br/pkg/lightning/restore/checksum.go @@ -323,9 +323,34 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name) +<<<<<<< HEAD:br/pkg/lightning/restore/checksum.go physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx) if err != nil { return nil, errors.Annotate(err, "fetch tso from pd failed") +======= + var ( + physicalTS, logicalTS int64 + err error + retryTime int + ) + physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx) + for err != nil { + if !pd.IsLeaderChange(errors.Cause(err)) { + return nil, errors.Annotate(err, "fetch tso from pd failed") + } + retryTime++ + if retryTime%60 == 0 { + log.FromContext(ctx).Warn("fetch tso from pd failed and retrying", + zap.Int("retryTime", retryTime), + zap.Error(err)) + } + select { + case <-ctx.Done(): + err = ctx.Err() + case <-time.After(retryGetTSInterval): + physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx) + } +>>>>>>> c6b4e9935a3 (lightning: unwrap the error before call PD function (#44856)):br/pkg/lightning/backend/local/checksum.go } ts := oracle.ComposeTS(physicalTS, logicalTS) if err := e.manager.addOneJob(ctx, tbl, ts); err != nil { diff --git a/br/pkg/lightning/restore/checksum_test.go b/br/pkg/lightning/restore/checksum_test.go index d9961a34b43a5..e0fd85d5a4841 100644 --- a/br/pkg/lightning/restore/checksum_test.go +++ b/br/pkg/lightning/restore/checksum_test.go @@ -249,7 +249,14 @@ func (c *testPDClient) currentSafePoint() uint64 { } func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) { +<<<<<<< HEAD:br/pkg/lightning/restore/checksum_test.go physicalTS := time.Now().UnixNano() / 1e6 +======= + physicalTS := time.Now().UnixMilli() + if c.leaderChanging && physicalTS%2 == 0 { + return 0, 0, errors.WithStack(errs.ErrClientTSOStreamClosed) + } +>>>>>>> c6b4e9935a3 (lightning: unwrap the error before call PD function (#44856)):br/pkg/lightning/backend/local/checksum_test.go logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc()) return physicalTS, logicalTS, nil }