diff --git a/br/pkg/aws/ebs.go b/br/pkg/aws/ebs.go index ddea6b358f556..cf5425e03be0d 100644 --- a/br/pkg/aws/ebs.go +++ b/br/pkg/aws/ebs.go @@ -281,6 +281,152 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) { log.Info("delete snapshot end", zap.Int("need-to-del", len(snapIDMap)), zap.Int32("deleted", deletedCnt.Load())) } +// EnableDataFSR enables FSR for data volume snapshots +func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string) (map[string][]*string, error) { + snapshotsIDsMap := fetchTargetSnapshots(meta, targetAZ) + + if len(snapshotsIDsMap) == 0 { + return snapshotsIDsMap, errors.Errorf("empty backup meta") + } + + eg, _ := errgroup.WithContext(context.Background()) + + for availableZone := range snapshotsIDsMap { + targetAZ := availableZone + eg.Go(func() error { + log.Info("enable fsr for snapshots", zap.String("available zone", targetAZ)) + resp, err := e.ec2.EnableFastSnapshotRestores(&ec2.EnableFastSnapshotRestoresInput{ + AvailabilityZones: []*string{&targetAZ}, + SourceSnapshotIds: snapshotsIDsMap[targetAZ], + }) + + if err != nil { + return errors.Trace(err) + } + + if len(resp.Unsuccessful) > 0 { + log.Warn("not all snapshots enabled FSR") + return errors.Errorf("Some snapshot fails to enable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors) + } + + return e.waitDataFSREnabled(snapshotsIDsMap[targetAZ], targetAZ) + }) + } + return snapshotsIDsMap, eg.Wait() +} + +// waitDataFSREnabled waits FSR for data volume snapshots are all enabled +func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) error { + // Create a map to store the strings as keys + pendingSnapshots := make(map[string]struct{}) + + // Populate the map with the strings from the array + for _, str := range snapShotIDs { + pendingSnapshots[*str] = struct{}{} + } + + log.Info("starts check fsr pending snapshots", zap.Any("snapshots", pendingSnapshots), zap.String("available zone", targetAZ)) + for { + if len(pendingSnapshots) == 0 { + log.Info("all snapshots fsr enablement is finished", zap.String("available zone", targetAZ)) + return nil + } + + // check pending snapshots every 1 minute + time.Sleep(1 * time.Minute) + log.Info("check snapshots not fsr enabled", zap.Int("count", len(pendingSnapshots))) + input := &ec2.DescribeFastSnapshotRestoresInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("state"), + Values: []*string{aws.String("disabled"), aws.String("disabling"), aws.String("enabling"), aws.String("optimizing")}, + }, + { + Name: aws.String("availability-zone"), + Values: []*string{aws.String(targetAZ)}, + }, + }, + } + + result, err := e.ec2.DescribeFastSnapshotRestores(input) + if err != nil { + return errors.Trace(err) + } + + uncompletedSnapshots := make(map[string]struct{}) + for _, fastRestore := range result.FastSnapshotRestores { + _, found := pendingSnapshots[*fastRestore.SnapshotId] + if found { + // Detect some conflict states + if strings.EqualFold(*fastRestore.State, "disabled") || strings.EqualFold(*fastRestore.State, "disabling") { + log.Error("detect conflict status", zap.String("snapshot", *fastRestore.SnapshotId), zap.String("status", *fastRestore.State)) + return errors.Errorf("status of snapshot %s is %s ", *fastRestore.SnapshotId, *fastRestore.State) + } + uncompletedSnapshots[*fastRestore.SnapshotId] = struct{}{} + } + } + pendingSnapshots = uncompletedSnapshots + } +} + +// DisableDataFSR disables FSR for data volume snapshots +func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error { + if len(snapshotsIDsMap) == 0 { + return nil + } + + eg, _ := errgroup.WithContext(context.Background()) + + for availableZone := range snapshotsIDsMap { + targetAZ := availableZone + eg.Go(func() error { + resp, err := e.ec2.DisableFastSnapshotRestores(&ec2.DisableFastSnapshotRestoresInput{ + AvailabilityZones: []*string{&targetAZ}, + SourceSnapshotIds: snapshotsIDsMap[targetAZ], + }) + + if err != nil { + return errors.Trace(err) + } + + if len(resp.Unsuccessful) > 0 { + log.Warn("not all snapshots disabled FSR", zap.String("available zone", targetAZ)) + return errors.Errorf("Some snapshot fails to disable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors) + } + + log.Info("Disable FSR issued", zap.String("available zone", targetAZ)) + + return nil + }) + } + return eg.Wait() +} + +func fetchTargetSnapshots(meta *config.EBSBasedBRMeta, specifiedAZ string) map[string][]*string { + var sourceSnapshotIDs = make(map[string][]*string) + + if len(meta.TiKVComponent.Stores) == 0 { + return sourceSnapshotIDs + } + + for i := range meta.TiKVComponent.Stores { + store := meta.TiKVComponent.Stores[i] + for j := range store.Volumes { + oldVol := store.Volumes[j] + // Handle data volume snapshots only + if strings.Compare(oldVol.Type, "storage.data-dir") == 0 { + if specifiedAZ != "" { + sourceSnapshotIDs[specifiedAZ] = append(sourceSnapshotIDs[specifiedAZ], &oldVol.SnapshotID) + } else { + sourceSnapshotIDs[oldVol.VolumeAZ] = append(sourceSnapshotIDs[oldVol.VolumeAZ], &oldVol.SnapshotID) + } + } + } + } + + return sourceSnapshotIDs +} + // CreateVolumes create volumes from snapshots // if err happens in the middle, return half-done result // returned map: store id -> old volume id -> new volume id @@ -377,7 +523,7 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress for len(pendingVolumes) > 0 { // check every 5 seconds time.Sleep(5 * time.Second) - log.Info("check pending snapshots", zap.Int("count", len(pendingVolumes))) + log.Info("check pending volumes", zap.Int("count", len(pendingVolumes))) resp, err := e.ec2.DescribeVolumes(&ec2.DescribeVolumesInput{ VolumeIds: pendingVolumes, }) diff --git a/br/pkg/config/ebs.go b/br/pkg/config/ebs.go index deedb2d384403..5731738c14d2a 100644 --- a/br/pkg/config/ebs.go +++ b/br/pkg/config/ebs.go @@ -100,6 +100,14 @@ func (c *EBSBasedBRMeta) GetStoreCount() uint64 { return uint64(len(c.TiKVComponent.Stores)) } +func (c *EBSBasedBRMeta) GetTiKVVolumeCount() uint64 { + if c.TiKVComponent == nil || len(c.TiKVComponent.Stores) == 0 { + return 0 + } + // Assume TiKV nodes are symmetric + return uint64(len(c.TiKVComponent.Stores[0].Volumes)) +} + func (c *EBSBasedBRMeta) String() string { cfg, err := json.Marshal(c) if err != nil { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 41bdcd2a96d49..26b96d4414318 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -80,6 +80,7 @@ const ( flagDryRun = "dry-run" // TODO used for local test, should be removed later flagSkipAWS = "skip-aws" + flagUseFSR = "use-fsr" flagCloudAPIConcurrency = "cloud-api-concurrency" flagWithSysTable = "with-sys-table" flagOperatorPausedGCAndSchedulers = "operator-paused-gc-and-scheduler" diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 9125d47d470fe..7a1a043e7c682 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -130,6 +130,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) { "batch size for ddl to create a batch of tables once.") flags.Bool(flagWithSysTable, false, "whether restore system privilege tables on default setting") flags.StringArrayP(FlagResetSysUsers, "", []string{"cloud_admin", "root"}, "whether reset these users after restoration") + flags.Bool(flagUseFSR, false, "whether enable FSR for AWS snapshots") _ = flags.MarkHidden(FlagResetSysUsers) _ = flags.MarkHidden(FlagMergeRegionSizeBytes) _ = flags.MarkHidden(FlagMergeRegionKeyCount) @@ -204,6 +205,7 @@ type RestoreConfig struct { VolumeThroughput int64 `json:"volume-throughput" toml:"volume-throughput"` ProgressFile string `json:"progress-file" toml:"progress-file"` TargetAZ string `json:"target-az" toml:"target-az"` + UseFSR bool `json:"use-fsr" toml:"use-fsr"` } // DefineRestoreFlags defines common flags for the restore tidb command. @@ -362,6 +364,11 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { return errors.Trace(err) } + cfg.UseFSR, err = flags.GetBool(flagUseFSR) + if err != nil { + return errors.Trace(err) + } + // iops: gp3 [3,000-16,000]; io1/io2 [100-32,000] // throughput: gp3 [125, 1000]; io1/io2 cannot set throughput // io1 and io2 volumes support up to 64,000 IOPS only on Instances built on the Nitro System. diff --git a/br/pkg/task/restore_data.go b/br/pkg/task/restore_data.go index 74663ea28d39d..48a5a0f17c9ee 100644 --- a/br/pkg/task/restore_data.go +++ b/br/pkg/task/restore_data.go @@ -159,23 +159,17 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto //TODO: restore volume type into origin type //ModifyVolume(*ec2.ModifyVolumeInput) (*ec2.ModifyVolumeOutput, error) by backupmeta - // this is used for cloud restoration + err = client.Init(g, mgr.GetStorage()) if err != nil { return errors.Trace(err) } defer client.Close() - log.Info("start to clear system user for cloud") - err = client.ClearSystemUsers(ctx, cfg.ResetSysUsers) - - if err != nil { - return errors.Trace(err) - } - // since we cannot reset tiflash automaticlly. so we should start it manually if err = client.ResetTiFlashReplicas(ctx, g, mgr.GetStorage()); err != nil { return errors.Trace(err) } + progress.Close() summary.CollectDuration("restore duration", time.Since(startAll)) summary.SetSuccessStatus(true) diff --git a/br/pkg/task/restore_ebs_meta.go b/br/pkg/task/restore_ebs_meta.go index 53286505b5b9c..7dbad5960cb17 100644 --- a/br/pkg/task/restore_ebs_meta.go +++ b/br/pkg/task/restore_ebs_meta.go @@ -175,10 +175,10 @@ func (h *restoreEBSMetaHelper) restore() error { return errors.Trace(err) } - storeCount := h.metaInfo.GetStoreCount() - progress := h.g.StartProgress(ctx, h.cmdName, int64(storeCount), !h.cfg.LogProgress) + volumeCount := h.metaInfo.GetStoreCount() * h.metaInfo.GetTiKVVolumeCount() + progress := h.g.StartProgress(ctx, h.cmdName, int64(volumeCount), !h.cfg.LogProgress) defer progress.Close() - go progressFileWriterRoutine(ctx, progress, int64(storeCount), h.cfg.ProgressFile) + go progressFileWriterRoutine(ctx, progress, int64(volumeCount), h.cfg.ProgressFile) resolvedTs = h.metaInfo.ClusterInfo.ResolvedTS if totalSize, err = h.doRestore(ctx, progress); err != nil { @@ -226,6 +226,8 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin volumeIDMap = make(map[string]string) err error totalSize int64 + // a map whose key is available zone, and value is the snapshot id array + snapshotsIDsMap = make(map[string][]*string) ) ec2Session, err = aws.NewEC2Session(h.cfg.CloudAPIConcurrency, h.cfg.S3.Region) if err != nil { @@ -236,7 +238,21 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin log.Error("failed to create all volumes, cleaning up created volume") ec2Session.DeleteVolumes(volumeIDMap) } + + if h.cfg.UseFSR { + err = ec2Session.DisableDataFSR(snapshotsIDsMap) + log.Error("disable fsr failed", zap.Error(err)) + } }() + + // Turn on FSR for TiKV data snapshots + if h.cfg.UseFSR { + snapshotsIDsMap, err = ec2Session.EnableDataFSR(h.metaInfo, h.cfg.TargetAZ) + if err != nil { + return nil, 0, errors.Trace(err) + } + } + volumeIDMap, err = ec2Session.CreateVolumes(h.metaInfo, string(h.cfg.VolumeType), h.cfg.VolumeIOPS, h.cfg.VolumeThroughput, h.cfg.TargetAZ) if err != nil {