Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

online recovery: fix online recovery timeout mechanism #6108

Merged
merged 4 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 129 additions & 113 deletions server/cluster/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,22 @@ func (u *unsafeRecoveryController) reset() {
func (u *unsafeRecoveryController) IsRunning() bool {
u.RLock()
defer u.RUnlock()
return u.isRunningLocked()
}

func (u *unsafeRecoveryController) isRunningLocked() bool {
return u.stage != idle && u.stage != finished && u.stage != failed
}

// RemoveFailedStores removes failed stores from the cluster.
func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64, autoDetect bool) error {
if u.IsRunning() {
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}
u.Lock()
defer u.Unlock()

if u.isRunningLocked() {
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}

if !autoDetect {
if len(failedStores) == 0 {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no store specified")
Expand Down Expand Up @@ -220,7 +225,9 @@ func (u *unsafeRecoveryController) Show() []StageOutput {
if u.stage == idle {
return []StageOutput{{Info: "No on-going recovery."}}
}
u.checkTimeout()
if err := u.checkTimeout(); err != nil {
u.HandleErr(err)
}
status := u.output
if u.stage != finished && u.stage != failed {
status = append(status, u.getReportStatus())
Expand Down Expand Up @@ -255,17 +262,15 @@ func (u *unsafeRecoveryController) getReportStatus() StageOutput {
return status
}

func (u *unsafeRecoveryController) checkTimeout() bool {
func (u *unsafeRecoveryController) checkTimeout() error {
if u.stage == finished || u.stage == failed {
return false
return nil
}

if time.Now().After(u.timeout) {
ret := u.HandleErr(errors.Errorf("Exceeds timeout %v", u.timeout))
u.timeout = time.Now().Add(storeRequestInterval * 2)
return ret
return errors.Errorf("Exceeds timeout %v", u.timeout)
}
return false
return nil
}

func (u *unsafeRecoveryController) HandleErr(err error) bool {
Expand All @@ -274,128 +279,139 @@ func (u *unsafeRecoveryController) HandleErr(err error) bool {
u.err = err
}
if u.stage == exitForceLeader {
// We already tried to exit force leader, and it still failed.
// We turn into failed stage directly. TiKV will step down force leader
// automatically after being for a long time.
u.changeStage(failed)
return true
}
// When encountering an error for the first time, we will try to exit force
// leader before turning into failed stage to avoid the leaking force leaders
// blocks reads and writes.
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
u.timeout = time.Now().Add(storeRequestInterval * 2)
// empty recovery plan would trigger exit force leader
u.changeStage(exitForceLeader)
return false
}

// HandleStoreHeartbeat handles the store heartbeat requests and checks whether the stores need to
// send detailed report back.
func (u *unsafeRecoveryController) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest, resp *pdpb.StoreHeartbeatResponse) {
if !u.IsRunning() {
// no recovery in progress, do nothing
return
}
u.Lock()
defer u.Unlock()

if u.checkTimeout() {
if !u.isRunningLocked() {
// no recovery in progress, do nothing
return
}

allCollected := u.collectReport(heartbeat)
done, err := func() (bool, error) {
if err := u.checkTimeout(); err != nil {
return false, err
}

if allCollected {
newestRegionTree, peersMap, buildErr := u.buildUpFromReports()
if buildErr != nil && u.HandleErr(buildErr) {
return
allCollected, err := u.collectReport(heartbeat)
if err != nil {
return false, err
}

// clean up previous plan
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
if allCollected {
newestRegionTree, peersMap, err := u.buildUpFromReports()
if err != nil {
return false, err
}

var stage unsafeRecoveryStage
if u.err == nil {
stage = u.stage
} else {
stage = exitForceLeader
return u.generatePlan(newestRegionTree, peersMap)
}
reCheck := false
hasPlan := false
var err error
for {
switch stage {
case collectReport:
fallthrough
case tombstoneTiFlashLearner:
if hasPlan, err = u.generateTombstoneTiFlashLearnerPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(tombstoneTiFlashLearner)
break
}
if err != nil {
break
}
fallthrough
case forceLeaderForCommitMerge:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, true); hasPlan && err == nil {
u.changeStage(forceLeaderForCommitMerge)
break
}
if err != nil {
break
}
fallthrough
case forceLeader:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, false); hasPlan && err == nil {
u.changeStage(forceLeader)
break
}
if err != nil {
break
}
fallthrough
case demoteFailedVoter:
if hasPlan = u.generateDemoteFailedVoterPlan(newestRegionTree, peersMap); hasPlan {
u.changeStage(demoteFailedVoter)
break
} else if !reCheck {
reCheck = true
stage = tombstoneTiFlashLearner
continue
}
fallthrough
case createEmptyRegion:
if hasPlan, err = u.generateCreateEmptyRegionPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(createEmptyRegion)
break
}
if err != nil {
break
}
fallthrough
case exitForceLeader:
// no need to generate plan, empty recovery plan triggers exit force leader on TiKV side
if hasPlan = u.generateExitForceLeaderPlan(); hasPlan {
u.changeStage(exitForceLeader)
}
default:
panic("unreachable")
}
return false, nil
}()

if done || (err != nil && u.HandleErr(err)) {
return
}
u.dispatchPlan(heartbeat, resp)
}

func (u *unsafeRecoveryController) generatePlan(newestRegionTree *regionTree, peersMap map[uint64][]*regionItem) (bool, error) {
// clean up previous plan
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)

stage := u.stage
reCheck := false
hasPlan := false
var err error
for {
switch stage {
case collectReport:
fallthrough
case tombstoneTiFlashLearner:
if hasPlan, err = u.generateTombstoneTiFlashLearnerPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(tombstoneTiFlashLearner)
break
}
if err != nil {
if u.HandleErr(err) {
return
}
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
// Clear the reports etc.
break
}
fallthrough
case forceLeaderForCommitMerge:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, true); hasPlan && err == nil {
u.changeStage(forceLeaderForCommitMerge)
break
}
if err != nil {
break
}
fallthrough
case forceLeader:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, false); hasPlan && err == nil {
u.changeStage(forceLeader)
break
}
if err != nil {
break
}
fallthrough
case demoteFailedVoter:
if hasPlan = u.generateDemoteFailedVoterPlan(newestRegionTree, peersMap); hasPlan {
u.changeStage(demoteFailedVoter)
break
} else if !reCheck {
reCheck = true
stage = tombstoneTiFlashLearner
continue
}
fallthrough
case createEmptyRegion:
if hasPlan, err = u.generateCreateEmptyRegionPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(createEmptyRegion)
break
}
if err != nil {
break
}
fallthrough
case exitForceLeader:
if hasPlan = u.generateExitForceLeaderPlan(); hasPlan {
u.changeStage(exitForceLeader)
return
} else if !hasPlan {
if u.err != nil {
u.changeStage(failed)
} else {
u.changeStage(finished)
}
return
}
break
default:
panic("unreachable")
}
break
}

u.dispatchPlan(heartbeat, resp)
if err == nil && !hasPlan {
if u.err != nil {
u.changeStage(failed)
} else {
u.changeStage(finished)
}
return true, nil
}
return false, err
}

// It dispatches recovery plan if any.
Expand All @@ -421,22 +437,21 @@ func (u *unsafeRecoveryController) dispatchPlan(heartbeat *pdpb.StoreHeartbeatRe
}

// It collects and checks if store reports have been fully collected.
func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatRequest) bool {
func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatRequest) (bool, error) {
storeID := heartbeat.Stats.StoreId
if _, isFailedStore := u.failedStores[storeID]; isFailedStore {
u.HandleErr(errors.Errorf("Receive heartbeat from failed store %d", storeID))
return false
return false, errors.Errorf("Receive heartbeat from failed store %d", storeID)
}

if heartbeat.StoreReport == nil {
return false
return false, nil
}

if heartbeat.StoreReport.GetStep() != u.step {
log.Info("unsafe recovery receives invalid store report",
log.Info("Unsafe recovery receives invalid store report",
zap.Uint64("store-id", storeID), zap.Uint64("expected-step", u.step), zap.Uint64("obtained-step", heartbeat.StoreReport.GetStep()))
// invalid store report, ignore
return false
return false, nil
}

if report, exists := u.storeReports[storeID]; exists {
Expand All @@ -445,11 +460,11 @@ func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatR
if report == nil {
u.numStoresReported++
if u.numStoresReported == len(u.storeReports) {
return true
return true, nil
}
}
}
return false
return false, nil
}

// Gets the stage of the current unsafe recovery.
Expand Down Expand Up @@ -1204,6 +1219,7 @@ func (u *unsafeRecoveryController) generateExitForceLeaderPlan() bool {
for storeID, storeReport := range u.storeReports {
for _, peerReport := range storeReport.PeerReports {
if peerReport.IsForceLeader {
// empty recovery plan triggers exit force leader on TiKV side
_ = u.getRecoveryPlan(storeID)
hasPlan = true
break
Expand Down
Loading