Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg: add a new retry to support the error isRetryable or not #1848

Merged
merged 26 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,11 @@ error = '''
pulsar send message failed
'''

["CDC:ErrReachMaxTry"]
error = '''
reach maximum try: %d
'''

["CDC:ErrReactorFinished"]
error = '''
the reactor has done its job and should no longer be executed
Expand Down
3 changes: 3 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,7 @@ var (
// miscellaneous internal errors
ErrFlowControllerAborted = errors.Normalize("flow controller is aborted", errors.RFCCodeText("CDC:ErrFlowControllerAborted"))
ErrFlowControllerEventLargerThanQuota = errors.Normalize("event is larger than the total memory quota, size: %d, quota: %d", errors.RFCCodeText("CDC:ErrFlowControllerEventLargerThanQuota"))

// retry error
ErrReachMaxTry = errors.Normalize("reach maximum try: %d", errors.RFCCodeText("CDC:ErrReachMaxTry"))
)
91 changes: 91 additions & 0 deletions pkg/retry/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"math"
)

const (
// defaultBackoffBaseInMs is the initial duration, in Millisecond
defaultBackoffBaseInMs = 10.0
// defaultBackoffCapInMs is the max amount of duration, in Millisecond
defaultBackoffCapInMs = 100.0
defaultMaxTries = 3
)

// Option ...
type Option func(*retryOptions)

// IsRetryable checks the error is safe or worth to retry, eg. "context.Canceled" better not retry
type IsRetryable func(error) bool

type retryOptions struct {
maxTries int64
backoffBaseInMs float64
backoffCapInMs float64
isRetryable IsRetryable
}

func newRetryOptions() *retryOptions {
return &retryOptions{
maxTries: defaultMaxTries,
backoffBaseInMs: defaultBackoffBaseInMs,
backoffCapInMs: defaultBackoffCapInMs,
isRetryable: func(err error) bool { return true },
}
}

// WithBackoffBaseDelay configures the initial delay, if delayInMs <= 0 "defaultBackoffBaseInMs" will be used
func WithBackoffBaseDelay(delayInMs int64) Option {
ben1009 marked this conversation as resolved.
Show resolved Hide resolved
ben1009 marked this conversation as resolved.
Show resolved Hide resolved
return func(o *retryOptions) {
if delayInMs > 0 {
o.backoffBaseInMs = float64(delayInMs)
}
}
}

// WithBackoffMaxDelay configures the maximum delay, if delayInMs <= 0 "defaultBackoffCapInMs" will be used
func WithBackoffMaxDelay(delayInMs int64) Option {
return func(o *retryOptions) {
if delayInMs > 0 {
o.backoffCapInMs = float64(delayInMs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when backoffCapInMs is larger than backoffBaseInMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think what u mean is backoffCapInMs < backoffBaseInMs,
temp := int64(math.Min(backoffCapInMs, backoffBaseInMs*math.Exp2(try)) / 2))
will always use backoffCapInMs to calculate, kind like after try for 100 times, always return backoffCapInMs for math.Min(backoffCapInMs, backoffBaseInMs*math.Exp2(try)) / 2)

}
}
}

// WithMaxTries configures maximum tries, if tries <= 0 "defaultMaxTries" will be used
func WithMaxTries(tries int64) Option {
return func(o *retryOptions) {
if tries > 0 {
o.maxTries = tries
}
}
}

// WithInfiniteTries configures to retry forever (math.MaxInt64 times) till success or got canceled
func WithInfiniteTries() Option {
ben1009 marked this conversation as resolved.
Show resolved Hide resolved
return func(o *retryOptions) {
o.maxTries = math.MaxInt64
}
}

// WithIsRetryableErr configures the error should retry or not, if not set, retry by default
func WithIsRetryableErr(f IsRetryable) Option {
return func(o *retryOptions) {
if f != nil {
o.isRetryable = f
}
}
}
2 changes: 1 addition & 1 deletion pkg/retry/retry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
137 changes: 136 additions & 1 deletion pkg/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@ package retry

import (
"context"
"math"
"testing"
"time"

Expand All @@ -39,6 +40,8 @@ func (s *runSuite) TestShouldRetryAtMostSpecifiedTimes(c *check.C) {

err := Run(500*time.Millisecond, 3, f)
c.Assert(err, check.ErrorMatches, "test")
// 👇 i think tries = first call + maxRetries, so not weird 😎

// It's weird that backoff may retry one more time than maxTries.
// Because the steps in backoff.Retry is:
// 1. Call function
Expand Down Expand Up @@ -121,3 +124,135 @@ func (s *runSuite) TestInfiniteRetry(c *check.C) {
c.Assert(reportedElapsed, check.Greater, time.Second)
c.Assert(reportedElapsed, check.LessEqual, 3*time.Second)
}

func (s *runSuite) TestDoShouldRetryAtMostSpecifiedTimes(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
return errors.New("test")
}

err := Do(context.Background(), f, WithMaxTries(3))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 3)
}

func (s *runSuite) TestDoShouldStopOnSuccess(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
if callCount == 2 {
return nil
}
return errors.New("test")
}

err := Do(context.Background(), f, WithMaxTries(3))
c.Assert(err, check.IsNil)
c.Assert(callCount, check.Equals, 2)
}

func (s *runSuite) TestIsRetryable(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
return errors.Annotate(context.Canceled, "test")
}

err := Do(context.Background(), f, WithMaxTries(3), WithIsRetryableErr(func(err error) bool {
switch errors.Cause(err) {
case context.Canceled:
return false
}
return true
}))

c.Assert(errors.Cause(err), check.Equals, context.Canceled)
c.Assert(callCount, check.Equals, 1)

callCount = 0
err = Do(context.Background(), f, WithMaxTries(3))

c.Assert(errors.Cause(err), check.Equals, context.Canceled)
c.Assert(callCount, check.Equals, 3)
}

func (s *runSuite) TestDoCancelInfiniteRetry(c *check.C) {
defer testleak.AfterTest(c)()
callCount := 0
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20)
defer cancel()
f := func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
callCount++
return errors.New("test")
}

err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10))
c.Assert(errors.Cause(err), check.Equals, context.DeadlineExceeded)
c.Assert(callCount, check.GreaterEqual, 1, check.Commentf("tries: %d", callCount))
c.Assert(callCount, check.Less, math.MaxInt64)
}

func (s *runSuite) TestDoCancelAtBeginning(c *check.C) {
defer testleak.AfterTest(c)()
callCount := 0
ctx, cancel := context.WithCancel(context.Background())
cancel()
f := func() error {
callCount++
return errors.New("test")
}

err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10))
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
c.Assert(callCount, check.Equals, 0, check.Commentf("tries:%d", callCount))
}

func (s *runSuite) TestDoCornerCases(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
return errors.New("test")
}

err := Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

var i int64
for i = -10; i < 10; i++ {
callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(i), WithBackoffMaxDelay(i), WithMaxTries(i))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(err, check.ErrorMatches, ".*CDC:ErrReachMaxTry.*")
if i > 0 {
c.Assert(int64(callCount), check.Equals, i)
} else {
c.Assert(callCount, check.Equals, defaultMaxTries)
}
}
}
94 changes: 94 additions & 0 deletions pkg/retry/retry_with_opt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"math"
"math/rand"
"time"

"github.com/pingcap/errors"
cerror "github.com/pingcap/ticdc/pkg/errors"
)

// Operation is the action need to retry
type Operation func() error

// Do execute the specified function at most maxTries times until it succeeds or got canceled
func Do(ctx context.Context, operation Operation, opts ...Option) error {
retryOption := setOptions(opts...)
return run(ctx, operation, retryOption)
}

func setOptions(opts ...Option) *retryOptions {
retryOption := newRetryOptions()
for _, opt := range opts {
opt(retryOption)
}
return retryOption
}

func run(ctx context.Context, op Operation, retryOption *retryOptions) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
default:
}

var t *time.Timer
try := 0
backOff := time.Duration(0)
for {
err := op()
if err == nil {
return nil
}

if !retryOption.isRetryable(err) {
return err
}

try++
if int64(try) >= retryOption.maxTries {
return cerror.ErrReachMaxTry.Wrap(err).GenWithStackByArgs(retryOption.maxTries)
}

backOff = getBackoffInMs(retryOption.backoffBaseInMs, retryOption.backoffCapInMs, float64(try))
if t == nil {
t = time.NewTimer(backOff)
defer t.Stop()
ben1009 marked this conversation as resolved.
Show resolved Hide resolved
} else {
t.Reset(backOff)
}

select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-t.C:
}
}
}

// getBackoffInMs returns the duration to wait before next try
// See https://www.awsarchitectureblog.com/2015/03/backoff.html
func getBackoffInMs(backoffBaseInMs, backoffCapInMs, try float64) time.Duration {
ben1009 marked this conversation as resolved.
Show resolved Hide resolved
temp := int64(math.Min(backoffCapInMs, backoffBaseInMs*math.Exp2(try)) / 2)
if temp <= 0 {
temp = 1
}
sleep := temp + rand.Int63n(temp)
backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep*3))+backoffBaseInMs)
return time.Duration(backOff) * time.Millisecond
}