diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index e8f0505d4e1..3b5ea08adcd 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -20,7 +20,6 @@ import ( "sync/atomic" "time" - "github.com/cenkalti/backoff" "github.com/pingcap/errors" "github.com/pingcap/log" timodel "github.com/pingcap/parser/model" @@ -719,31 +718,21 @@ func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schema // The infinite retry here is a temporary solution to the `ErrSchemaStorageUnresolved` caused by // DDL puller lagging too much. - err := retry.RunWithInfiniteRetry(10*time.Millisecond, func() error { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - default: - } + startTime := time.Now() + err := retry.Do(ctx, func() error { var err error snap, err = s.getSnapshot(ts) - if cerror.ErrSchemaStorageUnresolved.NotEqual(err) { - return backoff.Permanent(err) - } - - return err - }, func(elapsed time.Duration) { - if elapsed >= 5*time.Minute { + if time.Since(startTime) >= 5*time.Minute && isRetryable(err) { log.Warn("GetSnapshot is taking too long, DDL puller stuck?", zap.Uint64("ts", ts)) } - }) + return err + }, retry.WithBackoffBaseDelay(10), retry.WithInfiniteTries(), retry.WithIsRetryableErr(isRetryable)) - switch e := err.(type) { - case *backoff.PermanentError: - return nil, e.Err - default: - return snap, err - } + return snap, err +} + +func isRetryable(err error) bool { + return cerror.IsRetryableError(err) && cerror.ErrSchemaStorageUnresolved.Equal(err) } // GetLastSnapshot returns the last snapshot diff --git a/cdc/kv/client.go b/cdc/kv/client.go index fed7005e0dc..95a1507df1b 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -55,7 +55,6 @@ import ( const ( dialTimeout = 10 * time.Second - maxRetry = 100 tikvRequestMaxBackoff = 20000 // Maximum total sleep time(in ms) // TODO find optimal values and test extensively before releasing // The old values cause the gRPC stream to stall for some unknown reason. @@ -447,7 +446,7 @@ func (c *CDCClient) getRegionLimiter(regionID uint64) *rate.Limiter { } func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream cdcpb.ChangeData_EventFeedClient, err error) { - err = retry.Run(50*time.Millisecond, 3, func() error { + err = retry.Do(ctx, func() error { conn, err := c.getConn(ctx, addr) if err != nil { log.Info("get connection to store failed, retry later", zap.String("addr", addr), zap.Error(err)) @@ -475,7 +474,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) } log.Debug("created stream to store", zap.String("addr", addr)) return nil - }) + }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(3), retry.WithIsRetryableErr(cerror.IsRetryableError)) return } @@ -1100,38 +1099,31 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( regions []*tikv.Region err error ) - retryErr := retry.Run(50*time.Millisecond, maxRetry, - func() error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - scanT0 := time.Now() - bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) - regions, err = s.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.Start, nextSpan.End, limit) - scanRegionsDuration.WithLabelValues(captureAddr).Observe(time.Since(scanT0).Seconds()) - if err != nil { - return cerror.WrapError(cerror.ErrPDBatchLoadRegions, err) - } - metas := make([]*metapb.Region, 0, len(regions)) - for _, region := range regions { - if region.GetMeta() == nil { - err = cerror.ErrMetaNotInRegion.GenWithStackByArgs() - log.Warn("batch load region", zap.Stringer("span", nextSpan), zap.Error(err)) - return err - } - metas = append(metas, region.GetMeta()) - } - if !regionspan.CheckRegionsLeftCover(metas, nextSpan) { - err = cerror.ErrRegionsNotCoverSpan.GenWithStackByArgs(nextSpan, metas) - log.Warn("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas), zap.Error(err)) + retryErr := retry.Do(ctx, func() error { + scanT0 := time.Now() + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + regions, err = s.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.Start, nextSpan.End, limit) + scanRegionsDuration.WithLabelValues(captureAddr).Observe(time.Since(scanT0).Seconds()) + if err != nil { + return cerror.WrapError(cerror.ErrPDBatchLoadRegions, err) + } + metas := make([]*metapb.Region, 0, len(regions)) + for _, region := range regions { + if region.GetMeta() == nil { + err = cerror.ErrMetaNotInRegion.GenWithStackByArgs() + log.Warn("batch load region", zap.Stringer("span", nextSpan), zap.Error(err)) return err } - log.Debug("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas)) - return nil - }) - + metas = append(metas, region.GetMeta()) + } + if !regionspan.CheckRegionsLeftCover(metas, nextSpan) { + err = cerror.ErrRegionsNotCoverSpan.GenWithStackByArgs(nextSpan, metas) + log.Warn("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas), zap.Error(err)) + return err + } + log.Debug("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas)) + return nil + }, retry.WithBackoffMaxDelay(50), retry.WithMaxTries(100), retry.WithIsRetryableErr(cerror.IsRetryableError)) if retryErr != nil { return retryErr } diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 5329df61b05..4c0a5c537d1 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -200,7 +200,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( }() // wait all regions requested from cdc kv client - err = retry.Run(time.Millisecond*500, 20, func() error { + err = retry.Do(context.Background(), func() error { count := 0 requestIDs.Range(func(_, _ interface{}) bool { count++ @@ -210,7 +210,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( return nil } return errors.Errorf("region number %d is not as expected %d", count, regionNum) - }) + }, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20)) if err != nil { b.Error(err) } @@ -288,7 +288,7 @@ func prepareBench(b *testing.B, regionNum int) ( }() // wait all regions requested from cdc kv client - err = retry.Run(time.Millisecond*500, 20, func() error { + err = retry.Do(context.Background(), func() error { count := 0 requestIDs.Range(func(_, _ interface{}) bool { count++ @@ -298,7 +298,7 @@ func prepareBench(b *testing.B, regionNum int) ( return nil } return errors.Errorf("region number %d is not as expected %d", count, regionNum) - }) + }, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20)) if err != nil { b.Error(err) } @@ -373,13 +373,12 @@ func benchmarkSingleWorkerResolvedTs(b *testing.B, clientV2 bool) { } } }) - - err := retry.Run(time.Millisecond*500, 20, func() error { + err := retry.Do(context.Background(), func() error { if len(mockSrvCh) == 0 { return nil } return errors.New("not all events are sent yet") - }) + }, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20)) if err != nil { b.Error(err) } @@ -494,14 +493,14 @@ func benchmarkMultipleStoreResolvedTs(b *testing.B, clientV2 bool) { } }) - err := retry.Run(time.Millisecond*500, 1000, func() error { + err := retry.Do(context.Background(), func() error { for _, input := range inputs { if len(input) != 0 { return errors.New("not all events are sent yet") } } return nil - }) + }, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(1000)) if err != nil { b.Error(err) } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index c4df0fa45c8..71800be8d12 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -288,12 +288,13 @@ func newMockServiceSpecificAddr( // waitRequestID waits request ID larger than the given allocated ID func waitRequestID(c *check.C, allocatedID uint64) { - err := retry.Run(time.Millisecond*10, 20, func() error { + err := retry.Do(context.Background(), func() error { if currentRequestID() > allocatedID { return nil } return errors.Errorf("request id %d is not larger than %d", currentRequestID(), allocatedID) - }) + }, retry.WithBackoffBaseDelay(10), retry.WithMaxTries(20)) + c.Assert(err, check.IsNil) } @@ -1203,14 +1204,15 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) { // The expected request ids are agnostic because the kv client could retry // for more than one time, so we wait until the newly started server receives // requests for both two regions. - err = retry.Run(time.Millisecond*200, 10, func() error { + err = retry.Do(context.Background(), func() error { _, ok1 := requestIds.Load(regionID3) _, ok2 := requestIds.Load(regionID4) if ok1 && ok2 { return nil } return errors.New("waiting for kv client requests received by server") - }) + }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) + c.Assert(err, check.IsNil) reqID1, _ := requestIds.Load(regionID3) reqID2, _ := requestIds.Load(regionID4) @@ -1285,12 +1287,13 @@ func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) { waitRequestID(c, baseAllocatedID+1) initialized1 := mockInitializedEvent(regionID, currentRequestID()) ch1 <- initialized1 - err = retry.Run(time.Millisecond*200, 10, func() error { + err = retry.Do(context.Background(), func() error { if len(ch1) == 0 { return nil } return errors.New("message is not sent") - }) + }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) + c.Assert(err, check.IsNil) // another stream will be established, so we notify and wait the first @@ -1410,22 +1413,24 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { // wait request id allocated with: new session, new request waitRequestID(c, baseAllocatedID+1) - err = retry.Run(time.Millisecond*50, 10, func() error { + err = retry.Do(context.Background(), func() error { if atomic.LoadUint64(&requestID) == currentRequestID() { return nil } return errors.Errorf("request is not received, requestID: %d, expected: %d", atomic.LoadUint64(&requestID), currentRequestID()) - }) + }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10)) + c.Assert(err, check.IsNil) initialized1 := mockInitializedEvent(regionID, currentRequestID()) ch1 <- initialized1 - err = retry.Run(time.Millisecond*200, 10, func() error { + err = retry.Do(context.Background(), func() error { if len(ch1) == 0 { return nil } return errors.New("message is not sent") - }) + }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) + c.Assert(err, check.IsNil) resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ @@ -1436,12 +1441,13 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { }, }} ch1 <- resolved - err = retry.Run(time.Millisecond*200, 10, func() error { + err = retry.Do(context.Background(), func() error { if len(ch1) == 0 { return nil } return errors.New("message is not sent") - }) + }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) + c.Assert(err, check.IsNil) err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientStreamRecvError", "1*return(\"\")") c.Assert(err, check.IsNil) @@ -1461,22 +1467,24 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) { // wait request id allocated with: new session, new request*2 waitRequestID(c, baseAllocatedID+2) - err = retry.Run(time.Millisecond*50, 10, func() error { + err = retry.Do(context.Background(), func() error { if atomic.LoadUint64(&requestID) == currentRequestID() { return nil } return errors.Errorf("request is not received, requestID: %d, expected: %d", atomic.LoadUint64(&requestID), currentRequestID()) - }) + }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10)) + c.Assert(err, check.IsNil) initialized2 := mockInitializedEvent(regionID, currentRequestID()) ch1 <- initialized2 - err = retry.Run(time.Millisecond*200, 10, func() error { + err = retry.Do(context.Background(), func() error { if len(ch1) == 0 { return nil } return errors.New("message is not sent") - }) + }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) + c.Assert(err, check.IsNil) resolved = &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ @@ -1622,20 +1630,22 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) { cdcClient.Close() //nolint:errcheck }() - err = retry.Run(time.Millisecond*500, 20, func() error { + err = retry.Do(context.Background(), func() error { if atomic.LoadInt32(&call) >= versionGenCallBoundary { return nil } return errors.Errorf("version generator is not updated in time, call time %d", atomic.LoadInt32(&call)) - }) + }, retry.WithBackoffBaseDelay(500), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20)) + c.Assert(err, check.IsNil) - err = retry.Run(time.Millisecond*200, 10, func() error { + err = retry.Do(context.Background(), func() error { _, ok := requestIds.Load(regionID) if ok { return nil } return errors.New("waiting for kv client requests received by server") - }) + }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) + c.Assert(err, check.IsNil) reqID, _ := requestIds.Load(regionID) initialized := mockInitializedEvent(regionID, reqID.(uint64)) @@ -2830,13 +2840,14 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) { // The second TiKV could start up slowly, which causes the kv client retries // to TiKV for more than one time, so we can't determine the correct requestID // here, we must use the real request ID received by TiKV server - err = retry.Run(time.Millisecond*300, 10, func() error { + err = retry.Do(context.Background(), func() error { _, ok := requestIds.Load(regionID3) if ok { return nil } return errors.New("waiting for kv client requests received by server") - }) + }, retry.WithBackoffBaseDelay(300), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) + c.Assert(err, check.IsNil) requestID, _ := requestIds.Load(regionID3) @@ -2949,7 +2960,7 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { } // wait for all regions requested from cdc kv client - err = retry.Run(time.Millisecond*200, 20, func() error { + err = retry.Do(context.Background(), func() error { count := 0 requestIDs.Range(func(_, _ interface{}) bool { count++ @@ -2959,7 +2970,8 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) { return nil } return errors.Errorf("region number %d is not as expected %d", count, regionNum) - }) + }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20)) + c.Assert(err, check.IsNil) // send initialized event and a resolved ts event to each region diff --git a/cdc/kv/etcd.go b/cdc/kv/etcd.go index 580482382e0..aa3f7e5584e 100644 --- a/cdc/kv/etcd.go +++ b/cdc/kv/etcd.go @@ -60,6 +60,11 @@ const ( JobKeyPrefix = EtcdKeyBase + "/job" ) +const ( + putTaskStatusBackoffBaseDelayInMs = 100 + putTaskStatusMaxTries = 3 +) + // GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config func GetEtcdKeyChangeFeedList() string { return fmt.Sprintf("%s/changefeed/info", EtcdKeyBase) @@ -620,12 +625,7 @@ func (c CDCEtcdClient) AtomicPutTaskStatus( ) (*model.TaskStatus, int64, error) { var status *model.TaskStatus var newModRevision int64 - err := retry.Run(100*time.Millisecond, 3, func() error { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - default: - } + err := retry.Do(ctx, func() error { var modRevision int64 var err error modRevision, status, err = c.GetTaskStatus(ctx, changefeedID, captureID) @@ -669,7 +669,7 @@ func (c CDCEtcdClient) AtomicPutTaskStatus( } newModRevision = resp.Header.GetRevision() return nil - }) + }, retry.WithBackoffBaseDelay(putTaskStatusBackoffBaseDelayInMs), retry.WithMaxTries(putTaskStatusMaxTries), retry.WithIsRetryableErr(cerror.IsRetryableError)) if err != nil { return nil, newModRevision, errors.Trace(err) } diff --git a/cdc/owner/async_sink_test.go b/cdc/owner/async_sink_test.go index 5d20e3bef6c..7b8b1d02dbb 100644 --- a/cdc/owner/async_sink_test.go +++ b/cdc/owner/async_sink_test.go @@ -102,12 +102,12 @@ func (s *asyncSinkSuite) TestCheckpoint(c *check.C) { defer sink.Close() waitCheckpointGrowingUp := func(m *mockSink, targetTs model.Ts) error { - return retry.Run(100*time.Millisecond, 30, func() error { + return retry.Do(context.Background(), func() error { if targetTs != atomic.LoadUint64(&m.checkpointTs) { return errors.New("targetTs!=checkpointTs") } return nil - }) + }, retry.WithBackoffBaseDelay(100), retry.WithMaxTries(30)) } sink.EmitCheckpointTs(ctx, 1) c.Assert(waitCheckpointGrowingUp(mSink, 1), check.IsNil) diff --git a/cdc/owner/ddl_puller_test.go b/cdc/owner/ddl_puller_test.go index 371c0f5232a..4c692386e09 100644 --- a/cdc/owner/ddl_puller_test.go +++ b/cdc/owner/ddl_puller_test.go @@ -18,7 +18,6 @@ import ( "encoding/json" "sync" "sync/atomic" - "time" "github.com/pingcap/check" "github.com/pingcap/errors" @@ -220,12 +219,12 @@ func (s *ddlPullerSuite) TestPuller(c *check.C) { } func waitResolvedTsGrowing(c *check.C, p DDLPuller, targetTs model.Ts) { - err := retry.Run(20*time.Millisecond, 100, func() error { + err := retry.Do(context.Background(), func() error { resolvedTs, _ := p.FrontDDL() if resolvedTs < targetTs { return errors.New("resolvedTs < targetTs") } return nil - }) + }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(100)) c.Assert(err, check.IsNil) } diff --git a/cdc/processor.go b/cdc/processor.go index c5484aaff46..fef41e19012 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -65,6 +65,8 @@ const ( resolvedTsInterpolateInterval = 200 * time.Millisecond flushMemoryMetricsDuration = time.Second * 5 flowControlOutChSize = 128 + + maxTries = 3 ) type oldProcessor struct { @@ -324,7 +326,7 @@ func (p *oldProcessor) positionWorker(ctx context.Context) error { lastFlushTime := time.Now() retryFlushTaskStatusAndPosition := func() error { t0Update := time.Now() - err := retry.Run(500*time.Millisecond, 3, func() error { + err := retry.Do(ctx, func() error { inErr := p.flushTaskStatusAndPosition(ctx) if inErr != nil { if errors.Cause(inErr) != context.Canceled { @@ -337,11 +339,11 @@ func (p *oldProcessor) positionWorker(ctx context.Context) error { logError("update info failed", util.ZapFieldChangefeed(ctx), errField) } if p.isStopped() || cerror.ErrAdminStopProcessor.Equal(inErr) { - return backoff.Permanent(cerror.ErrAdminStopProcessor.FastGenByArgs()) + return cerror.ErrAdminStopProcessor.FastGenByArgs() } } return inErr - }) + }, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(maxTries), retry.WithIsRetryableErr(isRetryable)) updateInfoDuration. WithLabelValues(p.captureInfo.AdvertiseAddr). Observe(time.Since(t0Update).Seconds()) @@ -433,6 +435,10 @@ func (p *oldProcessor) positionWorker(ctx context.Context) error { } } +func isRetryable(err error) bool { + return cerror.IsRetryableError(err) && cerror.ErrAdminStopProcessor.NotEqual(err) +} + func (p *oldProcessor) ddlPullWorker(ctx context.Context) error { ddlRawKVCh := puller.SortOutput(ctx, p.ddlPuller.Output()) var ddlRawKV *model.RawKVEntry @@ -769,13 +775,14 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo defer p.stateMu.Unlock() var tableName string - err := retry.Run(time.Millisecond*5, 3, func() error { + + err := retry.Do(ctx, func() error { if name, ok := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID); ok { tableName = name.QuoteString() return nil } return errors.Errorf("failed to get table name, fallback to use table id: %d", tableID) - }) + }, retry.WithBackoffBaseDelay(5), retry.WithMaxTries(maxTries), retry.WithIsRetryableErr(cerror.IsRetryableError)) if err != nil { log.Warn("get table name for metric", util.ZapFieldChangefeed(ctx), zap.String("error", err.Error())) tableName = strconv.Itoa(int(tableID)) diff --git a/cdc/processor/pipeline/mounter_test.go b/cdc/processor/pipeline/mounter_test.go index db19dda8896..3ff2894735b 100644 --- a/cdc/processor/pipeline/mounter_test.go +++ b/cdc/processor/pipeline/mounter_test.go @@ -14,6 +14,7 @@ package pipeline import ( + "context" "errors" "sync" "sync/atomic" @@ -22,7 +23,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/pkg/context" + cdcContext "github.com/pingcap/ticdc/pkg/context" "github.com/pingcap/ticdc/pkg/pipeline" "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/ticdc/pkg/util/testleak" @@ -89,10 +90,10 @@ func generateMockRawKV(ts uint64) *model.RawKVEntry { func (s *mounterNodeSuite) TestMounterNodeBasics(c *check.C) { defer testleak.AfterTest(c)() - ctx, cancel := context.WithCancel(context.NewBackendContext4Test(false)) + ctx, cancel := cdcContext.WithCancel(cdcContext.NewBackendContext4Test(false)) defer cancel() - ctx = context.WithErrorHandler(ctx, func(err error) error { + ctx = cdcContext.WithErrorHandler(ctx, func(err error) error { return nil }) runnersSize, outputChannelSize := 2, 64 @@ -109,9 +110,9 @@ func (s *mounterNodeSuite) TestMounterNodeBasics(c *check.C) { var sentCount int64 sendMsg := func(p *pipeline.Pipeline, msg *pipeline.Message) { - err := retry.Run(10*time.Millisecond, 100, func() error { + err := retry.Do(context.Background(), func() error { return p.SendToFirstNode(msg) - }) + }, retry.WithBackoffBaseDelay(10), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(100)) atomic.AddInt64(&sentCount, 1) c.Assert(err, check.IsNil) } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 55bc83e1109..8a9421daa6c 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -50,6 +50,9 @@ const ( defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G schemaStorageGCLag = time.Minute * 20 + + backoffBaseDelayInMs = 5 + maxTries = 3 ) type processor struct { @@ -689,17 +692,17 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode return nil }) var tableName *model.TableName - retry.Run(time.Millisecond*5, 3, func() error { //nolint:errcheck + retry.Do(ctx, func() error { //nolint:errcheck if name, ok := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID); ok { tableName = &name return nil } return errors.Errorf("failed to get table name, fallback to use table id: %d", tableID) - }) + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithMaxTries(maxTries), retry.WithIsRetryableErr(cerror.IsRetryableError)) if p.changefeed.Info.Config.Cyclic.IsEnabled() { // Retry to find mark table ID var markTableID model.TableID - err := retry.Run(50*time.Millisecond, 20, func() error { + err := retry.Do(context.Background(), func() error { if tableName == nil { name, exist := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID) if !exist { @@ -707,14 +710,14 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode } tableName = &name } - markTableSchameName, markTableTableName := mark.GetMarkTableName(tableName.Schema, tableName.Table) - tableInfo, exist := p.schemaStorage.GetLastSnapshot().GetTableByName(markTableSchameName, markTableTableName) + markTableSchemaName, markTableTableName := mark.GetMarkTableName(tableName.Schema, tableName.Table) + tableInfo, exist := p.schemaStorage.GetLastSnapshot().GetTableByName(markTableSchemaName, markTableTableName) if !exist { return cerror.ErrProcessorTableNotFound.GenWithStack("normal table(%s) and mark table not match", tableName.String()) } markTableID = tableInfo.ID return nil - }) + }, retry.WithBackoffMaxDelay(50), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20)) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 93740b1b145..fb794fbd454 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -18,12 +18,12 @@ import ( "context" "fmt" "sync" - "time" "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" + cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/ticdc/pkg/security" @@ -170,13 +170,14 @@ func (s *pullerSuite) TestPullerResolvedForward(c *check.C) { c.Assert(ev.OpType, check.Equals, model.OpTypeResolved) c.Assert(ev.CRTs, check.Equals, uint64(1000)) c.Assert(plr.IsInitialized(), check.IsTrue) - err := retry.Run(time.Millisecond*10, 10, func() error { + err := retry.Do(context.Background(), func() error { ts := plr.GetResolvedTs() if ts != uint64(1000) { return errors.Errorf("resolved ts %d of puller does not forward to 1000", ts) } return nil - }) + }, retry.WithBackoffBaseDelay(10), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerrors.IsRetryableError)) + c.Assert(err, check.IsNil) store.Close() diff --git a/cdc/puller/sorter/sorter_test.go b/cdc/puller/sorter/sorter_test.go index 59469c27f61..1d69ac8330f 100644 --- a/cdc/puller/sorter/sorter_test.go +++ b/cdc/puller/sorter/sorter_test.go @@ -16,16 +16,13 @@ package sorter import ( "context" "math" + _ "net/http/pprof" "os" "path/filepath" "sync/atomic" "testing" "time" - "go.uber.org/zap/zapcore" - - _ "net/http/pprof" - "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -34,6 +31,7 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/util/testleak" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" ) @@ -81,7 +79,7 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) { sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0") c.Assert(err, check.IsNil) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() err = testSorter(ctx, c, sorter, 10000, true) c.Assert(err, check.ErrorMatches, ".*context cancel.*") diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 2e1d131dde3..dcdb5c377d4 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -25,7 +25,6 @@ import ( "sync/atomic" "time" - "github.com/cenkalti/backoff" dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -66,6 +65,12 @@ const ( defaultSafeMode = true ) +const ( + backoffBaseDelayInMs = 500 + // in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second + backoffMaxDelayInMs = 60 * 1000 +) + // SyncpointTableName is the name of table where all syncpoint maps sit const syncpointTableName string = "syncpoint_v1" @@ -184,7 +189,7 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error ) return cerror.ErrDDLEventIgnored.GenWithStackByArgs() } - err := s.execDDLWithMaxRetries(ctx, ddl, defaultDDLMaxRetryTime) + err := s.execDDLWithMaxRetries(ctx, ddl) return errors.Trace(err) } @@ -193,22 +198,18 @@ func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab return nil } -func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent, maxRetries uint64) error { - return retry.Run(500*time.Millisecond, maxRetries, - func() error { - err := s.execDDL(ctx, ddl) - if isIgnorableDDLError(err) { - log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err)) - return nil - } - if errors.Cause(err) == context.Canceled { - return backoff.Permanent(err) - } - if err != nil { - log.Warn("execute DDL with error, retry later", zap.String("query", ddl.Query), zap.Error(err)) - } - return err - }) +func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error { + return retry.Do(ctx, func() error { + err := s.execDDL(ctx, ddl) + if isIgnorableDDLError(err) { + log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err)) + return nil + } + if err != nil { + log.Warn("execute DDL with error, retry later", zap.String("query", ddl.Query), zap.Error(err)) + } + return err + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(defaultDDLMaxRetryTime), retry.WithIsRetryableErr(cerror.IsRetryableError)) } func (s *mysqlSink) execDDL(ctx context.Context, ddl *model.DDLEvent) error { @@ -867,68 +868,86 @@ func (s *mysqlSink) Close() error { return cerror.WrapError(cerror.ErrMySQLConnectionError, err) } -func (s *mysqlSink) execDMLWithMaxRetries( - ctx context.Context, dmls *preparedDMLs, maxRetries uint64, bucket int, -) error { +func logDMLTxnErr(err error) error { + if isRetryableDMLError(err) { + log.Warn("execute DMLs with error, retry later", zap.Error(err)) + } + return err +} + +func isRetryableDMLError(err error) bool { + if !cerror.IsRetryableError(err) { + return false + } + + errCode, ok := getSQLErrCode(err) + if !ok { + return true + } + + switch errCode { + case mysql.ErrNoSuchTable, mysql.ErrBadDB: + return false + } + return true +} + +func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDMLs, bucket int) error { if len(dmls.sqls) != len(dmls.values) { log.Panic("unexpected number of sqls and values", zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values)) } - checkTxnErr := func(err error) error { - if errors.Cause(err) == context.Canceled { - return backoff.Permanent(err) - } - log.Warn("execute DMLs with error, retry later", zap.Error(err)) - return err - } - return retry.Run(500*time.Millisecond, maxRetries, - func() error { - failpoint.Inject("MySQLSinkTxnRandomError", func() { - failpoint.Return(checkTxnErr(errors.Trace(dmysql.ErrInvalidConn))) - }) - failpoint.Inject("MySQLSinkHangLongTime", func() { - time.Sleep(time.Hour) - }) - err := s.statistics.RecordBatchExecution(func() (int, error) { - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return 0, checkTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) - } - for i, query := range dmls.sqls { - args := dmls.values[i] - log.Debug("exec row", zap.String("sql", query), zap.Any("args", args)) - if _, err := tx.ExecContext(ctx, query, args...); err != nil { - if rbErr := tx.Rollback(); rbErr != nil { - log.Warn("failed to rollback txn", zap.Error(err)) - } - return 0, checkTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) + + return retry.Do(ctx, func() error { + failpoint.Inject("MySQLSinkTxnRandomError", func() { + failpoint.Return(logDMLTxnErr(errors.Trace(dmysql.ErrInvalidConn))) + }) + failpoint.Inject("MySQLSinkHangLongTime", func() { + time.Sleep(time.Hour) + }) + + err := s.statistics.RecordBatchExecution(func() (int, error) { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return 0, logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) + } + + for i, query := range dmls.sqls { + args := dmls.values[i] + log.Debug("exec row", zap.String("sql", query), zap.Any("args", args)) + if _, err := tx.ExecContext(ctx, query, args...); err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + log.Warn("failed to rollback txn", zap.Error(err)) } + return 0, logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) } - if len(dmls.markSQL) != 0 { - log.Debug("exec row", zap.String("sql", dmls.markSQL)) - if _, err := tx.ExecContext(ctx, dmls.markSQL); err != nil { - if rbErr := tx.Rollback(); rbErr != nil { - log.Warn("failed to rollback txn", zap.Error(err)) - } - return 0, checkTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) + } + + if len(dmls.markSQL) != 0 { + log.Debug("exec row", zap.String("sql", dmls.markSQL)) + if _, err := tx.ExecContext(ctx, dmls.markSQL); err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + log.Warn("failed to rollback txn", zap.Error(err)) } + return 0, logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) } - if err = tx.Commit(); err != nil { - return 0, checkTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) - } - return dmls.rowCount, nil - }) - if err != nil { - return errors.Trace(err) } - log.Debug("Exec Rows succeeded", - zap.String("changefeed", s.params.changefeedID), - zap.Int("num of Rows", dmls.rowCount), - zap.Int("bucket", bucket)) - return nil - }, - ) + + if err = tx.Commit(); err != nil { + return 0, logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) + } + return dmls.rowCount, nil + }) + if err != nil { + return errors.Trace(err) + } + log.Debug("Exec Rows succeeded", + zap.String("changefeed", s.params.changefeedID), + zap.Int("num of Rows", dmls.rowCount), + zap.Int("bucket", bucket)) + return nil + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(defaultDMLMaxRetryTime), retry.WithIsRetryableErr(isRetryableDMLError)) } type preparedDMLs struct { @@ -1041,7 +1060,7 @@ func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, }) dmls := s.prepareDMLs(rows, replicaID, bucket) log.Debug("prepare DMLs", zap.Any("rows", rows), zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values)) - if err := s.execDMLWithMaxRetries(ctx, dmls, defaultDMLMaxRetryTime, bucket); err != nil { + if err := s.execDMLWithMaxRetries(ctx, dmls, bucket); err != nil { log.Error("execute DMLs failed", zap.String("err", err.Error())) return errors.Trace(err) } diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index d20d04ddb28..0a90390f1f2 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -1175,31 +1175,33 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { err = sink.EmitRowChangedEvents(ctx, rows...) c.Assert(err, check.IsNil) - err = retry.Run(time.Millisecond*20, 10, func() error { + err = retry.Do(context.Background(), func() error { ts, err := sink.FlushRowChangedEvents(ctx, uint64(2)) c.Assert(err, check.IsNil) if ts < uint64(2) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 2) } return nil - }) + }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) + c.Assert(err, check.IsNil) - err = retry.Run(time.Millisecond*20, 10, func() error { + err = retry.Do(context.Background(), func() error { ts, err := sink.FlushRowChangedEvents(ctx, uint64(4)) c.Assert(err, check.IsNil) if ts < uint64(4) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 4) } return nil - }) + }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) + c.Assert(err, check.IsNil) err = sink.Close() c.Assert(err, check.IsNil) } -func (s MySQLSinkSuite) TestExecDMLRollback(c *check.C) { +func (s MySQLSinkSuite) TestExecDMLRollbackErrDatabaseNotExists(c *check.C) { defer testleak.AfterTest(c)() rows := []*model.RowChangedEvent{ @@ -1220,8 +1222,9 @@ func (s MySQLSinkSuite) TestExecDMLRollback(c *check.C) { errDatabaseNotExists := &dmysql.MySQLError{ Number: uint16(infoschema.ErrDatabaseNotExists.Code()), } + dbIndex := 0 - mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + mockGetDBConnErrDatabaseNotExists := func(ctx context.Context, dsnStr string) (*sql.DB, error) { defer func() { dbIndex++ }() @@ -1234,18 +1237,154 @@ func (s MySQLSinkSuite) TestExecDMLRollback(c *check.C) { // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) c.Assert(err, check.IsNil) - for i := 0; i < defaultDMLMaxRetryTime+1; i++ { + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). + WithArgs(1, 2). + WillReturnError(errDatabaseNotExists) + mock.ExpectRollback() + mock.ExpectClose() + return db, nil + } + backupGetDBConn := getDBConnImpl + getDBConnImpl = mockGetDBConnErrDatabaseNotExists + defer func() { + getDBConnImpl = backupGetDBConn + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + changefeed := "test-changefeed" + sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") + c.Assert(err, check.IsNil) + rc := config.GetDefaultReplicaConfig() + f, err := filter.NewFilter(rc) + c.Assert(err, check.IsNil) + sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) + c.Assert(err, check.IsNil) + + err = sink.(*mysqlSink).execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */) + c.Assert(errors.Cause(err), check.Equals, errDatabaseNotExists) + + err = sink.Close() + c.Assert(err, check.IsNil) +} + +func (s MySQLSinkSuite) TestExecDMLRollbackErrTableNotExists(c *check.C) { + defer testleak.AfterTest(c)() + + rows := []*model.RowChangedEvent{ + { + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 1}, + }, + }, + { + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 2}, + }, + }, + } + + errTableNotExists := &dmysql.MySQLError{ + Number: uint16(infoschema.ErrTableNotExists.Code()), + } + + dbIndex := 0 + mockGetDBConnErrDatabaseNotExists := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex == 0 { + // test db + db, err := mockTestDB() + c.Assert(err, check.IsNil) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + c.Assert(err, check.IsNil) + mock.ExpectBegin() + mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). + WithArgs(1, 2). + WillReturnError(errTableNotExists) + mock.ExpectRollback() + mock.ExpectClose() + return db, nil + } + backupGetDBConn := getDBConnImpl + getDBConnImpl = mockGetDBConnErrDatabaseNotExists + defer func() { + getDBConnImpl = backupGetDBConn + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + changefeed := "test-changefeed" + sinkURI, err := url.Parse("mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1") + c.Assert(err, check.IsNil) + rc := config.GetDefaultReplicaConfig() + f, err := filter.NewFilter(rc) + c.Assert(err, check.IsNil) + sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, map[string]string{}) + c.Assert(err, check.IsNil) + + err = sink.(*mysqlSink).execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */) + c.Assert(errors.Cause(err), check.Equals, errTableNotExists) + + err = sink.Close() + c.Assert(err, check.IsNil) +} + +func (s MySQLSinkSuite) TestExecDMLRollbackErrRetryable(c *check.C) { + defer testleak.AfterTest(c)() + + rows := []*model.RowChangedEvent{ + { + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 1}, + }, + }, + { + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + Columns: []*model.Column{ + {Name: "a", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, Value: 2}, + }, + }, + } + + errLockDeadlock := &dmysql.MySQLError{ + Number: mysql.ErrLockDeadlock, + } + + dbIndex := 0 + mockGetDBConnErrDatabaseNotExists := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex == 0 { + // test db + db, err := mockTestDB() + c.Assert(err, check.IsNil) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + c.Assert(err, check.IsNil) + for i := 0; i < defaultDMLMaxRetryTime; i++ { mock.ExpectBegin() mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). WithArgs(1, 2). - WillReturnError(errDatabaseNotExists) + WillReturnError(errLockDeadlock) mock.ExpectRollback() } mock.ExpectClose() return db, nil } backupGetDBConn := getDBConnImpl - getDBConnImpl = mockGetDBConn + getDBConnImpl = mockGetDBConnErrDatabaseNotExists defer func() { getDBConnImpl = backupGetDBConn }() @@ -1262,7 +1401,7 @@ func (s MySQLSinkSuite) TestExecDMLRollback(c *check.C) { c.Assert(err, check.IsNil) err = sink.(*mysqlSink).execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */) - c.Assert(errors.Cause(err), check.Equals, errDatabaseNotExists) + c.Assert(errors.Cause(err), check.Equals, errLockDeadlock) err = sink.Close() c.Assert(err, check.IsNil) diff --git a/integration/framework/docker_compose_op.go b/integration/framework/docker_compose_op.go index ef5ed9f3b3f..986b93a3c07 100644 --- a/integration/framework/docker_compose_op.go +++ b/integration/framework/docker_compose_op.go @@ -17,12 +17,13 @@ import ( "database/sql" "os" "os/exec" - "time" "github.com/pingcap/errors" "github.com/pingcap/log" + cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" "go.uber.org/zap" + "golang.org/x/net/context" ) // DockerComposeOperator represent a docker compose @@ -53,7 +54,7 @@ func (d *DockerComposeOperator) Setup() { // WaitClusterStarted waits the cluster is started and ready func (d *DockerComposeOperator) WaitClusterStarted() { if d.HealthChecker != nil { - err := retry.Run(time.Second, 120, d.HealthChecker) + err := retry.Do(context.Background(), d.HealthChecker, retry.WithBackoffBaseDelay(1000), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(120), retry.WithIsRetryableErr(cerrors.IsRetryableError)) if err != nil { log.Fatal("Docker service health check failed after max retries", zap.Error(err)) } @@ -75,7 +76,7 @@ func (d *DockerComposeOperator) RestartComponents(names ...string) { } func waitTiDBStarted(dsn string) error { - return retry.Run(time.Second, 60, func() error { + return retry.Do(context.Background(), func() error { upstream, err := sql.Open("mysql", dsn) if err != nil { return errors.Trace(err) @@ -86,7 +87,7 @@ func waitTiDBStarted(dsn string) error { return errors.Trace(err) } return nil - }) + }, retry.WithBackoffBaseDelay(1000), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(60), retry.WithIsRetryableErr(cerrors.IsRetryableError)) } func runCmdHandleError(cmd *exec.Cmd) []byte { diff --git a/integration/framework/docker_env.go b/integration/framework/docker_env.go index 9a0f5ce6264..1e687912839 100644 --- a/integration/framework/docker_env.go +++ b/integration/framework/docker_env.go @@ -18,9 +18,9 @@ import ( "database/sql" "fmt" "os/exec" - "time" "github.com/pingcap/log" + cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" "go.uber.org/zap" ) @@ -90,7 +90,7 @@ func (e *DockerEnv) RunTest(task Task) { Downstream: downstream, Env: e, WaitForReady: func() error { - return retry.Run(time.Second, 120, e.HealthChecker) + return retry.Do(context.Background(), e.HealthChecker, retry.WithBackoffBaseDelay(1000), retry.WithMaxTries(120), retry.WithIsRetryableErr(cerrors.IsRetryableError)) }, Ctx: context.Background(), } diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 2d50f52602a..85cc1798864 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -14,6 +14,8 @@ package errors import ( + "context" + "github.com/pingcap/errors" ) @@ -59,3 +61,16 @@ func RFCCode(err error) (errors.RFCErrorCode, bool) { } return "", false } + +// IsRetryableError check the error is safe or worth to retry +func IsRetryableError(err error) bool { + if err == nil { + return false + } + + switch errors.Cause(err) { + case context.Canceled, context.DeadlineExceeded: + return false + } + return true +} diff --git a/pkg/errors/helper_test.go b/pkg/errors/helper_test.go index bfc72dc8e80..973692be561 100644 --- a/pkg/errors/helper_test.go +++ b/pkg/errors/helper_test.go @@ -14,6 +14,7 @@ package errors import ( + "context" "testing" "github.com/pingcap/check" @@ -53,3 +54,23 @@ func (s *helperSuite) TestWrapError(c *check.C) { } } } + +func (s *helperSuite) TestIsRetryableError(c *check.C) { + defer testleak.AfterTest(c)() + + tests := []struct { + name string + err error + want bool + }{ + {"nil error", nil, false}, + {"context Canceled err", context.Canceled, false}, + {"context DeadlineExceeded err", context.DeadlineExceeded, false}, + {"normal err", errors.New("test"), true}, + {"cdc reachMaxTry err", ErrReachMaxTry, true}, + } + for _, tt := range tests { + ret := IsRetryableError(tt.err) + c.Assert(ret, check.Equals, tt.want, check.Commentf("case:%s", tt.name)) + } +} diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 3b09ac33957..94a693da8d0 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -15,18 +15,16 @@ package etcd import ( "context" - "time" - - "github.com/cenkalti/backoff" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" - "google.golang.org/grpc/codes" "github.com/pingcap/errors" "github.com/pingcap/log" + cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" + "google.golang.org/grpc/codes" ) // etcd operation names @@ -39,6 +37,15 @@ const ( EtcdRevoke = "Revoke" ) +const ( + backoffBaseDelayInMs = 500 + // in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second + backoffMaxDelayInMs = 60 * 1000 +) + +// set to var instead of const for mocking the value to speedup test +var maxTries int64 = 8 + // Client is a simple wrapper that adds retry to etcd RPC type Client struct { cli *clientv3.Client @@ -61,17 +68,16 @@ func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) e // Retry at least two election timeout to handle the case that two PDs restarted // (the first election maybe failed). // 16s = \sum_{n=0}^{6} 0.5*1.5^n - return retry.Run(500*time.Millisecond, 7+1, // +1 for the inital request. - func() error { - err := etcdRPC() - if err != nil && errors.Cause(err) != context.Canceled { - log.Warn("etcd RPC failed", zap.String("RPC", rpcName), zap.Error(err)) - } - if metric != nil { - metric.Inc() - } - return err - }) + return retry.Do(context.Background(), func() error { + err := etcdRPC() + if err != nil && errors.Cause(err) != context.Canceled { + log.Warn("etcd RPC failed", zap.String("RPC", rpcName), zap.Error(err)) + } + if metric != nil { + metric.Inc() + } + return err + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(maxTries), retry.WithIsRetryableErr(isRetryableError(rpcName))) } // Put delegates request to clientv3.KV.Put @@ -121,17 +127,27 @@ func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGran return } +func isRetryableError(rpcName string) retry.IsRetryable { + return func(err error) bool { + if !cerrors.IsRetryableError(err) { + return false + } + if rpcName == EtcdRevoke { + if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound { + // it means the etcd lease is already expired or revoked + return false + } + } + + return true + } +} + // Revoke delegates request to clientv3.Lease.Revoke func (c *Client) Revoke(ctx context.Context, id clientv3.LeaseID) (resp *clientv3.LeaseRevokeResponse, err error) { err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error { var inErr error resp, inErr = c.cli.Revoke(ctx, id) - if inErr == nil { - return nil - } else if etcdErr := inErr.(rpctypes.EtcdError); etcdErr.Code() == codes.NotFound { - // it means the etcd lease is already expired or revoked - return backoff.Permanent(inErr) - } return inErr }) return diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index 6fc7d8899fe..88d248fc409 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -47,6 +47,10 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3. func (s *clientSuite) TestRetry(c *check.C) { defer testleak.AfterTest(c)() + originValue := maxTries + // to speedup the test + maxTries = 2 + cli := clientv3.NewCtxClient(context.TODO()) cli.KV = &mockClient{} retrycli := Wrap(cli, nil) @@ -54,10 +58,10 @@ func (s *clientSuite) TestRetry(c *check.C) { c.Assert(err, check.IsNil) c.Assert(get, check.NotNil) - // TODO: speed test, it take about 6.59s _, err = retrycli.Put(context.TODO(), "", "") c.Assert(err, check.NotNil) - c.Assert(err, check.ErrorMatches, "mock error") + c.Assert(errors.Cause(err), check.ErrorMatches, "mock error", check.Commentf("err:%v", err.Error())) + maxTries = originValue } func (s *etcdSuite) TestDelegateLease(c *check.C) { diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 61161cce195..bd8bc41a34a 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" ) +// Run is Deprecated, try to use Do instead. // Run retries the specified function on error for at most maxRetries times. // It stops retry if the returned error is context.Canceled or context.DeadlineExceeded. func Run(initialInterval time.Duration, maxRetries uint64, f func() error) error { @@ -38,6 +39,7 @@ func Run(initialInterval time.Duration, maxRetries uint64, f func() error) error }, retryCfg) } +// RunWithInfiniteRetry is Deprecated, try to use instead Do WithInfiniteTries option instead. // RunWithInfiniteRetry retries the specified function indefinitely, until a backoff.PermanentError is encountered. // notifyFunc will be called each time before sleeping with the total elapsed time. func RunWithInfiniteRetry(initialInterval time.Duration, f func() error, notifyFunc func(elapsed time.Duration)) error { diff --git a/pkg/workerpool/async_pool_impl.go b/pkg/workerpool/async_pool_impl.go index 81ad5b2067d..e2bf3ab6113 100644 --- a/pkg/workerpool/async_pool_impl.go +++ b/pkg/workerpool/async_pool_impl.go @@ -17,15 +17,18 @@ import ( "context" "sync" "sync/atomic" - "time" - "github.com/cenkalti/backoff" "github.com/pingcap/errors" cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" "golang.org/x/sync/errgroup" ) +const ( + backoffBaseDelayInMs = 1 + maxTries = 25 +) + type defaultAsyncPoolImpl struct { workers []*asyncWorker nextWorkerID int32 @@ -50,18 +53,15 @@ func (p *defaultAsyncPoolImpl) Go(ctx context.Context, f func()) error { if p.doGo(ctx, f) == nil { return nil } - return errors.Trace(retry.Run(time.Millisecond*1, 25, func() error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - err := errors.Trace(p.doGo(ctx, f)) - if err != nil && cerrors.ErrAsyncPoolExited.NotEqual(errors.Cause(err)) { - return backoff.Permanent(err) - } - return err - })) + + err := retry.Do(ctx, func() error { + return errors.Trace(p.doGo(ctx, f)) + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithMaxTries(maxTries), retry.WithIsRetryableErr(isRetryable)) + return errors.Trace(err) +} + +func isRetryable(err error) bool { + return cerrors.IsRetryableError(err) && cerrors.ErrAsyncPoolExited.Equal(err) } func (p *defaultAsyncPoolImpl) doGo(ctx context.Context, f func()) error { diff --git a/tests/bank/case.go b/tests/bank/case.go index 8204aab743c..1b0930cbd09 100644 --- a/tests/bank/case.go +++ b/tests/bank/case.go @@ -325,7 +325,9 @@ func prepareImpl( batchInsertSQL := batchInsertSQLF(size, startIndex) start := time.Now() - err := retry.Run(100*time.Millisecond, 5, func() error { return insertF(batchInsertSQL) }) + err := retry.Do(context.Background(), func() error { + return insertF(batchInsertSQL) + }, retry.WithBackoffBaseDelay(100), retry.WithBackoffMaxDelay(60*100), retry.WithMaxTries(5)) if err != nil { log.Panic("exec batch insert failed", zap.String("query", batchInsertSQL), zap.Error(err)) } @@ -376,7 +378,8 @@ func mustExec(ctx context.Context, db *sql.DB, query string) { _, err := db.ExecContext(ctx, query) return err } - err := retry.Run(100*time.Millisecond, 5, execF) + + err := retry.Do(context.Background(), execF, retry.WithBackoffBaseDelay(100), retry.WithBackoffMaxDelay(60*100), retry.WithMaxTries(5)) if err != nil { log.Panic("exec failed", zap.String("query", query), zap.Error(err)) } diff --git a/tests/move_table/main.go b/tests/move_table/main.go index e05a88d32f5..0321660fe2e 100644 --- a/tests/move_table/main.go +++ b/tests/move_table/main.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/kv" + cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/retry" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/pkg/logutil" @@ -55,8 +56,7 @@ func main() { if err != nil { log.Fatal("failed to create cluster info", zap.Error(err)) } - - err = retry.Run(100*time.Millisecond, 20, func() error { + err = retry.Do(ctx, func() error { err := cluster.refreshInfo(ctx) if err != nil { log.Warn("error refreshing cluster info", zap.Error(err)) @@ -68,7 +68,7 @@ func main() { return errors.New("too few captures") } return nil - }) + }, retry.WithBackoffBaseDelay(100), retry.WithMaxTries(20), retry.WithIsRetryableErr(cerrors.IsRetryableError)) if err != nil { log.Fatal("Fail to get captures", zap.Error(err)) @@ -110,9 +110,9 @@ func main() { log.Info("all tables are moved", zap.String("sourceCapture", sourceCapture), zap.String("targetCapture", targetCapture)) for counter := 0; counter < 30; counter++ { - err := retry.Run(100*time.Millisecond, 5, func() error { + err := retry.Do(ctx, func() error { return cluster.refreshInfo(ctx) - }) + }, retry.WithBackoffBaseDelay(100), retry.WithMaxTries(5+1), retry.WithIsRetryableErr(cerrors.IsRetryableError)) if err != nil { log.Warn("error refreshing cluster info", zap.Error(err)) }