Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snap_restore: added retry for recovery #46094

Merged
merged 3 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 90 additions & 10 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,75 @@
"google.golang.org/grpc/backoff"
)

type RecoveryStage int

const (
StageUnknown RecoveryStage = iota
StageCollectingMeta
StageMakingRecoveryPlan
StageResetPDAllocateID
StageRecovering
StageFlashback
)

func (s RecoveryStage) String() string {
switch s {
case StageCollectingMeta:
return "collecting meta"
case StageMakingRecoveryPlan:
return "making recovery plan"
case StageResetPDAllocateID:
return "resetting PD allocate ID"
case StageRecovering:
return "recovering"
case StageFlashback:
return "flashback"
default:
return "unknown"

Check warning on line 54 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L41-L54

Added lines #L41 - L54 were not covered by tests
}
}

type recoveryError struct {
error
atStage RecoveryStage
}

func FailedAt(err error) RecoveryStage {
if rerr, ok := err.(recoveryError); ok {
return rerr.atStage
}
return StageUnknown

Check warning on line 67 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L63-L67

Added lines #L63 - L67 were not covered by tests
}

type recoveryBackoffer struct {
state utils.RetryState
}

func newRecoveryBackoffer() *recoveryBackoffer {
return &recoveryBackoffer{
state: utils.InitialRetryState(16, 0, 0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the parameter 0 ok?

}

Check warning on line 77 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L74-L77

Added lines #L74 - L77 were not covered by tests
}

func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration {
s := FailedAt(err)
switch s {
case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering:
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s))
return bo.state.ExponentialBackoff()
case StageFlashback:
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s))
bo.state.GiveUp()
return 0

Check warning on line 89 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L80-L89

Added lines #L80 - L89 were not covered by tests
}
log.Warn("unknown stage of backing off.", zap.Int("val", int(s)))
return bo.state.ExponentialBackoff()

Check warning on line 92 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L91-L92

Added lines #L91 - L92 were not covered by tests
}

func (bo *recoveryBackoffer) Attempt() int {
return bo.state.Attempt()

Check warning on line 96 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}

// RecoverData recover the tikv cluster
// 1. read all meta data from tikvs
// 2. make recovery plan and then recovery max allocate ID firstly
Expand All @@ -35,39 +104,50 @@
// 5. prepare the flashback
// 6. flashback to resolveTS
func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) {
return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency)
})

Check warning on line 109 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L107-L109

Added lines #L107 - L109 were not covered by tests
}

func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()

Check warning on line 116 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L112-L116

Added lines #L112 - L116 were not covered by tests
var recovery = NewRecovery(allStores, mgr, progress, concurrency)
if err := recovery.ReadRegionMeta(ctx); err != nil {
return 0, errors.Trace(err)
return 0, recoveryError{error: err, atStage: StageCollectingMeta}

Check warning on line 119 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L119

Added line #L119 was not covered by tests
}

totalRegions := recovery.GetTotalRegions()

if err := recovery.MakeRecoveryPlan(); err != nil {
return totalRegions, errors.Trace(err)
err := recovery.MakeRecoveryPlan()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the format as before?, such as

if err := xxx; err != nil {

if err != nil {
return totalRegions, recoveryError{error: err, atStage: StageMakingRecoveryPlan}

Check warning on line 126 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L124-L126

Added lines #L124 - L126 were not covered by tests
}

log.Info("recover the alloc id to pd", zap.Uint64("max alloc id", recovery.MaxAllocID))
if err := recovery.mgr.RecoverBaseAllocID(ctx, recovery.MaxAllocID); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageResetPDAllocateID}

Check warning on line 131 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L131

Added line #L131 was not covered by tests
}

// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
// This wathcher will retrigger `RecoveryRegions` for those stores.
recovery.SpawnTiKVShutDownWatchers(ctx)
if err := recovery.RecoverRegions(ctx); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageRecovering}

Check warning on line 138 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L138

Added line #L138 was not covered by tests
}

if err := recovery.WaitApply(ctx); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageRecovering}

Check warning on line 142 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L142

Added line #L142 was not covered by tests
}

if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil {
return totalRegions, errors.Trace(err)
err = recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

if err != nil {
return totalRegions, recoveryError{error: err, atStage: StageFlashback}

Check warning on line 146 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L144-L146

Added lines #L144 - L146 were not covered by tests
}

if err := recovery.FlashbackToVersion(ctx, resolveTS, restoreTS); err != nil {
return totalRegions, errors.Trace(err)
return totalRegions, recoveryError{error: err, atStage: StageFlashback}

Check warning on line 150 in br/pkg/restore/data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/data.go#L150

Added line #L150 was not covered by tests
}

return totalRegions, nil
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@
// restore tikv data from a snapshot volume
var totalRegions int

// Roughly handle the case that some TiKVs are rebooted during making plan.
// Generally,

Check warning on line 150 in br/pkg/task/restore_data.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/task/restore_data.go#L149-L150

Added lines #L149 - L150 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not entire annotation?

totalRegions, err = restore.RecoverData(ctx, resolveTS, allStores, mgr, progress, restoreTS, cfg.Concurrency)
if err != nil {
return errors.Trace(err)
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
return backoff
}

func (rs *RetryState) GiveUp() {
rs.retryTimes = rs.maxRetry

Check warning on line 76 in br/pkg/utils/backoff.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/backoff.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}

// InitialRetryState make the initial state for retrying.
func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState {
return RetryState{
Expand Down
27 changes: 23 additions & 4 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var retryableServerError = []string{
// RetryableFunc presents a retryable operation.
type RetryableFunc func() error

type RetryableFuncV2[T any] func(context.Context) (T, error)

// Backoffer implements a backoff policy for retrying operations.
type Backoffer interface {
// NextBackoff returns a duration to wait before retrying again
Expand All @@ -51,20 +53,37 @@ func WithRetry(
retryableFunc RetryableFunc,
backoffer Backoffer,
) error {
_, err := WithRetryV2[struct{}](ctx, backoffer, func(ctx context.Context) (struct{}, error) {
innerErr := retryableFunc()
return struct{}{}, innerErr
})
return err
}

// WithRetryV2 retries a given operation with a backoff policy.
//
// Returns the returned value if `retryableFunc` succeeded at least once. Otherwise, returns a
// multierr that containing all errors encountered.
// Comparing with `WithRetry`, this function reordered the argument order and supports catching the return value.
func WithRetryV2[T any](
ctx context.Context,
backoffer Backoffer,
fn RetryableFuncV2[T],
) (T, error) {
var allErrors error
for backoffer.Attempt() > 0 {
err := retryableFunc()
res, err := fn(ctx)
if err == nil {
return nil
return res, nil
}
allErrors = multierr.Append(allErrors, err)
select {
case <-ctx.Done():
return allErrors // nolint:wrapcheck
return *new(T), allErrors
case <-time.After(backoffer.NextBackoff(err)):
}
}
return allErrors // nolint:wrapcheck
return *new(T), allErrors // nolint:wrapcheck
}

// MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.
Expand Down
Loading