diff --git a/cdc/owner.go b/cdc/owner.go index 3b1fc2b9510..b052319384e 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -577,9 +577,10 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if !o.gcSafepointLastUpdate.IsZero() { _, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, 0, 0) if err != nil { - return cerror.WrapError(cerror.ErrOwnerUpdateGCSafepoint, err) + log.Warn("failed to update service safe point", zap.Error(err)) + } else { + o.gcSafepointLastUpdate = time.Time{} } - o.gcSafepointLastUpdate = *new(time.Time) } return nil } @@ -609,10 +610,10 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if time.Since(o.gcSafepointLastUpdate) > GCSafepointUpdateInterval { _, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs) if err != nil { - log.Info("failed to update service safe point", zap.Error(err)) - return cerror.WrapError(cerror.ErrOwnerUpdateGCSafepoint, err) + log.Warn("failed to update service safe point", zap.Error(err)) + } else { + o.gcSafepointLastUpdate = time.Now() } - o.gcSafepointLastUpdate = time.Now() } return nil } diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 679e5cda256..9fae6b7143e 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -15,6 +15,7 @@ package cdc import ( "context" + "errors" "net/url" "time" @@ -34,6 +35,7 @@ import ( "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/store/mockstore" + pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "golang.org/x/sync/errgroup" @@ -74,6 +76,29 @@ func (s *ownerSuite) TearDownTest(c *check.C) { } } +type mockPDClient struct { + pd.Client + invokeCounter int +} + +func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + m.invokeCounter++ + return 0, errors.New("mock error") +} + +func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) { + mockPDCli := &mockPDClient{} + mockOwner := Owner{ + pdClient: mockPDCli, + gcSafepointLastUpdate: time.Now(), + } + + // Owner should ignore UpdateServiceGCSafePoint error. + err := mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 1) +} + /* type handlerForPrueDMLTest struct { mu sync.RWMutex diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 7e8e9fbd118..813396188e6 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -168,7 +168,6 @@ var ( ErrAPIInvalidParam = errors.Normalize("invalid api parameter", errors.RFCCodeText("CDC:ErrAPIInvalidParam")) ErrInternalServerError = errors.Normalize("internal server error", errors.RFCCodeText("CDC:ErrInternalServerError")) ErrOwnerSortDir = errors.Normalize("owner sort dir", errors.RFCCodeText("CDC:ErrOwnerSortDir")) - ErrOwnerUpdateGCSafepoint = errors.Normalize("owner update gc safepoint", errors.RFCCodeText("CDC:ErrOwnerUpdateGCSafepoint")) ErrOwnerChangefeedNotFound = errors.Normalize("changefeed %s not found in owner cache", errors.RFCCodeText("CDC:ErrOwnerChangefeedNotFound")) ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState")) ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType"))