diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index dee6810f1b146..27e14c5bc83a5 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "encoding/hex" + "strconv" "strings" "time" @@ -21,6 +22,7 @@ import ( "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" "github.com/tikv/pd/pkg/codec" + "go.uber.org/multierr" "go.uber.org/zap" ) @@ -279,22 +281,62 @@ func (rs *RegionSplitter) splitAndScatterRegions( return newRegions, nil } -// ScatterRegions scatter the regions. -func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) { - for _, region := range newRegions { - // Wait for a while until the regions successfully split. - rs.waitForSplit(ctx, region.Region.Id) - if err := utils.WithRetry(ctx, - func() error { return rs.client.ScatterRegion(ctx, region) }, - // backoff about 6s, or we give up scattering this region. - &scatterBackoffer{ - attempt: 7, - baseBackoff: 100 * time.Millisecond, - }, - ); err != nil { - log.Warn("scatter region failed, stop retry", logutil.Region(region.Region), zap.Error(err)) +// ScatterRegionsWithBackoffer scatter the region with some backoffer. +// This function is for testing the retry mechanism. +// For a real cluster, directly use ScatterRegions would be fine. +func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRegions []*RegionInfo, backoffer utils.Backoffer) { + newRegionSet := make(map[uint64]*RegionInfo, len(newRegions)) + for _, newRegion := range newRegions { + newRegionSet[newRegion.Region.Id] = newRegion + } + + if err := utils.WithRetry(ctx, func() error { + log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet))) + var errs error + for _, region := range newRegionSet { + // Wait for a while until the regions successfully split. + rs.waitForSplit(ctx, region.Region.Id) + err := rs.client.ScatterRegion(ctx, region) + if err == nil { + // it is safe accroding to the Go language spec. + delete(newRegionSet, region.Region.Id) + } else if !pdErrorCanRetry(err) { + log.Warn("scatter meet error cannot be retried, skipping", + logutil.ShortError(err), + logutil.Region(region.Region), + ) + delete(newRegionSet, region.Region.Id) + } + errs = multierr.Append(errs, err) } + return errs + }, backoffer); err != nil { + log.Warn("Some regions haven't been scattered because errors.", + zap.Int("count", len(newRegionSet)), + // if all region are failed to scatter, the short error might also be verbose... + logutil.ShortError(err), + logutil.AbbreviatedArray("failed-regions", newRegionSet, func(i interface{}) []string { + m := i.(map[uint64]*RegionInfo) + result := make([]string, len(m)) + for id := range m { + result = append(result, strconv.Itoa(int(id))) + } + return result + }), + ) } + +} + +// ScatterRegions scatter the regions. +func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) { + rs.ScatterRegionsWithBackoffer( + ctx, newRegions, + // backoff about 6s, or we give up scattering this region. + &exponentialBackoffer{ + attempt: 7, + baseBackoff: 100 * time.Millisecond, + }) } func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { diff --git a/br/pkg/restore/split_client.go b/br/pkg/restore/split_client.go index 3e619a6cab750..e5105a56dc603 100755 --- a/br/pkg/restore/split_client.go +++ b/br/pkg/restore/split_client.go @@ -571,12 +571,15 @@ func checkRegionEpoch(new, old *RegionInfo) bool { new.Region.GetRegionEpoch().GetConfVer() == old.Region.GetRegionEpoch().GetConfVer() } -type scatterBackoffer struct { +// exponentialBackoffer trivially retry any errors it meets. +// It's useful when the caller has handled the errors but +// only want to a more semantic backoff implementation. +type exponentialBackoffer struct { attempt int baseBackoff time.Duration } -func (b *scatterBackoffer) exponentialBackoff() time.Duration { +func (b *exponentialBackoffer) exponentialBackoff() time.Duration { bo := b.baseBackoff b.attempt-- if b.attempt == 0 { @@ -586,13 +589,7 @@ func (b *scatterBackoffer) exponentialBackoff() time.Duration { return bo } -func (b *scatterBackoffer) giveUp() time.Duration { - b.attempt = 0 - return 0 -} - -// NextBackoff returns a duration to wait before retrying again -func (b *scatterBackoffer) NextBackoff(err error) time.Duration { +func pdErrorCanRetry(err error) bool { // There are 3 type of reason that PD would reject a `scatter` request: // (1) region %d has no leader // (2) region %d is hot @@ -602,20 +599,19 @@ func (b *scatterBackoffer) NextBackoff(err error) time.Duration { // (1) and (3) might happen, and should be retried. grpcErr := status.Convert(err) if grpcErr == nil { - return b.giveUp() - } - if strings.Contains(grpcErr.Message(), "is not fully replicated") { - log.Info("scatter region failed, retring", logutil.ShortError(err), zap.Int("attempt-remain", b.attempt)) - return b.exponentialBackoff() + return false } - if strings.Contains(grpcErr.Message(), "has no leader") { - log.Info("scatter region failed, retring", logutil.ShortError(err), zap.Int("attempt-remain", b.attempt)) - return b.exponentialBackoff() - } - return b.giveUp() + return strings.Contains(grpcErr.Message(), "is not fully replicated") || + strings.Contains(grpcErr.Message(), "has no leader") +} + +// NextBackoff returns a duration to wait before retrying again. +func (b *exponentialBackoffer) NextBackoff(error) time.Duration { + // trivially exponential back off, because we have handled the error at upper level. + return b.exponentialBackoff() } // Attempt returns the remain attempt times -func (b *scatterBackoffer) Attempt() int { +func (b *exponentialBackoffer) Attempt() int { return b.attempt } diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 84d110c3f7600..7a39785af2cb9 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -6,6 +6,8 @@ import ( "bytes" "context" "sync" + "testing" + "time" . "github.com/pingcap/check" "github.com/pingcap/errors" @@ -14,7 +16,9 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/rtree" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/util/codec" + "github.com/stretchr/testify/require" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/placement" "google.golang.org/grpc/codes" @@ -22,11 +26,12 @@ import ( ) type TestClient struct { - mu sync.RWMutex - stores map[uint64]*metapb.Store - regions map[uint64]*restore.RegionInfo - regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions - nextRegionID uint64 + mu sync.RWMutex + stores map[uint64]*metapb.Store + regions map[uint64]*restore.RegionInfo + regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions + nextRegionID uint64 + injectInScatter func(*restore.RegionInfo) error scattered map[uint64]bool } @@ -41,11 +46,12 @@ func NewTestClient( regionsInfo.SetRegion(core.NewRegionInfo(regionInfo.Region, regionInfo.Leader)) } return &TestClient{ - stores: stores, - regions: regions, - regionsInfo: regionsInfo, - nextRegionID: nextRegionID, - scattered: map[uint64]bool{}, + stores: stores, + regions: regions, + regionsInfo: regionsInfo, + nextRegionID: nextRegionID, + scattered: map[uint64]bool{}, + injectInScatter: func(*restore.RegionInfo) error { return nil }, } } @@ -164,12 +170,7 @@ func (c *TestClient) BatchSplitRegions( } func (c *TestClient) ScatterRegion(ctx context.Context, regionInfo *restore.RegionInfo) error { - if _, ok := c.scattered[regionInfo.Region.Id]; !ok { - c.scattered[regionInfo.Region.Id] = false - return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionInfo.Region.Id) - } - c.scattered[regionInfo.Region.Id] = true - return nil + return c.injectInScatter(regionInfo) } func (c *TestClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { @@ -206,13 +207,73 @@ func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK return nil } -func (c *TestClient) checkScatter(check *C) { - regions := c.GetAllRegions() - for key := range regions { - if !c.scattered[key] { - check.Fatalf("region %d has not been scattered: %#v", key, regions[key]) +type assertRetryLessThanBackoffer struct { + max int + already int + t *testing.T +} + +func assertRetryLessThan(t *testing.T, times int) utils.Backoffer { + return &assertRetryLessThanBackoffer{ + max: times, + already: 0, + t: t, + } +} + +// NextBackoff returns a duration to wait before retrying again +func (b *assertRetryLessThanBackoffer) NextBackoff(err error) time.Duration { + b.already++ + if b.already >= b.max { + b.t.Logf("retry more than %d time: test failed", b.max) + b.t.FailNow() + } + return 0 +} + +// Attempt returns the remain attempt times +func (b *assertRetryLessThanBackoffer) Attempt() int { + return b.max - b.already +} + +func TestScatterFinishInTime(t *testing.T) { + t.Parallel() + client := initTestClient() + ranges := initRanges() + rewriteRules := initRewriteRules() + regionSplitter := restore.NewRegionSplitter(client) + + ctx := context.Background() + err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {}) + require.NoError(t, err) + regions := client.GetAllRegions() + if !validateRegions(regions) { + for _, region := range regions { + t.Logf("region: %v\n", region.Region) + } + t.Log("get wrong result") + t.Fail() + } + + regionInfos := make([]*restore.RegionInfo, 0, len(regions)) + for _, info := range regions { + regionInfos = append(regionInfos, info) + } + failed := map[uint64]int{} + client.injectInScatter = func(r *restore.RegionInfo) error { + failed[r.Region.Id]++ + if failed[r.Region.Id] > 7 { + return nil } + return status.Errorf(codes.Unknown, "region %d is not fully replicated", r.Region.Id) } + + // When using a exponential backoffer, if we try to backoff more than 40 times in 10 regions, + // it would cost time unacceptable. + regionSplitter.ScatterRegionsWithBackoffer(ctx, + regionInfos, + assertRetryLessThan(t, 40)) + } // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) @@ -221,7 +282,8 @@ func (c *TestClient) checkScatter(check *C) { // expected regions after split: // [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), // [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, ) -func (s *testRangeSuite) TestSplitAndScatter(c *C) { +func TestSplitAndScatter(t *testing.T) { + t.Parallel() client := initTestClient() ranges := initRanges() rewriteRules := initRewriteRules() @@ -229,23 +291,38 @@ func (s *testRangeSuite) TestSplitAndScatter(c *C) { ctx := context.Background() err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {}) - if err != nil { - c.Assert(err, IsNil, Commentf("split regions failed: %v", err)) - } + require.NoError(t, err) regions := client.GetAllRegions() if !validateRegions(regions) { for _, region := range regions { - c.Logf("region: %v\n", region.Region) + t.Logf("region: %v\n", region.Region) } - c.Log("get wrong result") - c.Fail() + t.Log("get wrong result") + t.Fail() } regionInfos := make([]*restore.RegionInfo, 0, len(regions)) for _, info := range regions { regionInfos = append(regionInfos, info) } + scattered := map[uint64]bool{} + const alwaysFailedRegionID = 1 + client.injectInScatter = func(regionInfo *restore.RegionInfo) error { + if _, ok := scattered[regionInfo.Region.Id]; !ok || regionInfo.Region.Id == alwaysFailedRegionID { + scattered[regionInfo.Region.Id] = false + return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionInfo.Region.Id) + } + scattered[regionInfo.Region.Id] = true + return nil + } regionSplitter.ScatterRegions(ctx, regionInfos) - client.checkScatter(c) + for key := range regions { + if key == alwaysFailedRegionID { + require.Falsef(t, scattered[key], "always failed region %d was scattered successfully", key) + } else if !scattered[key] { + t.Fatalf("region %d has not been scattered: %#v", key, regions[key]) + } + } + } // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )