diff --git a/errors.toml b/errors.toml index 222c9f4c2d2..592dc6fd6fb 100755 --- a/errors.toml +++ b/errors.toml @@ -636,6 +636,11 @@ error = ''' pulsar send message failed ''' +["CDC:ErrReachMaxTry"] +error = ''' +reach maximum try: %d +''' + ["CDC:ErrReactorFinished"] error = ''' the reactor has done its job and should no longer be executed diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 5a4858eba8d..8d33a056a5a 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -230,4 +230,7 @@ var ( // miscellaneous internal errors ErrFlowControllerAborted = errors.Normalize("flow controller is aborted", errors.RFCCodeText("CDC:ErrFlowControllerAborted")) ErrFlowControllerEventLargerThanQuota = errors.Normalize("event is larger than the total memory quota, size: %d, quota: %d", errors.RFCCodeText("CDC:ErrFlowControllerEventLargerThanQuota")) + + // retry error + ErrReachMaxTry = errors.Normalize("reach maximum try: %d", errors.RFCCodeText("CDC:ErrReachMaxTry")) ) diff --git a/pkg/retry/options.go b/pkg/retry/options.go new file mode 100644 index 00000000000..724195a3f0c --- /dev/null +++ b/pkg/retry/options.go @@ -0,0 +1,91 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "math" +) + +const ( + // defaultBackoffBaseInMs is the initial duration, in Millisecond + defaultBackoffBaseInMs = 10.0 + // defaultBackoffCapInMs is the max amount of duration, in Millisecond + defaultBackoffCapInMs = 100.0 + defaultMaxTries = 3 +) + +// Option ... +type Option func(*retryOptions) + +// IsRetryable checks the error is safe or worth to retry, eg. "context.Canceled" better not retry +type IsRetryable func(error) bool + +type retryOptions struct { + maxTries int64 + backoffBaseInMs float64 + backoffCapInMs float64 + isRetryable IsRetryable +} + +func newRetryOptions() *retryOptions { + return &retryOptions{ + maxTries: defaultMaxTries, + backoffBaseInMs: defaultBackoffBaseInMs, + backoffCapInMs: defaultBackoffCapInMs, + isRetryable: func(err error) bool { return true }, + } +} + +// WithBackoffBaseDelay configures the initial delay, if delayInMs <= 0 "defaultBackoffBaseInMs" will be used +func WithBackoffBaseDelay(delayInMs int64) Option { + return func(o *retryOptions) { + if delayInMs > 0 { + o.backoffBaseInMs = float64(delayInMs) + } + } +} + +// WithBackoffMaxDelay configures the maximum delay, if delayInMs <= 0 "defaultBackoffCapInMs" will be used +func WithBackoffMaxDelay(delayInMs int64) Option { + return func(o *retryOptions) { + if delayInMs > 0 { + o.backoffCapInMs = float64(delayInMs) + } + } +} + +// WithMaxTries configures maximum tries, if tries <= 0 "defaultMaxTries" will be used +func WithMaxTries(tries int64) Option { + return func(o *retryOptions) { + if tries > 0 { + o.maxTries = tries + } + } +} + +// WithInfiniteTries configures to retry forever (math.MaxInt64 times) till success or got canceled +func WithInfiniteTries() Option { + return func(o *retryOptions) { + o.maxTries = math.MaxInt64 + } +} + +// WithIsRetryableErr configures the error should retry or not, if not set, retry by default +func WithIsRetryableErr(f IsRetryable) Option { + return func(o *retryOptions) { + if f != nil { + o.isRetryable = f + } + } +} diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 1783a8ab182..61161cce195 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index 54b3691be9a..96a8e72cd3e 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2021 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ package retry import ( "context" + "math" "testing" "time" @@ -39,6 +40,8 @@ func (s *runSuite) TestShouldRetryAtMostSpecifiedTimes(c *check.C) { err := Run(500*time.Millisecond, 3, f) c.Assert(err, check.ErrorMatches, "test") + // 👇 i think tries = first call + maxRetries, so not weird 😎 + // It's weird that backoff may retry one more time than maxTries. // Because the steps in backoff.Retry is: // 1. Call function @@ -121,3 +124,135 @@ func (s *runSuite) TestInfiniteRetry(c *check.C) { c.Assert(reportedElapsed, check.Greater, time.Second) c.Assert(reportedElapsed, check.LessEqual, 3*time.Second) } + +func (s *runSuite) TestDoShouldRetryAtMostSpecifiedTimes(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(context.Background(), f, WithMaxTries(3)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(callCount, check.Equals, 3) +} + +func (s *runSuite) TestDoShouldStopOnSuccess(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + if callCount == 2 { + return nil + } + return errors.New("test") + } + + err := Do(context.Background(), f, WithMaxTries(3)) + c.Assert(err, check.IsNil) + c.Assert(callCount, check.Equals, 2) +} + +func (s *runSuite) TestIsRetryable(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + return errors.Annotate(context.Canceled, "test") + } + + err := Do(context.Background(), f, WithMaxTries(3), WithIsRetryableErr(func(err error) bool { + switch errors.Cause(err) { + case context.Canceled: + return false + } + return true + })) + + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + c.Assert(callCount, check.Equals, 1) + + callCount = 0 + err = Do(context.Background(), f, WithMaxTries(3)) + + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + c.Assert(callCount, check.Equals, 3) +} + +func (s *runSuite) TestDoCancelInfiniteRetry(c *check.C) { + defer testleak.AfterTest(c)() + callCount := 0 + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20) + defer cancel() + f := func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + callCount++ + return errors.New("test") + } + + err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + c.Assert(errors.Cause(err), check.Equals, context.DeadlineExceeded) + c.Assert(callCount, check.GreaterEqual, 1, check.Commentf("tries: %d", callCount)) + c.Assert(callCount, check.Less, math.MaxInt64) +} + +func (s *runSuite) TestDoCancelAtBeginning(c *check.C) { + defer testleak.AfterTest(c)() + callCount := 0 + ctx, cancel := context.WithCancel(context.Background()) + cancel() + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(ctx, f, WithInfiniteTries(), WithBackoffBaseDelay(2), WithBackoffMaxDelay(10)) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + c.Assert(callCount, check.Equals, 0, check.Commentf("tries:%d", callCount)) +} + +func (s *runSuite) TestDoCornerCases(c *check.C) { + defer testleak.AfterTest(c)() + var callCount int + f := func() error { + callCount++ + return errors.New("test") + } + + err := Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(callCount, check.Equals, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(callCount, check.Equals, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MinInt64), WithBackoffMaxDelay(math.MinInt64), WithMaxTries(2)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(callCount, check.Equals, 2) + + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(math.MaxInt64), WithBackoffMaxDelay(math.MaxInt64), WithMaxTries(2)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(callCount, check.Equals, 2) + + var i int64 + for i = -10; i < 10; i++ { + callCount = 0 + err = Do(context.Background(), f, WithBackoffBaseDelay(i), WithBackoffMaxDelay(i), WithMaxTries(i)) + c.Assert(errors.Cause(err), check.ErrorMatches, "test") + c.Assert(err, check.ErrorMatches, ".*CDC:ErrReachMaxTry.*") + if i > 0 { + c.Assert(int64(callCount), check.Equals, i) + } else { + c.Assert(callCount, check.Equals, defaultMaxTries) + } + } +} diff --git a/pkg/retry/retry_with_opt.go b/pkg/retry/retry_with_opt.go new file mode 100644 index 00000000000..b4af380b582 --- /dev/null +++ b/pkg/retry/retry_with_opt.go @@ -0,0 +1,94 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "math" + "math/rand" + "time" + + "github.com/pingcap/errors" + cerror "github.com/pingcap/ticdc/pkg/errors" +) + +// Operation is the action need to retry +type Operation func() error + +// Do execute the specified function at most maxTries times until it succeeds or got canceled +func Do(ctx context.Context, operation Operation, opts ...Option) error { + retryOption := setOptions(opts...) + return run(ctx, operation, retryOption) +} + +func setOptions(opts ...Option) *retryOptions { + retryOption := newRetryOptions() + for _, opt := range opts { + opt(retryOption) + } + return retryOption +} + +func run(ctx context.Context, op Operation, retryOption *retryOptions) error { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + default: + } + + var t *time.Timer + try := 0 + backOff := time.Duration(0) + for { + err := op() + if err == nil { + return nil + } + + if !retryOption.isRetryable(err) { + return err + } + + try++ + if int64(try) >= retryOption.maxTries { + return cerror.ErrReachMaxTry.Wrap(err).GenWithStackByArgs(retryOption.maxTries) + } + + backOff = getBackoffInMs(retryOption.backoffBaseInMs, retryOption.backoffCapInMs, float64(try)) + if t == nil { + t = time.NewTimer(backOff) + defer t.Stop() + } else { + t.Reset(backOff) + } + + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case <-t.C: + } + } +} + +// getBackoffInMs returns the duration to wait before next try +// See https://www.awsarchitectureblog.com/2015/03/backoff.html +func getBackoffInMs(backoffBaseInMs, backoffCapInMs, try float64) time.Duration { + temp := int64(math.Min(backoffCapInMs, backoffBaseInMs*math.Exp2(try)) / 2) + if temp <= 0 { + temp = 1 + } + sleep := temp + rand.Int63n(temp) + backOff := math.Min(backoffCapInMs, float64(rand.Int63n(sleep*3))+backoffBaseInMs) + return time.Duration(backOff) * time.Millisecond +}