Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BR: Fix stuck when many missing-peer regions in cluster (#28497) #28680

Merged
merged 4 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 56 additions & 14 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"encoding/hex"
"strconv"
"strings"
"time"

Expand All @@ -21,6 +22,7 @@ import (
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/tikv/pd/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -277,22 +279,62 @@ func (rs *RegionSplitter) splitAndScatterRegions(
return newRegions, nil
}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
for _, region := range newRegions {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
if err := utils.WithRetry(ctx,
func() error { return rs.client.ScatterRegion(ctx, region) },
// backoff about 6s, or we give up scattering this region.
&scatterBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
},
); err != nil {
log.Warn("scatter region failed, stop retry", logutil.Region(region.Region), zap.Error(err))
// ScatterRegionsWithBackoffer scatter the region with some backoffer.
// This function is for testing the retry mechanism.
// For a real cluster, directly use ScatterRegions would be fine.
func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRegions []*RegionInfo, backoffer utils.Backoffer) {
newRegionSet := make(map[uint64]*RegionInfo, len(newRegions))
for _, newRegion := range newRegions {
newRegionSet[newRegion.Region.Id] = newRegion
}

if err := utils.WithRetry(ctx, func() error {
log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet)))
var errs error
for _, region := range newRegionSet {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
err := rs.client.ScatterRegion(ctx, region)
if err == nil {
// it is safe accroding to the Go language spec.
delete(newRegionSet, region.Region.Id)
} else if !pdErrorCanRetry(err) {
log.Warn("scatter meet error cannot be retried, skipping",
logutil.ShortError(err),
logutil.Region(region.Region),
)
delete(newRegionSet, region.Region.Id)
}
errs = multierr.Append(errs, err)
}
return errs
}, backoffer); err != nil {
log.Warn("Some regions haven't been scattered because errors.",
zap.Int("count", len(newRegionSet)),
// if all region are failed to scatter, the short error might also be verbose...
logutil.ShortError(err),
logutil.AbbreviatedArray("failed-regions", newRegionSet, func(i interface{}) []string {
m := i.(map[uint64]*RegionInfo)
result := make([]string, len(m))
for id := range m {
result = append(result, strconv.Itoa(int(id)))
}
return result
}),
)
}

}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
rs.ScatterRegionsWithBackoffer(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
&exponentialBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
})
}

func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
Expand Down
36 changes: 16 additions & 20 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,12 +500,15 @@ func checkRegionEpoch(new, old *RegionInfo) bool {
new.Region.GetRegionEpoch().GetConfVer() == old.Region.GetRegionEpoch().GetConfVer()
}

