diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 2de1eb9fbe308..85f0836e28d9f 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "//br/pkg/conn/util", "//br/pkg/errors", "//br/pkg/glue", + "//br/pkg/lightning/common", "//br/pkg/logutil", "//br/pkg/metautil", "//br/pkg/pdutil", diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index a707d0f086ce9..316e218bb9069 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -15,6 +15,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" @@ -138,21 +139,15 @@ SplitRegions: } log.Info("start to wait for scattering regions", zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) - startTime = time.Now() - scatterCount := 0 - for _, region := range scatterRegions { - rs.waitForScatterRegion(ctx, region) - if time.Since(startTime) > split.ScatterWaitUpperInterval { - break - } - scatterCount++ - } - if scatterCount == len(scatterRegions) { + + leftCnt := rs.WaitForScatterRegions(ctx, scatterRegions, split.ScatterWaitUpperInterval) + if leftCnt == 0 { log.Info("waiting for scattering regions done", zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) } else { log.Warn("waiting for scattering regions timeout", - zap.Int("scatterCount", scatterCount), + zap.Int("NotScatterCount", leftCnt), + zap.Int("TotalScatterCount", len(scatterRegions)), zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) } @@ -182,26 +177,48 @@ func (rs *RegionSplitter) hasHealthyRegion(ctx context.Context, regionID uint64) return len(regionInfo.PendingPeers) == 0, nil } -func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) { +// isScatterRegionFinished check the latest successful operator and return the follow status: +// +// return (finished, needRescatter, error) +// +// if the latest operator is not `scatter-operator`, or its status is SUCCESS, it's likely that the +// scatter region operator is finished. +// +// if the latest operator is `scatter-operator` and its status is TIMEOUT or CANCEL, the needRescatter +// is true and the function caller needs to scatter this region again. +func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, bool, error) { resp, err := rs.client.GetOperator(ctx, regionID) if err != nil { - return false, errors.Trace(err) + if common.IsRetryableError(err) { + // retry in the next cycle + return false, false, nil + } + return false, false, errors.Trace(err) } // Heartbeat may not be sent to PD if respErr := resp.GetHeader().GetError(); respErr != nil { if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { - return true, nil + return true, false, nil } - return false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType()) + return false, false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType()) } retryTimes := ctx.Value(retryTimes).(int) if retryTimes > 3 { log.Info("get operator", zap.Uint64("regionID", regionID), zap.Stringer("resp", resp)) } // If the current operator of the region is not 'scatter-region', we could assume - // that 'scatter-operator' has finished or timeout - ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING - return ok, nil + // that 'scatter-operator' has finished + if string(resp.GetDesc()) != "scatter-region" { + return true, false, nil + } + switch resp.GetStatus() { + case pdpb.OperatorStatus_SUCCESS: + return true, false, nil + case pdpb.OperatorStatus_RUNNING: + return false, false, nil + default: + return false, true, nil + } } func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) { @@ -227,26 +244,66 @@ type retryTimeKey struct{} var retryTimes = new(retryTimeKey) -func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) { - interval := split.ScatterWaitInterval - regionID := regionInfo.Region.GetId() - for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ { - ctx1 := context.WithValue(ctx, retryTimes, i) - ok, err := rs.isScatterRegionFinished(ctx1, regionID) - if err != nil { - log.Warn("scatter region failed: do not have the region", - logutil.Region(regionInfo.Region)) - return +func mapRegionInfoSlice(regionInfos []*split.RegionInfo) map[uint64]*split.RegionInfo { + regionInfoMap := make(map[uint64]*split.RegionInfo) + for _, info := range regionInfos { + regionID := info.Region.GetId() + regionInfoMap[regionID] = info + } + return regionInfoMap +} + +func (rs *RegionSplitter) WaitForScatterRegions(ctx context.Context, regionInfos []*split.RegionInfo, timeout time.Duration) int { + var ( + startTime = time.Now() + interval = split.ScatterWaitInterval + leftRegions = mapRegionInfoSlice(regionInfos) + retryCnt = 0 + + reScatterRegions = make([]*split.RegionInfo, 0, len(regionInfos)) + ) + for { + ctx1 := context.WithValue(ctx, retryTimes, retryCnt) + reScatterRegions = reScatterRegions[:0] + for regionID, regionInfo := range leftRegions { + ok, rescatter, err := rs.isScatterRegionFinished(ctx1, regionID) + if err != nil { + log.Warn("scatter region failed: do not have the region", + logutil.Region(regionInfo.Region), zap.Error(err)) + delete(leftRegions, regionID) + continue + } + if ok { + delete(leftRegions, regionID) + continue + } + if rescatter { + reScatterRegions = append(reScatterRegions, regionInfo) + } + // RUNNING_STATUS, just wait and check it in the next loop } - if ok { + + if len(leftRegions) == 0 { + return 0 + } + + if len(reScatterRegions) > 0 { + rs.ScatterRegions(ctx1, reScatterRegions) + } + + if time.Since(startTime) > timeout { break } + + retryCnt += 1 interval = 2 * interval if interval > split.ScatterMaxWaitInterval { interval = split.ScatterMaxWaitInterval } time.Sleep(interval) } + + return len(leftRegions) } func (rs *RegionSplitter) splitAndScatterRegions( @@ -428,3 +485,423 @@ func replacePrefix(s []byte, rewriteRules *RewriteRules) ([]byte, *sst.RewriteRu return s, nil } +<<<<<<< HEAD +======= + +type rewriteSplitter struct { + rewriteKey []byte + tableID int64 + rule *RewriteRules + splitter *split.SplitHelper +} + +type splitHelperIterator struct { + tableSplitters []*rewriteSplitter +} + +func (iter *splitHelperIterator) Traverse(fn func(v split.Valued, endKey []byte, rule *RewriteRules) bool) { + for _, entry := range iter.tableSplitters { + endKey := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(entry.tableID+1)) + rule := entry.rule + entry.splitter.Traverse(func(v split.Valued) bool { + return fn(v, endKey, rule) + }) + } +} + +func NewSplitHelperIteratorForTest(helper *split.SplitHelper, tableID int64, rule *RewriteRules) *splitHelperIterator { + return &splitHelperIterator{ + tableSplitters: []*rewriteSplitter{ + { + tableID: tableID, + rule: rule, + splitter: helper, + }, + }, + } +} + +type LogSplitHelper struct { + tableSplitter map[int64]*split.SplitHelper + rules map[int64]*RewriteRules + client split.SplitClient + pool *utils.WorkerPool + eg *errgroup.Group + regionsCh chan []*split.RegionInfo + + splitThreSholdSize uint64 + splitThreSholdKeys int64 +} + +func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) *LogSplitHelper { + return &LogSplitHelper{ + tableSplitter: make(map[int64]*split.SplitHelper), + rules: rules, + client: client, + pool: utils.NewWorkerPool(128, "split region"), + eg: nil, + + splitThreSholdSize: splitSize, + splitThreSholdKeys: splitKeys, + } +} + +func (helper *LogSplitHelper) iterator() *splitHelperIterator { + tableSplitters := make([]*rewriteSplitter, 0, len(helper.tableSplitter)) + for tableID, splitter := range helper.tableSplitter { + delete(helper.tableSplitter, tableID) + rewriteRule, exists := helper.rules[tableID] + if !exists { + log.Info("skip splitting due to no table id matched", zap.Int64("tableID", tableID)) + continue + } + newTableID := GetRewriteTableID(tableID, rewriteRule) + if newTableID == 0 { + log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) + continue + } + tableSplitters = append(tableSplitters, &rewriteSplitter{ + rewriteKey: codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(newTableID)), + tableID: newTableID, + rule: rewriteRule, + splitter: splitter, + }) + } + sort.Slice(tableSplitters, func(i, j int) bool { + return bytes.Compare(tableSplitters[i].rewriteKey, tableSplitters[j].rewriteKey) < 0 + }) + return &splitHelperIterator{ + tableSplitters: tableSplitters, + } +} + +const splitFileThreshold = 1024 * 1024 // 1 MB + +func (helper *LogSplitHelper) skipFile(file *backuppb.DataFileInfo) bool { + _, exist := helper.rules[file.TableId] + return file.Length < splitFileThreshold || file.IsMeta || !exist +} + +func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) { + if helper.skipFile(file) { + return + } + splitHelper, exist := helper.tableSplitter[file.TableId] + if !exist { + splitHelper = split.NewSplitHelper() + helper.tableSplitter[file.TableId] = splitHelper + } + + splitHelper.Merge(split.Valued{ + Key: split.Span{ + StartKey: file.StartKey, + EndKey: file.EndKey, + }, + Value: split.Value{ + Size: file.Length, + Number: file.NumberOfEntries, + }, + }) +} + +type splitFunc = func(context.Context, *RegionSplitter, uint64, int64, *split.RegionInfo, []split.Valued) error + +func (helper *LogSplitHelper) splitRegionByPoints( + ctx context.Context, + regionSplitter *RegionSplitter, + initialLength uint64, + initialNumber int64, + region *split.RegionInfo, + valueds []split.Valued, +) error { + var ( + splitPoints [][]byte = make([][]byte, 0) + lastKey []byte = region.Region.StartKey + length uint64 = initialLength + number int64 = initialNumber + ) + for _, v := range valueds { + // decode will discard ts behind the key, which results in the same key for consecutive ranges + if !bytes.Equal(lastKey, v.GetStartKey()) && (v.Value.Size+length > helper.splitThreSholdSize || v.Value.Number+number > helper.splitThreSholdKeys) { + _, rawKey, _ := codec.DecodeBytes(v.GetStartKey(), nil) + splitPoints = append(splitPoints, rawKey) + length = 0 + number = 0 + } + lastKey = v.GetStartKey() + length += v.Value.Size + number += v.Value.Number + } + + if len(splitPoints) == 0 { + return nil + } + + helper.pool.ApplyOnErrorGroup(helper.eg, func() error { + newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints) + if errSplit != nil { + log.Warn("failed to split the scaned region", zap.Error(errSplit)) + _, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil) + ranges := make([]rtree.Range, 0, len(splitPoints)) + for _, point := range splitPoints { + ranges = append(ranges, rtree.Range{StartKey: startKey, EndKey: point}) + startKey = point + } + + return regionSplitter.Split(ctx, ranges, nil, false, func([][]byte) {}) + } + select { + case <-ctx.Done(): + return nil + case helper.regionsCh <- newRegions: + } + log.Info("split the region", zap.Uint64("region-id", region.Region.Id), zap.Int("split-point-number", len(splitPoints))) + return nil + }) + return nil +} + +// GetRewriteTableID gets rewrite table id by the rewrite rule and original table id +func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64 { + tableKey := tablecodec.GenTableRecordPrefix(tableID) + rule := matchOldPrefix(tableKey, rewriteRules) + if rule == nil { + return 0 + } + + return tablecodec.DecodeTableID(rule.GetNewKeyPrefix()) +} + +// SplitPoint selects ranges overlapped with each region, and calls `splitF` to split the region +func SplitPoint( + ctx context.Context, + iter *splitHelperIterator, + client split.SplitClient, + splitF splitFunc, +) (err error) { + // common status + var ( + regionSplitter *RegionSplitter = NewRegionSplitter(client) + ) + // region traverse status + var ( + // the region buffer of each scan + regions []*split.RegionInfo = nil + regionIndex int = 0 + ) + // region split status + var ( + // range span +----------------+------+---+-------------+ + // region span +------------------------------------+ + // +initial length+ +end valued+ + // regionValueds is the ranges array overlapped with `regionInfo` + regionValueds []split.Valued = nil + // regionInfo is the region to be split + regionInfo *split.RegionInfo = nil + // intialLength is the length of the part of the first range overlapped with the region + initialLength uint64 = 0 + initialNumber int64 = 0 + ) + // range status + var ( + // regionOverCount is the number of regions overlapped with the range + regionOverCount uint64 = 0 + ) + + iter.Traverse(func(v split.Valued, endKey []byte, rule *RewriteRules) bool { + if v.Value.Number == 0 || v.Value.Size == 0 { + return true + } + var ( + vStartKey []byte + vEndKey []byte + ) + // use `vStartKey` and `vEndKey` to compare with region's key + vStartKey, vEndKey, err = GetRewriteEncodedKeys(v, rule) + if err != nil { + return false + } + // traverse to the first region overlapped with the range + for ; regionIndex < len(regions); regionIndex++ { + if bytes.Compare(vStartKey, regions[regionIndex].Region.EndKey) < 0 { + break + } + } + // cannot find any regions overlapped with the range + // need to scan regions again + if regionIndex == len(regions) { + regions = nil + } + regionOverCount = 0 + for { + if regionIndex >= len(regions) { + var startKey []byte + if len(regions) > 0 { + // has traversed over the region buffer, should scan from the last region's end-key of the region buffer + startKey = regions[len(regions)-1].Region.EndKey + } else { + // scan from the range's start-key + startKey = vStartKey + } + // scan at most 64 regions into the region buffer + regions, err = split.ScanRegionsWithRetry(ctx, client, startKey, endKey, 64) + if err != nil { + return false + } + regionIndex = 0 + } + + region := regions[regionIndex] + // this region must be overlapped with the range + regionOverCount++ + // the region is the last one overlapped with the range, + // should split the last recorded region, + // and then record this region as the region to be split + if bytes.Compare(vEndKey, region.Region.EndKey) < 0 { + endLength := v.Value.Size / regionOverCount + endNumber := v.Value.Number / int64(regionOverCount) + if len(regionValueds) > 0 && regionInfo != region { + // add a part of the range as the end part + if bytes.Compare(vStartKey, regionInfo.Region.EndKey) < 0 { + regionValueds = append(regionValueds, split.NewValued(vStartKey, regionInfo.Region.EndKey, split.Value{Size: endLength, Number: endNumber})) + } + // try to split the region + err = splitF(ctx, regionSplitter, initialLength, initialNumber, regionInfo, regionValueds) + if err != nil { + return false + } + regionValueds = make([]split.Valued, 0) + } + if regionOverCount == 1 { + // the region completely contains the range + regionValueds = append(regionValueds, split.Valued{ + Key: split.Span{ + StartKey: vStartKey, + EndKey: vEndKey, + }, + Value: v.Value, + }) + } else { + // the region is overlapped with the last part of the range + initialLength = endLength + initialNumber = endNumber + } + regionInfo = region + // try the next range + return true + } + + // try the next region + regionIndex++ + } + }) + + if err != nil { + return errors.Trace(err) + } + if len(regionValueds) > 0 { + // try to split the region + err = splitF(ctx, regionSplitter, initialLength, initialNumber, regionInfo, regionValueds) + if err != nil { + return errors.Trace(err) + } + } + + return nil +} + +func (helper *LogSplitHelper) Split(ctx context.Context) error { + var ectx context.Context + var wg sync.WaitGroup + helper.eg, ectx = errgroup.WithContext(ctx) + helper.regionsCh = make(chan []*split.RegionInfo, 1024) + wg.Add(1) + go func() { + defer wg.Done() + scatterRegions := make([]*split.RegionInfo, 0) + receiveNewRegions: + for { + select { + case <-ectx.Done(): + return + case newRegions, ok := <-helper.regionsCh: + if !ok { + break receiveNewRegions + } + + scatterRegions = append(scatterRegions, newRegions...) + } + } + + regionSplitter := NewRegionSplitter(helper.client) + // It is too expensive to stop recovery and wait for a small number of regions + // to complete scatter, so the maximum waiting time is reduced to 1 minute. + _ = regionSplitter.WaitForScatterRegions(ctx, scatterRegions, time.Minute) + }() + + iter := helper.iterator() + if err := SplitPoint(ectx, iter, helper.client, helper.splitRegionByPoints); err != nil { + return errors.Trace(err) + } + + // wait for completion of splitting regions + if err := helper.eg.Wait(); err != nil { + return errors.Trace(err) + } + + // wait for completion of scattering regions + close(helper.regionsCh) + wg.Wait() + + return nil +} + +type LogFilesIterWithSplitHelper struct { + iter LogIter + helper *LogSplitHelper + buffer []*LogDataFileInfo + next int +} + +const SplitFilesBufferSize = 4096 + +func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) LogIter { + return &LogFilesIterWithSplitHelper{ + iter: iter, + helper: NewLogSplitHelper(rules, client, splitSize, splitKeys), + buffer: nil, + next: 0, + } +} + +func (splitIter *LogFilesIterWithSplitHelper) TryNext(ctx context.Context) iter.IterResult[*LogDataFileInfo] { + if splitIter.next >= len(splitIter.buffer) { + splitIter.buffer = make([]*LogDataFileInfo, 0, SplitFilesBufferSize) + for r := splitIter.iter.TryNext(ctx); !r.Finished; r = splitIter.iter.TryNext(ctx) { + if r.Err != nil { + return r + } + f := r.Item + splitIter.helper.Merge(f.DataFileInfo) + splitIter.buffer = append(splitIter.buffer, f) + if len(splitIter.buffer) >= SplitFilesBufferSize { + break + } + } + splitIter.next = 0 + if len(splitIter.buffer) == 0 { + return iter.Done[*LogDataFileInfo]() + } + log.Info("start to split the regions") + startTime := time.Now() + if err := splitIter.helper.Split(ctx); err != nil { + return iter.Throw[*LogDataFileInfo](errors.Trace(err)) + } + log.Info("end to split the regions", zap.Duration("takes", time.Since(startTime))) + } + + res := iter.Emit(splitIter.buffer[splitIter.next]) + splitIter.next += 1 + return res +} +>>>>>>> 0abd1329d6a (br: retry to scatter the regions if status is timeout or cancel (#46471)) diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index b726a5ec78729..df444f3570f58 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -37,6 +37,7 @@ type TestClient struct { regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions nextRegionID uint64 injectInScatter func(*split.RegionInfo) error + injectInOperator func(uint64) (*pdpb.GetOperatorResponse, error) supportBatchScatter bool scattered map[uint64]bool @@ -211,6 +212,9 @@ func (c *TestClient) ScatterRegion(ctx context.Context, regionInfo *split.Region } func (c *TestClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { + if c.injectInOperator != nil { + return c.injectInOperator(regionID) + } return &pdpb.GetOperatorResponse{ Header: new(pdpb.ResponseHeader), }, nil @@ -333,6 +337,114 @@ func TestSplitAndScatter(t *testing.T) { client := initTestClient(false) runTestSplitAndScatterWith(t, client) }) + t.Run("WaitScatter", func(t *testing.T) { + client := initTestClient(false) + client.InstallBatchScatterSupport() + runWaitScatter(t, client) + }) +} + +func TestXXX(t *testing.T) { + client := initTestClient(false) + client.InstallBatchScatterSupport() + runWaitScatter(t, client) +} + +// +------------+---------------------------- +// | region | states +// +------------+---------------------------- +// | [ , aay) | SUCCESS +// +------------+---------------------------- +// | [aay, bba) | CANCEL, SUCCESS +// +------------+---------------------------- +// | [bba, bbh) | RUNNING, TIMEOUT, SUCCESS +// +------------+---------------------------- +// | [bbh, cca) | +// +------------+---------------------------- +// | [cca, ) | CANCEL, RUNNING, SUCCESS +// +------------+---------------------------- +// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +// states: +func runWaitScatter(t *testing.T, client *TestClient) { + // configuration + type Operatorstates struct { + index int + status []pdpb.OperatorStatus + } + results := map[string]*Operatorstates{ + "": {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_SUCCESS}}, + string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_CANCEL, pdpb.OperatorStatus_SUCCESS}}, + string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_RUNNING, pdpb.OperatorStatus_TIMEOUT, pdpb.OperatorStatus_SUCCESS}}, + string(codec.EncodeBytesExt([]byte{}, []byte("bbh"), false)): {}, + string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_CANCEL, pdpb.OperatorStatus_RUNNING, pdpb.OperatorStatus_SUCCESS}}, + } + // after test done, the `leftScatterCount` should be empty + leftScatterCount := map[string]int{ + string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): 1, + string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): 1, + string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): 1, + } + client.injectInScatter = func(ri *split.RegionInfo) error { + states, ok := results[string(ri.Region.StartKey)] + require.True(t, ok) + require.NotEqual(t, 0, len(states.status)) + require.NotEqual(t, pdpb.OperatorStatus_SUCCESS, states.status[states.index]) + states.index += 1 + cnt, ok := leftScatterCount[string(ri.Region.StartKey)] + require.True(t, ok) + if cnt == 1 { + delete(leftScatterCount, string(ri.Region.StartKey)) + } else { + leftScatterCount[string(ri.Region.StartKey)] = cnt - 1 + } + return nil + } + regionsMap := client.GetAllRegions() + leftOperatorCount := map[string]int{ + "": 1, + string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): 2, + string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): 3, + string(codec.EncodeBytesExt([]byte{}, []byte("bbh"), false)): 1, + string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): 3, + } + client.injectInOperator = func(u uint64) (*pdpb.GetOperatorResponse, error) { + ri := regionsMap[u] + cnt, ok := leftOperatorCount[string(ri.Region.StartKey)] + require.True(t, ok) + if cnt == 1 { + delete(leftOperatorCount, string(ri.Region.StartKey)) + } else { + leftOperatorCount[string(ri.Region.StartKey)] = cnt - 1 + } + states, ok := results[string(ri.Region.StartKey)] + require.True(t, ok) + if len(states.status) == 0 { + return &pdpb.GetOperatorResponse{ + Desc: []byte("other"), + }, nil + } + if states.status[states.index] == pdpb.OperatorStatus_RUNNING { + states.index += 1 + return &pdpb.GetOperatorResponse{ + Desc: []byte("scatter-region"), + Status: states.status[states.index-1], + }, nil + } + return &pdpb.GetOperatorResponse{ + Desc: []byte("scatter-region"), + Status: states.status[states.index], + }, nil + } + + // begin to test + ctx := context.Background() + regions := make([]*split.RegionInfo, 0, len(regionsMap)) + for _, info := range regionsMap { + regions = append(regions, info) + } + regionSplitter := restore.NewRegionSplitter(client) + leftCnt := regionSplitter.WaitForScatterRegions(ctx, regions, 2000*time.Second) + require.Equal(t, leftCnt, 0) } func runTestSplitAndScatterWith(t *testing.T, client *TestClient) {