Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BR: Fix stuck when many missing-peer regions in cluster #28497

Merged
merged 8 commits into from
Oct 9, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 54 additions & 14 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/tikv/pd/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// Constants for split retry machinery.
Expand Down Expand Up @@ -279,22 +281,60 @@ func (rs *RegionSplitter) splitAndScatterRegions(
return newRegions, nil
}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
for _, region := range newRegions {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
if err := utils.WithRetry(ctx,
func() error { return rs.client.ScatterRegion(ctx, region) },
// backoff about 6s, or we give up scattering this region.
&scatterBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
},
); err != nil {
log.Warn("scatter region failed, stop retry", logutil.Region(region.Region), zap.Error(err))
// ScatterRegionsWithBackoffer scatter the region with some backoffer.
// This function is for testing the retry mechanism.
// For a real cluster, directly use ScatterRegions would be fine.
func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRegions []*RegionInfo, backoffer utils.Backoffer) {
newRegionSet := make(map[uint64]*RegionInfo, len(newRegions))
for _, newRegion := range newRegions {
newRegionSet[newRegion.Region.Id] = newRegion
}

if err := utils.WithRetry(ctx, func() error {
log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet)))
var errs error
for _, region := range newRegionSet {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
err := rs.client.ScatterRegion(ctx, region)
if err == nil {
// it is safe accroding to the Go language spec.
delete(newRegionSet, region.Region.Id)
} else if !pdErrorCanRetry(err) {
log.Warn("scatter meet error cannot be retried, skipping",
logutil.ShortError(err),
logutil.Region(region.Region),
)
delete(newRegionSet, region.Region.Id)
}
errs = multierr.Append(errs, err)
}
return errs
}, backoffer); err != nil {
log.Warn("Some regions haven't been scattered because errors. Use DEBUG level too see full region info",
kennytm marked this conversation as resolved.
Show resolved Hide resolved
zap.Int("count", len(newRegionSet)),
// if all region are failed to scatter, the short error might also be verbose...
logutil.ShortError(err),
)
// DebugLevel is the smallest (== -1) when writting this comment.
if log.GetLevel() <= zapcore.DebugLevel {
kennytm marked this conversation as resolved.
Show resolved Hide resolved
for _, r := range newRegionSet {
log.Debug("failed region", logutil.Region(r.Region))
}
}
}

}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
rs.ScatterRegionsWithBackoffer(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
&exponentialBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
})
}

func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
Expand Down
33 changes: 17 additions & 16 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,15 @@ func checkRegionEpoch(new, old *RegionInfo) bool {
new.Region.GetRegionEpoch().GetConfVer() == old.Region.GetRegionEpoch().GetConfVer()
}

