Skip to content

Commit

Permalink
br: retry to scatter the regions if status is timeout or cancel (#46471)
Browse files Browse the repository at this point in the history
close #47236
  • Loading branch information
Leavrth authored Nov 7, 2023
1 parent 249a5fe commit 0abd132
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 38 deletions.
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//br/pkg/conn/util",
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/lightning/common",
"//br/pkg/logutil",
"//br/pkg/metautil",
"//br/pkg/pdutil",
Expand Down
127 changes: 89 additions & 38 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/rtree"
Expand Down Expand Up @@ -144,21 +145,15 @@ SplitRegions:
}
log.Info("start to wait for scattering regions",
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
startTime = time.Now()
scatterCount := 0
for _, region := range scatterRegions {
rs.waitForScatterRegion(ctx, region)
if time.Since(startTime) > split.ScatterWaitUpperInterval {
break
}
scatterCount++
}
if scatterCount == len(scatterRegions) {

leftCnt := rs.WaitForScatterRegions(ctx, scatterRegions, split.ScatterWaitUpperInterval)
if leftCnt == 0 {
log.Info("waiting for scattering regions done",
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
} else {
log.Warn("waiting for scattering regions timeout",
zap.Int("scatterCount", scatterCount),
zap.Int("NotScatterCount", leftCnt),
zap.Int("TotalScatterCount", len(scatterRegions)),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)))
}
Expand Down Expand Up @@ -188,26 +183,48 @@ func (rs *RegionSplitter) hasHealthyRegion(ctx context.Context, regionID uint64)
return len(regionInfo.PendingPeers) == 0, nil
}

func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) {
// isScatterRegionFinished check the latest successful operator and return the follow status:
//
// return (finished, needRescatter, error)
//
// if the latest operator is not `scatter-operator`, or its status is SUCCESS, it's likely that the
// scatter region operator is finished.
//
// if the latest operator is `scatter-operator` and its status is TIMEOUT or CANCEL, the needRescatter
// is true and the function caller needs to scatter this region again.
func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, bool, error) {
resp, err := rs.client.GetOperator(ctx, regionID)
if err != nil {
return false, errors.Trace(err)
if common.IsRetryableError(err) {
// retry in the next cycle
return false, false, nil
}
return false, false, errors.Trace(err)
}
// Heartbeat may not be sent to PD
if respErr := resp.GetHeader().GetError(); respErr != nil {
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
return true, nil
return true, false, nil
}
return false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType())
return false, false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType())
}
retryTimes := ctx.Value(retryTimes).(int)
if retryTimes > 3 {
log.Info("get operator", zap.Uint64("regionID", regionID), zap.Stringer("resp", resp))
}
// If the current operator of the region is not 'scatter-region', we could assume
// that 'scatter-operator' has finished or timeout
ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING
return ok, nil
// that 'scatter-operator' has finished
if string(resp.GetDesc()) != "scatter-region" {
return true, false, nil
}
switch resp.GetStatus() {
case pdpb.OperatorStatus_SUCCESS:
return true, false, nil
case pdpb.OperatorStatus_RUNNING:
return false, false, nil
default:
return false, true, nil
}
}

