Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: ebs volume snapshot backup and restore with flashback solution #38700

Merged
merged 23 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e15ed35
feat: move ebs resolve data solution to flashback
fengou1 Oct 27, 2022
b5a140e
refactor: add concurrency and progress for flashback solution
fengou1 Oct 28, 2022
e44e112
Merge branch 'master' into ebs_with_flashback
fengou1 Oct 28, 2022
4e64ba0
Merge branch 'master' into ebs_with_flashback
fengou1 Oct 28, 2022
ebb79bf
feat: adapts flashback solution with the entire cluster range
fengou1 Nov 1, 2022
97f4b7c
refactor: add ddl flashback concurrency
fengou1 Nov 1, 2022
7ca4452
Merge branch 'master' into ebs_with_flashback
fengou1 Nov 1, 2022
6db3708
refactor: remove dependency from flashbackToVersion function
fengou1 Nov 2, 2022
f0047ca
fix: make check_dev happy
fengou1 Nov 2, 2022
3df6aeb
refactor: add progress for flashback, rename resolvedTs to resolveTS …
fengou1 Nov 2, 2022
8a4747b
fix: remove empty lines to get build ci pass
fengou1 Nov 2, 2022
e501d50
Merge branch 'master' into ebs_with_flashback
fengou1 Nov 2, 2022
90adf23
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
0f045e5
Merge branch 'master' into ebs_with_flashback
3pointer Nov 3, 2022
900d1c8
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
5c0b444
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
7b686e1
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
30d904d
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
007a5f4
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
fb74ee1
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
e531b69
fix: update build bazel to get check_dev up
fengou1 Nov 3, 2022
00347be
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
f5d1fde
Merge branch 'master' into ebs_with_flashback
ti-chi-bot Nov 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 78 additions & 38 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/util/mathutil"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand All @@ -26,8 +29,8 @@ import (
// 3. send the recover plan and the wait tikv to apply, in waitapply, all assigned region leader will check apply log to the last log
// 4. ensure all region apply to last log
// 5. send the resolvedTs to tikv for deleting data.
func RecoverData(ctx context.Context, resolvedTs uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress) (int, error) {
var recovery = NewRecovery(allStores, mgr, progress)
func RecoverData(ctx context.Context, resolvedTs uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTs uint64, concurrency uint32) (int, error) {
var recovery = NewRecovery(allStores, mgr, progress, concurrency)
if err := recovery.ReadRegionMeta(ctx); err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -51,7 +54,13 @@ func RecoverData(ctx context.Context, resolvedTs uint64, allStores []*metapb.Sto
return totalRegions, errors.Trace(err)
}

if err := recovery.ResolveData(ctx, resolvedTs); err != nil {
keyRanges := recovery.getRegionKeyRanges()

if err := recovery.PrepareFlashbackToVersion(ctx, keyRanges); err != nil {
return totalRegions, errors.Trace(err)
}

if err := recovery.FlashbackToVersion(ctx, keyRanges, resolvedTs, restoreTs); err != nil {
return totalRegions, errors.Trace(err)
}

Expand All @@ -76,9 +85,10 @@ type Recovery struct {
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
concurrency uint32
}

func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress) Recovery {
func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, concurrency uint32) Recovery {
totalStores := len(allStores)
var StoreMetas = make([]StoreMeta, totalStores)
var regionRecovers = make(map[uint64][]*recovpb.RecoverRegionRequest, totalStores)
Expand All @@ -88,15 +98,15 @@ func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progres
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress}
progress: progress,
concurrency: concurrency}
}

func (recovery *Recovery) newRecoveryClient(ctx context.Context, storeAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) {
// Connect to the Recovery service on the given TiKV node.
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = gRPCBackOffMaxDelay
//TODO: connection may need some adjust
//keepaliveConf keepalive.ClientParameters

conn, err := utils.GRPCConn(ctx, storeAddr, recovery.mgr.GetTLSConfig(),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(recovery.mgr.GetKeepalive()),
Expand Down Expand Up @@ -190,8 +200,6 @@ func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error {
return eg.Wait()
}

// TODO: map may be more suitable for this function

func (recovery *Recovery) GetTotalRegions() int {
// Group region peer info by region id.
var regions = make(map[uint64]struct{}, 0)
Expand Down Expand Up @@ -292,50 +300,82 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {
return eg.Wait()
}

// ResolveData a worker pool to all tikv for execute delete all data whose has ts > resolvedTs
func (recovery *Recovery) ResolveData(ctx context.Context, resolvedTs uint64) (err error) {
// get the region key range
func (recovery *Recovery) getRegionKeyRanges() []tikvstore.KeyRange {
fengou1 marked this conversation as resolved.
Show resolved Hide resolved
fengou1 marked this conversation as resolved.
Show resolved Hide resolved

var keyRanges []tikvstore.KeyRange

var regions = make(map[uint64]struct{}, 0)
for _, v := range recovery.StoreMetas {
for _, m := range v.RegionMetas {
// insert the keyRanges
if _, ok := regions[m.RegionId]; !ok {
regions[m.RegionId] = struct{}{}
keyRanges = append(keyRanges, tikvstore.KeyRange{
StartKey: m.StartKey,
EndKey: m.EndKey,
})
}
}
}

return keyRanges
}

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, keyRanges []tikvstore.KeyRange) (err error) {
eg, ectx := errgroup.WithContext(ctx)
totalStores := len(recovery.allStores)
workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, common.MaxStoreConcurrency)), "resolve data from tikv")
// since we do not know the how many region we will recover, use a ratio regions/per tikv as progress
incRatio := len(keyRanges) / len(recovery.allStores)
workers := utils.NewWorkerPool(uint(recovery.concurrency), "prepare the region")

// TODO: what if the resolved data take long time take long time?, it look we need some handling here, at least some retry may necessary
// TODO: what if the network disturbing, a retry machanism may need here
for _, store := range recovery.allStores {
for i, r := range keyRanges {
if err := ectx.Err(); err != nil {
break
}
storeAddr := getStoreAddress(recovery.allStores, store.Id)
storeId := store.Id
workers.ApplyOnErrorGroup(eg, func() error {
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
_, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), r)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()
log.Info("resolve data to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
req := &recovpb.ResolveKvDataRequest{ResolvedTs: resolvedTs}
stream, err := recoveryClient.ResolveKvData(ectx, req)
log.Debug("prepare region done", zap.ByteString("region_start_key", r.StartKey), zap.ByteString("region_end_key", r.EndKey))
return nil
})

if i%incRatio == 0 {
recovery.progress.Inc()
}
}
log.Info("prepare region flashback complete")
// Wait for all region to prepare flashback
return eg.Wait()
}

