diff --git a/pkg/restore/backoff.go b/pkg/restore/backoff.go index d614e0129..bd6699817 100644 --- a/pkg/restore/backoff.go +++ b/pkg/restore/backoff.go @@ -12,6 +12,7 @@ import ( var ( errNotLeader = errors.NewNoStackError("not leader") + errEpochNotMatch = errors.NewNoStackError("epoch not match") errKeyNotInRegion = errors.NewNoStackError("key not in region") errResp = errors.NewNoStackError("response error") errRewriteRuleNotFound = errors.NewNoStackError("rewrite rule not found") diff --git a/pkg/restore/import.go b/pkg/restore/import.go index f036f62da..f905d0315 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -170,12 +170,11 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos))) // Try to download and ingest the file in every region for _, regionInfo := range regionInfos { - info := regionInfo // Try to download file. var downloadMeta *import_sstpb.SSTMeta err1 = utils.WithRetry(importer.ctx, func() error { var e error - downloadMeta, e = importer.downloadSST(info, file, rewriteRules) + downloadMeta, e = importer.downloadSST(regionInfo, file, rewriteRules) return e }, newDownloadSSTBackoffer()) if err1 != nil { @@ -185,26 +184,31 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul } log.Error("download file failed", zap.Stringer("file", file), - zap.Stringer("region", info.Region), + zap.Stringer("region", regionInfo.Region), zap.Binary("startKey", startKey), zap.Binary("endKey", endKey), zap.Error(err1)) return err1 } - err1 = importer.ingestSST(downloadMeta, info) + err1 = importer.ingestSST(downloadMeta, regionInfo) // If error is `NotLeader`, update the region info and retry for err1 == errNotLeader { - info, err1 = importer.metaClient.GetRegion(ctx, info.Region.GetStartKey()) + var newInfo *RegionInfo + newInfo, err1 = importer.metaClient.GetRegion(ctx, regionInfo.Region.GetStartKey()) if err1 != nil { break } - err1 = importer.ingestSST(downloadMeta, info) + if !checkRegionEpoch(newInfo, regionInfo) { + err1 = errEpochNotMatch + break + } + err1 = importer.ingestSST(downloadMeta, newInfo) } if err1 != nil { log.Error("ingest file failed", zap.Stringer("file", file), zap.Stringer("range", downloadMeta.GetRange()), - zap.Stringer("region", info.Region), + zap.Stringer("region", regionInfo.Region), zap.Error(err1)) return err1 } @@ -308,6 +312,15 @@ func (importer *FileImporter) ingestSST( return nil } +func checkRegionEpoch(new, old *RegionInfo) bool { + if new.Region.GetId() == old.Region.GetId() && + new.Region.GetRegionEpoch().GetVersion() == old.Region.GetRegionEpoch().GetVersion() && + new.Region.GetRegionEpoch().GetConfVer() == old.Region.GetRegionEpoch().GetConfVer() { + return true + } + return false +} + func extractDownloadSSTError(e error) error { err := errGrpc switch {