diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 7cf76fefe5932..f5522e40fe73a 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//br/pkg/conn/util", "//br/pkg/errors", "//br/pkg/glue", + "//br/pkg/lightning/common", "//br/pkg/logutil", "//br/pkg/metautil", "//br/pkg/pdutil", diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index c50a330c658a6..565254e830ddf 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" @@ -144,21 +145,15 @@ SplitRegions: } log.Info("start to wait for scattering regions", zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) - startTime = time.Now() - scatterCount := 0 - for _, region := range scatterRegions { - rs.waitForScatterRegion(ctx, region) - if time.Since(startTime) > split.ScatterWaitUpperInterval { - break - } - scatterCount++ - } - if scatterCount == len(scatterRegions) { + + leftCnt := rs.WaitForScatterRegions(ctx, scatterRegions, split.ScatterWaitUpperInterval) + if leftCnt == 0 { log.Info("waiting for scattering regions done", zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) } else { log.Warn("waiting for scattering regions timeout", - zap.Int("scatterCount", scatterCount), + zap.Int("NotScatterCount", leftCnt), + zap.Int("TotalScatterCount", len(scatterRegions)), zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) } @@ -188,26 +183,48 @@ func (rs *RegionSplitter) hasHealthyRegion(ctx context.Context, regionID uint64) return len(regionInfo.PendingPeers) == 0, nil } -func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) { +// isScatterRegionFinished check the latest successful operator and return the follow status: +// +// return (finished, needRescatter, error) +// +// if the latest operator is not `scatter-operator`, or its status is SUCCESS, it's likely that the +// scatter region operator is finished. +// +// if the latest operator is `scatter-operator` and its status is TIMEOUT or CANCEL, the needRescatter +// is true and the function caller needs to scatter this region again. +func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, bool, error) { resp, err := rs.client.GetOperator(ctx, regionID) if err != nil { - return false, errors.Trace(err) + if common.IsRetryableError(err) { + // retry in the next cycle + return false, false, nil + } + return false, false, errors.Trace(err) } // Heartbeat may not be sent to PD if respErr := resp.GetHeader().GetError(); respErr != nil { if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { - return true, nil + return true, false, nil } - return false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType()) + return false, false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType()) } retryTimes := ctx.Value(retryTimes).(int) if retryTimes > 3 { log.Info("get operator", zap.Uint64("regionID", regionID), zap.Stringer("resp", resp)) } // If the current operator of the region is not 'scatter-region', we could assume - // that 'scatter-operator' has finished or timeout - ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING - return ok, nil + // that 'scatter-operator' has finished + if string(resp.GetDesc()) != "scatter-region" { + return true, false, nil + } + switch resp.GetStatus() { + case pdpb.OperatorStatus_SUCCESS: + return true, false, nil + case pdpb.OperatorStatus_RUNNING: + return false, false, nil + default: + return false, true, nil + } } func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) { @@ -233,26 +250,66 @@ type retryTimeKey struct{} var retryTimes = new(retryTimeKey) -func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) { - interval := split.ScatterWaitInterval - regionID := regionInfo.Region.GetId() - for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ { - ctx1 := context.WithValue(ctx, retryTimes, i) - ok, err := rs.isScatterRegionFinished(ctx1, regionID) - if err != nil { - log.Warn("scatter region failed: do not have the region", - logutil.Region(regionInfo.Region)) - return +func mapRegionInfoSlice(regionInfos []*split.RegionInfo) map[uint64]*split.RegionInfo { + regionInfoMap := make(map[uint64]*split.RegionInfo) + for _, info := range regionInfos { + regionID := info.Region.GetId() + regionInfoMap[regionID] = info + } + return regionInfoMap +} + +func (rs *RegionSplitter) WaitForScatterRegions(ctx context.Context, regionInfos []*split.RegionInfo, timeout time.Duration) int { + var ( + startTime = time.Now() + interval = split.ScatterWaitInterval + leftRegions = mapRegionInfoSlice(regionInfos) + retryCnt = 0 + + reScatterRegions = make([]*split.RegionInfo, 0, len(regionInfos)) + ) + for { + ctx1 := context.WithValue(ctx, retryTimes, retryCnt) + reScatterRegions = reScatterRegions[:0] + for regionID, regionInfo := range leftRegions { + ok, rescatter, err := rs.isScatterRegionFinished(ctx1, regionID) + if err != nil { + log.Warn("scatter region failed: do not have the region", + logutil.Region(regionInfo.Region), zap.Error(err)) + delete(leftRegions, regionID) + continue + } + if ok { + delete(leftRegions, regionID) + continue + } + if rescatter { + reScatterRegions = append(reScatterRegions, regionInfo) + } + // RUNNING_STATUS, just wait and check it in the next loop } - if ok { + + if len(leftRegions) == 0 { + return 0 + } + + if len(reScatterRegions) > 0 { + rs.ScatterRegions(ctx1, reScatterRegions) + } + + if time.Since(startTime) > timeout { break } + + retryCnt += 1 interval = 2 * interval if interval > split.ScatterMaxWaitInterval { interval = split.ScatterMaxWaitInterval } time.Sleep(interval) } + + return len(leftRegions) } func (rs *RegionSplitter) splitAndScatterRegions( @@ -780,16 +837,10 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error { } } - startTime := time.Now() regionSplitter := NewRegionSplitter(helper.client) - for _, region := range scatterRegions { - regionSplitter.waitForScatterRegion(ctx, region) - // It is too expensive to stop recovery and wait for a small number of regions - // to complete scatter, so the maximum waiting time is reduced to 1 minute. - if time.Since(startTime) > time.Minute { - break - } - } + // It is too expensive to stop recovery and wait for a small number of regions + // to complete scatter, so the maximum waiting time is reduced to 1 minute. + _ = regionSplitter.WaitForScatterRegions(ctx, scatterRegions, time.Minute) }() iter := helper.iterator() diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 3afb9ec21dc99..c48d056be2589 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -41,6 +41,7 @@ type TestClient struct { regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions nextRegionID uint64 injectInScatter func(*split.RegionInfo) error + injectInOperator func(uint64) (*pdpb.GetOperatorResponse, error) supportBatchScatter bool scattered map[uint64]bool @@ -215,6 +216,9 @@ func (c *TestClient) ScatterRegion(ctx context.Context, regionInfo *split.Region } func (c *TestClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { + if c.injectInOperator != nil { + return c.injectInOperator(regionID) + } return &pdpb.GetOperatorResponse{ Header: new(pdpb.ResponseHeader), }, nil @@ -337,6 +341,114 @@ func TestSplitAndScatter(t *testing.T) { client := initTestClient(false) runTestSplitAndScatterWith(t, client) }) + t.Run("WaitScatter", func(t *testing.T) { + client := initTestClient(false) + client.InstallBatchScatterSupport() + runWaitScatter(t, client) + }) +} + +func TestXXX(t *testing.T) { + client := initTestClient(false) + client.InstallBatchScatterSupport() + runWaitScatter(t, client) +} + +// +------------+---------------------------- +// | region | states +// +------------+---------------------------- +// | [ , aay) | SUCCESS +// +------------+---------------------------- +// | [aay, bba) | CANCEL, SUCCESS +// +------------+---------------------------- +// | [bba, bbh) | RUNNING, TIMEOUT, SUCCESS +// +------------+---------------------------- +// | [bbh, cca) | +// +------------+---------------------------- +// | [cca, ) | CANCEL, RUNNING, SUCCESS +// +------------+---------------------------- +// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +// states: +func runWaitScatter(t *testing.T, client *TestClient) { + // configuration + type Operatorstates struct { + index int + status []pdpb.OperatorStatus + } + results := map[string]*Operatorstates{ + "": {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_SUCCESS}}, + string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_CANCEL, pdpb.OperatorStatus_SUCCESS}}, + string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_RUNNING, pdpb.OperatorStatus_TIMEOUT, pdpb.OperatorStatus_SUCCESS}}, + string(codec.EncodeBytesExt([]byte{}, []byte("bbh"), false)): {}, + string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_CANCEL, pdpb.OperatorStatus_RUNNING, pdpb.OperatorStatus_SUCCESS}}, + } + // after test done, the `leftScatterCount` should be empty + leftScatterCount := map[string]int{ + string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): 1, + string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): 1, + string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): 1, + } + client.injectInScatter = func(ri *split.RegionInfo) error { + states, ok := results[string(ri.Region.StartKey)] + require.True(t, ok) + require.NotEqual(t, 0, len(states.status)) + require.NotEqual(t, pdpb.OperatorStatus_SUCCESS, states.status[states.index]) + states.index += 1 + cnt, ok := leftScatterCount[string(ri.Region.StartKey)] + require.True(t, ok) + if cnt == 1 { + delete(leftScatterCount, string(ri.Region.StartKey)) + } else { + leftScatterCount[string(ri.Region.StartKey)] = cnt - 1 + } + return nil + } + regionsMap := client.GetAllRegions() + leftOperatorCount := map[string]int{ + "": 1, + string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): 2, + string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): 3, + string(codec.EncodeBytesExt([]byte{}, []byte("bbh"), false)): 1, + string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): 3, + } + client.injectInOperator = func(u uint64) (*pdpb.GetOperatorResponse, error) { + ri := regionsMap[u] + cnt, ok := leftOperatorCount[string(ri.Region.StartKey)] + require.True(t, ok) + if cnt == 1 { + delete(leftOperatorCount, string(ri.Region.StartKey)) + } else { + leftOperatorCount[string(ri.Region.StartKey)] = cnt - 1 + } + states, ok := results[string(ri.Region.StartKey)] + require.True(t, ok) + if len(states.status) == 0 { + return &pdpb.GetOperatorResponse{ + Desc: []byte("other"), + }, nil + } + if states.status[states.index] == pdpb.OperatorStatus_RUNNING { + states.index += 1 + return &pdpb.GetOperatorResponse{ + Desc: []byte("scatter-region"), + Status: states.status[states.index-1], + }, nil + } + return &pdpb.GetOperatorResponse{ + Desc: []byte("scatter-region"), + Status: states.status[states.index], + }, nil + } + + // begin to test + ctx := context.Background() + regions := make([]*split.RegionInfo, 0, len(regionsMap)) + for _, info := range regionsMap { + regions = append(regions, info) + } + regionSplitter := restore.NewRegionSplitter(client) + leftCnt := regionSplitter.WaitForScatterRegions(ctx, regions, 2000*time.Second) + require.Equal(t, leftCnt, 0) } func runTestSplitAndScatterWith(t *testing.T, client *TestClient) {