diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 8ff1afeddc594..34a1117b55fc9 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1142,14 +1142,16 @@ func (local *Backend) generateAndSendJob( for _, jobRange := range jobRanges { r := jobRange eg.Go(func() error { - select { - case <-egCtx.Done(): + if egCtx.Err() != nil { return nil - default: } + failpoint.Inject("beforeGenerateJob", nil) jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys) if err != nil { + if common.IsContextCanceledError(err) { + return nil + } return err } for _, job := range jobs { @@ -1186,6 +1188,9 @@ func (local *Backend) generateJobForRange( regionSplitSize, regionSplitKeys int64, ) ([]*regionJob, error) { failpoint.Inject("fakeRegionJobs", func() { + if ctx.Err() != nil { + failpoint.Return(nil, ctx.Err()) + } key := [2]string{string(keyRange.start), string(keyRange.end)} injected := fakeRegionJobs[key] // overwrite the stage to regionScanned, because some time same keyRange @@ -1562,6 +1567,7 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges jobWg.Wait() workerCancel() firstErr.Set(workGroup.Wait()) + firstErr.Set(ctx.Err()) return firstErr.Get() } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index ed98115371f43..902dd906fa040 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -2028,3 +2028,60 @@ func TestRegionJobResetRetryCounter(t *testing.T) { } } } + +func TestCtxCancelIsIgnored(t *testing.T) { + backup := maxRetryBackoffSecond + maxRetryBackoffSecond = 1 + t.Cleanup(func() { + maxRetryBackoffSecond = backup + }) + + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/beforeGenerateJob", "sleep(1000)") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/beforeGenerateJob") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace") + }) + + initRanges := []Range{ + {start: []byte{'c'}, end: []byte{'d'}}, + {start: []byte{'d'}, end: []byte{'e'}}, + } + fakeRegionJobs = map[[2]string]struct { + jobs []*regionJob + err error + }{ + {"c", "d"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'c'}, end: []byte{'d'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + {"d", "e"}: { + jobs: []*regionJob{ + { + keyRange: Range{start: []byte{'d'}, end: []byte{'e'}}, + engine: &Engine{}, + injected: getSuccessInjectedBehaviour(), + }, + }, + }, + } + + ctx := context.Background() + l := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 1, + }, + } + e := &Engine{} + err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.ErrorContains(t, err, "the remaining storage capacity of TiKV") +}