From 0472ed1e6e84dc0428d458f3b6ec42f45b3ee0b3 Mon Sep 17 00:00:00 2001 From: luancheng Date: Tue, 31 Mar 2020 19:49:12 +0800 Subject: [PATCH 1/5] move wait rejectstores into import files --- pkg/restore/client.go | 5 +-- pkg/restore/import.go | 28 +++++++++++++++- pkg/restore/split.go | 70 --------------------------------------- pkg/restore/split_test.go | 2 +- pkg/restore/util.go | 67 +++++++++++++++++++++++++++++++------ pkg/task/restore.go | 12 ++++++- 6 files changed, 99 insertions(+), 85 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 76dbf5066..15755d868 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -454,6 +454,7 @@ func (rc *Client) setSpeedLimit() error { func (rc *Client) RestoreFiles( files []*backup.File, rewriteRules *RewriteRules, + rejectStoreMap map[uint64]bool, updateCh glue.Progress, ) (err error) { start := time.Now() @@ -486,7 +487,7 @@ func (rc *Client) RestoreFiles( select { case <-rc.ctx.Done(): errCh <- rc.ctx.Err() - case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules): + case errCh <- rc.fileImporter.Import(fileReplica, rejectStoreMap, rewriteRules): updateCh.Inc() } }) @@ -537,7 +538,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil select { case <-rc.ctx.Done(): errCh <- rc.ctx.Err() - case errCh <- rc.fileImporter.Import(fileReplica, emptyRules): + case errCh <- rc.fileImporter.Import(fileReplica, nil, emptyRules): updateCh.Inc() } }) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 405af050c..b2299c42d 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -175,7 +175,10 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error { // Import tries to import a file. // All rules must contain encoded keys. -func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRules) error { +func (importer *FileImporter) Import(file *backup.File, + rejectStoreMap map[uint64]bool, + rewriteRules *RewriteRules, +) error { log.Debug("import file", zap.Stringer("file", file)) // Rewrite the start key and end key of file to scan regions var startKey, endKey []byte @@ -193,6 +196,12 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul zap.Stringer("file", file), zap.Binary("startKey", startKey), zap.Binary("endKey", endKey)) + + needReject := false + if len(rejectStoreMap) > 0 { + needReject = true + } + err = utils.WithRetry(importer.ctx, func() error { ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime) defer cancel() @@ -202,6 +211,23 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul if errScanRegion != nil { return errors.Trace(errScanRegion) } + + if needReject { + // TODO remove when TiFlash support restore + startTime := time.Now() + log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap)) + for _, region := range regionInfos { + if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) { + log.Error("waiting for removing rejected stores failed", + zap.Stringer("region", region.Region)) + return errors.New("waiting for removing rejected stores failed") + } + } + log.Info("waiting for removing rejected stores done", + zap.Int("regions", len(regionInfos)), zap.Duration("take", time.Since(startTime))) + needReject = false + } + log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos))) // Try to download and ingest the file in every region for _, regionInfo := range regionInfos { diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 03153097a..4138d0012 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -63,7 +63,6 @@ func (rs *RegionSplitter) Split( ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules, - rejectStores map[uint64]bool, onSplit OnSplitFunc, ) error { if len(ranges) == 0 { @@ -95,14 +94,12 @@ func (rs *RegionSplitter) Split( } interval := SplitRetryInterval scatterRegions := make([]*RegionInfo, 0) - allRegions := make([]*RegionInfo, 0) SplitRegions: for i := 0; i < SplitRetryTimes; i++ { regions, errScan := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit) if errScan != nil { return errors.Trace(errScan) } - allRegions = append(allRegions, regions...) if len(regions) == 0 { log.Warn("cannot scan any region") return nil @@ -145,19 +142,6 @@ SplitRegions: if errSplit != nil { return errors.Trace(errSplit) } - if len(rejectStores) > 0 { - startTime = time.Now() - log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStores)) - for _, region := range allRegions { - if !rs.waitForRemoveRejectStores(ctx, region, rejectStores) { - log.Error("waiting for removing rejected stores failed", - zap.Stringer("region", region.Region)) - return errors.New("waiting for removing rejected stores failed") - } - } - log.Info("waiting for removing rejected stores done", - zap.Int("regions", len(allRegions)), zap.Duration("take", time.Since(startTime))) - } log.Info("start to wait for scattering regions", zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) startTime = time.Now() @@ -211,30 +195,6 @@ func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID return ok, nil } -func (rs *RegionSplitter) hasRejectStorePeer( - ctx context.Context, - regionID uint64, - rejectStores map[uint64]bool, -) (bool, error) { - regionInfo, err := rs.client.GetRegionByID(ctx, regionID) - if err != nil { - return false, err - } - if regionInfo == nil { - return false, nil - } - for _, peer := range regionInfo.Region.GetPeers() { - if rejectStores[peer.GetStoreId()] { - return true, nil - } - } - retryTimes := ctx.Value(retryTimes).(int) - if retryTimes > 10 { - log.Warn("get region info", zap.Stringer("region", regionInfo.Region)) - } - return false, nil -} - func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) { interval := SplitCheckInterval for i := 0; i < SplitCheckMaxRetryTimes; i++ { @@ -280,36 +240,6 @@ func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo * } } -func (rs *RegionSplitter) waitForRemoveRejectStores( - ctx context.Context, - regionInfo *RegionInfo, - rejectStores map[uint64]bool, -) bool { - interval := RejectStoreCheckInterval - regionID := regionInfo.Region.GetId() - for i := 0; i < RejectStoreCheckRetryTimes; i++ { - ctx1 := context.WithValue(ctx, retryTimes, i) - ok, err := rs.hasRejectStorePeer(ctx1, regionID, rejectStores) - if err != nil { - log.Warn("wait for rejecting store failed", - zap.Stringer("region", regionInfo.Region), - zap.Error(err)) - return false - } - // Do not have any peer in the rejected store, return true - if !ok { - return true - } - interval = 2 * interval - if interval > RejectStoreMaxCheckInterval { - interval = RejectStoreMaxCheckInterval - } - time.Sleep(interval) - } - - return false -} - func (rs *RegionSplitter) splitAndScatterRegions( ctx context.Context, regionInfo *RegionInfo, keys [][]byte, ) ([]*RegionInfo, error) { diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 06dab1cf1..b21cbf781 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -193,7 +193,7 @@ func (s *testRestoreUtilSuite) TestSplit(c *C) { regionSplitter := NewRegionSplitter(client) ctx := context.Background() - err := regionSplitter.Split(ctx, ranges, rewriteRules, map[uint64]bool{}, func(key [][]byte) {}) + err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {}) if err != nil { c.Assert(err, IsNil, Commentf("split regions failed: %v", err)) } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 698de6aec..d322c9de0 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" - "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/summary" @@ -332,16 +331,8 @@ func SplitRanges( summary.CollectDuration("split region", elapsed) }() splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig())) - tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly) - if err != nil { - return errors.Trace(err) - } - storeMap := make(map[uint64]bool) - for _, store := range tiflashStores { - storeMap[store.GetId()] = true - } - return splitter.Split(ctx, ranges, rewriteRules, storeMap, func(keys [][]byte) { + return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) { for range keys { updateCh.Inc() } @@ -416,3 +407,59 @@ func paginateScanRegion( } return regions, nil } + +func hasRejectStorePeer( + ctx context.Context, + client SplitClient, + regionID uint64, + rejectStores map[uint64]bool, +) (bool, error) { + regionInfo, err := client.GetRegionByID(ctx, regionID) + if err != nil { + return false, err + } + if regionInfo == nil { + return false, nil + } + for _, peer := range regionInfo.Region.GetPeers() { + if rejectStores[peer.GetStoreId()] { + return true, nil + } + } + retryTimes := ctx.Value(retryTimes).(int) + if retryTimes > 10 { + log.Warn("get region info", zap.Stringer("region", regionInfo.Region)) + } + return false, nil +} + +func waitForRemoveRejectStores( + ctx context.Context, + client SplitClient, + regionInfo *RegionInfo, + rejectStores map[uint64]bool, +) bool { + interval := RejectStoreCheckInterval + regionID := regionInfo.Region.GetId() + for i := 0; i < RejectStoreCheckRetryTimes; i++ { + ctx1 := context.WithValue(ctx, retryTimes, i) + ok, err := hasRejectStorePeer(ctx1, client, regionID, rejectStores) + if err != nil { + log.Warn("wait for rejecting store failed", + zap.Stringer("region", regionInfo.Region), + zap.Error(err)) + return false + } + // Do not have any peer in the rejected store, return true + if !ok { + return true + } + interval = 2 * interval + if interval > RejectStoreMaxCheckInterval { + interval = RejectStoreMaxCheckInterval + } + time.Sleep(interval) + } + + return false +} diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 336758dd7..05fdd3891 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -222,6 +222,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if batchSize > maxRestoreBatchSizeLimit { batchSize = maxRestoreBatchSizeLimit // 256 } + + tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly) + if err != nil { + return errors.Trace(err) + } + rejectStoreMap := make(map[uint64]bool) + for _, store := range tiflashStores { + rejectStoreMap[store.GetId()] = true + } + for { if len(ranges) == 0 { break @@ -246,7 +256,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } // After split, we can restore backup files. - err = client.RestoreFiles(fileBatch, rewriteRules, updateCh) + err = client.RestoreFiles(fileBatch, rewriteRules, rejectStoreMap, updateCh) if err != nil { break } From 4f269c04de4dc2422dbdf57d84ee5ca954eb3c91 Mon Sep 17 00:00:00 2001 From: luancheng Date: Wed, 1 Apr 2020 17:19:55 +0800 Subject: [PATCH 2/5] restore: use new table id to search placementRules --- pkg/restore/client.go | 10 ++++++---- pkg/restore/client_test.go | 4 ++++ pkg/task/restore.go | 3 ++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 15755d868..5c87e2397 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -347,15 +347,17 @@ func (rc *Client) CreateTables( // RemoveTiFlashReplica removes all the tiflash replicas of a table // TODO: remove this after tiflash supports restore -func (rc *Client) RemoveTiFlashReplica(tables []*utils.Table, placementRules []placement.Rule) error { +func (rc *Client) RemoveTiFlashReplica(tables []*utils.Table, newTables []*model.TableInfo, placementRules []placement.Rule) error { schemas := make([]*backup.Schema, 0, len(tables)) var updateReplica bool - for _, table := range tables { - if rule := utils.SearchPlacementRule(table.Info.ID, placementRules, placement.Learner); rule != nil { + // must use new table id to search placement rules + // here newTables and tables must have same order + for i, table := range tables { + if rule := utils.SearchPlacementRule(newTables[i].ID, placementRules, placement.Learner); rule != nil { table.TiFlashReplicas = rule.Count updateReplica = true } - tableData, err := json.Marshal(table.Info) + tableData, err := json.Marshal(newTables[i]) if err != nil { return errors.Trace(err) } diff --git a/pkg/restore/client_test.go b/pkg/restore/client_test.go index 3f8cb71f8..13b5caa0a 100644 --- a/pkg/restore/client_test.go +++ b/pkg/restore/client_test.go @@ -72,6 +72,10 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) { } rules, newTables, err := client.CreateTables(s.mock.Domain, tables, 0) c.Assert(err, IsNil) + // make sure tables and newTables have same order + for i, t := range tables { + c.Assert(newTables[i].Name, Equals, t.Info.Name) + } for _, nt := range newTables { c.Assert(nt.Name.String(), Matches, "test[0-3]") } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 05fdd3891..2486ad319 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -174,7 +174,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if err != nil { return err } - err = client.RemoveTiFlashReplica(tables, placementRules) + + err = client.RemoveTiFlashReplica(tables, newTables, placementRules) if err != nil { return err } From bf0b4959be204a66bfef7d50cf2b89ef2507fc57 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 1 Apr 2020 17:55:51 +0800 Subject: [PATCH 3/5] Update pkg/restore/import.go Co-Authored-By: Neil Shen --- pkg/restore/import.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index b2299c42d..0150bdead 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -175,7 +175,8 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error { // Import tries to import a file. // All rules must contain encoded keys. -func (importer *FileImporter) Import(file *backup.File, +func (importer *FileImporter) Import( + file *backup.File, rejectStoreMap map[uint64]bool, rewriteRules *RewriteRules, ) error { From 6ffdf949ab812cd7534c0383baf1e10e58458ef2 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 1 Apr 2020 17:56:08 +0800 Subject: [PATCH 4/5] Update pkg/restore/import.go Co-Authored-By: kennytm --- pkg/restore/import.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 0150bdead..ee5cef6ca 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -198,10 +198,7 @@ func (importer *FileImporter) Import( zap.Binary("startKey", startKey), zap.Binary("endKey", endKey)) - needReject := false - if len(rejectStoreMap) > 0 { - needReject = true - } + needReject := len(rejectStoreMap) > 0 err = utils.WithRetry(importer.ctx, func() error { ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime) From a25d62bc35597f8130d67ed591d634be0beb1574 Mon Sep 17 00:00:00 2001 From: luancheng Date: Wed, 1 Apr 2020 17:59:19 +0800 Subject: [PATCH 5/5] fix ci --- pkg/restore/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 5c87e2397..fde382fb0 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -347,7 +347,8 @@ func (rc *Client) CreateTables( // RemoveTiFlashReplica removes all the tiflash replicas of a table // TODO: remove this after tiflash supports restore -func (rc *Client) RemoveTiFlashReplica(tables []*utils.Table, newTables []*model.TableInfo, placementRules []placement.Rule) error { +func (rc *Client) RemoveTiFlashReplica( + tables []*utils.Table, newTables []*model.TableInfo, placementRules []placement.Rule) error { schemas := make([]*backup.Schema, 0, len(tables)) var updateReplica bool // must use new table id to search placement rules