func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) {
Expand All @@ -233,26 +250,66 @@ type retryTimeKey struct{}

var retryTimes = new(retryTimeKey)

func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
interval := split.ScatterWaitInterval
regionID := regionInfo.Region.GetId()
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ctx1 := context.WithValue(ctx, retryTimes, i)
ok, err := rs.isScatterRegionFinished(ctx1, regionID)
if err != nil {
log.Warn("scatter region failed: do not have the region",
logutil.Region(regionInfo.Region))
return
func mapRegionInfoSlice(regionInfos []*split.RegionInfo) map[uint64]*split.RegionInfo {
regionInfoMap := make(map[uint64]*split.RegionInfo)
for _, info := range regionInfos {
regionID := info.Region.GetId()
regionInfoMap[regionID] = info
}
return regionInfoMap
}

func (rs *RegionSplitter) WaitForScatterRegions(ctx context.Context, regionInfos []*split.RegionInfo, timeout time.Duration) int {
var (
startTime = time.Now()
interval = split.ScatterWaitInterval
leftRegions = mapRegionInfoSlice(regionInfos)
retryCnt = 0

reScatterRegions = make([]*split.RegionInfo, 0, len(regionInfos))
)
for {
ctx1 := context.WithValue(ctx, retryTimes, retryCnt)
reScatterRegions = reScatterRegions[:0]
for regionID, regionInfo := range leftRegions {
ok, rescatter, err := rs.isScatterRegionFinished(ctx1, regionID)
if err != nil {
log.Warn("scatter region failed: do not have the region",
logutil.Region(regionInfo.Region), zap.Error(err))
delete(leftRegions, regionID)
continue
}
if ok {
delete(leftRegions, regionID)
continue
}
if rescatter {
reScatterRegions = append(reScatterRegions, regionInfo)
}
// RUNNING_STATUS, just wait and check it in the next loop
}
if ok {

if len(leftRegions) == 0 {
return 0
}

if len(reScatterRegions) > 0 {
rs.ScatterRegions(ctx1, reScatterRegions)
}

if time.Since(startTime) > timeout {
break
}

retryCnt += 1
interval = 2 * interval
if interval > split.ScatterMaxWaitInterval {
interval = split.ScatterMaxWaitInterval
}
time.Sleep(interval)
}

return len(leftRegions)
}

func (rs *RegionSplitter) splitAndScatterRegions(
Expand Down Expand Up @@ -780,16 +837,10 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error {
}
}

startTime := time.Now()
regionSplitter := NewRegionSplitter(helper.client)
for _, region := range scatterRegions {
regionSplitter.waitForScatterRegion(ctx, region)
// It is too expensive to stop recovery and wait for a small number of regions
// to complete scatter, so the maximum waiting time is reduced to 1 minute.
if time.Since(startTime) > time.Minute {
break
}
}
// It is too expensive to stop recovery and wait for a small number of regions
// to complete scatter, so the maximum waiting time is reduced to 1 minute.
_ = regionSplitter.WaitForScatterRegions(ctx, scatterRegions, time.Minute)
}()

iter := helper.iterator()
Expand Down
112 changes: 112 additions & 0 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type TestClient struct {
regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions
nextRegionID uint64
injectInScatter func(*split.RegionInfo) error
injectInOperator func(uint64) (*pdpb.GetOperatorResponse, error)
supportBatchScatter bool

scattered map[uint64]bool
Expand Down Expand Up @@ -215,6 +216,9 @@ func (c *TestClient) ScatterRegion(ctx context.Context, regionInfo *split.Region
}

func (c *TestClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
if c.injectInOperator != nil {
return c.injectInOperator(regionID)
}
return &pdpb.GetOperatorResponse{
Header: new(pdpb.ResponseHeader),
}, nil
Expand Down Expand Up @@ -337,6 +341,114 @@ func TestSplitAndScatter(t *testing.T) {
client := initTestClient(false)
runTestSplitAndScatterWith(t, client)
})
t.Run("WaitScatter", func(t *testing.T) {
client := initTestClient(false)
client.InstallBatchScatterSupport()
runWaitScatter(t, client)
})
}

func TestXXX(t *testing.T) {
client := initTestClient(false)
client.InstallBatchScatterSupport()
runWaitScatter(t, client)
}

// +------------+----------------------------
// | region | states
// +------------+----------------------------
// | [ , aay) | SUCCESS
// +------------+----------------------------
// | [aay, bba) | CANCEL, SUCCESS
// +------------+----------------------------
// | [bba, bbh) | RUNNING, TIMEOUT, SUCCESS
// +------------+----------------------------
// | [bbh, cca) | <NOT_SCATTER_OPEARTOR>
// +------------+----------------------------
// | [cca, ) | CANCEL, RUNNING, SUCCESS
// +------------+----------------------------
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
// states:
func runWaitScatter(t *testing.T, client *TestClient) {
// configuration
type Operatorstates struct {
index int
status []pdpb.OperatorStatus
}
results := map[string]*Operatorstates{
"": {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_SUCCESS}},
string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_CANCEL, pdpb.OperatorStatus_SUCCESS}},
string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_RUNNING, pdpb.OperatorStatus_TIMEOUT, pdpb.OperatorStatus_SUCCESS}},
string(codec.EncodeBytesExt([]byte{}, []byte("bbh"), false)): {},
string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_CANCEL, pdpb.OperatorStatus_RUNNING, pdpb.OperatorStatus_SUCCESS}},
}
// after test done, the `leftScatterCount` should be empty
leftScatterCount := map[string]int{
string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): 1,
string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): 1,
string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): 1,
}
client.injectInScatter = func(ri *split.RegionInfo) error {
states, ok := results[string(ri.Region.StartKey)]
require.True(t, ok)
require.NotEqual(t, 0, len(states.status))
require.NotEqual(t, pdpb.OperatorStatus_SUCCESS, states.status[states.index])
states.index += 1
cnt, ok := leftScatterCount[string(ri.Region.StartKey)]
require.True(t, ok)
if cnt == 1 {
delete(leftScatterCount, string(ri.Region.StartKey))
} else {
leftScatterCount[string(ri.Region.StartKey)] = cnt - 1
}
return nil
}
regionsMap := client.GetAllRegions()
leftOperatorCount := map[string]int{
"": 1,
string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): 2,
string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): 3,
string(codec.EncodeBytesExt([]byte{}, []byte("bbh"), false)): 1,
string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): 3,
}
client.injectInOperator = func(u uint64) (*pdpb.GetOperatorResponse, error) {
ri := regionsMap[u]
cnt, ok := leftOperatorCount[string(ri.Region.StartKey)]
require.True(t, ok)
if cnt == 1 {
delete(leftOperatorCount, string(ri.Region.StartKey))
} else {
leftOperatorCount[string(ri.Region.StartKey)] = cnt - 1
}
states, ok := results[string(ri.Region.StartKey)]
require.True(t, ok)
if len(states.status) == 0 {
return &pdpb.GetOperatorResponse{
Desc: []byte("other"),
}, nil
}
if states.status[states.index] == pdpb.OperatorStatus_RUNNING {
states.index += 1
return &pdpb.GetOperatorResponse{
Desc: []byte("scatter-region"),
Status: states.status[states.index-1],
}, nil
}
return &pdpb.GetOperatorResponse{
Desc: []byte("scatter-region"),
Status: states.status[states.index],
}, nil
}

// begin to test
ctx := context.Background()
regions := make([]*split.RegionInfo, 0, len(regionsMap))
for _, info := range regionsMap {
regions = append(regions, info)
}
regionSplitter := restore.NewRegionSplitter(client)
leftCnt := regionSplitter.WaitForScatterRegions(ctx, regions, 2000*time.Second)
require.Equal(t, leftCnt, 0)
}

func runTestSplitAndScatterWith(t *testing.T, client *TestClient) {
Expand Down

0 comments on commit 0abd132

Please sign in to comment.