Skip to content

Commit

Permalink
added retry for recovery
Browse files Browse the repository at this point in the history
Signed-off-by: hillium <yujuncen@pingcap.com>
  • Loading branch information
YuJuncen committed Aug 15, 2023
1 parent 66033d5 commit ea5ce0f
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 14 deletions.
100 changes: 90 additions & 10 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,75 @@ import (
"google.golang.org/grpc/backoff"
)

type RecoveryStage int

const (
StageUnknown RecoveryStage = iota
StageCollectingMeta
StageMakingRecoveryPlan
StageResetPDAllocateID
StageRecovering
StageFlashback
)

func (s RecoveryStage) String() string {
switch s {
case StageCollectingMeta:
return "collecting meta"
case StageMakingRecoveryPlan:
return "making recovery plan"
case StageResetPDAllocateID:
return "resetting PD allocate ID"
case StageRecovering:
return "recovering"
case StageFlashback:
return "flashback"
default:
return "unknown"

Check warning on line 54 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L41-L54

Added lines #L41 - L54 were not covered by tests
}
}

type recoveryError struct {
error
atStage RecoveryStage
}

func FailedAt(err error) RecoveryStage {
if rerr, ok := err.(recoveryError); ok {
return rerr.atStage
}
return StageUnknown

Check warning on line 67 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L63-L67

Added lines #L63 - L67 were not covered by tests
}

type recoveryBackoffer struct {
state utils.RetryState
}

func newRecoveryBackoffer() *recoveryBackoffer {
return &recoveryBackoffer{
state: utils.InitialRetryState(16, 0, 0),
}

Check warning on line 77 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L74-L77

Added lines #L74 - L77 were not covered by tests
}

func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration {
s := FailedAt(err)
switch s {
case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering:
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s))
return bo.state.ExponentialBackoff()
case StageFlashback:
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s))
bo.state.GiveUp()
return 0

Check warning on line 89 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L80-L89

Added lines #L80 - L89 were not covered by tests
}
log.Warn("unknown stage of backing off.", zap.Int("val", int(s)))
return bo.state.ExponentialBackoff()

Check warning on line 92 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L91-L92

Added lines #L91 - L92 were not covered by tests
}

func (bo *recoveryBackoffer) Attempt() int {
return bo.state.Attempt()

Check warning on line 96 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}

// RecoverData recover the tikv cluster
// 1. read all meta data from tikvs
// 2. make recovery plan and then recovery max allocate ID firstly
Expand All @@ -35,39 +104,50 @@ import (
// 5. prepare the flashback
// 6. flashback to resolveTS
func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) {
return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency)
})

Check warning on line 109 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L107-L109

Added lines #L107 - L109 were not covered by tests
}

func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()

Check warning on line 116 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L112-L116

Added lines #L112 - L116 were not covered by tests
var recovery = NewRecovery(allStores, mgr, progress, concurrency)
if err := recovery.ReadRegionMeta(ctx); err != nil {
return 0, errors.Trace(err)
return 0, recoveryError{error: err, atStage: StageCollectingMeta}

Check warning on line 119 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L119

Added line #L119 was not covered by tests
}

totalRegions := recovery.GetTotalRegions()

if err := recovery.MakeRecoveryPlan(); err != nil {
return totalRegions, errors.Trace(err)
err := recovery.MakeRecoveryPlan()
if err != nil {
return totalRegions, recoveryError{error: err, atStage: StageMakingRecoveryPlan}

Check warning on line 126 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L124-L126

Added lines #L124 - L126 were not covered by tests
}

log.Info("recover the alloc id to pd", zap.Uint64("max alloc id", recovery.MaxAllocID))
if err := recovery.mgr.RecoverBaseAllocID(ctx, recovery.MaxAllocID); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageResetPDAllocateID}

Check warning on line 131 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L131

Added line #L131 was not covered by tests
}

// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
// This wathcher will retrigger `RecoveryRegions` for those stores.
recovery.SpawnTiKVShutDownWatchers(ctx)
if err := recovery.RecoverRegions(ctx); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageRecovering}

Check warning on line 138 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L138

Added line #L138 was not covered by tests
}

if err := recovery.WaitApply(ctx); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageRecovering}

Check warning on line 142 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L142

Added line #L142 was not covered by tests
}

if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil {
return totalRegions, errors.Trace(err)
err = recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1)
if err != nil {
return totalRegions, recoveryError{error: err, atStage: StageFlashback}

Check warning on line 146 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L144-L146

Added lines #L144 - L146 were not covered by tests
}

if err := recovery.FlashbackToVersion(ctx, resolveTS, restoreTS); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageFlashback}

Check warning on line 150 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L150

Added line #L150 was not covered by tests
}

return totalRegions, nil
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
// restore tikv data from a snapshot volume
var totalRegions int

// Roughly handle the case that some TiKVs are rebooted during making plan.
// Generally,

Check warning on line 150 in br/pkg/task/restore_data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/task/restore_data.go#L149-L150

Added lines #L149 - L150 were not covered by tests
totalRegions, err = restore.RecoverData(ctx, resolveTS, allStores, mgr, progress, restoreTS, cfg.Concurrency)
if err != nil {
return errors.Trace(err)
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (rs *RetryState) ExponentialBackoff() time.Duration {
return backoff
}

func (rs *RetryState) GiveUp() {
rs.retryTimes = rs.maxRetry

Check warning on line 76 in br/pkg/utils/backoff.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/backoff.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}

// InitialRetryState make the initial state for retrying.
func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState {
return RetryState{
Expand Down
27 changes: 23 additions & 4 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var retryableServerError = []string{
// RetryableFunc presents a retryable operation.
type RetryableFunc func() error

type RetryableFuncV2[T any] func(context.Context) (T, error)

// Backoffer implements a backoff policy for retrying operations.
type Backoffer interface {
// NextBackoff returns a duration to wait before retrying again
Expand All @@ -51,20 +53,37 @@ func WithRetry(
retryableFunc RetryableFunc,
backoffer Backoffer,
) error {
_, err := WithRetryV2[struct{}](ctx, backoffer, func(ctx context.Context) (struct{}, error) {
innerErr := retryableFunc()
return struct{}{}, innerErr
})
return err
}

// WithRetryV2 retries a given operation with a backoff policy.
//
// Returns the returned value if `retryableFunc` succeeded at least once. Otherwise, returns a
// multierr that containing all errors encountered.
// Comparing with `WithRetry`, this function reordered the argument order and supports catching the return value.
func WithRetryV2[T any](
ctx context.Context,
backoffer Backoffer,
fn RetryableFuncV2[T],
) (T, error) {
var allErrors error
for backoffer.Attempt() > 0 {
err := retryableFunc()
res, err := fn(ctx)
if err == nil {
return nil
return res, nil
}
allErrors = multierr.Append(allErrors, err)
select {
case <-ctx.Done():
return allErrors // nolint:wrapcheck
return *new(T), allErrors
case <-time.After(backoffer.NextBackoff(err)):
}
}
return allErrors // nolint:wrapcheck
return *new(T), allErrors // nolint:wrapcheck
}

// MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.
Expand Down

0 comments on commit ea5ce0f

Please sign in to comment.