Skip to content

Commit

Permalink
restore_data: remove wait apply phase (#50316)
Browse files Browse the repository at this point in the history
close #50312, close #50315
  • Loading branch information
YuJuncen authored Jan 16, 2024
1 parent 264d11b commit bf8d474
Showing 1 changed file with 1 addition and 41 deletions.
42 changes: 1 addition & 41 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.S
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
}

if err := recovery.WaitApply(ctx); err != nil {
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
}

if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil {
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
}
Expand Down Expand Up @@ -227,7 +223,7 @@ func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error {
totalStores := len(recovery.allStores)
workers := utils.NewWorkerPool(uint(min(totalStores, common.MaxStoreConcurrency)), "Collect Region Meta") // TODO: int overflow?

// TODO: optimize the ErroGroup when TiKV is panic
// TODO: optimize the ErrorGroup when TiKV is panic
metaChan := make(chan StoreMeta, 1024)
defer close(metaChan)

Expand Down Expand Up @@ -398,42 +394,6 @@ func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) {
go mainLoop()
}

// WaitApply send wait apply to all tikv ensure all region peer apply log into the last
func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {
eg, ectx := errgroup.WithContext(ctx)
totalStores := len(recovery.allStores)
workers := utils.NewWorkerPool(uint(min(totalStores, common.MaxStoreConcurrency)), "wait apply")

for _, store := range recovery.allStores {
if err := ectx.Err(); err != nil {
break
}
storeAddr := getStoreAddress(recovery.allStores, store.Id)
storeId := store.Id

workers.ApplyOnErrorGroup(eg, func() error {
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()
log.Info("send wait apply to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
req := &recovpb.WaitApplyRequest{StoreId: storeId}
_, err = recoveryClient.WaitApply(ectx, req)
if err != nil {
log.Error("wait apply failed", zap.Uint64("store id", storeId))
return errors.Trace(err)
}

recovery.progress.Inc()
log.Info("wait apply execution success", zap.Uint64("store id", storeId))
return nil
})
}
// Wait for all TiKV instances force leader and wait apply to last log.
return eg.Wait()
}

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
retryState := utils.InitialRetryState(utils.FlashbackRetryTime, utils.FlashbackWaitInterval, utils.FlashbackMaxWaitInterval)
Expand Down

0 comments on commit bf8d474

Please sign in to comment.