Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
fix region epoch error
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers committed Feb 11, 2020
1 parent 8622d68 commit 3189274
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/restore/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

var (
errNotLeader = errors.NewNoStackError("not leader")
errEpochNotMatch = errors.NewNoStackError("epoch not match")
errKeyNotInRegion = errors.NewNoStackError("key not in region")
errResp = errors.NewNoStackError("response error")
errRewriteRuleNotFound = errors.NewNoStackError("rewrite rule not found")
Expand Down
27 changes: 20 additions & 7 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,11 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
for _, regionInfo := range regionInfos {
info := regionInfo
// Try to download file.
var downloadMeta *import_sstpb.SSTMeta
err1 = utils.WithRetry(importer.ctx, func() error {
var e error
downloadMeta, e = importer.downloadSST(info, file, rewriteRules)
downloadMeta, e = importer.downloadSST(regionInfo, file, rewriteRules)
return e
}, newDownloadSSTBackoffer())
if err1 != nil {
Expand All @@ -185,26 +184,31 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
}
log.Error("download file failed",
zap.Stringer("file", file),
zap.Stringer("region", info.Region),
zap.Stringer("region", regionInfo.Region),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
zap.Error(err1))
return err1
}
err1 = importer.ingestSST(downloadMeta, info)
err1 = importer.ingestSST(downloadMeta, regionInfo)
// If error is `NotLeader`, update the region info and retry
for err1 == errNotLeader {
info, err1 = importer.metaClient.GetRegion(ctx, info.Region.GetStartKey())
var newInfo *RegionInfo
newInfo, err1 = importer.metaClient.GetRegion(ctx, regionInfo.Region.GetStartKey())
if err1 != nil {
break
}
err1 = importer.ingestSST(downloadMeta, info)
if !checkRegionEpoch(newInfo, regionInfo) {
err1 = errEpochNotMatch
break
}
err1 = importer.ingestSST(downloadMeta, newInfo)
}
if err1 != nil {
log.Error("ingest file failed",
zap.Stringer("file", file),
zap.Stringer("range", downloadMeta.GetRange()),
zap.Stringer("region", info.Region),
zap.Stringer("region", regionInfo.Region),
zap.Error(err1))
return err1
}
Expand Down Expand Up @@ -308,6 +312,15 @@ func (importer *FileImporter) ingestSST(
return nil
}

func checkRegionEpoch(new, old *RegionInfo) bool {
if new.Region.GetId() == old.Region.GetId() &&
new.Region.GetRegionEpoch().GetVersion() == old.Region.GetRegionEpoch().GetVersion() &&
new.Region.GetRegionEpoch().GetConfVer() == old.Region.GetRegionEpoch().GetConfVer() {
return true
}
return false
}

func extractDownloadSSTError(e error) error {
err := errGrpc
switch {
Expand Down

0 comments on commit 3189274

Please sign in to comment.