From b52a42743e8fb50463d3f4e0bfcdaa8b402ad3f9 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 1 Nov 2023 21:47:06 +0800 Subject: [PATCH] local backend: fix worker err overriden by job generation err (#48185) close pingcap/tidb#47992 --- br/pkg/lightning/backend/local/engine.go | 3 ++ br/pkg/lightning/backend/local/local.go | 40 ++++++++++--------- br/pkg/lightning/backend/local/local_test.go | 37 +++++++++++++++++ .../backend/local/localhelper_test.go | 5 +++ 4 files changed, 67 insertions(+), 18 deletions(-) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 468416a39be1c..3085041356ca6 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -1001,6 +1001,9 @@ func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by LowerBound: lowerBound, UpperBound: upperBound, } + failpoint.Inject("mockGetFirstAndLastKey", func() { + failpoint.Return(lowerBound, upperBound, nil) + }) iter := e.newKVIter(context.Background(), opt) //nolint: errcheck diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 98abc7ecf02b4..3040955010a8a 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1674,26 +1674,30 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region failpoint.Label("afterStartWorker") - err := local.prepareAndSendJob( - workerCtx, - engine, - regionRanges, - regionSplitSize, - regionSplitKeys, - jobToWorkerCh, - &jobWg, - ) - if err != nil { - firstErr.Set(err) + workGroup.Go(func() error { + err := local.prepareAndSendJob( + workerCtx, + engine, + regionRanges, + regionSplitSize, + regionSplitKeys, + jobToWorkerCh, + &jobWg, + ) + if err != nil { + return err + } + + jobWg.Wait() workerCancel() - _ = workGroup.Wait() - return firstErr.Get() + return nil + }) + if err := workGroup.Wait(); err != nil { + if !common.IsContextCanceledError(err) { + log.FromContext(ctx).Error("do import meets error", zap.Error(err)) + } + firstErr.Set(err) } - - jobWg.Wait() - workerCancel() - firstErr.Set(workGroup.Wait()) - firstErr.Set(ctx.Err()) return firstErr.Get() } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index c097c5e519b06..4d5a3770e4188 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -2151,6 +2151,43 @@ func TestCtxCancelIsIgnored(t *testing.T) { require.ErrorContains(t, err, "the remaining storage capacity of TiKV") } +func TestWorkerFailedWhenGeneratingJobs(t *testing.T) { + backup := maxRetryBackoffSecond + maxRetryBackoffSecond = 1 + t.Cleanup(func() { + maxRetryBackoffSecond = backup + }) + + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace") + }) + + initRanges := []common.Range{ + {Start: []byte{'c'}, End: []byte{'d'}}, + } + + ctx := context.Background() + l := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 1, + }, + splitCli: initTestSplitClient( + [][]byte{{1}, {11}}, + panicSplitRegionClient{}, + ), + } + e := &Engine{} + err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.ErrorContains(t, err, "the remaining storage capacity of TiKV") +} + func TestExternalEngine(t *testing.T) { _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker", "return()") diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index b31482236cf0a..a2bd5a6a8c619 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -252,6 +252,11 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { c.mu.Lock() defer c.mu.Unlock() + + if err := ctx.Err(); err != nil { + return nil, err + } + if c.hook != nil { key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit) }