// flashback the region data to resolvedTs
func (recovery *Recovery) FlashbackToVersion(ctx context.Context, keyRanges []tikvstore.KeyRange, resolvedTs uint64, commitTs uint64) (err error) {
eg, ectx := errgroup.WithContext(ctx)
incRatio := len(keyRanges) / len(recovery.allStores)
workers := utils.NewWorkerPool(uint(recovery.concurrency), "flashback the region")

for i, r := range keyRanges {
if err := ectx.Err(); err != nil {
break
}
workers.ApplyOnErrorGroup(eg, func() error {
_, err := ddl.SendFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolvedTs, commitTs-1, commitTs, r)
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Error("send the resolve kv data failed", zap.Uint64("store id", storeId))
return errors.Trace(err)
}
// for a TiKV, received the stream
for {
var resp *recovpb.ResolveKvDataResponse
if resp, err = stream.Recv(); err == nil {
log.Info("current delete key", zap.Uint64("resolved key num", resp.ResolvedKeyCount), zap.Uint64("store id", resp.StoreId))
} else if err == io.EOF {
break
} else {
return errors.Trace(err)
}
}
recovery.progress.Inc()
log.Info("resolve kv data done", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
log.Debug("region flashback done", zap.ByteString("region_start_key", r.StartKey), zap.ByteString("region_end_key", r.EndKey))
return nil
})

if i%incRatio == 0 {
recovery.progress.Inc()
}
}
// Wait for all TiKV instances force leader and wait apply to last log.
log.Info("region flashback complete")
// Wait for all region to flashback
return eg.Wait()
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/backup_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
// Step.2 starts call ebs snapshot api to back up volume data.
// NOTE: we should start snapshot in specify order.

progress := g.StartProgress(ctx, "backup", int64(storeCount), !cfg.LogProgress)
progress := g.StartProgress(ctx, "backup", int64(storeCount)*100, !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(storeCount)*100, cfg.ProgressFile)

ec2Session, err := aws.NewEC2Session(cfg.CloudAPIConcurrency)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
// restore tikv data from a snapshot volume
var totalRegions int

totalRegions, err = restore.RecoverData(ctx, resolveTs, allStores, mgr, progress)
totalRegions, err = restore.RecoverData(ctx, resolveTs, allStores, mgr, progress, restoreTS, cfg.Concurrency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems restoreTs > resolveTs. is it ok to stop gc at restoreTs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep safe point in restoreTs is safe since GC never work before the TiDB node startup. The code may need a refactoring to remove the safepointkeeper service, however, I would like to remove it after a thorough test.

if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 6 additions & 4 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) {
return keyRanges, nil
}

func sendPrepareFlashbackToVersionRPC(
// function be called by BR for aws ebs snapshot backup and restore
func SendPrepareFlashbackToVersionRPC(
ctx context.Context,
s tikv.Storage,
r tikvstore.KeyRange,
Expand Down Expand Up @@ -324,7 +325,8 @@ func sendPrepareFlashbackToVersionRPC(
return taskStat, nil
}

func sendFlashbackToVersionRPC(
// function be called also by BR for aws ebs snapshot backup and restore
func SendFlashbackToVersionRPC(
ctx context.Context,
s tikv.Storage,
version uint64,
Expand Down Expand Up @@ -524,7 +526,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
for _, r := range keyRanges {
if err = flashbackToVersion(d.ctx, d,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := sendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r)
stats, err := SendPrepareFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), r)
totalRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}, r.StartKey, r.EndKey); err != nil {
Expand Down Expand Up @@ -560,7 +562,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
if err = flashbackToVersion(d.ctx, d,
func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// Use commitTS - 1 as startTS, make sure it less than commitTS.
stats, err := sendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, commitTS-1, commitTS, r)
stats, err := SendFlashbackToVersionRPC(ctx, d.store.(tikv.Storage), flashbackTS, commitTS-1, commitTS, r)
completedRegions.Add(uint64(stats.CompletedRegions))
logutil.BgLogger().Info("[ddl] flashback cluster stats",
zap.Uint64("complete regions", completedRegions.Load()),
Expand Down