Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

move waiting reject stores in import file #222

Merged
merged 6 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ func (rc *Client) setSpeedLimit() error {
func (rc *Client) RestoreFiles(
files []*backup.File,
rewriteRules *RewriteRules,
rejectStoreMap map[uint64]bool,
updateCh glue.Progress,
) (err error) {
start := time.Now()
Expand Down Expand Up @@ -486,7 +487,7 @@ func (rc *Client) RestoreFiles(
select {
case <-rc.ctx.Done():
errCh <- rc.ctx.Err()
case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules):
case errCh <- rc.fileImporter.Import(fileReplica, rejectStoreMap, rewriteRules):
updateCh.Inc()
}
})
Expand Down Expand Up @@ -537,7 +538,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil
select {
case <-rc.ctx.Done():
errCh <- rc.ctx.Err()
case errCh <- rc.fileImporter.Import(fileReplica, emptyRules):
case errCh <- rc.fileImporter.Import(fileReplica, nil, emptyRules):
updateCh.Inc()
}
})
Expand Down
28 changes: 27 additions & 1 deletion pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error {

// Import tries to import a file.
// All rules must contain encoded keys.
func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRules) error {
func (importer *FileImporter) Import(file *backup.File,
rejectStoreMap map[uint64]bool,
rewriteRules *RewriteRules,
) error {
3pointer marked this conversation as resolved.
Show resolved Hide resolved
log.Debug("import file", zap.Stringer("file", file))
// Rewrite the start key and end key of file to scan regions
var startKey, endKey []byte
Expand All @@ -193,6 +196,12 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
zap.Stringer("file", file),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey))

needReject := false
if len(rejectStoreMap) > 0 {
needReject = true
}
3pointer marked this conversation as resolved.
Show resolved Hide resolved

err = utils.WithRetry(importer.ctx, func() error {
ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime)
defer cancel()
Expand All @@ -202,6 +211,23 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
if errScanRegion != nil {
return errors.Trace(errScanRegion)
}

if needReject {
// TODO remove when TiFlash support restore
startTime := time.Now()
log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap))
for _, region := range regionInfos {
if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) {
log.Error("waiting for removing rejected stores failed",
zap.Stringer("region", region.Region))
return errors.New("waiting for removing rejected stores failed")
}
}
log.Info("waiting for removing rejected stores done",
zap.Int("regions", len(regionInfos)), zap.Duration("take", time.Since(startTime)))
needReject = false
}

log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
for _, regionInfo := range regionInfos {
Expand Down
70 changes: 0 additions & 70 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (rs *RegionSplitter) Split(
ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
rejectStores map[uint64]bool,
onSplit OnSplitFunc,
) error {
if len(ranges) == 0 {
Expand Down Expand Up @@ -95,14 +94,12 @@ func (rs *RegionSplitter) Split(
}
interval := SplitRetryInterval
scatterRegions := make([]*RegionInfo, 0)
allRegions := make([]*RegionInfo, 0)
SplitRegions:
for i := 0; i < SplitRetryTimes; i++ {
regions, errScan := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit)
if errScan != nil {
return errors.Trace(errScan)
}
allRegions = append(allRegions, regions...)
if len(regions) == 0 {
log.Warn("cannot scan any region")
return nil
Expand Down Expand Up @@ -145,19 +142,6 @@ SplitRegions:
if errSplit != nil {
return errors.Trace(errSplit)
}
if len(rejectStores) > 0 {
startTime = time.Now()
log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStores))
for _, region := range allRegions {
if !rs.waitForRemoveRejectStores(ctx, region, rejectStores) {
log.Error("waiting for removing rejected stores failed",
zap.Stringer("region", region.Region))
return errors.New("waiting for removing rejected stores failed")
}
}
log.Info("waiting for removing rejected stores done",
zap.Int("regions", len(allRegions)), zap.Duration("take", time.Since(startTime)))
}
log.Info("start to wait for scattering regions",
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
startTime = time.Now()
Expand Down Expand Up @@ -211,30 +195,6 @@ func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID
return ok, nil
}

func (rs *RegionSplitter) hasRejectStorePeer(
ctx context.Context,
regionID uint64,
rejectStores map[uint64]bool,
) (bool, error) {
regionInfo, err := rs.client.GetRegionByID(ctx, regionID)
if err != nil {
return false, err
}
if regionInfo == nil {
return false, nil
}
for _, peer := range regionInfo.Region.GetPeers() {
if rejectStores[peer.GetStoreId()] {
return true, nil
}
}
retryTimes := ctx.Value(retryTimes).(int)
if retryTimes > 10 {
log.Warn("get region info", zap.Stringer("region", regionInfo.Region))
}
return false, nil
}

func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) {
interval := SplitCheckInterval
for i := 0; i < SplitCheckMaxRetryTimes; i++ {
Expand Down Expand Up @@ -280,36 +240,6 @@ func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *
}
}

func (rs *RegionSplitter) waitForRemoveRejectStores(
ctx context.Context,
regionInfo *RegionInfo,
rejectStores map[uint64]bool,
) bool {
interval := RejectStoreCheckInterval
regionID := regionInfo.Region.GetId()
for i := 0; i < RejectStoreCheckRetryTimes; i++ {
ctx1 := context.WithValue(ctx, retryTimes, i)
ok, err := rs.hasRejectStorePeer(ctx1, regionID, rejectStores)
if err != nil {
log.Warn("wait for rejecting store failed",
zap.Stringer("region", regionInfo.Region),
zap.Error(err))
return false
}
// Do not have any peer in the rejected store, return true
if !ok {
return true
}
interval = 2 * interval
if interval > RejectStoreMaxCheckInterval {
interval = RejectStoreMaxCheckInterval
}
time.Sleep(interval)
}

return false
}

func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *testRestoreUtilSuite) TestSplit(c *C) {
regionSplitter := NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, map[uint64]bool{}, func(key [][]byte) {})
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
if err != nil {
c.Assert(err, IsNil, Commentf("split regions failed: %v", err))
}
Expand Down
67 changes: 57 additions & 10 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -332,16 +331,8 @@ func SplitRanges(
summary.CollectDuration("split region", elapsed)
}()
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig()))
tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
return errors.Trace(err)
}
storeMap := make(map[uint64]bool)
for _, store := range tiflashStores {
storeMap[store.GetId()] = true
}

