Skip to content

Commit 39b4cf0

Browse files
crazycs520winkyao
authored andcommitted
executor/split: return split result when do split region and r… (#11484)
1 parent a93a97a commit 39b4cf0

File tree

9 files changed

+143
-63
lines changed

9 files changed

+143
-63
lines changed

ddl/split_region.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter
121121

122122
func waitScatterRegionFinish(store kv.SplitableStore, regionIDs ...uint64) {
123123
for _, regionID := range regionIDs {
124-
err := store.WaitScatterRegionFinish(regionID)
124+
err := store.WaitScatterRegionFinish(regionID, 0)
125125
if err != nil {
126126
logutil.Logger(context.Background()).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
127127
}

executor/builder.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1265,8 +1265,9 @@ func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executo
12651265
}
12661266

12671267
func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor {
1268-
base := newBaseExecutor(b.ctx, nil, v.ExplainID())
1269-
base.initCap = chunk.ZeroCapacity
1268+
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
1269+
base.initCap = 1
1270+
base.maxChunkSize = 1
12701271
if v.IndexInfo != nil {
12711272
return &SplitIndexRegionExec{
12721273
baseExecutor: base,

executor/executor_test.go

+6-13
Original file line numberDiff line numberDiff line change
@@ -1976,16 +1976,9 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) {
19761976
tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
19771977
tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
19781978
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
1979-
_, err := tk.Exec(`split table t between (0) and (10000) regions 10`)
1980-
c.Assert(err, NotNil)
1981-
c.Assert(err.Error(), Equals, "split region timeout(1s)")
1979+
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
1980+
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
19821981
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)
1983-
1984-
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout", `return(true)`), IsNil)
1985-
_, err = tk.Exec(`split table t between (0) and (10000) regions 10`)
1986-
c.Assert(err, NotNil)
1987-
c.Assert(err.Error(), Equals, "wait split region scatter timeout(1s)")
1988-
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout"), IsNil)
19891982
}
19901983

19911984
func (s *testSuite) TestRow(c *C) {
@@ -3952,7 +3945,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
39523945

39533946
// Test show table regions.
39543947
tk.MustExec(`split table t_regions1 by (0)`)
3955-
tk.MustExec(`split table t_regions between (-10000) and (10000) regions 4;`)
3948+
tk.MustQuery(`split table t_regions between (-10000) and (10000) regions 4;`).Check(testkit.Rows("3 1"))
39563949
re := tk.MustQuery("show table t_regions regions")
39573950
rows := re.Rows()
39583951
// Table t_regions should have 4 regions now.
@@ -3967,7 +3960,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
39673960
c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_5000", tbl.Meta().ID))
39683961

39693962
// Test show table index regions.
3970-
tk.MustExec(`split table t_regions index idx between (-1000) and (1000) regions 4;`)
3963+
tk.MustQuery(`split table t_regions index idx between (-1000) and (1000) regions 4;`).Check(testkit.Rows("4 1"))
39713964
re = tk.MustQuery("show table t_regions index idx regions")
39723965
rows = re.Rows()
39733966
// The index `idx` of table t_regions should have 4 regions now.
@@ -3997,7 +3990,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
39973990

39983991
// Test show table regions.
39993992
tk.MustExec(`set @@session.tidb_wait_split_region_finish=1;`)
4000-
tk.MustExec(`split table t_regions between (0) and (10000) regions 4;`)
3993+
tk.MustQuery(`split table t_regions by (2500),(5000),(7500);`).Check(testkit.Rows("3 1"))
40013994
re = tk.MustQuery("show table t_regions regions")
40023995
rows = re.Rows()
40033996
// Table t_regions should have 4 regions now.
@@ -4010,7 +4003,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
40104003
c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_7500", tbl.Meta().ID))
40114004

40124005
// Test show table index regions.
4013-
tk.MustExec(`split table t_regions index idx between (0) and (1000) regions 4;`)
4006+
tk.MustQuery(`split table t_regions index idx by (250),(500),(750);`).Check(testkit.Rows("4 1"))
40144007
re = tk.MustQuery("show table t_regions index idx regions")
40154008
rows = re.Rows()
40164009
// The index `idx` of table t_regions should have 4 regions now.

executor/split.go

100644100755
+112-41
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/pingcap/parser/model"
2929
"github.com/pingcap/parser/mysql"
3030
"github.com/pingcap/tidb/kv"
31+
"github.com/pingcap/tidb/sessionctx"
3132
"github.com/pingcap/tidb/store/tikv"
3233
"github.com/pingcap/tidb/table/tables"
3334
"github.com/pingcap/tidb/tablecodec"
@@ -48,10 +49,37 @@ type SplitIndexRegionExec struct {
4849
upper []types.Datum
4950
num int
5051
valueLists [][]types.Datum
52+
53+
done bool
54+
splitRegionResult
55+
}
56+
57+
type splitRegionResult struct {
58+
splitRegions int
59+
finishScatterNum int
60+
}
61+
62+
// Open implements the Executor Open interface.
63+
func (e *SplitIndexRegionExec) Open(ctx context.Context) error {
64+
return e.splitIndexRegion(ctx)
5165
}
5266

5367
// Next implements the Executor Next interface.
54-
func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
68+
func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
69+
chk.Reset()
70+
if e.done {
71+
return nil
72+
}
73+
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
74+
e.done = true
75+
return nil
76+
}
77+
78+
// checkScatterRegionFinishBackOff is the back off time that used to check if a region has finished scattering before split region timeout.
79+
const checkScatterRegionFinishBackOff = 50
80+
81+
// splitIndexRegion is used to split index regions.
82+
func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
5583
store := e.ctx.GetStore()
5684
s, ok := store.(kv.SplitableStore)
5785
if !ok {
@@ -62,10 +90,15 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
6290
return err
6391
}
6492

93+
start := time.Now()
6594
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
6695
defer cancel()
6796
regionIDs := make([]uint64, 0, len(splitIdxKeys))
6897
for _, idxKey := range splitIdxKeys {
98+
if isCtxDone(ctxWithTimeout) {
99+
break
100+
}
101+
69102
regionID, err := s.SplitRegion(idxKey, true)
70103
if err != nil {
71104
logutil.Logger(context.Background()).Warn("split table index region failed",
@@ -74,28 +107,17 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
74107
zap.Error(err))
75108
continue
76109
}
110+
if regionID == 0 {
111+
continue
112+
}
77113
regionIDs = append(regionIDs, regionID)
78114

79-
if isCtxDone(ctxWithTimeout) {
80-
return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
81-
}
82115
}
116+
e.splitRegions = len(regionIDs)
83117
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
84118
return nil
85119
}
86-
for _, regionID := range regionIDs {
87-
err := s.WaitScatterRegionFinish(regionID)
88-
if err != nil {
89-
logutil.Logger(context.Background()).Warn("wait scatter region failed",
90-
zap.Uint64("regionID", regionID),
91-
zap.String("table", e.tableInfo.Name.L),
92-
zap.String("index", e.indexInfo.Name.L),
93-
zap.Error(err))
94-
}
95-
if isCtxDone(ctxWithTimeout) {
96-
return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
97-
}
98-
}
120+
e.finishScatterNum = waitScatterRegionFinish(ctxWithTimeout, e.ctx, start, s, regionIDs, e.tableInfo.Name.L, e.indexInfo.Name.L)
99121
return nil
100122
}
101123

@@ -225,16 +247,35 @@ type SplitTableRegionExec struct {
225247
upper types.Datum
226248
num int
227249
valueLists [][]types.Datum
250+
251+
done bool
252+
splitRegionResult
253+
}
254+
255+
// Open implements the Executor Open interface.
256+
func (e *SplitTableRegionExec) Open(ctx context.Context) error {
257+
return e.splitTableRegion(ctx)
228258
}
229259

230260
// Next implements the Executor Next interface.
231-
func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
261+
func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
262+
chk.Reset()
263+
if e.done {
264+
return nil
265+
}
266+
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
267+
e.done = true
268+
return nil
269+
}
270+
271+
func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
232272
store := e.ctx.GetStore()
233273
s, ok := store.(kv.SplitableStore)
234274
if !ok {
235275
return nil
236276
}
237277

