Skip to content

Commit

Permalink
pkg: add a new retry to support the error isRetryable or not (#1848) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 9, 2021
1 parent 9f014ba commit 003018d
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 2 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,11 @@ error = '''
pulsar send message failed
'''

["CDC:ErrReachMaxTry"]
error = '''
reach maximum try: %d
'''

["CDC:ErrReactorFinished"]
error = '''
the reactor has done its job and should no longer be executed
Expand Down
3 changes: 3 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,7 @@ var (
// miscellaneous internal errors
ErrFlowControllerAborted = errors.Normalize("flow controller is aborted", errors.RFCCodeText("CDC:ErrFlowControllerAborted"))
ErrFlowControllerEventLargerThanQuota = errors.Normalize("event is larger than the total memory quota, size: %d, quota: %d", errors.RFCCodeText("CDC:ErrFlowControllerEventLargerThanQuota"))

// retry error
ErrReachMaxTry = errors.Normalize("reach maximum try: %d", errors.RFCCodeText("CDC:ErrReachMaxTry"))
)
91 changes: 91 additions & 0 deletions pkg/retry/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"math"
)

const (
// defaultBackoffBaseInMs is the initial duration, in Millisecond
defaultBackoffBaseInMs = 10.0
// defaultBackoffCapInMs is the max amount of duration, in Millisecond
defaultBackoffCapInMs = 100.0
defaultMaxTries = 3
)

// Option ...
type Option func(*retryOptions)

// IsRetryable checks the error is safe or worth to retry, eg. "context.Canceled" better not retry
type IsRetryable func(error) bool

type retryOptions struct {
maxTries int64
backoffBaseInMs float64
backoffCapInMs float64
isRetryable IsRetryable
}

func newRetryOptions() *retryOptions {
return &retryOptions{
maxTries: defaultMaxTries,
backoffBaseInMs: defaultBackoffBaseInMs,
backoffCapInMs: defaultBackoffCapInMs,
isRetryable: func(err error) bool { return true },
}
}

// WithBackoffBaseDelay configures the initial delay, if delayInMs <= 0 "defaultBackoffBaseInMs" will be used
func WithBackoffBaseDelay(delayInMs int64) Option {
return func(o *retryOptions) {
if delayInMs > 0 {
o.backoffBaseInMs = float64(delayInMs)
}
}
}

// WithBackoffMaxDelay configures the maximum delay, if delayInMs <= 0 "defaultBackoffCapInMs" will be used
func WithBackoffMaxDelay(delayInMs int64) Option {
return func(o *retryOptions) {
if delayInMs > 0 {
o.backoffCapInMs = float64(delayInMs)
}
}
}

// WithMaxTries configures maximum tries, if tries <= 0 "defaultMaxTries" will be used
func WithMaxTries(tries int64) Option {
return func(o *retryOptions) {
if tries > 0 {
o.maxTries = tries
}
}
}

// WithInfiniteTries configures to retry forever (math.MaxInt64 times) till success or got canceled
func WithInfiniteTries() Option {
return func(o *retryOptions) {
o.maxTries = math.MaxInt64
}
}

// WithIsRetryableErr configures the error should retry or not, if not set, retry by default
func WithIsRetryableErr(f IsRetryable) Option {
return func(o *retryOptions) {
if f != nil {
o.isRetryable = f
}
}
}
2 changes: 1 addition & 1 deletion pkg/retry/retry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
137 changes: 136 additions & 1 deletion pkg/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 PingCAP, Inc.
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@ package retry

import (
"context"
"math"
"testing"
"time"

Expand All @@ -39,6 +40,8 @@ func (s *runSuite) TestShouldRetryAtMostSpecifiedTimes(c *check.C) {

err := Run(500*time.Millisecond, 3, f)
c.Assert(err, check.ErrorMatches, "test")
// 👇 i think tries = first call + maxRetries, so not weird 😎

// It's weird that backoff may retry one more time than maxTries.
// Because the steps in backoff.Retry is:
// 1. Call function
Expand Down Expand Up @@ -121,3 +124,135 @@ func (s *runSuite) TestInfiniteRetry(c *check.C) {
c.Assert(reportedElapsed, check.Greater, time.Second)
c.Assert(reportedElapsed, check.LessEqual, 3*time.Second)
}

func (s *runSuite) TestDoShouldRetryAtMostSpecifiedTimes(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
return errors.New("test")
}

err := Do(context.Background(), f, WithMaxTries(3))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 3)
}

func (s *runSuite) TestDoShouldStopOnSuccess(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
if callCount == 2 {
return nil
}
return errors.New("test")
}

err := Do(context.Background(), f, WithMaxTries(3))
c.Assert(err, check.IsNil)
c.Assert(callCount, check.Equals, 2)
}

func (s *runSuite) TestIsRetryable(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
return errors.Annotate(context.Canceled, "test")
}

err := Do(context.Background(), f, WithMaxTries(3), WithIsRetryableErr(func(err error) bool {
switch errors.Cause(err) {
case context.Canceled:
return false
}
return true
}))

c.Assert(errors.Cause(err), check.Equals, context.Canceled)
c.Assert(callCount, check.Equals, 1)

callCount = 0
err = Do(context.Background(), f, WithMaxTries(3))

c.Assert(errors.Cause(err), check.Equals, context.Canceled)
c.Assert(callCount, check.Equals, 3)
}

func (s *runSuite) TestDoCancelInfiniteRetry(c *check.C) {
defer testleak.AfterTest(c)()
callCount := 0
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20)
defer cancel()
f := func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
callCount++
return errors.New("test")
}

err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10))
c.Assert(errors.Cause(err), check.Equals, context.DeadlineExceeded)
c.Assert(callCount, check.GreaterEqual, 1, check.Commentf("tries: %d", callCount))
c.Assert(callCount, check.Less, math.MaxInt64)
}

func (s *runSuite) TestDoCancelAtBeginning(c *check.C) {
defer testleak.AfterTest(c)()
callCount := 0
ctx, cancel := context.WithCancel(context.Background())
cancel()
f := func() error {
callCount++
return errors.New("test")
}

err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10))
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
c.Assert(callCount, check.Equals, 0, check.Commentf("tries:%d", callCount))
}

func (s *runSuite) TestDoCornerCases(c *check.C) {
defer testleak.AfterTest(c)()
var callCount int
f := func() error {
callCount++
return errors.New("test")
}

err := Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(callCount, check.Equals, 2)

var i int64
for i = -10; i < 10; i++ {
callCount = 0
err = Do(context.Background(), f, WithBackoffBaseDelay(i), WithBackoffMaxDelay(i), WithMaxTries(i))
c.Assert(errors.Cause(err), check.ErrorMatches, "test")
c.Assert(err, check.ErrorMatches, ".*CDC:ErrReachMaxTry.*")
if i > 0 {
c.Assert(int64(callCount), check.Equals, i)
} else {
c.Assert(callCount, check.Equals, defaultMaxTries)
}
}
}
94 changes: 94 additions & 0 deletions pkg/retry/retry_with_opt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"math"
"math/rand"
"time"

"github.com/pingcap/errors"
cerror "github.com/pingcap/ticdc/pkg/errors"
)

// Operation is the action need to retry
type Operation func() error

// Do execute the specified function at most maxTries times until it succeeds or got canceled
func Do(ctx context.Context, operation Operation, opts ...Option) error {
retryOption := setOptions(opts...)
return run(ctx, operation, retryOption)
}

func setOptions(opts ...Option) *retryOptions {
retryOption := newRetryOptions()
for _, opt := range opts {
opt(retryOption)
}
return retryOption
}

func run(ctx context.Context, op Operation, retryOption *retryOptions) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
default:
}

var t *time.Timer
try := 0
backOff := time.Duration(0)
for {
err := op()
if err == nil {
return nil
}

if !retryOption.isRetryable(err) {
return err
}

try++
if int64(try) >= retryOption.maxTries {
return cerror.ErrReachMaxTry.Wrap(err).GenWithStackByArgs(retryOption.maxTries)
}

backOff = getBackoffInMs(retryOption.backoffBaseInMs, retryOption.backoffCapInMs, float64(try))
if t == nil {
t = time.NewTimer(backOff)
defer t.Stop()
} else {
t.Reset(backOff)
}

select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-t.C:
}
}
}

// getBackoffInMs returns the duration to wait before next try
// See https://www.awsarchitectureblog.com/2015/03/backoff.html
func getBackoffInMs(backoffBaseInMs, backoffCapInMs, try float64) time.Duration {
temp := int64(math.Min(backoffCapInMs, backoffBaseInMs*math.Exp2(try)) / 2)
if temp <= 0 {
temp = 1
}
sleep := temp + rand.Int63n(temp)
backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep*3))+backoffBaseInMs)
return time.Duration(backOff) * time.Millisecond
}

0 comments on commit 003018d

Please sign in to comment.