return splitter.Split(ctx, ranges, rewriteRules, storeMap, func(keys [][]byte) {
return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) {
for range keys {
updateCh.Inc()
}
Expand Down Expand Up @@ -416,3 +407,59 @@ func paginateScanRegion(
}
return regions, nil
}

func hasRejectStorePeer(
ctx context.Context,
client SplitClient,
regionID uint64,
rejectStores map[uint64]bool,
) (bool, error) {
regionInfo, err := client.GetRegionByID(ctx, regionID)
if err != nil {
return false, err
}
if regionInfo == nil {
return false, nil
}
for _, peer := range regionInfo.Region.GetPeers() {
if rejectStores[peer.GetStoreId()] {
return true, nil
}
}
retryTimes := ctx.Value(retryTimes).(int)
if retryTimes > 10 {
log.Warn("get region info", zap.Stringer("region", regionInfo.Region))
}
return false, nil
}

func waitForRemoveRejectStores(
ctx context.Context,
client SplitClient,
regionInfo *RegionInfo,
rejectStores map[uint64]bool,
) bool {
interval := RejectStoreCheckInterval
regionID := regionInfo.Region.GetId()
for i := 0; i < RejectStoreCheckRetryTimes; i++ {
ctx1 := context.WithValue(ctx, retryTimes, i)
ok, err := hasRejectStorePeer(ctx1, client, regionID, rejectStores)
if err != nil {
log.Warn("wait for rejecting store failed",
zap.Stringer("region", regionInfo.Region),
zap.Error(err))
return false
}
// Do not have any peer in the rejected store, return true
if !ok {
return true
}
interval = 2 * interval
if interval > RejectStoreMaxCheckInterval {
interval = RejectStoreMaxCheckInterval
}
time.Sleep(interval)
}

return false
}
12 changes: 11 additions & 1 deletion pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if batchSize > maxRestoreBatchSizeLimit {
batchSize = maxRestoreBatchSizeLimit // 256
}

tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
return errors.Trace(err)
}
rejectStoreMap := make(map[uint64]bool)
for _, store := range tiflashStores {
rejectStoreMap[store.GetId()] = true
}

for {
if len(ranges) == 0 {
break
Expand All @@ -246,7 +256,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
}

// After split, we can restore backup files.
err = client.RestoreFiles(fileBatch, rewriteRules, updateCh)
err = client.RestoreFiles(fileBatch, rewriteRules, rejectStoreMap, updateCh)
if err != nil {
break
}
Expand Down