diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index c8cad292f4607..65ff4288987a1 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -12,6 +12,7 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/backup", visibility = ["//visibility:public"], deps = [ + "//br/pkg/checkpoint", "//br/pkg/checksum", "//br/pkg/conn", "//br/pkg/conn/util", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 865e7fa2f3078..0241789e65103 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -3,7 +3,9 @@ package backup import ( + "bytes" "context" + "encoding/base64" "encoding/hex" "encoding/json" "fmt" @@ -21,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/checkpoint" "github.com/pingcap/tidb/br/pkg/conn" connutil "github.com/pingcap/tidb/br/pkg/conn/util" berrors "github.com/pingcap/tidb/br/pkg/errors" @@ -90,6 +93,10 @@ type Client struct { backend *backuppb.StorageBackend apiVersion kvrpcpb.APIVersion + cipher *backuppb.CipherInfo + checkpointMeta *checkpoint.CheckpointMetadata + checkpointRunner *checkpoint.CheckpointRunner + gcTTL int64 } @@ -101,9 +108,18 @@ func NewBackupClient(ctx context.Context, mgr ClientMgr) *Client { return &Client{ clusterID: clusterID, mgr: mgr, + + cipher: nil, + checkpointMeta: nil, + checkpointRunner: nil, } } +// SetCipher for checkpoint to encrypt sst file's metadata +func (bc *Client) SetCipher(cipher *backuppb.CipherInfo) { + bc.cipher = cipher +} + // GetTS gets a new timestamp from PD. func (bc *Client) GetCurerntTS(ctx context.Context) (uint64, error) { p, l, err := bc.mgr.GetPDClient().GetTS(ctx) @@ -120,6 +136,11 @@ func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64) backupTS uint64 err error ) + + if bc.checkpointMeta != nil { + log.Info("reuse checkpoint BackupTS", zap.Uint64("backup-ts", bc.checkpointMeta.BackupTS)) + return bc.checkpointMeta.BackupTS, nil + } if ts > 0 { backupTS = ts } else { @@ -160,6 +181,15 @@ func (bc *Client) SetLockFile(ctx context.Context) error { "This file exists to remind other backup jobs won't use this path")) } +// GetSafePointID get the gc-safe-point's service-id from either checkpoint or immediate generation +func (bc *Client) GetSafePointID() string { + if bc.checkpointMeta != nil { + log.Info("reuse the checkpoint gc-safepoint service id", zap.String("service-id", bc.checkpointMeta.GCServiceId)) + return bc.checkpointMeta.GCServiceId + } + return utils.MakeSafePointID() +} + // SetGCTTL set gcTTL for client. func (bc *Client) SetGCTTL(ttl int64) { if ttl <= 0 { @@ -204,13 +234,145 @@ func (bc *Client) SetStorageAndCheckNotInUse( "there may be some backup files in the path already, "+ "please specify a correct backup directory!", bc.storage.URI()+"/"+metautil.MetaFile) } - err = CheckBackupStorageIsLocked(ctx, bc.storage) + // use checkpoint mode if checkpoint meta exists + exist, err = bc.storage.FileExists(ctx, checkpoint.CheckpointMetaPath) if err != nil { - return err + return errors.Annotatef(err, "error occurred when checking %s file", checkpoint.CheckpointMetaPath) + } + + // if there is no checkpoint meta, then checkpoint mode is not used + // or it is the first execution + if exist { + // load the config's hash to keep the config unchanged. + log.Info("load the checkpoint meta, so the existence of lockfile is allowed.") + bc.checkpointMeta, err = checkpoint.LoadCheckpointMetadata(ctx, bc.storage) + if err != nil { + return errors.Annotatef(err, "error occurred when loading %s file", checkpoint.CheckpointMetaPath) + } + } else { + err = CheckBackupStorageIsLocked(ctx, bc.storage) + if err != nil { + return err + } } + + return nil +} + +// CheckCheckpoint check whether the configs are the same +func (bc *Client) CheckCheckpoint(hash []byte) error { + if bc.checkpointMeta != nil && !bytes.Equal(bc.checkpointMeta.ConfigHash, hash) { + return errors.Annotatef(berrors.ErrInvalidArgument, "failed to backup to %v, "+ + "because the checkpoint mode is used, "+ + "but the hashs of the configs are not the same. Please check the config", + bc.storage.URI(), + ) + } + + // first execution or not using checkpoint mode yet + // or using the same config can pass the check return nil } +func (bc *Client) GetCheckpointRunner() *checkpoint.CheckpointRunner { + return bc.checkpointRunner +} + +// StartCheckpointMeta will +// 1. saves the initial status into the external storage; +// 2. load the checkpoint data from external storage +// 3. start checkpoint runner +func (bc *Client) StartCheckpointRunner( + ctx context.Context, + cfgHash []byte, + backupTS uint64, + ranges []rtree.Range, + safePointID string, + progressCallBack func(ProgressUnit), +) (err error) { + if bc.checkpointMeta == nil { + bc.checkpointMeta = &checkpoint.CheckpointMetadata{ + GCServiceId: safePointID, + ConfigHash: cfgHash, + BackupTS: backupTS, + Ranges: ranges, + } + + // sync the checkpoint meta to the external storage at first + if err := checkpoint.SaveCheckpointMetadata(ctx, bc.storage, bc.checkpointMeta); err != nil { + return errors.Trace(err) + } + } else { + // otherwise, the checkpoint meta is loaded from the external storage, + // no need to save it again + // besides, there are exist checkpoint data need to be loaded before start checkpoint runner + bc.checkpointMeta.CheckpointDataMap, err = bc.loadCheckpointRanges(ctx, progressCallBack) + if err != nil { + return errors.Trace(err) + } + } + + bc.checkpointRunner = checkpoint.StartCheckpointRunner(ctx, bc.storage, bc.cipher) + return nil +} + +func (bc *Client) WaitForFinishCheckpoint() { + if bc.checkpointRunner != nil { + bc.checkpointRunner.WaitForFinish() + } +} + +// GetProgressRange loads the checkpoint(finished) sub-ranges of the current range, and calculate its incompleted sub-ranges. +func (bc *Client) GetProgressRange(r rtree.Range) (*rtree.ProgressRange, error) { + // use groupKey to distinguish different ranges + groupKey := base64.URLEncoding.EncodeToString(r.StartKey) + if bc.checkpointMeta != nil && len(bc.checkpointMeta.CheckpointDataMap) > 0 { + rangeTree, exists := bc.checkpointMeta.CheckpointDataMap[groupKey] + if exists { + incomplete := rangeTree.GetIncompleteRange(r.StartKey, r.EndKey) + delete(bc.checkpointMeta.CheckpointDataMap, groupKey) + return &rtree.ProgressRange{ + Res: rangeTree, + Incomplete: incomplete, + Origin: r, + GroupKey: groupKey, + }, nil + } + } + + // the origin range are not recorded in checkpoint + // return the default progress range + return &rtree.ProgressRange{ + Res: rtree.NewRangeTree(), + Incomplete: []rtree.Range{ + r, + }, + Origin: r, + GroupKey: groupKey, + }, nil +} + +// LoadCheckpointRange loads the checkpoint(finished) sub-ranges of the current range, and calculate its incompleted sub-ranges. +func (bc *Client) loadCheckpointRanges(ctx context.Context, progressCallBack func(ProgressUnit)) (map[string]rtree.RangeTree, error) { + rangeDataMap := make(map[string]rtree.RangeTree) + + pastDureTime, err := checkpoint.WalkCheckpointFile(ctx, bc.storage, bc.cipher, func(groupKey string, rg *rtree.Range) { + rangeTree, exists := rangeDataMap[groupKey] + if !exists { + rangeTree = rtree.NewRangeTree() + rangeDataMap[groupKey] = rangeTree + } + rangeTree.Put(rg.StartKey, rg.EndKey, rg.Files) + progressCallBack(RegionUnit) + }) + + // we should adjust start-time of the summary to `pastDureTime` earlier + log.Info("past cost time", zap.Duration("cost", pastDureTime)) + summary.AdjustStartTimeToEarlierTime(pastDureTime) + + return rangeDataMap, errors.Trace(err) +} + // SetStorage sets ExternalStorage for client. func (bc *Client) SetStorage( ctx context.Context, @@ -239,6 +401,22 @@ func (bc *Client) SetApiVersion(v kvrpcpb.APIVersion) { bc.apiVersion = v } +// Client.BuildBackupRangeAndSchema calls BuildBackupRangeAndSchema, +// if the checkpoint mode is used, return the ranges from checkpoint meta +func (bc *Client) BuildBackupRangeAndSchema( + storage kv.Storage, + tableFilter filter.Filter, + backupTS uint64, + isFullBackup bool, +) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error) { + if bc.checkpointMeta == nil { + return BuildBackupRangeAndSchema(storage, tableFilter, backupTS, isFullBackup, true) + } + _, schemas, policies, err := BuildBackupRangeAndSchema(storage, tableFilter, backupTS, isFullBackup, false) + schemas.SetCheckpointChecksum(bc.checkpointMeta.CheckpointChecksum) + return bc.checkpointMeta.Ranges, schemas, policies, errors.Trace(err) +} + // CheckBackupStorageIsLocked checks whether backups is locked. // which means we found other backup progress already write // some data files into the same backup directory or cloud prefix. @@ -252,7 +430,7 @@ func CheckBackupStorageIsLocked(ctx context.Context, s storage.ExternalStorage) // should return error to break the walkDir when found lock file and other .sst files. if strings.HasSuffix(path, ".sst") { return errors.Annotatef(berrors.ErrInvalidArgument, "backup lock file and sst file exist in %v, "+ - "there are some backup files in the path already, "+ + "there are some backup files in the path already, but hasn't checkpoint metadata, "+ "please specify a correct backup directory!", s.URI()+"/"+metautil.LockFile) } return nil @@ -319,6 +497,7 @@ func BuildBackupRangeAndSchema( tableFilter filter.Filter, backupTS uint64, isFullBackup bool, + buildRange bool, ) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error) { snapshot := storage.GetSnapshot(kv.NewVersion(backupTS)) m := meta.NewSnapshotMeta(snapshot) @@ -435,15 +614,17 @@ func BuildBackupRangeAndSchema( backupSchemas.AddSchema(dbInfo, tableInfo) - tableRanges, err := BuildTableRanges(tableInfo) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - for _, r := range tableRanges { - ranges = append(ranges, rtree.Range{ - StartKey: r.StartKey, - EndKey: r.EndKey, - }) + if buildRange { + tableRanges, err := BuildTableRanges(tableInfo) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + for _, r := range tableRanges { + ranges = append(ranges, rtree.Range{ + StartKey: r.StartKey, + EndKey: r.EndKey, + }) + } } } } @@ -604,10 +785,13 @@ func (bc *Client) BackupRanges( id := id req := request req.StartKey, req.EndKey = r.StartKey, r.EndKey - + pr, err := bc.GetProgressRange(r) + if err != nil { + return errors.Trace(err) + } workerPool.ApplyOnErrorGroup(eg, func() error { elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id)) - err := bc.BackupRange(elctx, req, metaWriter, progressCallBack) + err := bc.BackupRange(elctx, req, pr, metaWriter, progressCallBack) if err != nil { // The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear) if errors.Cause(err) == context.Canceled { @@ -618,6 +802,7 @@ func (bc *Client) BackupRanges( return nil }) } + return eg.Wait() } @@ -625,7 +810,8 @@ func (bc *Client) BackupRanges( // Returns an array of files backed up. func (bc *Client) BackupRange( ctx context.Context, - req backuppb.BackupRequest, + request backuppb.BackupRequest, + progressRange *rtree.ProgressRange, metaWriter *metautil.MetaWriter, progressCallBack func(ProgressUnit), ) (err error) { @@ -633,17 +819,17 @@ func (bc *Client) BackupRange( defer func() { elapsed := time.Since(start) logutil.CL(ctx).Info("backup range completed", - logutil.Key("startKey", req.StartKey), logutil.Key("endKey", req.EndKey), + logutil.Key("startKey", progressRange.Origin.StartKey), logutil.Key("endKey", progressRange.Origin.EndKey), zap.Duration("take", elapsed)) - key := "range start:" + hex.EncodeToString(req.StartKey) + " end:" + hex.EncodeToString(req.EndKey) + key := "range start:" + hex.EncodeToString(progressRange.Origin.StartKey) + " end:" + hex.EncodeToString(progressRange.Origin.EndKey) if err != nil { summary.CollectFailureUnit(key, err) } }() logutil.CL(ctx).Info("backup range started", - logutil.Key("startKey", req.StartKey), logutil.Key("endKey", req.EndKey), - zap.Uint64("rateLimit", req.RateLimit), - zap.Uint32("concurrency", req.Concurrency)) + logutil.Key("startKey", progressRange.Origin.StartKey), logutil.Key("endKey", progressRange.Origin.EndKey), + zap.Uint64("rateLimit", request.RateLimit), + zap.Uint32("concurrency", request.Concurrency)) var allStores []*metapb.Store allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), connutil.SkipTiFlash) @@ -652,35 +838,57 @@ func (bc *Client) BackupRange( } logutil.CL(ctx).Info("backup push down started") - push := newPushDown(bc.mgr, len(allStores)) - results, err := push.pushBackup(ctx, req, allStores, progressCallBack) - if err != nil { - return errors.Trace(err) + // either the `incomplete` is origin range itself, + // or the `incomplete` is sub-ranges split by checkpoint of origin range + if len(progressRange.Incomplete) > 0 { + // don't make the origin request dirty, + // since fineGrainedBackup need to use it. + req := request + if len(progressRange.Incomplete) > 1 { + subRanges := make([]*kvrpcpb.KeyRange, 0, len(progressRange.Incomplete)) + for _, r := range progressRange.Incomplete { + subRanges = append(subRanges, &kvrpcpb.KeyRange{ + StartKey: r.StartKey, + EndKey: r.EndKey, + }) + } + req.SubRanges = subRanges + } else { + // compatible with older version of TiKV + req.StartKey = progressRange.Incomplete[0].StartKey + req.EndKey = progressRange.Incomplete[0].EndKey + } + + push := newPushDown(bc.mgr, len(allStores)) + err = push.pushBackup(ctx, req, progressRange, allStores, bc.checkpointRunner, progressCallBack) + if err != nil { + return errors.Trace(err) + } } - logutil.CL(ctx).Info("backup push down completed", zap.Int("small-range-count", results.Len())) + logutil.CL(ctx).Info("backup push down completed", zap.Int("small-range-count", progressRange.Res.Len())) // Find and backup remaining ranges. // TODO: test fine grained backup. - if err := bc.fineGrainedBackup(ctx, req, results, progressCallBack); err != nil { + if err := bc.fineGrainedBackup(ctx, request, progressRange, progressCallBack); err != nil { return errors.Trace(err) } // update progress of range unit progressCallBack(RangeUnit) - if req.IsRawKv { + if request.IsRawKv { logutil.CL(ctx).Info("raw ranges backed up", - logutil.Key("startKey", req.StartKey), - logutil.Key("endKey", req.EndKey), - zap.String("cf", req.Cf)) + logutil.Key("startKey", progressRange.Origin.StartKey), + logutil.Key("endKey", progressRange.Origin.EndKey), + zap.String("cf", request.Cf)) } else { logutil.CL(ctx).Info("transactional range backup completed", - zap.Reflect("StartTS", req.StartVersion), - zap.Reflect("EndTS", req.EndVersion)) + zap.Reflect("StartTS", request.StartVersion), + zap.Reflect("EndTS", request.EndVersion)) } var ascendErr error - results.Ascend(func(i btree.Item) bool { + progressRange.Res.Ascend(func(i btree.Item) bool { r := i.(*rtree.Range) for _, f := range r.Files { summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs) @@ -699,7 +907,7 @@ func (bc *Client) BackupRange( } // Check if there are duplicated files. - checkDupFiles(&results) + checkDupFiles(&progressRange.Res) return nil } @@ -732,7 +940,7 @@ func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool func (bc *Client) fineGrainedBackup( ctx context.Context, req backuppb.BackupRequest, - rangeTree rtree.RangeTree, + pr *rtree.ProgressRange, progressCallBack func(ProgressUnit), ) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { @@ -759,7 +967,7 @@ func (bc *Client) fineGrainedBackup( bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff) for { // Step1, check whether there is any incomplete range - incomplete := rangeTree.GetIncompleteRange(req.StartKey, req.EndKey) + incomplete := pr.Res.GetIncompleteRange(req.StartKey, req.EndKey) if len(incomplete) == 0 { return nil } @@ -827,7 +1035,18 @@ func (bc *Client) fineGrainedBackup( logutil.Key("fine-grained-range-start", resp.StartKey), logutil.Key("fine-grained-range-end", resp.EndKey), ) - rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files) + if bc.checkpointRunner != nil { + if err := bc.checkpointRunner.Append( + ctx, + pr.GroupKey, + resp.StartKey, + resp.EndKey, + resp.Files, + ); err != nil { + return errors.Annotate(err, "failed to flush checkpoint when fineGrainedBackup") + } + } + pr.Res.Put(resp.StartKey, resp.EndKey, resp.Files) apiVersion := resp.ApiVersion bc.SetApiVersion(apiVersion) diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 45c2b9acca01c..2ffffe690ffe5 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -13,6 +13,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/checkpoint" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/redact" @@ -54,9 +55,11 @@ func newPushDown(mgr ClientMgr, capacity int) *pushDown { func (push *pushDown) pushBackup( ctx context.Context, req backuppb.BackupRequest, + pr *rtree.ProgressRange, stores []*metapb.Store, + checkpointRunner *checkpoint.CheckpointRunner, progressCallBack func(ProgressUnit), -) (rtree.RangeTree, error) { +) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("pushDown.pushBackup", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -64,10 +67,9 @@ func (push *pushDown) pushBackup( } // Push down backup tasks to all tikv instances. - res := rtree.NewRangeTree() failpoint.Inject("noop-backup", func(_ failpoint.Value) { logutil.CL(ctx).Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey)) - failpoint.Return(res, nil) + failpoint.Return(nil) }) wg := new(sync.WaitGroup) @@ -84,7 +86,7 @@ func (push *pushDown) pushBackup( // BR should be able to backup even some of stores disconnected. // The regions managed by this store can be retried at fine-grained backup then. logutil.CL(lctx).Warn("fail to connect store, skipping", zap.Error(err)) - return res, nil + return nil } wg.Add(1) go func() { @@ -125,7 +127,7 @@ func (push *pushDown) pushBackup( store := respAndStore.GetStore() if !ok { // Finished. - return res, nil + return nil } failpoint.Inject("backup-timeout-error", func(val failpoint.Value) { msg := val.(string) @@ -165,7 +167,19 @@ func (push *pushDown) pushBackup( }) if resp.GetError() == nil { // None error means range has been backuped successfully. - res.Put( + if checkpointRunner != nil { + if err := checkpointRunner.Append( + ctx, + pr.GroupKey, + resp.StartKey, + resp.EndKey, + resp.Files, + ); err != nil { + // the error is only from flush operator + return errors.Annotate(err, "failed to flush checkpoint") + } + } + pr.Res.Put( resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles()) // Update progress @@ -181,7 +195,7 @@ func (push *pushDown) pushBackup( case *backuppb.Error_ClusterIdError: logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v)) - return res, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb) + return errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb) default: if utils.MessageIsRetryableStorageError(errPb.GetMsg()) { logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg())) @@ -204,7 +218,7 @@ func (push *pushDown) pushBackup( if len(errMsg) <= 0 { errMsg = errPb.Msg } - return res, errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s %s", + return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s %s", store.GetId(), redact.String(store.GetAddress()), req.StorageBackend.String(), @@ -214,10 +228,10 @@ func (push *pushDown) pushBackup( } case err := <-push.errCh: if !berrors.Is(err, berrors.ErrFailedToConnect) { - return res, errors.Annotatef(err, "failed to backup range [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey)) + return errors.Annotatef(err, "failed to backup range [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey)) } logutil.CL(ctx).Warn("skipping disconnected stores", logutil.ShortError(err)) - return res, nil + return nil } } } diff --git a/br/pkg/backup/schema.go b/br/pkg/backup/schema.go index 066043c224064..bb0cf7f884189 100644 --- a/br/pkg/backup/schema.go +++ b/br/pkg/backup/schema.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/checkpoint" "github.com/pingcap/tidb/br/pkg/checksum" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" @@ -44,14 +45,22 @@ type schemaInfo struct { type Schemas struct { // name -> schema schemas map[string]*schemaInfo + + // checkpoint: table id -> checksum + checkpointChecksum map[int64]*checkpoint.ChecksumItem } func NewBackupSchemas() *Schemas { return &Schemas{ - schemas: make(map[string]*schemaInfo), + schemas: make(map[string]*schemaInfo), + checkpointChecksum: nil, } } +func (ss *Schemas) SetCheckpointChecksum(checkpointChecksum map[int64]*checkpoint.ChecksumItem) { + ss.checkpointChecksum = checkpointChecksum +} + func (ss *Schemas) AddSchema( dbInfo *model.DBInfo, tableInfo *model.TableInfo, ) { @@ -73,6 +82,7 @@ func (ss *Schemas) AddSchema( func (ss *Schemas) BackupSchemas( ctx context.Context, metaWriter *metautil.MetaWriter, + checkpointRunner *checkpoint.CheckpointRunner, store kv.Storage, statsHandle *handle.Handle, backupTS uint64, @@ -100,6 +110,11 @@ func (ss *Schemas) BackupSchemas( schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O) } + var checksum *checkpoint.ChecksumItem + var exists bool = false + if ss.checkpointChecksum != nil { + checksum, exists = ss.checkpointChecksum[schema.tableInfo.ID] + } workerPool.ApplyOnErrorGroup(errg, func() error { if schema.tableInfo != nil { logger := log.With( @@ -109,16 +124,38 @@ func (ss *Schemas) BackupSchemas( if !skipChecksum { logger.Info("Calculate table checksum start") - start := time.Now() - err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency) - if err != nil { - return errors.Trace(err) + if exists && checksum != nil { + schema.crc64xor = checksum.Crc64xor + schema.totalKvs = checksum.TotalKvs + schema.totalBytes = checksum.TotalBytes + logger.Info("Calculate table checksum completed (from checkpoint)", + zap.Uint64("Crc64Xor", schema.crc64xor), + zap.Uint64("TotalKvs", schema.totalKvs), + zap.Uint64("TotalBytes", schema.totalBytes)) + } else { + start := time.Now() + err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency) + if err != nil { + return errors.Trace(err) + } + calculateCost := time.Since(start) + var flushCost time.Duration + if checkpointRunner != nil { + // if checkpoint runner is running and the checksum is not from checkpoint + // then flush the checksum by the checkpoint runner + startFlush := time.Now() + if err = checkpointRunner.FlushChecksum(ctx, schema.tableInfo.ID, schema.crc64xor, schema.totalKvs, schema.totalBytes, calculateCost.Seconds()); err != nil { + return errors.Trace(err) + } + flushCost = time.Since(startFlush) + } + logger.Info("Calculate table checksum completed", + zap.Uint64("Crc64Xor", schema.crc64xor), + zap.Uint64("TotalKvs", schema.totalKvs), + zap.Uint64("TotalBytes", schema.totalBytes), + zap.Duration("calculate-take", calculateCost), + zap.Duration("flush-take", flushCost)) } - logger.Info("Calculate table checksum completed", - zap.Uint64("Crc64Xor", schema.crc64xor), - zap.Uint64("TotalKvs", schema.totalKvs), - zap.Uint64("TotalBytes", schema.totalBytes), - zap.Duration("take", time.Since(start))) } if statsHandle != nil { if err := schema.dumpStatsToJSON(statsHandle); err != nil { diff --git a/br/pkg/backup/schema_test.go b/br/pkg/backup/schema_test.go index bed9d834d2e10..08d560bf03c25 100644 --- a/br/pkg/backup/schema_test.go +++ b/br/pkg/backup/schema_test.go @@ -108,7 +108,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { testFilter, err := filter.Parse([]string{"test.t1"}) require.NoError(t, err) _, backupSchemas, _, err := backup.BuildBackupRangeAndSchema( - m.Storage, testFilter, math.MaxUint64, false) + m.Storage, testFilter, math.MaxUint64, false, true) require.NoError(t, err) require.NotNil(t, backupSchemas) @@ -116,7 +116,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { fooFilter, err := filter.Parse([]string{"foo.t1"}) require.NoError(t, err) _, backupSchemas, _, err = backup.BuildBackupRangeAndSchema( - m.Storage, fooFilter, math.MaxUint64, false) + m.Storage, fooFilter, math.MaxUint64, false, true) require.NoError(t, err) require.Nil(t, backupSchemas) @@ -125,7 +125,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"}) require.NoError(t, err) _, backupSchemas, _, err = backup.BuildBackupRangeAndSchema( - m.Storage, noFilter, math.MaxUint64, false) + m.Storage, noFilter, math.MaxUint64, false, true) require.NoError(t, err) require.NotNil(t, backupSchemas) @@ -137,7 +137,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { var policies []*backuppb.PlacementPolicy _, backupSchemas, policies, err = backup.BuildBackupRangeAndSchema( - m.Storage, testFilter, math.MaxUint64, false) + m.Storage, testFilter, math.MaxUint64, false, true) require.NoError(t, err) require.Equal(t, 1, backupSchemas.Len()) // we expect no policies collected, because it's not full backup. @@ -151,7 +151,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { metaWriter := metautil.NewMetaWriter(es, metautil.MetaFileSize, false, "", &cipher) ctx := context.Background() err = backupSchemas.BackupSchemas( - ctx, metaWriter, m.Storage, nil, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) + ctx, metaWriter, nil, m.Storage, nil, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) require.Equal(t, int64(1), updateCh.get()) require.NoError(t, err) err = metaWriter.FlushBackupMeta(ctx) @@ -170,7 +170,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { tk.MustExec("insert into t2 values (11);") _, backupSchemas, policies, err = backup.BuildBackupRangeAndSchema( - m.Storage, noFilter, math.MaxUint64, true) + m.Storage, noFilter, math.MaxUint64, true, true) require.NoError(t, err) require.Equal(t, 2, backupSchemas.Len()) // we expect the policy fivereplicas collected in full backup. @@ -180,7 +180,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { es2 := GetRandomStorage(t) metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, "", &cipher) err = backupSchemas.BackupSchemas( - ctx, metaWriter2, m.Storage, nil, math.MaxUint64, 2, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) + ctx, metaWriter2, nil, m.Storage, nil, math.MaxUint64, 2, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) require.Equal(t, int64(2), updateCh.get()) require.NoError(t, err) err = metaWriter2.FlushBackupMeta(ctx) @@ -219,7 +219,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) { f, err := filter.Parse([]string{"test.t3"}) require.NoError(t, err) - _, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false) + _, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false, true) require.NoError(t, err) require.Equal(t, 1, backupSchemas.Len()) @@ -234,7 +234,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) { metaWriter := metautil.NewMetaWriter(es, metautil.MetaFileSize, false, "", &cipher) ctx := context.Background() err = backupSchemas.BackupSchemas( - ctx, metaWriter, m.Storage, nil, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) + ctx, metaWriter, nil, m.Storage, nil, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) require.NoError(t, err) err = metaWriter.FlushBackupMeta(ctx) require.NoError(t, err) @@ -253,7 +253,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) { // recover the statistics. tk.MustExec("analyze table t3;") - _, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false) + _, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false, true) require.NoError(t, err) require.Equal(t, 1, backupSchemas.Len()) @@ -262,7 +262,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) { es2 := GetRandomStorage(t) metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, "", &cipher) err = backupSchemas.BackupSchemas( - ctx, metaWriter2, m.Storage, statsHandle, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) + ctx, metaWriter2, nil, m.Storage, statsHandle, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) require.NoError(t, err) err = metaWriter2.FlushBackupMeta(ctx) require.NoError(t, err) @@ -294,7 +294,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) { f, err := filter.Parse([]string{"mysql.systable*"}) require.NoError(t, err) - _, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false) + _, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false, true) require.NoError(t, err) require.Equal(t, systemTablesCount, backupSchemas.Len()) @@ -305,7 +305,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) { updateCh := new(simpleProgress) metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, "", &cipher) - err = backupSchemas.BackupSchemas(ctx, metaWriter2, m.Storage, nil, + err = backupSchemas.BackupSchemas(ctx, metaWriter2, nil, m.Storage, nil, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, true, updateCh) require.NoError(t, err) err = metaWriter2.FlushBackupMeta(ctx) diff --git a/br/pkg/checkpoint/BUILD.bazel b/br/pkg/checkpoint/BUILD.bazel new file mode 100644 index 0000000000000..baae284d545fb --- /dev/null +++ b/br/pkg/checkpoint/BUILD.bazel @@ -0,0 +1,32 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "checkpoint", + srcs = ["checkpoint.go"], + importpath = "github.com/pingcap/tidb/br/pkg/checkpoint", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/metautil", + "//br/pkg/rtree", + "//br/pkg/storage", + "//br/pkg/summary", + "//br/pkg/utils", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_log//:log", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "checkpoint_test", + srcs = ["checkpoint_test.go"], + deps = [ + ":checkpoint", + "//br/pkg/rtree", + "//br/pkg/storage", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/encryptionpb", + "@com_github_stretchr_testify//require", + ], +) diff --git a/br/pkg/checkpoint/checkpoint.go b/br/pkg/checkpoint/checkpoint.go new file mode 100644 index 0000000000000..a1634779009b8 --- /dev/null +++ b/br/pkg/checkpoint/checkpoint.go @@ -0,0 +1,580 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkpoint + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "math/rand" + "strings" + "sync" + "time" + + "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/metautil" + "github.com/pingcap/tidb/br/pkg/rtree" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/summary" + "github.com/pingcap/tidb/br/pkg/utils" + "go.uber.org/zap" +) + +const ( + CheckpointMetaPath = "checkpoint.meta" + CheckpointDir = "/checkpoints" + + CheckpointDataDir = CheckpointDir + "/data" + CheckpointChecksumDir = CheckpointDir + "/checksum" +) + +const MaxChecksumTotalCost float64 = 60.0 + +const tickDuration = 30 * time.Second + +type CheckpointMessage struct { + // start-key of the origin range + GroupKey string + + Group *rtree.Range +} + +// A Checkpoint Range File is like this: +// +// ChecksumData +// +----------------+ RangeGroupData RangeGroups +// | DureTime | +--------------------------+ encrypted +-------------+ +// | RangeGroupData-+---> | RangeGroupsEncriptedData-+----------> | GroupKey | +// | RangeGroupData | | Checksum | | Range | +// | ... | | CipherIv | | ... | +// | RangeGroupData | | Size | | Range | +// +----------------+ +--------------------------+ +-------------+ + +type RangeGroups struct { + GroupKey string `json:"group-key"` + Groups []*rtree.Range `json:"groups"` +} + +type RangeGroupData struct { + RangeGroupsEncriptedData []byte + Checksum []byte + CipherIv []byte + + Size int +} + +type CheckpointData struct { + DureTime time.Duration `json:"dure-time"` + RangeGroupMetas []*RangeGroupData `json:"range-group-metas"` +} + +// A Checkpoint Checksum File is like this: +// +// ChecksumInfo ChecksumItems ChecksumItem +// +-------------+ +--------------+ +--------------+ +// | Content---+-> | ChecksumItem-+---> | TableID | +// | Checksum | | ChecksumItem | | Crc64xor | +// +-------------+ | ... | | TotalKvs | +// | ChecksumItem | | TotalBytes | +// +--------------+ +--------------+ + +type ChecksumItem struct { + TableID int64 `json:"table-id"` + Crc64xor uint64 `json:"crc64-xor"` + TotalKvs uint64 `json:"total-kvs"` + TotalBytes uint64 `json:"total-bytes"` +} + +type ChecksumItems struct { + Items []*ChecksumItem `json:"checksum-items"` +} + +type ChecksumInfo struct { + Content []byte `json:"content"` + Checksum []byte `json:"checksum"` +} + +type ChecksumRunner struct { + sync.Mutex + + checksumItems ChecksumItems + + // when the total time cost is large than the threshold, + // begin to flush checksum + totalCost float64 + + err error + wg sync.WaitGroup + workerPool utils.WorkerPool +} + +func NewChecksumRunner() *ChecksumRunner { + return &ChecksumRunner{ + workerPool: *utils.NewWorkerPool(4, "checksum flush worker"), + } +} + +// FlushChecksum save the checksum in the memory temporarily +// and flush to the external storage if checksum take much time +func (cr *ChecksumRunner) FlushChecksum( + ctx context.Context, + s storage.ExternalStorage, + tableID int64, + crc64xor uint64, + totalKvs uint64, + totalBytes uint64, + timeCost float64, +) error { + checksumItem := &ChecksumItem{ + TableID: tableID, + Crc64xor: crc64xor, + TotalKvs: totalKvs, + TotalBytes: totalBytes, + } + var toBeFlushedChecksumItems *ChecksumItems = nil + cr.Lock() + if cr.err != nil { + err := cr.err + cr.Unlock() + return err + } + if cr.checksumItems.Items == nil { + // reset the checksumInfo + cr.totalCost = 0 + cr.checksumItems.Items = make([]*ChecksumItem, 0) + } + cr.totalCost += timeCost + cr.checksumItems.Items = append(cr.checksumItems.Items, checksumItem) + if cr.totalCost > MaxChecksumTotalCost { + toBeFlushedChecksumItems = &ChecksumItems{ + Items: cr.checksumItems.Items, + } + cr.checksumItems.Items = nil + } + cr.Unlock() + + // now lock is free + if toBeFlushedChecksumItems == nil { + return nil + } + + // create a goroutine to flush checksumInfo to external storage + cr.wg.Add(1) + cr.workerPool.Apply(func() { + defer cr.wg.Done() + recordErr := func(err error) { + cr.Lock() + cr.err = err + cr.Unlock() + } + + content, err := json.Marshal(toBeFlushedChecksumItems) + if err != nil { + recordErr(err) + return + } + + checksum := sha256.Sum256(content) + checksumInfo := &ChecksumInfo{ + Content: content, + Checksum: checksum[:], + } + + data, err := json.Marshal(checksumInfo) + if err != nil { + recordErr(err) + return + } + + fname := fmt.Sprintf("%s/t%d_and__", CheckpointChecksumDir, tableID) + err = s.WriteFile(ctx, fname, data) + if err != nil { + recordErr(err) + return + } + }) + return nil +} + +type CheckpointRunner struct { + meta map[string]*RangeGroups + + checksumRunner *ChecksumRunner + + storage storage.ExternalStorage + cipher *backuppb.CipherInfo + + appendCh chan *CheckpointMessage + metaCh chan map[string]*RangeGroups + errCh chan error + + wg sync.WaitGroup +} + +// only for test +func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration) *CheckpointRunner { + runner := &CheckpointRunner{ + meta: make(map[string]*RangeGroups), + + checksumRunner: NewChecksumRunner(), + + storage: storage, + cipher: cipher, + + appendCh: make(chan *CheckpointMessage), + metaCh: make(chan map[string]*RangeGroups), + errCh: make(chan error), + } + + runner.startCheckpointLoop(ctx, tick) + return runner +} + +func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo) *CheckpointRunner { + runner := &CheckpointRunner{ + meta: make(map[string]*RangeGroups), + + checksumRunner: NewChecksumRunner(), + + storage: storage, + cipher: cipher, + + appendCh: make(chan *CheckpointMessage), + metaCh: make(chan map[string]*RangeGroups), + errCh: make(chan error), + } + + runner.startCheckpointLoop(ctx, tickDuration) + return runner +} + +func (r *CheckpointRunner) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64, totalKvs uint64, totalBytes uint64, timeCost float64) error { + return r.checksumRunner.FlushChecksum(ctx, r.storage, tableID, crc64xor, totalKvs, totalBytes, timeCost) +} + +func (r *CheckpointRunner) Append( + ctx context.Context, + groupKey string, + startKey []byte, + endKey []byte, + files []*backuppb.File, +) error { + select { + case <-ctx.Done(): + return nil + case err := <-r.errCh: + return err + case r.appendCh <- &CheckpointMessage{ + GroupKey: groupKey, + Group: &rtree.Range{ + StartKey: startKey, + EndKey: endKey, + Files: files, + }, + }: + return nil + } +} + +// Note: Cannot be parallel with `Append` function +func (r *CheckpointRunner) WaitForFinish() { + // can not append anymore + close(r.appendCh) + // wait the range flusher exit + r.wg.Wait() + // wait the checksum flusher exit + r.checksumRunner.wg.Wait() +} + +// Send the meta to the flush goroutine, and reset the CheckpointRunner's meta +func (r *CheckpointRunner) flushMeta(ctx context.Context, errCh chan error) error { + meta := r.meta + r.meta = make(map[string]*RangeGroups) + // do flush + select { + case <-ctx.Done(): + case err := <-errCh: + return err + case r.metaCh <- meta: + } + return nil +} + +// start a goroutine to flush the meta, which is sent from `checkpoint looper`, to the external storage +func (r *CheckpointRunner) startCheckpointRunner(ctx context.Context, wg *sync.WaitGroup) chan error { + errCh := make(chan error, 1) + wg.Add(1) + flushWorker := func(ctx context.Context, errCh chan error) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case meta, ok := <-r.metaCh: + if !ok { + log.Info("stop checkpoint flush worker") + return + } + if err := r.doFlush(ctx, meta); err != nil { + errCh <- err + return + } + } + } + } + + go flushWorker(ctx, errCh) + return errCh +} + +func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration time.Duration) { + r.wg.Add(1) + checkpointLoop := func(ctx context.Context) { + defer r.wg.Done() + cctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup + errCh := r.startCheckpointRunner(cctx, &wg) + ticker := time.NewTicker(tickDuration) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := r.flushMeta(ctx, errCh); err != nil { + r.errCh <- err + return + } + case msg, ok := <-r.appendCh: + if !ok { + log.Info("stop checkpoint runner") + if err := r.flushMeta(ctx, errCh); err != nil { + r.errCh <- err + } + // close the channel to flush worker + // and wait it to consumes all the metas + close(r.metaCh) + wg.Wait() + return + } + groups, exist := r.meta[msg.GroupKey] + if !exist { + groups = &RangeGroups{ + GroupKey: msg.GroupKey, + Groups: make([]*rtree.Range, 0), + } + r.meta[msg.GroupKey] = groups + } + groups.Groups = append(groups.Groups, msg.Group) + case err := <-errCh: + // pass flush worker's error back + r.errCh <- err + return + } + } + } + + go checkpointLoop(ctx) +} + +// flush the meta to the external storage +func (r *CheckpointRunner) doFlush(ctx context.Context, meta map[string]*RangeGroups) error { + if len(meta) == 0 { + return nil + } + + checkpointData := &CheckpointData{ + DureTime: summary.NowDureTime(), + RangeGroupMetas: make([]*RangeGroupData, 0, len(meta)), + } + + var fname []byte = nil + + for _, group := range meta { + if len(group.Groups) == 0 { + continue + } + + // use the first item's group-key and sub-range-key as the filename + if len(fname) == 0 { + fname = append(append([]byte(group.GroupKey), '.', '.'), group.Groups[0].StartKey...) + } + + // Flush the metaFile to storage + content, err := json.Marshal(group) + if err != nil { + return errors.Trace(err) + } + + encryptBuff, iv, err := metautil.Encrypt(content, r.cipher) + if err != nil { + return errors.Trace(err) + } + + checksum := sha256.Sum256(content) + + checkpointData.RangeGroupMetas = append(checkpointData.RangeGroupMetas, &RangeGroupData{ + RangeGroupsEncriptedData: encryptBuff, + Checksum: checksum[:], + Size: len(content), + CipherIv: iv, + }) + } + + if len(checkpointData.RangeGroupMetas) > 0 { + data, err := json.Marshal(checkpointData) + if err != nil { + return errors.Trace(err) + } + + checksum := sha256.Sum256(fname) + checksumEncoded := base64.URLEncoding.EncodeToString(checksum[:]) + path := fmt.Sprintf("%s/%s_%d.cpt", CheckpointDataDir, checksumEncoded, rand.Uint64()) + if err := r.storage.WriteFile(ctx, path, data); err != nil { + return errors.Trace(err) + } + } + return nil +} + +// walk the whole checkpoint range files and retrieve the metadatat of backed up ranges +// and return the total time cost in the past executions +func WalkCheckpointFile(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, fn func(groupKey string, rg *rtree.Range)) (time.Duration, error) { + // records the total time cost in the past executions + var pastDureTime time.Duration = 0 + err := s.WalkDir(ctx, &storage.WalkOption{SubDir: CheckpointDataDir}, func(path string, size int64) error { + if strings.HasSuffix(path, ".cpt") { + content, err := s.ReadFile(ctx, path) + if err != nil { + return errors.Trace(err) + } + + checkpointData := &CheckpointData{} + if err = json.Unmarshal(content, checkpointData); err != nil { + return errors.Trace(err) + } + + if checkpointData.DureTime > pastDureTime { + pastDureTime = checkpointData.DureTime + } + for _, meta := range checkpointData.RangeGroupMetas { + decryptContent, err := metautil.Decrypt(meta.RangeGroupsEncriptedData, cipher, meta.CipherIv) + if err != nil { + return errors.Trace(err) + } + + checksum := sha256.Sum256(decryptContent) + if !bytes.Equal(meta.Checksum, checksum[:]) { + log.Error("checkpoint checksum info's checksum mismatch, skip it", + zap.ByteString("expect", meta.Checksum), + zap.ByteString("got", checksum[:]), + ) + continue + } + + group := &RangeGroups{} + if err = json.Unmarshal(decryptContent, group); err != nil { + return errors.Trace(err) + } + + for _, g := range group.Groups { + fn(group.GroupKey, g) + } + } + } + return nil + }) + + return pastDureTime, errors.Trace(err) +} + +type CheckpointMetadata struct { + GCServiceId string `json:"gc-service-id"` + ConfigHash []byte `json:"config-hash"` + BackupTS uint64 `json:"backup-ts"` + Ranges []rtree.Range `json:"ranges"` + + CheckpointChecksum map[int64]*ChecksumItem `json:"-"` + CheckpointDataMap map[string]rtree.RangeTree `json:"-"` +} + +// load checkpoint metadata from the external storage +func LoadCheckpointMetadata(ctx context.Context, s storage.ExternalStorage) (*CheckpointMetadata, error) { + data, err := s.ReadFile(ctx, CheckpointMetaPath) + if err != nil { + return nil, errors.Trace(err) + } + m := &CheckpointMetadata{} + err = json.Unmarshal(data, m) + if err != nil { + return nil, errors.Trace(err) + } + m.CheckpointChecksum, err = loadCheckpointChecksum(ctx, s) + return m, errors.Trace(err) +} + +// walk the whole checkpoint checksum files and retrieve checksum information of tables calculated +func loadCheckpointChecksum(ctx context.Context, s storage.ExternalStorage) (map[int64]*ChecksumItem, error) { + checkpointChecksum := make(map[int64]*ChecksumItem) + + err := s.WalkDir(ctx, &storage.WalkOption{SubDir: CheckpointChecksumDir}, func(path string, size int64) error { + data, err := s.ReadFile(ctx, path) + if err != nil { + return errors.Trace(err) + } + info := &ChecksumInfo{} + err = json.Unmarshal(data, info) + if err != nil { + return errors.Trace(err) + } + + checksum := sha256.Sum256(info.Content) + if !bytes.Equal(info.Checksum, checksum[:]) { + log.Error("checkpoint checksum info's checksum mismatch, skip it", + zap.ByteString("expect", info.Checksum), + zap.ByteString("got", checksum[:]), + ) + return nil + } + + items := &ChecksumItems{} + err = json.Unmarshal(info.Content, items) + if err != nil { + return errors.Trace(err) + } + + for _, c := range items.Items { + checkpointChecksum[c.TableID] = c + } + return nil + }) + return checkpointChecksum, errors.Trace(err) +} + +// save the checkpoint metadata into the external storage +func SaveCheckpointMetadata(ctx context.Context, s storage.ExternalStorage, meta *CheckpointMetadata) error { + data, err := json.Marshal(meta) + if err != nil { + return errors.Trace(err) + } + + err = s.WriteFile(ctx, CheckpointMetaPath, data) + return errors.Trace(err) +} diff --git a/br/pkg/checkpoint/checkpoint_test.go b/br/pkg/checkpoint/checkpoint_test.go new file mode 100644 index 0000000000000..f52b1c6ad2b02 --- /dev/null +++ b/br/pkg/checkpoint/checkpoint_test.go @@ -0,0 +1,175 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkpoint_test + +import ( + "context" + "os" + "strings" + "testing" + "time" + + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/encryptionpb" + "github.com/pingcap/tidb/br/pkg/checkpoint" + "github.com/pingcap/tidb/br/pkg/rtree" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestCheckpointMeta(t *testing.T) { + ctx := context.Background() + base := t.TempDir() + s, err := storage.NewLocalStorage(base) + require.NoError(t, err) + + checkpointMeta := &checkpoint.CheckpointMetadata{ + ConfigHash: []byte("123456"), + BackupTS: 123456, + } + + err = checkpoint.SaveCheckpointMetadata(ctx, s, checkpointMeta) + require.NoError(t, err) + + checkpointMeta2, err := checkpoint.LoadCheckpointMetadata(ctx, s) + require.NoError(t, err) + require.Equal(t, checkpointMeta.ConfigHash, checkpointMeta2.ConfigHash) + require.Equal(t, checkpointMeta.BackupTS, checkpointMeta2.BackupTS) +} + +func TestCheckpointRunner(t *testing.T) { + ctx := context.Background() + base := t.TempDir() + s, err := storage.NewLocalStorage(base) + require.NoError(t, err) + os.MkdirAll(base+checkpoint.CheckpointDataDir, 0o755) + os.MkdirAll(base+checkpoint.CheckpointChecksumDir, 0o755) + + cipher := &backuppb.CipherInfo{ + CipherType: encryptionpb.EncryptionMethod_AES256_CTR, + CipherKey: []byte("01234567890123456789012345678901"), + } + checkpointRunner := checkpoint.StartCheckpointRunnerForTest(ctx, s, cipher, 5*time.Second) + + data := map[string]struct { + StartKey string + EndKey string + Name string + Name2 string + }{ + "a": { + StartKey: "a", + EndKey: "b", + Name: "c", + Name2: "d", + }, + "A": { + StartKey: "A", + EndKey: "B", + Name: "C", + Name2: "D", + }, + "1": { + StartKey: "1", + EndKey: "2", + Name: "3", + Name2: "4", + }, + } + + data2 := map[string]struct { + StartKey string + EndKey string + Name string + Name2 string + }{ + "+": { + StartKey: "+", + EndKey: "-", + Name: "*", + Name2: "/", + }, + } + + for _, d := range data { + err = checkpointRunner.Append(ctx, "a", []byte(d.StartKey), []byte(d.EndKey), []*backuppb.File{ + {Name: d.Name}, + {Name: d.Name2}, + }) + require.NoError(t, err) + } + + checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1, checkpoint.MaxChecksumTotalCost-20.0) + checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2, 40.0) + // now the checksum is flushed, because the total time cost is larger than `MaxChecksumTotalCost` + checkpointRunner.FlushChecksum(ctx, 3, 3, 3, 3, checkpoint.MaxChecksumTotalCost-20.0) + time.Sleep(6 * time.Second) + // the checksum has not been flushed even though after 6 seconds, + // because the total time cost is less than `MaxChecksumTotalCost` + checkpointRunner.FlushChecksum(ctx, 4, 4, 4, 4, 40.0) + + for _, d := range data2 { + err = checkpointRunner.Append(ctx, "+", []byte(d.StartKey), []byte(d.EndKey), []*backuppb.File{ + {Name: d.Name}, + {Name: d.Name2}, + }) + require.NoError(t, err) + } + + checkpointRunner.WaitForFinish() + + checker := func(groupKey string, resp *rtree.Range) { + require.NotNil(t, resp) + d, ok := data[string(resp.StartKey)] + if !ok { + d, ok = data2[string(resp.StartKey)] + require.True(t, ok) + } + require.Equal(t, d.StartKey, string(resp.StartKey)) + require.Equal(t, d.EndKey, string(resp.EndKey)) + require.Equal(t, d.Name, resp.Files[0].Name) + require.Equal(t, d.Name2, resp.Files[1].Name) + } + + _, err = checkpoint.WalkCheckpointFile(ctx, s, cipher, checker) + require.NoError(t, err) + + checkpointMeta := &checkpoint.CheckpointMetadata{ + ConfigHash: []byte("123456"), + BackupTS: 123456, + } + + err = checkpoint.SaveCheckpointMetadata(ctx, s, checkpointMeta) + require.NoError(t, err) + meta, err := checkpoint.LoadCheckpointMetadata(ctx, s) + require.NoError(t, err) + + var i int64 + for i = 1; i <= 4; i++ { + require.Equal(t, meta.CheckpointChecksum[i].Crc64xor, uint64(i)) + } + + // only 2 checksum files exists, they are t2_and__ and t4_and__ + count := 0 + err = s.WalkDir(ctx, &storage.WalkOption{SubDir: checkpoint.CheckpointChecksumDir}, func(s string, i int64) error { + count += 1 + if !strings.Contains(s, "t2") { + require.True(t, strings.Contains(s, "t4")) + } + return nil + }) + require.NoError(t, err) + require.Equal(t, count, 2) +} diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index e5bd63815b60a..49b9b7bb7f58e 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2583,7 +2583,7 @@ func (rc *Client) SaveSchemas( schemas := TidyOldSchemas(sr) schemasConcurrency := uint(mathutil.Min(64, schemas.Len())) - err := schemas.BackupSchemas(ctx, metaWriter, nil, nil, rc.restoreTS, schemasConcurrency, 0, true, nil) + err := schemas.BackupSchemas(ctx, metaWriter, nil, nil, nil, rc.restoreTS, schemasConcurrency, 0, true, nil) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/rtree/rtree.go b/br/pkg/rtree/rtree.go index 9f12b22daca75..f17ebf38df510 100644 --- a/br/pkg/rtree/rtree.go +++ b/br/pkg/rtree/rtree.go @@ -217,3 +217,10 @@ func (rangeTree *RangeTree) GetIncompleteRange( } return incomplete } + +type ProgressRange struct { + Res RangeTree + Incomplete []Range + Origin Range + GroupKey string +} diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index 68dc760cc1c9a..2d6e0571faa93 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -10,6 +10,8 @@ import ( "strings" "github.com/pingcap/errors" + "github.com/pingcap/log" + "go.uber.org/zap" ) const ( @@ -38,7 +40,21 @@ func (l *LocalStorage) WriteFile(_ context.Context, name string, data []byte) er // to an empty file if write is not finished. tmpPath := filepath.Join(l.base, name) + ".tmp" if err := os.WriteFile(tmpPath, data, localFilePerm); err != nil { - return errors.Trace(err) + path := filepath.Dir(tmpPath) + log.Info("failed to write file, try to mkdir the path", zap.String("path", path)) + exists, existErr := pathExists(path) + if existErr != nil { + return errors.Annotatef(err, "after failed to write file, failed to check path exists : %v", existErr) + } + if exists { + return errors.Trace(err) + } + if mkdirErr := mkdirAll(path); mkdirErr != nil { + return errors.Annotatef(err, "after failed to write file, failed to mkdir : %v", mkdirErr) + } + if err := os.WriteFile(tmpPath, data, localFilePerm); err != nil { + return errors.Trace(err) + } } if err := os.Rename(tmpPath, filepath.Join(l.base, name)); err != nil { return errors.Trace(err) diff --git a/br/pkg/storage/local_test.go b/br/pkg/storage/local_test.go index 82e7435ae29be..db1ba424b9d6b 100644 --- a/br/pkg/storage/local_test.go +++ b/br/pkg/storage/local_test.go @@ -9,6 +9,7 @@ import ( "runtime" "testing" + "github.com/pingcap/errors" "github.com/stretchr/testify/require" ) @@ -99,4 +100,30 @@ func TestWalkDirWithSoftLinkFile(t *testing.T) { }) require.NoError(t, err) require.Equal(t, 1, i) + + // test file not exists + exists, err := store.FileExists(context.TODO(), "/123/456") + require.NoError(t, err) + require.False(t, exists) + + // test walk nonexistent directory + err = store.WalkDir(context.TODO(), &WalkOption{SubDir: "123/456"}, func(path string, size int64) error { + return errors.New("find file") + }) + require.NoError(t, err) + // write file to a nonexistent directory + err = store.WriteFile(context.TODO(), "/123/456/789.txt", []byte(data)) + require.NoError(t, err) + exists, err = store.FileExists(context.TODO(), "/123/456") + require.NoError(t, err) + require.True(t, exists) + + // test walk existent directory + err = store.WalkDir(context.TODO(), &WalkOption{SubDir: "123/456"}, func(path string, size int64) error { + if path == "123/456/789.txt" { + return nil + } + return errors.Errorf("find other file: %s", path) + }) + require.NoError(t, err) } diff --git a/br/pkg/summary/collector.go b/br/pkg/summary/collector.go index 705c26df3e4ac..1a16fb6dc9cfc 100644 --- a/br/pkg/summary/collector.go +++ b/br/pkg/summary/collector.go @@ -46,6 +46,10 @@ type LogCollector interface { SetSuccessStatus(success bool) + NowDureTime() time.Duration + + AdjustStartTimeToEarlierTime(t time.Duration) + Summary(name string) Log(msg string, fields ...zap.Field) @@ -163,6 +167,18 @@ func logKeyFor(key string) string { return strings.ReplaceAll(key, " ", "-") } +func (tc *logCollector) NowDureTime() time.Duration { + tc.mu.Lock() + defer tc.mu.Unlock() + return time.Since(tc.startTime) +} + +func (tc *logCollector) AdjustStartTimeToEarlierTime(t time.Duration) { + tc.mu.Lock() + defer tc.mu.Unlock() + tc.startTime = tc.startTime.Add(-t) +} + func (tc *logCollector) Summary(name string) { tc.mu.Lock() defer func() { diff --git a/br/pkg/summary/summary.go b/br/pkg/summary/summary.go index 7ae488785760e..45c8fbbc55997 100644 --- a/br/pkg/summary/summary.go +++ b/br/pkg/summary/summary.go @@ -43,6 +43,15 @@ func SetSuccessStatus(success bool) { collector.SetSuccessStatus(success) } +// NowDureTime returns the duration between start time and current time +func NowDureTime() time.Duration { + return collector.NowDureTime() +} + +func AdjustStartTimeToEarlierTime(t time.Duration) { + collector.AdjustStartTimeToEarlierTime(t) +} + // Summary outputs summary log. func Summary(name string) { collector.Summary(name) diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index dba72170649de..3402640c703b2 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -4,6 +4,8 @@ package task import ( "context" + "crypto/sha256" + "encoding/json" "fmt" "os" "strconv" @@ -26,6 +28,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics/handle" @@ -45,11 +48,13 @@ const ( flagRemoveSchedulers = "remove-schedulers" flagIgnoreStats = "ignore-stats" flagUseBackupMetaV2 = "use-backupmeta-v2" + flagUseCheckpoint = "use-checkpoint" flagGCTTL = "gcttl" defaultBackupConcurrency = 4 maxBackupConcurrency = 256 + checkpointDefaultGCTTL = 72 * 60 // 72 minutes ) const ( @@ -77,6 +82,7 @@ type BackupConfig struct { RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"` IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"` UseBackupMetaV2 bool `json:"use-backupmeta-v2"` + UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"` CompressionConfig // for ebs-based backup @@ -126,6 +132,9 @@ func DefineBackupFlags(flags *pflag.FlagSet) { // but will generate v1 meta due to this flag is false. the behaviour is as same as v4.0.15, v4.0.16. // finally v4.0.17 will set this flag to true, and generate v2 meta. _ = flags.MarkHidden(flagUseBackupMetaV2) + + flags.Bool(flagUseCheckpoint, true, "use checkpoint mode") + _ = flags.MarkHidden(flagUseCheckpoint) } // ParseFromFlags parses the backup-related flags from the flag set. @@ -150,10 +159,21 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + noCheckpoint, err := flags.GetBool(flagUseCheckpoint) + if err != nil { + return errors.Trace(err) + } + cfg.UseCheckpoint = !noCheckpoint gcTTL, err := flags.GetInt64(flagGCTTL) if err != nil { return errors.Trace(err) } + // if use checkpoint and gcTTL is the default value + // update gcttl to checkpoint's default gc ttl + if cfg.UseCheckpoint && gcTTL == utils.DefaultBRGCSafePointTTL { + gcTTL = checkpointDefaultGCTTL + log.Info("use checkpoint's default GC TTL", zap.Int64("GC TTL", gcTTL)) + } cfg.GCTTL = gcTTL compressionCfg, err := parseCompressionFlags(flags) @@ -269,6 +289,23 @@ func (cfg *BackupConfig) Adjust() { } } +// a rough hash for checkpoint checker +func (cfg *BackupConfig) Hash() ([]byte, error) { + config := &BackupConfig{ + LastBackupTS: cfg.LastBackupTS, + IgnoreStats: cfg.IgnoreStats, + UseCheckpoint: cfg.UseCheckpoint, + Config: cfg.Config, + } + data, err := json.Marshal(config) + if err != nil { + return nil, errors.Trace(err) + } + hash := sha256.Sum256(data) + + return hash[:], nil +} + func isFullBackup(cmdName string) bool { return cmdName == FullBackupCmd } @@ -301,6 +338,12 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return errors.Trace(err) } defer mgr.Close() + // after version check, check the cluster whether support checkpoint mode + if cfg.UseCheckpoint { + err = version.CheckCheckpointSupport() + log.Warn("unable to use checkpoint mode, fall back to normal mode", zap.Error(err)) + cfg.UseCheckpoint = false + } var statsHandle *handle.Handle if !skipStats { statsHandle = mgr.GetDomain().StatsHandle() @@ -321,6 +364,10 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } client := backup.NewBackupClient(ctx, mgr) + + // set cipher only for checkpoint + client.SetCipher(&cfg.CipherInfo) + opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, @@ -329,6 +376,16 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if err = client.SetStorageAndCheckNotInUse(ctx, u, &opts); err != nil { return errors.Trace(err) } + // if checkpoint mode is unused at this time but there is checkpoint meta, + // CheckCheckpoint will stop backing up + cfgHash, err := cfg.Hash() + if err != nil { + return errors.Trace(err) + } + err = client.CheckCheckpoint(cfgHash) + if err != nil { + return errors.Trace(err) + } err = client.SetLockFile(ctx) if err != nil { return errors.Trace(err) @@ -340,10 +397,11 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return errors.Trace(err) } g.Record("BackupTS", backupTS) + safePointID := client.GetSafePointID() sp := utils.BRServiceSafePoint{ BackupTS: backupTS, TTL: client.GetGCTTL(), - ID: utils.MakeSafePointID(), + ID: safePointID, } // use lastBackupTS as safePoint if exists @@ -353,7 +411,26 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } log.Info("current backup safePoint job", zap.Object("safePoint", sp)) - err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) + cctx, gcSafePointKeeperCancel := context.WithCancel(ctx) + gcSafePointKeeperRemovable := false + defer func() { + // don't reset the gc-safe-point if checkpoint mode is used and backup is not finished + if cfg.UseCheckpoint && !gcSafePointKeeperRemovable { + return + } + log.Info("start to remove gc-safepoint keeper") + // close the gc safe point keeper at first + gcSafePointKeeperCancel() + // set the ttl to 0 to remove the gc-safe-point + sp.TTL = 0 + if err := utils.UpdateServiceSafePoint(ctx, mgr.GetPDClient(), sp); err != nil { + log.Warn("failed to update service safe point, backup may fail if gc triggered", + zap.Error(err), + ) + } + log.Info("finish removing gc-safepoint keeper") + }() + err = utils.StartServiceSafePointKeeper(cctx, mgr.GetPDClient(), sp) if err != nil { return errors.Trace(err) } @@ -392,7 +469,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return errors.Trace(err) } - ranges, schemas, policies, err := backup.BuildBackupRangeAndSchema(mgr.GetStorage(), cfg.TableFilter, backupTS, isFullBackup(cmdName)) + ranges, schemas, policies, err := client.BuildBackupRangeAndSchema(mgr.GetStorage(), cfg.TableFilter, backupTS, isFullBackup(cmdName)) if err != nil { return errors.Trace(err) } @@ -500,6 +577,18 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig }) } } + + if cfg.UseCheckpoint { + if err = client.StartCheckpointRunner(ctx, cfgHash, backupTS, ranges, safePointID, progressCallBack); err != nil { + return errors.Trace(err) + } + defer func() { + if !gcSafePointKeeperRemovable { + log.Info("wait for flush checkpoint...") + client.WaitForFinishCheckpoint() + } + }() + } metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), metawriter, progressCallBack) if err != nil { @@ -529,7 +618,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig schemasConcurrency := uint(mathutil.Min(backup.DefaultSchemaConcurrency, schemas.Len())) err = schemas.BackupSchemas( - ctx, metawriter, mgr.GetStorage(), statsHandle, backupTS, schemasConcurrency, cfg.ChecksumConcurrency, skipChecksum, updateCh) + ctx, metawriter, client.GetCheckpointRunner(), mgr.GetStorage(), statsHandle, backupTS, schemasConcurrency, cfg.ChecksumConcurrency, skipChecksum, updateCh) if err != nil { return errors.Trace(err) } @@ -538,6 +627,9 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if err != nil { return errors.Trace(err) } + // Since backupmeta is flushed on the external storage, + // we can remove the gc safepoint keeper + gcSafePointKeeperRemovable = true // Checksum has finished, close checksum progress. updateCh.Close() diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index 4d1d35d37f140..2b46347327501 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -213,9 +213,18 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf CompressionLevel: cfg.CompressionLevel, CipherInfo: &cfg.CipherInfo, } + rg := rtree.Range{ + StartKey: backupRange.StartKey, + EndKey: backupRange.EndKey, + } + progressRange := &rtree.ProgressRange{ + Res: rtree.NewRangeTree(), + Incomplete: []rtree.Range{rg}, + Origin: rg, + } metaWriter := metautil.NewMetaWriter(client.GetStorage(), metautil.MetaFileSize, false, metautil.MetaFile, &cfg.CipherInfo) metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRange(ctx, req, metaWriter, progressCallBack) + err = client.BackupRange(ctx, req, progressRange, metaWriter, progressCallBack) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 8006793ffdb89..fdcc728a9ce5f 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -424,7 +424,7 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error { } schemasConcurrency := uint(mathutil.Min(backup.DefaultSchemaConcurrency, schemas.Len())) - err = schemas.BackupSchemas(ctx, metaWriter, s.mgr.GetStorage(), nil, + err = schemas.BackupSchemas(ctx, metaWriter, nil, s.mgr.GetStorage(), nil, s.cfg.StartTS, schemasConcurrency, 0, true, nil) if err != nil { return errors.Trace(err) diff --git a/br/pkg/version/version.go b/br/pkg/version/version.go index 2af5cc3491535..14a0201ad4325 100644 --- a/br/pkg/version/version.go +++ b/br/pkg/version/version.go @@ -33,7 +33,8 @@ var ( versionHash = regexp.MustCompile("-[0-9]+-g[0-9a-f]{7,}") - pitrSupportBatchKVFiles bool = true + checkpointSupportError error = nil + pitrSupportBatchKVFiles bool = true ) // NextMajorVersion returns the next major version. @@ -204,6 +205,14 @@ func CheckVersionForBR(s *metapb.Store, tikvVersion *semver.Version) error { } } + // reset the checkpoint support error + checkpointSupportError = nil + if tikvVersion.Major < 6 || (tikvVersion.Major == 6 && tikvVersion.Minor < 5) { + // checkpoint mode only support after v6.5.0 + checkpointSupportError = errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s is too low when use checkpoint, please update tikv's version to at least v6.5.0", + s.Address, tikvVersion) + } + // don't warn if we are the master build, which always have the version v4.0.0-beta.2-* if build.GitBranch != "master" && tikvVersion.Compare(*BRVersion) > 0 { log.Warn(fmt.Sprintf("BR version is outdated, please consider use version %s of BR", tikvVersion)) @@ -311,6 +320,10 @@ func FetchVersion(ctx context.Context, db utils.QueryExecutor) (string, error) { return versionInfo, nil } +func CheckCheckpointSupport() error { + return checkpointSupportError +} + func CheckPITRSupportBatchKVFiles() bool { return pitrSupportBatchKVFiles } diff --git a/br/pkg/version/version_test.go b/br/pkg/version/version_test.go index 8871e0ae9f04e..96c8d9a306ef7 100644 --- a/br/pkg/version/version_test.go +++ b/br/pkg/version/version_test.go @@ -228,6 +228,29 @@ func TestCheckClusterVersion(t *testing.T) { } err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBR) require.NoError(t, err) + require.Error(t, CheckCheckpointSupport()) + } + + { + build.ReleaseVersion = "v6.0.0-rc.2" + mock.getAllStores = func() []*metapb.Store { + // TiKV v6.0.0-rc.1 with BR v6.0.0-rc.2 is ok + return []*metapb.Store{{Version: "v6.0.0-rc.1"}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBR) + require.NoError(t, err) + require.Error(t, CheckCheckpointSupport()) + } + + { + build.ReleaseVersion = "v6.5.0-rc.2" + mock.getAllStores = func() []*metapb.Store { + // TiKV v6.5.0-rc.1 with BR v6.5.0-rc.2 is ok + return []*metapb.Store{{Version: "v6.5.0-rc.1"}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBR) + require.NoError(t, err) + require.NoError(t, CheckCheckpointSupport()) } { diff --git a/executor/adapter.go b/executor/adapter.go index aaa7aa8c4b8c9..a7942f2638a02 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -198,6 +198,7 @@ type TelemetryInfo struct { UseFlashbackToCluster bool PartitionTelemetry *PartitionTelemetryInfo AccountLockTelemetry *AccountLockTelemetryInfo + UseIndexMerge bool } // PartitionTelemetryInfo records table partition telemetry information during execution. diff --git a/executor/builder.go b/executor/builder.go index 8391964095877..e3c4e5533b68f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3977,6 +3977,9 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd } func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor { + if b.Ti != nil { + b.Ti.UseIndexMerge = true + } ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) if err := b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil { b.err = err diff --git a/metrics/telemetry.go b/metrics/telemetry.go index 7460ca5ceb04c..591823f9952d9 100644 --- a/metrics/telemetry.go +++ b/metrics/telemetry.go @@ -155,6 +155,13 @@ var ( Name: "flashback_cluster_usage", Help: "Counter of usage of flashback cluster", }) + TelemetryIndexMergeUsage = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "telemetry", + Name: "index_merge_usage", + Help: "Counter of usage of index merge", + }) TelemetryCompactPartitionCnt = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "tidb", @@ -396,3 +403,22 @@ func GetDDLUsageCounter() DDLUsageCounter { FlashbackClusterUsed: readCounter(TelemetryFlashbackClusterCnt), } } + +// IndexMergeUsageCounter records the usages of IndexMerge feature. +type IndexMergeUsageCounter struct { + IndexMergeUsed int64 `json:"index_merge_used"` +} + +// Sub returns the difference of two counters. +func (i IndexMergeUsageCounter) Sub(rhs IndexMergeUsageCounter) IndexMergeUsageCounter { + return IndexMergeUsageCounter{ + IndexMergeUsed: i.IndexMergeUsed - rhs.IndexMergeUsed, + } +} + +// GetIndexMergeCounter gets the IndexMerge usage counter. +func GetIndexMergeCounter() IndexMergeUsageCounter { + return IndexMergeUsageCounter{ + IndexMergeUsed: readCounter(TelemetryIndexMergeUsage), + } +} diff --git a/session/session.go b/session/session.go index 437c663999b83..9104c4186bf74 100644 --- a/session/session.go +++ b/session/session.go @@ -150,6 +150,8 @@ var ( telemetryLockUserUsage = metrics.TelemetryAccountLockCnt.WithLabelValues("lockUser") telemetryUnlockUserUsage = metrics.TelemetryAccountLockCnt.WithLabelValues("unlockUser") telemetryCreateOrAlterUserUsage = metrics.TelemetryAccountLockCnt.WithLabelValues("createOrAlterUser") + + telemetryIndexMerge = metrics.TelemetryIndexMergeUsage ) // Session context, it is consistent with the lifecycle of a client connection. @@ -3588,6 +3590,10 @@ func (s *session) updateTelemetryMetric(es *executor.ExecStmt) { telemetryCTEUsageNotCTE.Inc() } + if ti.UseIndexMerge { + telemetryIndexMerge.Inc() + } + if ti.UseMultiSchemaChange { telemetryMultiSchemaChangeUsage.Inc() } diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index 3766945536d9c..6dec8edcb023a 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -58,6 +58,7 @@ type featureUsage struct { DDLUsageCounter *m.DDLUsageCounter `json:"DDLUsageCounter"` EnableGlobalMemoryControl bool `json:"enableGlobalMemoryControl"` AutoIDNoCache bool `json:"autoIDNoCache"` + IndexMergeUsageCounter *m.IndexMergeUsageCounter `json:"indexMergeUsageCounter"` } type placementPolicyUsage struct { @@ -108,6 +109,8 @@ func getFeatureUsage(ctx context.Context, sctx sessionctx.Context) (*featureUsag usage.EnableGlobalMemoryControl = getGlobalMemoryControl() + usage.IndexMergeUsageCounter = getIndexMergeUsageInfo() + return &usage, nil } @@ -244,6 +247,7 @@ var initialTablePartitionCounter m.TablePartitionUsageCounter var initialSavepointStmtCounter int64 var initialLazyPessimisticUniqueCheckSetCount int64 var initialDDLUsageCounter m.DDLUsageCounter +var initialIndexMergeCounter m.IndexMergeUsageCounter // getTxnUsageInfo gets the usage info of transaction related features. It's exported for tests. func getTxnUsageInfo(ctx sessionctx.Context) *TxnUsage { @@ -402,3 +406,13 @@ func getDDLUsageInfo(ctx sessionctx.Context) *m.DDLUsageCounter { func getGlobalMemoryControl() bool { return memory.ServerMemoryLimit.Load() > 0 } + +func postReportIndexMergeUsage() { + initialIndexMergeCounter = m.GetIndexMergeCounter() +} + +func getIndexMergeUsageInfo() *m.IndexMergeUsageCounter { + curr := m.GetIndexMergeCounter() + diff := curr.Sub(initialIndexMergeCounter) + return &diff +} diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index ebe93d6dbaa52..369073009c0a4 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -554,3 +554,32 @@ func TestGlobalMemoryControl(t *testing.T) { require.NoError(t, err) require.False(t, usage.EnableGlobalMemoryControl) } + +func TestIndexMergeUsage(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t1(c1 int, c2 int, index idx1(c1), index idx2(c2))") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, idx1, idx2) */ * from t1 where c1 = 1 and c2 = 1").Rows() + require.Contains(t, res[0][0], "IndexMerge") + + usage, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, usage.IndexMergeUsageCounter.IndexMergeUsed, int64(0)) + + tk.MustExec("select /*+ use_index_merge(t1, idx1, idx2) */ * from t1 where c1 = 1 and c2 = 1") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, int64(1), usage.IndexMergeUsageCounter.IndexMergeUsed) + + tk.MustExec("select /*+ use_index_merge(t1, idx1, idx2) */ * from t1 where c1 = 1 or c2 = 1") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, int64(2), usage.IndexMergeUsageCounter.IndexMergeUsed) + + tk.MustExec("select /*+ no_index_merge() */ * from t1 where c1 = 1 or c2 = 1") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, int64(2), usage.IndexMergeUsageCounter.IndexMergeUsed) +}