Skip to content

Commit

Permalink
Merge branch 'release-5.2-82a75542e83f' of github.com:ti-srebot/tidb …
Browse files Browse the repository at this point in the history
…into release-5.2-82a75542e83f
  • Loading branch information
guo-shaoge committed Apr 14, 2022
2 parents b40fbdf + b4ccc4c commit 73efc62
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 33 deletions.
16 changes: 8 additions & 8 deletions br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,8 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon
}
}

func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)).
func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
Build()
if err != nil {
Expand Down Expand Up @@ -326,12 +322,16 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo

func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0))
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
ts := oracle.ComposeTS(physicalTS, logicalTS)
if err := e.manager.addOneJob(ctx, tbl, ts); err != nil {
return nil, errors.Trace(err)
}

return e.checksumDB(ctx, tableInfo)
return e.checksumDB(ctx, tableInfo, ts)
}

type tableChecksumTS struct {
Expand Down
58 changes: 37 additions & 21 deletions br/pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/DATA-DOG/go-sqlmock"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
)

var _ = Suite(&checksumSuite{})
Expand Down Expand Up @@ -168,13 +168,21 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) {
tableInfo, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 999)
c.Assert(err, IsNil)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for i := 0; i <= maxErrorRetryCount; i++ {
kvClient.maxErrCount = i
kvClient.curErrCount = 0
var checksumTS uint64
kvClient.onSendReq = func(req *kv.Request) {
checksumTS = req.StartTs
}
checksumExec := &tikvChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient}
startTS := oracle.ComposeTS(time.Now().Unix()*1000, 0)
ctx := context.WithValue(context.Background(), &checksumManagerKey, checksumExec)
_, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
physicalTS, logicalTS, err := pdClient.GetTS(ctx)
c.Check(err, IsNil)
subCtx := context.WithValue(ctx, &checksumManagerKey, checksumExec)
_, err = DoChecksum(subCtx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
// with max error retry < maxErrorRetryCount, the checksum can success
if i >= maxErrorRetryCount {
c.Assert(err, ErrorMatches, "tikv timeout")
Expand All @@ -186,8 +194,10 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) {
// after checksum, safepint should be small than start ts
ts := pdClient.currentSafePoint()
// 1ms for the schedule deviation
c.Assert(ts <= startTS+1, IsTrue)
c.Assert(atomic.LoadUint32(&checksumExec.manager.started) > 0, IsTrue)
startTS := oracle.ComposeTS(physicalTS+1, logicalTS)
c.Check(ts, LessEqual, startTS+1)
c.Check(checksumTS, GreaterEqual, ts)
c.Check(checksumExec.manager.started, Not(Equals), 0)
}
}

