From 05d616c2d3c19a2d8c49f967e648cf88037d449b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 25 Nov 2022 16:39:58 +0800 Subject: [PATCH] lightning: recover status from checkpoint after index engine closed (#39365) --- br/pkg/lightning/restore/table_restore.go | 17 +++++++++++++++++ br/tests/lightning_checkpoint/run.sh | 3 +++ 2 files changed, 20 insertions(+) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index ad975bc2f6bad..67c7ae5e0bdb4 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -235,10 +235,12 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp // data-engines that need to be restore or import. Otherwise, all data-engines should // be finished already. + handleDataEngineThisRun := false idxEngineCfg := &backend.EngineConfig{ TableInfo: tr.tableInfo, } if indexEngineCp.Status < checkpoints.CheckpointStatusClosed { + handleDataEngineThisRun = true indexWorker := rc.indexWorkers.Apply() defer rc.indexWorkers.Recycle(indexWorker) @@ -370,11 +372,26 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp return errors.Trace(restoreErr) } + // if data engine is handled in previous run and we continue importing from checkpoint + if !handleDataEngineThisRun { + for _, engine := range cp.Engines { + for _, chunk := range engine.Chunks { + rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset) + } + } + } + if cp.Status < checkpoints.CheckpointStatusIndexImported { var err error if indexEngineCp.Status < checkpoints.CheckpointStatusImported { err = tr.importKV(ctx, closedIndexEngine, rc, indexEngineID) failpoint.Inject("FailBeforeIndexEngineImported", func() { + finished := rc.status.FinishedFileSize.Load() + total := rc.status.TotalFileSize.Load() + tr.logger.Warn("print lightning status", + zap.Int64("finished", finished), + zap.Int64("total", total), + zap.Bool("equal", finished == total)) panic("forcing failure due to FailBeforeIndexEngineImported") }) } diff --git a/br/tests/lightning_checkpoint/run.sh b/br/tests/lightning_checkpoint/run.sh index 86551fd6246eb..5263dd90f1acf 100755 --- a/br/tests/lightning_checkpoint/run.sh +++ b/br/tests/lightning_checkpoint/run.sh @@ -79,6 +79,9 @@ for i in $(seq "$TABLE_COUNT"); do done set -e +# at the failure of last table, all data engines are imported so finished == total +grep "print lightning status" "$TEST_DIR/lightning.log" | grep -q "equal=true" + export GO_FAILPOINTS="$SLOWDOWN_FAILPOINTS" # After everything is done, there should be no longer new calls to ImportEngine # (and thus `kill_lightning_after_one_import` will spare this final check)