Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BR: add lock with ttl for checkpoint backup (#40563) #40685

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ func (bc *Client) StartCheckpointRunner(
}
}

bc.checkpointRunner = checkpoint.StartCheckpointRunner(ctx, bc.storage, bc.cipher)
return nil
bc.checkpointRunner, err = checkpoint.StartCheckpointRunner(ctx, bc.storage, bc.cipher, bc.mgr.GetPDClient())
return errors.Trace(err)
}

func (bc *Client) WaitForFinishCheckpoint() {
func (bc *Client) WaitForFinishCheckpoint(ctx context.Context) {
if bc.checkpointRunner != nil {
bc.checkpointRunner.WaitForFinish()
bc.checkpointRunner.WaitForFinish(ctx)
}
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -29,5 +30,6 @@ go_test(
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
],
)
208 changes: 189 additions & 19 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand All @@ -44,11 +45,16 @@ const (

CheckpointDataDir = CheckpointDir + "/data"
CheckpointChecksumDir = CheckpointDir + "/checksum"
CheckpointLockPath = CheckpointDir + "/checkpoint.lock"
)

const MaxChecksumTotalCost float64 = 60.0

const tickDuration = 30 * time.Second
const tickDurationForFlush = 30 * time.Second

const tickDurationForLock = 4 * time.Minute

const lockTimeToLive = 5 * time.Minute

type CheckpointMessage struct {
// start-key of the origin range
Expand Down Expand Up @@ -132,6 +138,12 @@ func NewChecksumRunner() *ChecksumRunner {
}
}

func (cr *ChecksumRunner) RecordError(err error) {
cr.Lock()
cr.err = err
cr.Unlock()
}

// FlushChecksum save the checksum in the memory temporarily
// and flush to the external storage if checksum take much time
func (cr *ChecksumRunner) FlushChecksum(
Expand Down Expand Up @@ -180,15 +192,10 @@ func (cr *ChecksumRunner) FlushChecksum(
cr.wg.Add(1)
cr.workerPool.Apply(func() {
defer cr.wg.Done()
recordErr := func(err error) {
cr.Lock()
cr.err = err
cr.Unlock()
}

content, err := json.Marshal(toBeFlushedChecksumItems)
if err != nil {
recordErr(err)
cr.RecordError(err)
return
}

Expand All @@ -200,70 +207,90 @@ func (cr *ChecksumRunner) FlushChecksum(

data, err := json.Marshal(checksumInfo)
if err != nil {
recordErr(err)
cr.RecordError(err)
return
}

fname := fmt.Sprintf("%s/t%d_and__", CheckpointChecksumDir, tableID)
err = s.WriteFile(ctx, fname, data)
if err != nil {
recordErr(err)
cr.RecordError(err)
return
}
})
return nil
}

type GlobalTimer interface {
GetTS(context.Context) (int64, int64, error)
}

type CheckpointRunner struct {
lockId uint64

meta map[string]*RangeGroups

checksumRunner *ChecksumRunner

storage storage.ExternalStorage
cipher *backuppb.CipherInfo
timer GlobalTimer

appendCh chan *CheckpointMessage
metaCh chan map[string]*RangeGroups
lockCh chan struct{}
errCh chan error

wg sync.WaitGroup
}

// only for test
func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration) *CheckpointRunner {
func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer) (*CheckpointRunner, error) {
runner := &CheckpointRunner{
meta: make(map[string]*RangeGroups),

checksumRunner: NewChecksumRunner(),

storage: storage,
cipher: cipher,
timer: timer,

appendCh: make(chan *CheckpointMessage),
metaCh: make(chan map[string]*RangeGroups),
lockCh: make(chan struct{}),
errCh: make(chan error, 1),
}

runner.startCheckpointLoop(ctx, tick)
return runner
err := runner.initialLock(ctx)
if err != nil {
return nil, errors.Annotate(err, "Failed to initialize checkpoint lock.")
}
runner.startCheckpointLoop(ctx, tick, tick)
return runner, nil
}

func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo) *CheckpointRunner {
func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, timer GlobalTimer) (*CheckpointRunner, error) {
runner := &CheckpointRunner{
meta: make(map[string]*RangeGroups),

checksumRunner: NewChecksumRunner(),

storage: storage,
cipher: cipher,
timer: timer,

appendCh: make(chan *CheckpointMessage),
metaCh: make(chan map[string]*RangeGroups),
lockCh: make(chan struct{}),
errCh: make(chan error, 1),
}

runner.startCheckpointLoop(ctx, tickDuration)
return runner
err := runner.initialLock(ctx)
if err != nil {
return nil, errors.Trace(err)
}
runner.startCheckpointLoop(ctx, tickDurationForFlush, tickDurationForLock)
return runner, nil
}

func (r *CheckpointRunner) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64, totalKvs uint64, totalBytes uint64, timeCost float64) error {
Expand Down Expand Up @@ -295,13 +322,18 @@ func (r *CheckpointRunner) Append(
}

// Note: Cannot be parallel with `Append` function
func (r *CheckpointRunner) WaitForFinish() {
func (r *CheckpointRunner) WaitForFinish(ctx context.Context) {
// can not append anymore
close(r.appendCh)
// wait the range flusher exit
r.wg.Wait()
// wait the checksum flusher exit
r.checksumRunner.wg.Wait()
// remove the checkpoint lock
err := r.storage.DeleteFile(ctx, CheckpointLockPath)
if err != nil {
log.Warn("failed to remove the checkpoint lock", zap.Error(err))
}
}

// Send the meta to the flush goroutine, and reset the CheckpointRunner's meta
Expand All @@ -318,6 +350,16 @@ func (r *CheckpointRunner) flushMeta(ctx context.Context, errCh chan error) erro
return nil
}

func (r *CheckpointRunner) setLock(ctx context.Context, errCh chan error) error {
select {
case <-ctx.Done():
case err := <-errCh:
return err
case r.lockCh <- struct{}{}:
}
return nil
}

// start a goroutine to flush the meta, which is sent from `checkpoint looper`, to the external storage
func (r *CheckpointRunner) startCheckpointRunner(ctx context.Context, wg *sync.WaitGroup) chan error {
errCh := make(chan error, 1)
Expand All @@ -337,6 +379,15 @@ func (r *CheckpointRunner) startCheckpointRunner(ctx context.Context, wg *sync.W
errCh <- err
return
}
case _, ok := <-r.lockCh:
if !ok {
log.Info("stop checkpoint flush worker")
return
}
if err := r.updateLock(ctx); err != nil {
errCh <- errors.Annotate(err, "Failed to update checkpoint lock.")
return
}
}
}
}
Expand All @@ -351,22 +402,31 @@ func (r *CheckpointRunner) sendError(err error) {
default:
log.Error("errCh is blocked", logutil.ShortError(err))
}
r.checksumRunner.RecordError(err)
}

