-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
br: retry to scatter the regions if status is timeout or cancel #46471
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
"github.com/pingcap/kvproto/pkg/pdpb" | ||
"github.com/pingcap/log" | ||
berrors "github.com/pingcap/tidb/br/pkg/errors" | ||
"github.com/pingcap/tidb/br/pkg/lightning/common" | ||
"github.com/pingcap/tidb/br/pkg/logutil" | ||
"github.com/pingcap/tidb/br/pkg/restore/split" | ||
"github.com/pingcap/tidb/br/pkg/rtree" | ||
|
@@ -144,21 +145,15 @@ | |
} | ||
log.Info("start to wait for scattering regions", | ||
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) | ||
startTime = time.Now() | ||
scatterCount := 0 | ||
for _, region := range scatterRegions { | ||
rs.waitForScatterRegion(ctx, region) | ||
if time.Since(startTime) > split.ScatterWaitUpperInterval { | ||
break | ||
} | ||
scatterCount++ | ||
} | ||
if scatterCount == len(scatterRegions) { | ||
|
||
leftCnt := rs.WaitForScatterRegions(ctx, scatterRegions, split.ScatterWaitUpperInterval) | ||
if leftCnt == 0 { | ||
log.Info("waiting for scattering regions done", | ||
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) | ||
} else { | ||
log.Warn("waiting for scattering regions timeout", | ||
zap.Int("scatterCount", scatterCount), | ||
zap.Int("NotScatterCount", leftCnt), | ||
zap.Int("TotalScatterCount", len(scatterRegions)), | ||
zap.Int("regions", len(scatterRegions)), | ||
zap.Duration("take", time.Since(startTime))) | ||
} | ||
|
@@ -188,26 +183,48 @@ | |
return len(regionInfo.PendingPeers) == 0, nil | ||
} | ||
|
||
func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) { | ||
// isScatterRegionFinished check the latest successful operator and return the follow status: | ||
// | ||
// return (finished, needRescatter, error) | ||
// | ||
// if the latest operator is not `scatter-operator`, or its status is SUCCESS, it's likely that the | ||
// scatter region operator is finished. | ||
// | ||
// if the latest operator is `scatter-operator` and its status is TIMEOUT or CANCEL, the needRescatter | ||
// is true and the function caller needs to scatter this region again. | ||
func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, bool, error) { | ||
resp, err := rs.client.GetOperator(ctx, regionID) | ||
if err != nil { | ||
return false, errors.Trace(err) | ||
if common.IsRetryableError(err) { | ||
// retry in the next cycle | ||
return false, false, nil | ||
} | ||
return false, false, errors.Trace(err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not retry here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
// Heartbeat may not be sent to PD | ||
if respErr := resp.GetHeader().GetError(); respErr != nil { | ||
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { | ||
return true, nil | ||
return true, false, nil | ||
} | ||
return false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType()) | ||
return false, false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType()) | ||
} | ||
retryTimes := ctx.Value(retryTimes).(int) | ||
if retryTimes > 3 { | ||
log.Info("get operator", zap.Uint64("regionID", regionID), zap.Stringer("resp", resp)) | ||
} | ||
// If the current operator of the region is not 'scatter-region', we could assume | ||
// that 'scatter-operator' has finished or timeout | ||
ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING | ||
return ok, nil | ||
// that 'scatter-operator' has finished | ||
if string(resp.GetDesc()) != "scatter-region" { | ||
return true, false, nil | ||
} | ||
switch resp.GetStatus() { | ||
case pdpb.OperatorStatus_SUCCESS: | ||
return true, false, nil | ||
case pdpb.OperatorStatus_RUNNING: | ||
return false, false, nil | ||
default: | ||
return false, true, nil | ||
} | ||
} | ||
|
||
func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) { | ||
|
@@ -233,26 +250,66 @@ | |
|
||
var retryTimes = new(retryTimeKey) | ||
|
||
func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) { | ||
interval := split.ScatterWaitInterval | ||
regionID := regionInfo.Region.GetId() | ||
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ { | ||
ctx1 := context.WithValue(ctx, retryTimes, i) | ||
ok, err := rs.isScatterRegionFinished(ctx1, regionID) | ||
if err != nil { | ||
log.Warn("scatter region failed: do not have the region", | ||
logutil.Region(regionInfo.Region)) | ||
return | ||
func mapRegionInfoSlice(regionInfos []*split.RegionInfo) map[uint64]*split.RegionInfo { | ||
regionInfoMap := make(map[uint64]*split.RegionInfo) | ||
for _, info := range regionInfos { | ||
regionID := info.Region.GetId() | ||
regionInfoMap[regionID] = info | ||
} | ||
return regionInfoMap | ||
} | ||
|
||
func (rs *RegionSplitter) WaitForScatterRegions(ctx context.Context, regionInfos []*split.RegionInfo, timeout time.Duration) int { | ||
var ( | ||
startTime = time.Now() | ||
interval = split.ScatterWaitInterval | ||
leftRegions = mapRegionInfoSlice(regionInfos) | ||
retryCnt = 0 | ||
|
||
reScatterRegions = make([]*split.RegionInfo, 0, len(regionInfos)) | ||
) | ||
for { | ||
ctx1 := context.WithValue(ctx, retryTimes, retryCnt) | ||
reScatterRegions = reScatterRegions[:0] | ||
for regionID, regionInfo := range leftRegions { | ||
ok, rescatter, err := rs.isScatterRegionFinished(ctx1, regionID) | ||
if err != nil { | ||
log.Warn("scatter region failed: do not have the region", | ||
logutil.Region(regionInfo.Region), zap.Error(err)) | ||
delete(leftRegions, regionID) | ||
continue | ||
} | ||
if ok { | ||
delete(leftRegions, regionID) | ||
continue | ||
} | ||
if rescatter { | ||
reScatterRegions = append(reScatterRegions, regionInfo) | ||
} | ||
// RUNNING_STATUS, just wait and check it in the next loop | ||
} | ||
if ok { | ||
|
||
if len(leftRegions) == 0 { | ||
return 0 | ||
} | ||
|
||
if len(reScatterRegions) > 0 { | ||
rs.ScatterRegions(ctx1, reScatterRegions) | ||
} | ||
|
||
if time.Since(startTime) > timeout { | ||
break | ||
} | ||
|
||
retryCnt += 1 | ||
interval = 2 * interval | ||
if interval > split.ScatterMaxWaitInterval { | ||
interval = split.ScatterMaxWaitInterval | ||
} | ||
time.Sleep(interval) | ||
} | ||
|
||
return len(leftRegions) | ||
} | ||
|
||
func (rs *RegionSplitter) splitAndScatterRegions( | ||
|
@@ -780,16 +837,10 @@ | |
} | ||
} | ||
|
||
startTime := time.Now() | ||
regionSplitter := NewRegionSplitter(helper.client) | ||
for _, region := range scatterRegions { | ||
regionSplitter.waitForScatterRegion(ctx, region) | ||
// It is too expensive to stop recovery and wait for a small number of regions | ||
// to complete scatter, so the maximum waiting time is reduced to 1 minute. | ||
if time.Since(startTime) > time.Minute { | ||
break | ||
} | ||
} | ||
// It is too expensive to stop recovery and wait for a small number of regions | ||
// to complete scatter, so the maximum waiting time is reduced to 1 minute. | ||
_ = regionSplitter.WaitForScatterRegions(ctx, scatterRegions, time.Minute) | ||
}() | ||
|
||
iter := helper.iterator() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ type TestClient struct { | |
regionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions | ||
nextRegionID uint64 | ||
injectInScatter func(*split.RegionInfo) error | ||
injectInOperator func(uint64) (*pdpb.GetOperatorResponse, error) | ||
supportBatchScatter bool | ||
|
||
scattered map[uint64]bool | ||
|
@@ -215,6 +216,9 @@ func (c *TestClient) ScatterRegion(ctx context.Context, regionInfo *split.Region | |
} | ||
|
||
func (c *TestClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { | ||
if c.injectInOperator != nil { | ||
return c.injectInOperator(regionID) | ||
} | ||
return &pdpb.GetOperatorResponse{ | ||
Header: new(pdpb.ResponseHeader), | ||
}, nil | ||
|
@@ -337,6 +341,114 @@ func TestSplitAndScatter(t *testing.T) { | |
client := initTestClient(false) | ||
runTestSplitAndScatterWith(t, client) | ||
}) | ||
t.Run("WaitScatter", func(t *testing.T) { | ||
client := initTestClient(false) | ||
client.InstallBatchScatterSupport() | ||
runWaitScatter(t, client) | ||
}) | ||
} | ||
|
||
func TestXXX(t *testing.T) { | ||
client := initTestClient(false) | ||
client.InstallBatchScatterSupport() | ||
runWaitScatter(t, client) | ||
} | ||
Comment on lines
+351
to
+355
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove this or give it a better name? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a test to test the unit test, which I forget to remove... |
||
|
||
// +------------+---------------------------- | ||
// | region | states | ||
// +------------+---------------------------- | ||
// | [ , aay) | SUCCESS | ||
// +------------+---------------------------- | ||
// | [aay, bba) | CANCEL, SUCCESS | ||
// +------------+---------------------------- | ||
// | [bba, bbh) | RUNNING, TIMEOUT, SUCCESS | ||
// +------------+---------------------------- | ||
// | [bbh, cca) | <NOT_SCATTER_OPEARTOR> | ||
// +------------+---------------------------- | ||
// | [cca, ) | CANCEL, RUNNING, SUCCESS | ||
// +------------+---------------------------- | ||
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) | ||
// states: | ||
func runWaitScatter(t *testing.T, client *TestClient) { | ||
// configuration | ||
type Operatorstates struct { | ||
index int | ||
status []pdpb.OperatorStatus | ||
} | ||
results := map[string]*Operatorstates{ | ||
"": {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_SUCCESS}}, | ||
string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_CANCEL, pdpb.OperatorStatus_SUCCESS}}, | ||
string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_RUNNING, pdpb.OperatorStatus_TIMEOUT, pdpb.OperatorStatus_SUCCESS}}, | ||
string(codec.EncodeBytesExt([]byte{}, []byte("bbh"), false)): {}, | ||
string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): {status: []pdpb.OperatorStatus{pdpb.OperatorStatus_CANCEL, pdpb.OperatorStatus_RUNNING, pdpb.OperatorStatus_SUCCESS}}, | ||
} | ||
// after test done, the `leftScatterCount` should be empty | ||
leftScatterCount := map[string]int{ | ||
string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): 1, | ||
string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): 1, | ||
string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): 1, | ||
} | ||
client.injectInScatter = func(ri *split.RegionInfo) error { | ||
states, ok := results[string(ri.Region.StartKey)] | ||
require.True(t, ok) | ||
require.NotEqual(t, 0, len(states.status)) | ||
require.NotEqual(t, pdpb.OperatorStatus_SUCCESS, states.status[states.index]) | ||
states.index += 1 | ||
cnt, ok := leftScatterCount[string(ri.Region.StartKey)] | ||
require.True(t, ok) | ||
if cnt == 1 { | ||
delete(leftScatterCount, string(ri.Region.StartKey)) | ||
} else { | ||
leftScatterCount[string(ri.Region.StartKey)] = cnt - 1 | ||
} | ||
return nil | ||
} | ||
regionsMap := client.GetAllRegions() | ||
leftOperatorCount := map[string]int{ | ||
"": 1, | ||
string(codec.EncodeBytesExt([]byte{}, []byte("aay"), false)): 2, | ||
string(codec.EncodeBytesExt([]byte{}, []byte("bba"), false)): 3, | ||
string(codec.EncodeBytesExt([]byte{}, []byte("bbh"), false)): 1, | ||
string(codec.EncodeBytesExt([]byte{}, []byte("cca"), false)): 3, | ||
} | ||
client.injectInOperator = func(u uint64) (*pdpb.GetOperatorResponse, error) { | ||
ri := regionsMap[u] | ||
cnt, ok := leftOperatorCount[string(ri.Region.StartKey)] | ||
require.True(t, ok) | ||
if cnt == 1 { | ||
delete(leftOperatorCount, string(ri.Region.StartKey)) | ||
} else { | ||
leftOperatorCount[string(ri.Region.StartKey)] = cnt - 1 | ||
} | ||
states, ok := results[string(ri.Region.StartKey)] | ||
require.True(t, ok) | ||
if len(states.status) == 0 { | ||
return &pdpb.GetOperatorResponse{ | ||
Desc: []byte("other"), | ||
}, nil | ||
} | ||
if states.status[states.index] == pdpb.OperatorStatus_RUNNING { | ||
states.index += 1 | ||
return &pdpb.GetOperatorResponse{ | ||
Desc: []byte("scatter-region"), | ||
Status: states.status[states.index-1], | ||
}, nil | ||
} | ||
return &pdpb.GetOperatorResponse{ | ||
Desc: []byte("scatter-region"), | ||
Status: states.status[states.index], | ||
}, nil | ||
} | ||
|
||
// begin to test | ||
ctx := context.Background() | ||
regions := make([]*split.RegionInfo, 0, len(regionsMap)) | ||
for _, info := range regionsMap { | ||
regions = append(regions, info) | ||
} | ||
regionSplitter := restore.NewRegionSplitter(client) | ||
leftCnt := regionSplitter.WaitForScatterRegions(ctx, regions, 2000*time.Second) | ||
require.Equal(t, leftCnt, 0) | ||
} | ||
|
||
func runTestSplitAndScatterWith(t *testing.T, client *TestClient) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the latest operator is not scatter-operator
Will it become a mistake when the scatter-operator cancelled and another operator succeed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. But it is ignored usually.