278+
start := time.Now()
238279
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
239280
defer cancel()
240281

@@ -244,48 +285,78 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
244285
}
245286
regionIDs := make([]uint64, 0, len(splitKeys))
246287
for _, key := range splitKeys {
288+
failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
289+
if val.(bool) {
290+
time.Sleep(time.Second*1 + time.Millisecond*10)
291+
}
292+
})
293+
if isCtxDone(ctxWithTimeout) {
294+
break
295+
}
247296
regionID, err := s.SplitRegion(key, true)
248297
if err != nil {
249298
logutil.Logger(context.Background()).Warn("split table region failed",
250299
zap.String("table", e.tableInfo.Name.L),
251300
zap.Error(err))
252301
continue
253302
}
303+
if regionID == 0 {
304+
continue
305+
}
254306
regionIDs = append(regionIDs, regionID)
255307

256-
failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
257-
if val.(bool) {
258-
time.Sleep(time.Second * 1)
259-
}
260-
})
261-
262-
if isCtxDone(ctxWithTimeout) {
263-
return errors.Errorf("split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
264-
}
265308
}
309+
e.splitRegions = len(regionIDs)
266310
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
267311
return nil
268312
}
313+
314+
e.finishScatterNum = waitScatterRegionFinish(ctxWithTimeout, e.ctx, start, s, regionIDs, e.tableInfo.Name.L, "")
315+
return nil
316+
}
317+
318+
func waitScatterRegionFinish(ctxWithTimeout context.Context, sctx sessionctx.Context, startTime time.Time, store kv.SplitableStore, regionIDs []uint64, tableName, indexName string) int {
319+
remainMillisecond := 0
320+
finishScatterNum := 0
269321
for _, regionID := range regionIDs {
270-
err := s.WaitScatterRegionFinish(regionID)
271-
if err != nil {
272-
logutil.Logger(context.Background()).Warn("wait scatter region failed",
273-
zap.Uint64("regionID", regionID),
274-
zap.String("table", e.tableInfo.Name.L),
275-
zap.Error(err))
322+
if isCtxDone(ctxWithTimeout) {
323+
// Do not break here for checking remain regions scatter finished with a very short backoff time.
324+
// Consider this situation - Regions 1, 2, and 3 are to be split.
325+
// Region 1 times out before scattering finishes, while Region 2 and Region 3 have finished scattering.
326+
// In this case, we should return 2 Regions, instead of 0, have finished scattering.
327+
remainMillisecond = checkScatterRegionFinishBackOff
328+
} else {
329+
remainMillisecond = int((sctx.GetSessionVars().GetSplitRegionTimeout().Seconds() - time.Since(startTime).Seconds()) * 1000)
276330
}
277331

278-
failpoint.Inject("mockScatterRegionTimeout", func(val failpoint.Value) {
279-
if val.(bool) {
280-
time.Sleep(time.Second * 1)
332+
err := store.WaitScatterRegionFinish(regionID, remainMillisecond)
333+
if err == nil {
334+
finishScatterNum++
335+
} else {
336+
if len(indexName) == 0 {
337+
logutil.Logger(context.Background()).Warn("wait scatter region failed",
338+
zap.Uint64("regionID", regionID),
339+
zap.String("table", tableName),
340+
zap.Error(err))
341+
} else {
342+
logutil.Logger(context.Background()).Warn("wait scatter region failed",
343+
zap.Uint64("regionID", regionID),
344+
zap.String("table", tableName),
345+
zap.String("index", indexName),
346+
zap.Error(err))
281347
}
282-
})
283-
284-
if isCtxDone(ctxWithTimeout) {
285-
return errors.Errorf("wait split region scatter timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
286348
}
287349
}
288-
return nil
350+
return finishScatterNum
351+
}
352+
353+
func appendSplitRegionResultToChunk(chk *chunk.Chunk, totalRegions, finishScatterNum int) {
354+
chk.AppendInt64(0, int64(totalRegions))
355+
if finishScatterNum > 0 && totalRegions > 0 {
356+
chk.AppendFloat64(1, float64(finishScatterNum)/float64(totalRegions))
357+
} else {
358+
chk.AppendFloat64(1, float64(0))
359+
}
289360
}
290361

291362
func isCtxDone(ctx context.Context) bool {

kv/kv.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,6 @@ type Iterator interface {
299299
// SplitableStore is the kv store which supports split regions.
300300
type SplitableStore interface {
301301
SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
302-
WaitScatterRegionFinish(regionID uint64) error
302+
WaitScatterRegionFinish(regionID uint64, backOff int) error
303303
CheckRegionInScattering(regionID uint64) (bool, error)
304304
}

planner/core/planbuilder.go

+9
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,13 @@ func buildTableRegionsSchema() *expression.Schema {
999999
return schema
10001000
}
10011001

1002+
func buildSplitRegionsSchema() *expression.Schema {
1003+
schema := expression.NewSchema(make([]*expression.Column, 0, 2)...)
1004+
schema.Append(buildColumn("", "TOTAL_SPLIT_REGION", mysql.TypeLonglong, 4))
1005+
schema.Append(buildColumn("", "SCATTER_FINISH_RATIO", mysql.TypeDouble, 8))
1006+
return schema
1007+
}
1008+
10021009
func buildShowDDLJobQueriesFields() *expression.Schema {
10031010
schema := expression.NewSchema(make([]*expression.Column, 0, 1)...)
10041011
schema.Append(buildColumn("", "QUERY", mysql.TypeVarchar, 256))
@@ -1687,6 +1694,7 @@ func (b *PlanBuilder) buildSplitIndexRegion(node *ast.SplitRegionStmt) (Plan, er
16871694
TableInfo: tblInfo,
16881695
IndexInfo: indexInfo,
16891696
}
1697+
p.SetSchema(buildSplitRegionsSchema())
16901698
// Split index regions by user specified value lists.
16911699
if len(node.SplitOpt.ValueLists) > 0 {
16921700
indexValues := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists))
@@ -1801,6 +1809,7 @@ func (b *PlanBuilder) buildSplitTableRegion(node *ast.SplitRegionStmt) (Plan, er
18011809
p := &SplitRegion{
18021810
TableInfo: tblInfo,
18031811
}
1812+
p.SetSchema(buildSplitRegionsSchema())
18041813
if len(node.SplitOpt.ValueLists) > 0 {
18051814
values := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists))
18061815
for i, valuesItem := range node.SplitOpt.ValueLists {

store/mockstore/mocktikv/cluster.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -319,12 +319,13 @@ func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, peerIDs []uint
319319
}
320320

321321
// SplitRaw splits a Region at the key (not encoded) and creates new Region.
322-
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) {
322+
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) *Region {
323323
c.Lock()
324324
defer c.Unlock()
325325

326326
newRegion := c.regions[regionID].split(newRegionID, rawKey, peerIDs, leaderPeerID)
327327
c.regions[newRegionID] = newRegion
328+
return newRegion
328329
}
329330

330331
// Merge merges 2 regions, their key ranges should be adjacent.

store/mockstore/mocktikv/rpc.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -597,8 +597,8 @@ func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb
597597
return &kvrpcpb.SplitRegionResponse{}
598598
}
599599
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
600-
h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
601-
return &kvrpcpb.SplitRegionResponse{}
600+
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
601+
return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta}
602602
}
603603

604604
// RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of

store/tikv/split_region.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,15 @@ func (s *tikvStore) scatterRegion(regionID uint64) error {
104104
}
105105

106106
// WaitScatterRegionFinish implements SplitableStore interface.
107-
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error {
107+
// backOff is the back off time of the wait scatter region.(Milliseconds)
108+
// if backOff <= 0, the default wait scatter back off time will be used.
109+
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error {
108110
logutil.Logger(context.Background()).Info("wait scatter region",
109111
zap.Uint64("regionID", regionID))
110-
bo := NewBackoffer(context.Background(), waitScatterRegionFinishBackoff)
112+
if backOff <= 0 {
113+
backOff = waitScatterRegionFinishBackoff
114+
}
115+
bo := NewBackoffer(context.Background(), backOff)
111116
logFreq := 0
112117
for {
113118
resp, err := s.pdClient.GetOperator(context.Background(), regionID)

0 commit comments

Comments
 (0)