func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration time.Duration) {
func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDurationForFlush, tickDurationForLock time.Duration) {
r.wg.Add(1)
checkpointLoop := func(ctx context.Context) {
defer r.wg.Done()
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
errCh := r.startCheckpointRunner(cctx, &wg)
ticker := time.NewTicker(tickDuration)
flushTicker := time.NewTicker(tickDurationForFlush)
defer flushTicker.Stop()
lockTicker := time.NewTicker(tickDurationForLock)
defer lockTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-lockTicker.C:
if err := r.setLock(ctx, errCh); err != nil {
r.sendError(err)
return
}
case <-flushTicker.C:
if err := r.flushMeta(ctx, errCh); err != nil {
r.sendError(err)
return
Expand All @@ -380,6 +440,7 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration
// close the channel to flush worker
// and wait it to consumes all the metas
close(r.metaCh)
close(r.lockCh)
wg.Wait()
return
}
Expand Down Expand Up @@ -463,6 +524,115 @@ func (r *CheckpointRunner) doFlush(ctx context.Context, meta map[string]*RangeGr
return nil
}

type CheckpointLock struct {
LockId uint64 `json:"lock-id"`
ExpireAt int64 `json:"expire-at"`
}

// get ts with retry
func (r *CheckpointRunner) getTS(ctx context.Context) (int64, int64, error) {
var (
p int64 = 0
l int64 = 0
retry int = 0
)
errRetry := utils.WithRetry(ctx, func() error {
var err error
p, l, err = r.timer.GetTS(ctx)
if err != nil {
retry++
log.Info("failed to get ts", zap.Int("retry", retry), zap.Error(err))
return err
}

return nil
}, utils.NewPDReqBackoffer())

return p, l, errors.Trace(errRetry)
}

// flush the lock to the external storage
func (r *CheckpointRunner) flushLock(ctx context.Context, p int64) error {
lock := &CheckpointLock{
LockId: r.lockId,
ExpireAt: p + lockTimeToLive.Milliseconds(),
}
log.Info("start to flush the checkpoint lock", zap.Int64("lock-at", p), zap.Int64("expire-at", lock.ExpireAt))
data, err := json.Marshal(lock)
if err != nil {
return errors.Trace(err)
}

err = r.storage.WriteFile(ctx, CheckpointLockPath, data)
return errors.Trace(err)
}

// check whether this lock belongs to this BR
func (r *CheckpointRunner) checkLockFile(ctx context.Context, now int64) error {
data, err := r.storage.ReadFile(ctx, CheckpointLockPath)
if err != nil {
return errors.Trace(err)
}
lock := &CheckpointLock{}
err = json.Unmarshal(data, lock)
if err != nil {
return errors.Trace(err)
}
if lock.ExpireAt <= now {
if lock.LockId > r.lockId {
return errors.Errorf("There are another BR(%d) running after but setting lock before this one(%d). "+
"Please check whether the BR is running. If not, you can retry.", lock.LockId, r.lockId)
}
if lock.LockId == r.lockId {
log.Warn("The lock has expired.", zap.Int64("expire-at(ms)", lock.ExpireAt), zap.Int64("now(ms)", now))
}
} else if lock.LockId != r.lockId {
return errors.Errorf("The existing lock will expire in %d seconds. "+
"There may be another BR(%d) running. If not, you can wait for the lock to expire, or delete the file `%s%s` manually.",
(lock.ExpireAt-now)/1000, lock.LockId, strings.TrimRight(r.storage.URI(), "/"), CheckpointLockPath)
}

return nil
}

// generate a new lock and flush the lock to the external storage
func (r *CheckpointRunner) updateLock(ctx context.Context) error {
p, _, err := r.getTS(ctx)
if err != nil {
return errors.Trace(err)
}
if err = r.checkLockFile(ctx, p); err != nil {
return errors.Trace(err)
}
return errors.Trace(r.flushLock(ctx, p))
}

// Attempt to initialize the lock. Need to stop the backup when there is an unexpired locks.
func (r *CheckpointRunner) initialLock(ctx context.Context) error {
p, l, err := r.getTS(ctx)
if err != nil {
return errors.Trace(err)
}
r.lockId = oracle.ComposeTS(p, l)
exist, err := r.storage.FileExists(ctx, CheckpointLockPath)
if err != nil {
return errors.Trace(err)
}
if exist {
if err := r.checkLockFile(ctx, p); err != nil {
return errors.Trace(err)
}
}
if err = r.flushLock(ctx, p); err != nil {
return errors.Trace(err)
}

// wait for 3 seconds to check whether the lock file is overwritten by another BR
time.Sleep(3 * time.Second)
err = r.checkLockFile(ctx, p)
return errors.Trace(err)
}

// walk the whole checkpoint range files and retrieve the metadatat of backed up ranges
// and return the total time cost in the past executions
func WalkCheckpointFile(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, fn func(groupKey string, rg *rtree.Range)) (time.Duration, error) {
Expand Down
Loading