diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 04ac7dce7a7fe..745454a372b18 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -32,6 +32,7 @@ import ( "github.com/google/btree" "github.com/google/uuid" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" @@ -1224,6 +1225,15 @@ func (w *Writer) flushKVs(ctx context.Context) error { if err != nil { return errors.Trace(err) } + + failpoint.Inject("orphanWriterGoRoutine", func() { + _ = common.KillMySelf() + // mimic we meet context cancel error when `addSST` + <-ctx.Done() + time.Sleep(5 * time.Second) + failpoint.Return(errors.Trace(ctx.Err())) + }) + err = w.addSST(ctx, meta) if err != nil { return errors.Trace(err) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index ae83d41efd7f6..6ed4b40335b96 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -598,6 +598,11 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti o.logger.Error("restore failed", log.ShortError(err)) return errors.Trace(err) } + + failpoint.Inject("orphanWriterGoRoutine", func() { + // don't exit too quickly to expose panic + defer time.Sleep(time.Second * 10) + }) defer procedure.Close() err = procedure.Run(ctx) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 6b1372c0bca9e..a3562cb436a5c 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -501,6 +501,7 @@ func (tr *TableRestore) restoreEngine( metrics, _ := metric.FromContext(ctx) // Restore table data +ChunkLoop: for chunkIndex, chunk := range cp.Chunks { if rc.status != nil && rc.status.backend == config.BackendTiDB { rc.status.FinishedFileSize.Add(chunk.Chunk.Offset - chunk.Key.Offset) @@ -524,9 +525,15 @@ func (tr *TableRestore) restoreEngine( } checkFlushLock.Unlock() + failpoint.Inject("orphanWriterGoRoutine", func() { + if chunkIndex > 0 { + <-pCtx.Done() + } + }) + select { case <-pCtx.Done(): - return nil, pCtx.Err() + break ChunkLoop default: } @@ -615,6 +622,11 @@ func (tr *TableRestore) restoreEngine( } wg.Wait() + select { + case <-pCtx.Done(): + return nil, pCtx.Err() + default: + } // Report some statistics into the log for debugging. totalKVSize := uint64(0) diff --git a/br/tests/lightning_checkpoint_chunks/run.sh b/br/tests/lightning_checkpoint_chunks/run.sh index 35cabe0aadfc5..48d24ca405070 100755 --- a/br/tests/lightning_checkpoint_chunks/run.sh +++ b/br/tests/lightning_checkpoint_chunks/run.sh @@ -48,6 +48,11 @@ for i in $(seq "$CHUNK_COUNT"); do done done +PKG="github.com/pingcap/tidb/br/pkg/lightning" +export GO_FAILPOINTS="$PKG/backend/local/orphanWriterGoRoutine=return();$PKG/restore/orphanWriterGoRoutine=return();$PKG/orphanWriterGoRoutine=return()" +# test won't panic +do_run_lightning config + # Set the failpoint to kill the lightning instance as soon as # one file (after writing totally $ROW_COUNT rows) is imported. # If checkpoint does work, this should kill exactly $CHUNK_COUNT instances of lightnings.