From bf8d474070097f1e6d81ccc233dfc237ce0da5ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Tue, 16 Jan 2024 17:55:45 +0800 Subject: [PATCH] restore_data: remove wait apply phase (#50316) close pingcap/tidb#50312, close pingcap/tidb#50315 --- br/pkg/restore/data.go | 42 +----------------------------------------- 1 file changed, 1 insertion(+), 41 deletions(-) diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index 48f015d80a85e..535ed7e4484f0 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -139,10 +139,6 @@ func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.S return totalRegions, recoveryError{error: err, atStage: StageRecovering} } - if err := recovery.WaitApply(ctx); err != nil { - return totalRegions, recoveryError{error: err, atStage: StageRecovering} - } - if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil { return totalRegions, recoveryError{error: err, atStage: StageFlashback} } @@ -227,7 +223,7 @@ func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error { totalStores := len(recovery.allStores) workers := utils.NewWorkerPool(uint(min(totalStores, common.MaxStoreConcurrency)), "Collect Region Meta") // TODO: int overflow? - // TODO: optimize the ErroGroup when TiKV is panic + // TODO: optimize the ErrorGroup when TiKV is panic metaChan := make(chan StoreMeta, 1024) defer close(metaChan) @@ -398,42 +394,6 @@ func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) { go mainLoop() } -// WaitApply send wait apply to all tikv ensure all region peer apply log into the last -func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { - eg, ectx := errgroup.WithContext(ctx) - totalStores := len(recovery.allStores) - workers := utils.NewWorkerPool(uint(min(totalStores, common.MaxStoreConcurrency)), "wait apply") - - for _, store := range recovery.allStores { - if err := ectx.Err(); err != nil { - break - } - storeAddr := getStoreAddress(recovery.allStores, store.Id) - storeId := store.Id - - workers.ApplyOnErrorGroup(eg, func() error { - recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr) - if err != nil { - return errors.Trace(err) - } - defer conn.Close() - log.Info("send wait apply to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId)) - req := &recovpb.WaitApplyRequest{StoreId: storeId} - _, err = recoveryClient.WaitApply(ectx, req) - if err != nil { - log.Error("wait apply failed", zap.Uint64("store id", storeId)) - return errors.Trace(err) - } - - recovery.progress.Inc() - log.Info("wait apply execution success", zap.Uint64("store id", storeId)) - return nil - }) - } - // Wait for all TiKV instances force leader and wait apply to last log. - return eg.Wait() -} - // prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) { retryState := utils.InitialRetryState(utils.FlashbackRetryTime, utils.FlashbackWaitInterval, utils.FlashbackMaxWaitInterval)