type scatterBackoffer struct {
// exponentialBackoffer trivially retry any errors it meets.
// It's useful when the caller has handled the errors but
// only want to a more semantic backoff implementation.
type exponentialBackoffer struct {
attempt int
baseBackoff time.Duration
}

func (b *scatterBackoffer) exponentialBackoff() time.Duration {
func (b *exponentialBackoffer) exponentialBackoff() time.Duration {
bo := b.baseBackoff
b.attempt--
if b.attempt == 0 {
Expand All @@ -586,13 +589,7 @@ func (b *scatterBackoffer) exponentialBackoff() time.Duration {
return bo
}

func (b *scatterBackoffer) giveUp() time.Duration {
b.attempt = 0
return 0
}

// NextBackoff returns a duration to wait before retrying again
func (b *scatterBackoffer) NextBackoff(err error) time.Duration {
func pdErrorCanRetry(err error) bool {
// There are 3 type of reason that PD would reject a `scatter` request:
// (1) region %d has no leader
// (2) region %d is hot
Expand All @@ -602,20 +599,24 @@ func (b *scatterBackoffer) NextBackoff(err error) time.Duration {
// (1) and (3) might happen, and should be retried.
grpcErr := status.Convert(err)
if grpcErr == nil {
return b.giveUp()
return false
}
if strings.Contains(grpcErr.Message(), "is not fully replicated") {
log.Info("scatter region failed, retring", logutil.ShortError(err), zap.Int("attempt-remain", b.attempt))
return b.exponentialBackoff()
return true
}
if strings.Contains(grpcErr.Message(), "has no leader") {
log.Info("scatter region failed, retring", logutil.ShortError(err), zap.Int("attempt-remain", b.attempt))
return b.exponentialBackoff()
return true
}
return b.giveUp()
return false
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}

// NextBackoff returns a duration to wait before retrying again.
func (b *exponentialBackoffer) NextBackoff(error) time.Duration {
// trivially exponential back off, because we have handled the error at upper level.
return b.exponentialBackoff()
}

// Attempt returns the remain attempt times
func (b *scatterBackoffer) Attempt() int {
func (b *exponentialBackoffer) Attempt() int {
return b.attempt
}
119 changes: 99 additions & 20 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"bytes"
"context"
"sync"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand All @@ -14,19 +16,22 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/util/codec"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/placement"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type TestClient struct {
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
injectInScatter func(*restore.RegionInfo) error

scattered map[uint64]bool
}
Expand All @@ -41,11 +46,12 @@ func NewTestClient(
regionsInfo.SetRegion(core.NewRegionInfo(regionInfo.Region, regionInfo.Leader))
}
return &TestClient{
stores: stores,
regions: regions,
regionsInfo: regionsInfo,
nextRegionID: nextRegionID,
scattered: map[uint64]bool{},
stores: stores,
regions: regions,
regionsInfo: regionsInfo,
nextRegionID: nextRegionID,
scattered: map[uint64]bool{},
injectInScatter: func(*restore.RegionInfo) error { return nil },
}
}

Expand Down Expand Up @@ -164,12 +170,7 @@ func (c *TestClient) BatchSplitRegions(
}

func (c *TestClient) ScatterRegion(ctx context.Context, regionInfo *restore.RegionInfo) error {
if _, ok := c.scattered[regionInfo.Region.Id]; !ok {
c.scattered[regionInfo.Region.Id] = false
return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionInfo.Region.Id)
}
c.scattered[regionInfo.Region.Id] = true
return nil
return c.injectInScatter(regionInfo)
}

func (c *TestClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
Expand Down Expand Up @@ -206,15 +207,84 @@ func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
return nil
}

func (c *TestClient) checkScatter(check *C) {
regions := c.GetAllRegions()
func checkScatter(check *C, regions map[uint64]*restore.RegionInfo, scattered map[uint64]bool) {
for key := range regions {
if !c.scattered[key] {
if !scattered[key] {
check.Fatalf("region %d has not been scattered: %#v", key, regions[key])
}
}
}

type assertRetryNotThanBackoffer struct {
max int
already int
t *testing.T
}

func assertRetryNotThan(t *testing.T, times int) utils.Backoffer {
kennytm marked this conversation as resolved.
Show resolved Hide resolved
return &assertRetryNotThanBackoffer{
max: times,
already: 0,
t: t,
}
}

// NextBackoff returns a duration to wait before retrying again
func (b *assertRetryNotThanBackoffer) NextBackoff(err error) time.Duration {
b.already++
if b.already >= b.max {
b.t.Logf("retry more than %d time: test failed", b.max)
b.t.FailNow()
}
return 0
}

// Attempt returns the remain attempt times
func (b *assertRetryNotThanBackoffer) Attempt() int {
return b.max - b.already
}

func TestScatterFinishInTime(t *testing.T) {
client := initTestClient()
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := restore.NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
if err != nil {
require.Nil(t, err)
}
regions := client.GetAllRegions()
if !validateRegions(regions) {
for _, region := range regions {
t.Logf("region: %v\n", region.Region)
}
t.Log("get wrong result")
t.Fail()
}

regionInfos := make([]*restore.RegionInfo, 0, len(regions))
for _, info := range regions {
regionInfos = append(regionInfos, info)
}
failed := map[uint64]int{}
client.injectInScatter = func(r *restore.RegionInfo) error {
failed[r.Region.Id] = failed[r.Region.Id] + 1
kennytm marked this conversation as resolved.
Show resolved Hide resolved
if failed[r.Region.Id] > 7 {
return nil
}
return status.Errorf(codes.Unknown, "region %d is not fully replicated", r.Region.Id)
}

// When using a exponential backoffer, if we try to backoff more than 40 times in 10 regions,
// it would cost time unacceptable.
regionSplitter.ScatterRegionsWithBackoffer(ctx,
regionInfos,
assertRetryNotThan(t, 40))

}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj)
// rewrite rules: aa -> xx, cc -> bb
Expand Down Expand Up @@ -244,8 +314,17 @@ func (s *testRangeSuite) TestSplitAndScatter(c *C) {
for _, info := range regions {
regionInfos = append(regionInfos, info)
}
scattered := map[uint64]bool{}
client.injectInScatter = func(regionInfo *restore.RegionInfo) error {
if _, ok := scattered[regionInfo.Region.Id]; !ok {
scattered[regionInfo.Region.Id] = false
return status.Errorf(codes.Unknown, "region %d is not fully replicated", regionInfo.Region.Id)
}
scattered[regionInfo.Region.Id] = true
return nil
}
regionSplitter.ScatterRegions(ctx, regionInfos)
client.checkScatter(c)
checkScatter(c, client.GetAllRegions(), scattered)
}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
Expand Down