diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 5fa6f361a4e88..59d36c3d53a56 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -1613,9 +1613,9 @@ func (d *ddl) preSplitAndScatter(ctx sessionctx.Context, tbInfo *model.TableInfo } pi := tbInfo.GetPartitionInfo() if pi != nil { - preSplit = func() { splitPartitionTableRegion(sp, pi, scatterRegion) } + preSplit = func() { splitPartitionTableRegion(ctx, sp, pi, scatterRegion) } } else { - preSplit = func() { splitTableRegion(sp, tbInfo, scatterRegion) } + preSplit = func() { splitTableRegion(ctx, sp, tbInfo, scatterRegion) } } if scatterRegion { preSplit() diff --git a/ddl/split_region.go b/ddl/split_region.go index 9b88fd250988d..466aa3c834763 100644 --- a/ddl/split_region.go +++ b/ddl/split_region.go @@ -18,34 +18,39 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) -func splitPartitionTableRegion(store kv.SplittableStore, pi *model.PartitionInfo, scatter bool) { +func splitPartitionTableRegion(ctx sessionctx.Context, store kv.SplittableStore, pi *model.PartitionInfo, scatter bool) { // Max partition count is 4096, should we sample and just choose some of the partition to split? regionIDs := make([]uint64, 0, len(pi.Definitions)) + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), ctx.GetSessionVars().GetSplitRegionTimeout()) + defer cancel() for _, def := range pi.Definitions { - regionIDs = append(regionIDs, splitRecordRegion(store, def.ID, scatter)) + regionIDs = append(regionIDs, splitRecordRegion(ctxWithTimeout, store, def.ID, scatter)) } if scatter { - waitScatterRegionFinish(store, regionIDs...) + waitScatterRegionFinish(ctxWithTimeout, store, regionIDs...) } } -func splitTableRegion(store kv.SplittableStore, tbInfo *model.TableInfo, scatter bool) { +func splitTableRegion(ctx sessionctx.Context, store kv.SplittableStore, tbInfo *model.TableInfo, scatter bool) { + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), ctx.GetSessionVars().GetSplitRegionTimeout()) + defer cancel() if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 { - splitPreSplitedTable(store, tbInfo, scatter) + splitPreSplitedTable(ctxWithTimeout, store, tbInfo, scatter) } else { - regionID := splitRecordRegion(store, tbInfo.ID, scatter) + regionID := splitRecordRegion(ctxWithTimeout, store, tbInfo.ID, scatter) if scatter { - waitScatterRegionFinish(store, regionID) + waitScatterRegionFinish(ctxWithTimeout, store, regionID) } } } -func splitPreSplitedTable(store kv.SplittableStore, tbInfo *model.TableInfo, scatter bool) { +func splitPreSplitedTable(ctx context.Context, store kv.SplittableStore, tbInfo *model.TableInfo, scatter bool) { // Example: // ShardRowIDBits = 4 // PreSplitRegions = 2 @@ -80,20 +85,20 @@ func splitPreSplitedTable(store kv.SplittableStore, tbInfo *model.TableInfo, sca splitTableKeys = append(splitTableKeys, key) } var err error - regionIDs, err := store.SplitRegions(context.Background(), splitTableKeys, scatter) + regionIDs, err := store.SplitRegions(ctx, splitTableKeys, scatter) if err != nil { logutil.BgLogger().Warn("[ddl] pre split some table regions failed", zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err)) } regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...) if scatter { - waitScatterRegionFinish(store, regionIDs...) + waitScatterRegionFinish(ctx, store, regionIDs...) } } -func splitRecordRegion(store kv.SplittableStore, tableID int64, scatter bool) uint64 { +func splitRecordRegion(ctx context.Context, store kv.SplittableStore, tableID int64, scatter bool) uint64 { tableStartKey := tablecodec.GenTablePrefix(tableID) - regionIDs, err := store.SplitRegions(context.Background(), [][]byte{tableStartKey}, scatter) + regionIDs, err := store.SplitRegions(ctx, [][]byte{tableStartKey}, scatter) if err != nil { // It will be automatically split by TiKV later. logutil.BgLogger().Warn("[ddl] split table region failed", zap.Error(err)) @@ -118,11 +123,12 @@ func splitIndexRegion(store kv.SplittableStore, tblInfo *model.TableInfo, scatte return regionIDs } -func waitScatterRegionFinish(store kv.SplittableStore, regionIDs ...uint64) { +func waitScatterRegionFinish(ctx context.Context, store kv.SplittableStore, regionIDs ...uint64) { for _, regionID := range regionIDs { - err := store.WaitScatterRegionFinish(regionID, 0) + err := store.WaitScatterRegionFinish(ctx, regionID, 0) if err != nil { logutil.BgLogger().Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err)) + return } } } diff --git a/executor/executor_test.go b/executor/executor_test.go index b3138df460a4d..3e92928a74eb6 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2239,6 +2239,16 @@ func (s *testSuite7) TestSplitRegionTimeout(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil) tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("10 1")) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil) + + // Test pre-split with timeout. + tk.MustExec("drop table if exists t") + tk.MustExec("set @@global.tidb_scatter_region=1;") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil) + atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1) + start := time.Now() + tk.MustExec("create table t (a int, b int) partition by hash(a) partitions 5;") + c.Assert(time.Since(start).Seconds(), Less, 10.0) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil) } func (s *testSuiteP2) TestRow(c *C) { @@ -5632,7 +5642,7 @@ func (s *testSuite1) TestInsertIntoGivenPartitionSet(c *C) { tk.MustExec("use test;") tk.MustExec("drop table if exists t1") tk.MustExec(`create table t1( - a int(11) DEFAULT NULL, + a int(11) DEFAULT NULL, b varchar(10) DEFAULT NULL, UNIQUE KEY idx_a (a)) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (10) ENGINE = InnoDB, diff --git a/executor/split.go b/executor/split.go index 817425b6342f3..a560d3cb0d372 100755 --- a/executor/split.go +++ b/executor/split.go @@ -96,7 +96,7 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error { start := time.Now() ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() - regionIDs, err := s.SplitRegions(context.Background(), e.splitIdxKeys, true) + regionIDs, err := s.SplitRegions(ctxWithTimeout, e.splitIdxKeys, true) if err != nil { logutil.BgLogger().Warn("split table index region failed", zap.String("table", e.tableInfo.Name.L), @@ -398,7 +398,7 @@ func waitScatterRegionFinish(ctxWithTimeout context.Context, sctx sessionctx.Con remainMillisecond = int((sctx.GetSessionVars().GetSplitRegionTimeout().Seconds() - time.Since(startTime).Seconds()) * 1000) } - err := store.WaitScatterRegionFinish(regionID, remainMillisecond) + err := store.WaitScatterRegionFinish(ctxWithTimeout, regionID, remainMillisecond) if err == nil { finishScatterNum++ } else { diff --git a/kv/kv.go b/kv/kv.go index 1aa94b54a1c12..4e6f2239db0ed 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -419,7 +419,7 @@ type Iterator interface { // SplittableStore is the kv store which supports split regions. type SplittableStore interface { SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool) (regionID []uint64, err error) - WaitScatterRegionFinish(regionID uint64, backOff int) error + WaitScatterRegionFinish(ctx context.Context, regionID uint64, backOff int) error CheckRegionInScattering(regionID uint64) (bool, error) } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 309d331b5be3f..17cf625608ec8 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -495,7 +495,7 @@ func preSplitAndScatterIn2PC(ctx context.Context, store *tikvStore, group groupe } for _, regionID := range regionIDs { - err := store.WaitScatterRegionFinish(regionID, 0) + err := store.WaitScatterRegionFinish(ctx, regionID, 0) if err != nil { logutil.BgLogger().Warn("2PC wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err)) } diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 65594c0e6950b..f9f367d460606 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -169,7 +169,7 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo } for i, r := range spResp.Regions { - if err = s.scatterRegion(r.Id); err == nil { + if err = s.scatterRegion(bo.ctx, r.Id); err == nil { logutil.BgLogger().Info("batch split regions, scatter region complete", zap.Uint64("batch region ID", batch.regionID.id), zap.Stringer("at", kv.Key(batch.keys[i])), @@ -207,18 +207,19 @@ func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatte return regionIDs, errors.Trace(err) } -func (s *tikvStore) scatterRegion(regionID uint64) error { - failpoint.Inject("MockScatterRegionTimeout", func(val failpoint.Value) { - if val.(bool) { - failpoint.Return(ErrPDServerTimeout) - } - }) - +func (s *tikvStore) scatterRegion(ctx context.Context, regionID uint64) error { logutil.BgLogger().Info("start scatter region", zap.Uint64("regionID", regionID)) - bo := NewBackoffer(context.Background(), scatterRegionBackoff) + bo := NewBackoffer(ctx, scatterRegionBackoff) for { - err := s.pdClient.ScatterRegion(context.Background(), regionID) + err := s.pdClient.ScatterRegion(ctx, regionID) + + failpoint.Inject("MockScatterRegionTimeout", func(val failpoint.Value) { + if val.(bool) { + err = ErrPDServerTimeout + } + }) + if err == nil { break } @@ -235,17 +236,17 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { // WaitScatterRegionFinish implements SplittableStore interface. // backOff is the back off time of the wait scatter region.(Milliseconds) // if backOff <= 0, the default wait scatter back off time will be used. -func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error { +func (s *tikvStore) WaitScatterRegionFinish(ctx context.Context, regionID uint64, backOff int) error { if backOff <= 0 { backOff = waitScatterRegionFinishBackoff } logutil.BgLogger().Info("wait scatter region", zap.Uint64("regionID", regionID), zap.Int("backoff(ms)", backOff)) - bo := NewBackoffer(context.Background(), backOff) + bo := NewBackoffer(ctx, backOff) logFreq := 0 for { - resp, err := s.pdClient.GetOperator(context.Background(), regionID) + resp, err := s.pdClient.GetOperator(ctx, regionID) if err == nil && resp != nil { if !bytes.Equal(resp.Desc, []byte("scatter-region")) || resp.Status != pdpb.OperatorStatus_RUNNING { logutil.BgLogger().Info("wait scatter region finished",