type scatterBackoffer struct {
// exponentialBackoffer trivially retry any errors it meets.
// It's useful when the caller has handled the errors but
// only want to a more semantic backoff implementation.
type exponentialBackoffer struct {
attempt int
baseBackoff time.Duration
}

func (b *scatterBackoffer) exponentialBackoff() time.Duration {
func (b *exponentialBackoffer) exponentialBackoff() time.Duration {
bo := b.baseBackoff
b.attempt--
if b.attempt == 0 {
Expand All @@ -515,13 +518,7 @@ func (b *scatterBackoffer) exponentialBackoff() time.Duration {
return bo
}

func (b *scatterBackoffer) giveUp() time.Duration {
b.attempt = 0
return 0
}

// NextBackoff returns a duration to wait before retrying again
func (b *scatterBackoffer) NextBackoff(err error) time.Duration {
func pdErrorCanRetry(err error) bool {
// There are 3 type of reason that PD would reject a `scatter` request:
// (1) region %d has no leader
// (2) region %d is hot
Expand All @@ -531,20 +528,19 @@ func (b *scatterBackoffer) NextBackoff(err error) time.Duration {
// (1) and (3) might happen, and should be retried.
grpcErr := status.Convert(err)
if grpcErr == nil {
return b.giveUp()
}
if strings.Contains(grpcErr.Message(), "is not fully replicated") {
log.Info("scatter region failed, retring", logutil.ShortError(err), zap.Int("attempt-remain", b.attempt))
return b.exponentialBackoff()
return false
}
if strings.Contains(grpcErr.Message(), "has no leader") {
log.Info("scatter region failed, retring", logutil.ShortError(err), zap.Int("attempt-remain", b.attempt))
return b.exponentialBackoff()
}
return b.giveUp()
return strings.Contains(grpcErr.Message(), "is not fully replicated") ||
strings.Contains(grpcErr.Message(), "has no leader")
}

// NextBackoff returns a duration to wait before retrying again.
func (b *exponentialBackoffer) NextBackoff(error) time.Duration {
// trivially exponential back off, because we have handled the error at upper level.
return b.exponentialBackoff()
}

// Attempt returns the remain attempt times
func (b *scatterBackoffer) Attempt() int {
func (b *exponentialBackoffer) Attempt() int {
return b.attempt
}
135 changes: 106 additions & 29 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"bytes"
"context"
"sync"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand All @@ -14,19 +16,22 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/util/codec"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/placement"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type TestClient struct {
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
injectInScatter func(*restore.RegionInfo) error

scattered map[uint64]bool
}
Expand All @@ -41,11 +46,12 @@ func NewTestClient(
regionsInfo.SetRegion(core.NewRegionInfo(regionInfo.Region, regionInfo.Leader))
}
return &TestClient{
stores: stores,
regions: regions,
regionsInfo: regionsInfo,
nextRegionID: nextRegionID,
scattered: map[uint64]bool{},
stores: stores,
regions: regions,
regionsInfo: regionsInfo,
nextRegionID: nextRegionID,
scattered: map[uint64]bool{},
injectInScatter: func(*restore.RegionInfo) error { return nil },
}
}

Expand Down Expand Up @@ -164,12 +170,7 @@ func (c *TestClient) BatchSplitRegions(
}

func (c *TestClient) ScatterRegion(ctx context.Context, regionInfo *restore.RegionInfo) error {
if _, ok := c.scattered[regionInfo.Region.Id]; !ok {
c.scattered[regionInfo.Region.Id] = false
return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionInfo.Region.Id)
}
c.scattered[regionInfo.Region.Id] = true
return nil
return c.injectInScatter(regionInfo)
}

func (c *TestClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
Expand Down Expand Up @@ -206,13 +207,73 @@ func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
return nil
}

func (c *TestClient) checkScatter(check *C) {
regions := c.GetAllRegions()
for key := range regions {
if !c.scattered[key] {
check.Fatalf("region %d has not been scattered: %#v", key, regions[key])
type assertRetryLessThanBackoffer struct {
max int
already int
t *testing.T
}

func assertRetryLessThan(t *testing.T, times int) utils.Backoffer {
return &assertRetryLessThanBackoffer{
max: times,
already: 0,
t: t,
}
}

// NextBackoff returns a duration to wait before retrying again
func (b *assertRetryLessThanBackoffer) NextBackoff(err error) time.Duration {
b.already++
if b.already >= b.max {
b.t.Logf("retry more than %d time: test failed", b.max)
b.t.FailNow()
}
return 0
}

// Attempt returns the remain attempt times
func (b *assertRetryLessThanBackoffer) Attempt() int {
return b.max - b.already
}

func TestScatterFinishInTime(t *testing.T) {
t.Parallel()
client := initTestClient()
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := restore.NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
require.NoError(t, err)
regions := client.GetAllRegions()
if !validateRegions(regions) {
for _, region := range regions {
t.Logf("region: %v\n", region.Region)
}
t.Log("get wrong result")
t.Fail()
}

regionInfos := make([]*restore.RegionInfo, 0, len(regions))
for _, info := range regions {
regionInfos = append(regionInfos, info)
}
failed := map[uint64]int{}
client.injectInScatter = func(r *restore.RegionInfo) error {
failed[r.Region.Id]++
if failed[r.Region.Id] > 7 {
return nil
}
return status.Errorf(codes.Unknown, "region %d is not fully replicated", r.Region.Id)
}

// When using a exponential backoffer, if we try to backoff more than 40 times in 10 regions,
// it would cost time unacceptable.
regionSplitter.ScatterRegionsWithBackoffer(ctx,
regionInfos,
assertRetryLessThan(t, 40))

}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
Expand All @@ -221,31 +282,47 @@ func (c *TestClient) checkScatter(check *C) {
// expected regions after split:
// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, )
func (s *testRangeSuite) TestSplitAndScatter(c *C) {
func TestSplitAndScatter(t *testing.T) {
t.Parallel()
client := initTestClient()
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := restore.NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
if err != nil {
c.Assert(err, IsNil, Commentf("split regions failed: %v", err))
}
require.NoError(t, err)
regions := client.GetAllRegions()
if !validateRegions(regions) {
for _, region := range regions {
c.Logf("region: %v\n", region.Region)
t.Logf("region: %v\n", region.Region)
}
c.Log("get wrong result")
c.Fail()
t.Log("get wrong result")
t.Fail()
}
regionInfos := make([]*restore.RegionInfo, 0, len(regions))
for _, info := range regions {
regionInfos = append(regionInfos, info)
}
scattered := map[uint64]bool{}
const alwaysFailedRegionID = 1
client.injectInScatter = func(regionInfo *restore.RegionInfo) error {
if _, ok := scattered[regionInfo.Region.Id]; !ok || regionInfo.Region.Id == alwaysFailedRegionID {
scattered[regionInfo.Region.Id] = false
return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionInfo.Region.Id)
}
scattered[regionInfo.Region.Id] = true
return nil
}
regionSplitter.ScatterRegions(ctx, regionInfos)
client.checkScatter(c)
for key := range regions {
if key == alwaysFailedRegionID {
require.Falsef(t, scattered[key], "always failed region %d was scattered successfully", key)
} else if !scattered[key] {
t.Fatalf("region %d has not been scattered: %#v", key, regions[key])
}
}

}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
Expand Down