Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backoff: deep copy the needed fields for backoffer type #453

Merged
merged 1 commit into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
// Retry it when tikv disk full happens.
if diskFull := regionErr.GetDiskFull(); diskFull != nil {
if err = bo.Backoff(retry.BoTiKVDiskFull, errors.Errorf("tikv disk full: %v ctx: %v", diskFull.String(), ctx.String())); err != nil {
retry.BoTiKVDiskFull.SetErrors(errors.Errorf("tikv disk full: %v", diskFull.String()))
return false, nil
}
return true, nil
Expand Down
55 changes: 39 additions & 16 deletions internal/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,14 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
errMsg += "\n" + err.Error()
}
}
errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime)
returnedErr := err
if longestSleepCfg != nil {
errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime)
returnedErr = longestSleepCfg.err
}
logutil.BgLogger().Warn(errMsg)
// Use the backoff type that contributes most to the timeout to generate a MySQL error.
return errors.WithStack(longestSleepCfg.err)
return errors.WithStack(returnedErr)
}
b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
b.configs = append(b.configs, cfg)
Expand Down Expand Up @@ -219,32 +223,51 @@ func (b *Backoffer) String() string {
return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.configs)
}

// copyMapWithoutRecursive is only used to deep copy map fields in the Backoffer type.
func copyMapWithoutRecursive(srcMap map[string]int) map[string]int {
result := map[string]int{}
for k, v := range srcMap {
result[k] = v
}
return result
}

// Clone creates a new Backoffer which keeps current Backoffer's sleep time and errors, and shares
// current Backoffer's context.
// Some fields like `configs` and `vars` are concurrently used by all the backoffers in different threads,
// try not to modify the referenced content directly.
func (b *Backoffer) Clone() *Backoffer {
return &Backoffer{
ctx: b.ctx,
maxSleep: b.maxSleep,
totalSleep: b.totalSleep,
excludedSleep: b.excludedSleep,
errors: b.errors,
vars: b.vars,
parent: b.parent,
ctx: b.ctx,
maxSleep: b.maxSleep,
totalSleep: b.totalSleep,
excludedSleep: b.excludedSleep,
vars: b.vars,
errors: append([]error{}, b.errors...),
configs: append([]*Config{}, b.configs...),
backoffSleepMS: copyMapWithoutRecursive(b.backoffSleepMS),
backoffTimes: copyMapWithoutRecursive(b.backoffTimes),
parent: b.parent,
}
}

// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors, and holds
// a child context of current Backoffer's context.
// Some fields like `configs` and `vars` are concurrently used by all the backoffers in different threads,
// try not to modify the referenced content directly.
func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc) {
ctx, cancel := context.WithCancel(b.ctx)
return &Backoffer{
ctx: ctx,
maxSleep: b.maxSleep,
totalSleep: b.totalSleep,
excludedSleep: b.excludedSleep,
errors: b.errors,
vars: b.vars,
parent: b,
ctx: ctx,
maxSleep: b.maxSleep,
totalSleep: b.totalSleep,
excludedSleep: b.excludedSleep,
errors: append([]error{}, b.errors...),
configs: append([]*Config{}, b.configs...),
backoffSleepMS: copyMapWithoutRecursive(b.backoffSleepMS),
backoffTimes: copyMapWithoutRecursive(b.backoffTimes),
vars: b.vars,
parent: b,
}, cancel
}

Expand Down
17 changes: 17 additions & 0 deletions internal/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,20 @@ func TestBackoffErrorType(t *testing.T) {
err = b.Backoff(BoTxnNotFound, errors.New("tikv rpc"))
assert.ErrorIs(t, err, BoMaxDataNotReady.err)
}

func TestBackoffDeepCopy(t *testing.T) {
var err error
b := NewBackofferWithVars(context.TODO(), 200, nil)
// 700 ms sleep in total and the backoffer will return an error next time.
for i := 0; i < 3; i++ {
err = b.Backoff(BoMaxDataNotReady, errors.New("data not ready"))
assert.Nil(t, err)
}
bForked, cancel := b.Fork()
defer cancel()
bCloned := b.Clone()
for _, b := range []*Backoffer{bForked, bCloned} {
err = b.Backoff(BoTiKVRPC, errors.New("tikv rpc"))
assert.ErrorIs(t, err, BoMaxDataNotReady.err)
}
}