Expand Down Expand Up @@ -217,51 +227,53 @@ func (s *checksumSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {

type safePointTTL struct {
safePoint uint64
// ttl is the last timestamp this safe point is valid
ttl int64
expiredAt int64
}

type testPDClient struct {
sync.Mutex
pd.Client
count int32
gcSafePoint []safePointTTL
count atomic.Int32
gcSafePoint []safePointTTL
logicalTSCounter atomic.Uint64
}

func (c *testPDClient) currentSafePoint() uint64 {
ts := time.Now().Unix()
c.Lock()
defer c.Unlock()
for _, s := range c.gcSafePoint {
if s.ttl > ts {
if s.expiredAt > ts {
return s.safePoint
}
}
return 0
}

func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return time.Now().Unix(), 0, nil
physicalTS := time.Now().UnixNano() / 1e6
logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc())
return physicalTS, logicalTS, nil
}

func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if !strings.HasPrefix(serviceID, "lightning") {
panic("service ID must start with 'lightning'")
}
atomic.AddInt32(&c.count, 1)
c.count.Inc()
c.Lock()
idx := sort.Search(len(c.gcSafePoint), func(i int) bool {
return c.gcSafePoint[i].safePoint >= safePoint
})
sp := c.gcSafePoint
ttlEnd := time.Now().Unix() + ttl
spTTL := safePointTTL{safePoint: safePoint, ttl: ttlEnd}
spTTL := safePointTTL{safePoint: safePoint, expiredAt: ttlEnd}
switch {
case idx >= len(sp):
c.gcSafePoint = append(c.gcSafePoint, spTTL)
case sp[idx].safePoint == safePoint:
if ttlEnd > sp[idx].ttl {
sp[idx].ttl = ttlEnd
if ttlEnd > sp[idx].expiredAt {
sp[idx].expiredAt = ttlEnd
}
default:
c.gcSafePoint = append(append(sp[:idx], spTTL), sp[idx:]...)
Expand Down Expand Up @@ -289,15 +301,15 @@ func (s *checksumSuite) TestGcTTLManagerSingle(c *C) {
time.Sleep(2*time.Second + 10*time.Millisecond)

// after 2 seconds, must at least update 5 times
val := atomic.LoadInt32(&pdClient.count)
val := pdClient.count.Load()
c.Assert(val, GreaterEqual, int32(5))

// after remove the job, there are no job remain, gc ttl needn't to be updated
manager.removeOneJob("test")
time.Sleep(10 * time.Millisecond)
val = atomic.LoadInt32(&pdClient.count)
val = pdClient.count.Load()
time.Sleep(1*time.Second + 10*time.Millisecond)
c.Assert(atomic.LoadInt32(&pdClient.count), Equals, val)
c.Assert(pdClient.count.Load(), Equals, val)
}

func (s *checksumSuite) TestGcTTLManagerMulti(c *C) {
Expand Down Expand Up @@ -387,15 +399,19 @@ func (r *mockResultSubset) RespTime() time.Duration {

type mockChecksumKVClient struct {
kv.Client
checksum tipb.ChecksumResponse
respDur time.Duration
checksum tipb.ChecksumResponse
respDur time.Duration
onSendReq func(req *kv.Request)
// return error count before return success
maxErrCount int
curErrCount int
}

// a mock client for checksum request
func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response {
if c.onSendReq != nil {
c.onSendReq(req)
}
if c.curErrCount < c.maxErrCount {
c.curErrCount++
return &mockErrorResponse{err: "tikv timeout"}
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,15 @@ func LoadSchemaInfo(
if err != nil {
return nil, errors.Trace(err)
}
// Table names are case-sensitive in mydump.MDTableMeta.
// We should always use the original tbl.Name in checkpoints.
tableInfo := &checkpoints.TidbTableInfo{
ID: tblInfo.ID,
DB: schema.Name,
Name: tableName,
Name: tbl.Name,
Core: tblInfo,
}
dbInfo.Tables[tableName] = tableInfo
dbInfo.Tables[tbl.Name] = tableInfo
}

result[schema.Name] = dbInfo
Expand Down
15 changes: 13 additions & 2 deletions br/pkg/lightning/restore/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func (s *tidbSuite) TestLoadSchemaInfo(c *C) {
"CREATE TABLE `t1` (`a` INT PRIMARY KEY);"+
"CREATE TABLE `t2` (`b` VARCHAR(20), `c` BOOL, KEY (`b`, `c`));"+
// an extra table that not exists in dbMetas
"CREATE TABLE `t3` (`d` VARCHAR(20), `e` BOOL);",
"CREATE TABLE `t3` (`d` VARCHAR(20), `e` BOOL);"+
"CREATE TABLE `T4` (`f` BIGINT PRIMARY KEY);",
"", "")
c.Assert(err, IsNil)
tableInfos := make([]*model.TableInfo, 0, len(nodes))
Expand All @@ -350,6 +351,10 @@ func (s *tidbSuite) TestLoadSchemaInfo(c *C) {
DB: "db",
Name: "t2",
},
{
DB: "db",
Name: "t4",
},
},
},
}
Expand All @@ -375,13 +380,19 @@ func (s *tidbSuite) TestLoadSchemaInfo(c *C) {
Name: "t2",
Core: tableInfos[1],
},
"t4": {
ID: 103,
DB: "db",
Name: "t4",
Core: tableInfos[3],
},
},
},
})

tableCntAfter := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStatePending, metric.TableResultSuccess))

c.Assert(tableCntAfter-tableCntBefore, Equals, 2.0)
c.Assert(tableCntAfter-tableCntBefore, Equals, 3.0)
}

func (s *tidbSuite) TestLoadSchemaInfoMissing(c *C) {
Expand Down

0 comments on commit 73efc62

Please sign in to comment.