From 6c98959f20c96fbbedc651842db138d5f80d6d93 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 30 Sep 2020 16:22:01 +0800 Subject: [PATCH 1/3] owner, errors: ignore update gc safepoint error Signed-off-by: Neil Shen --- cdc/owner.go | 11 ++++++----- cdc/owner_test.go | 22 ++++++++++++++++++++++ pkg/errors/errors.go | 1 - 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index 3b1fc2b9510..b052319384e 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -577,9 +577,10 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if !o.gcSafepointLastUpdate.IsZero() { _, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, 0, 0) if err != nil { - return cerror.WrapError(cerror.ErrOwnerUpdateGCSafepoint, err) + log.Warn("failed to update service safe point", zap.Error(err)) + } else { + o.gcSafepointLastUpdate = time.Time{} } - o.gcSafepointLastUpdate = *new(time.Time) } return nil } @@ -609,10 +610,10 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if time.Since(o.gcSafepointLastUpdate) > GCSafepointUpdateInterval { _, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs) if err != nil { - log.Info("failed to update service safe point", zap.Error(err)) - return cerror.WrapError(cerror.ErrOwnerUpdateGCSafepoint, err) + log.Warn("failed to update service safe point", zap.Error(err)) + } else { + o.gcSafepointLastUpdate = time.Now() } - o.gcSafepointLastUpdate = time.Now() } return nil } diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 679e5cda256..11ec372dcee 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -15,6 +15,7 @@ package cdc import ( "context" + "errors" "net/url" "time" @@ -34,6 +35,7 @@ import ( "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/store/mockstore" + pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "golang.org/x/sync/errgroup" @@ -74,6 +76,26 @@ func (s *ownerSuite) TearDownTest(c *check.C) { } } +type mockPDClient struct { + pd.Client + err error +} + +func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + return 0, errors.New("mock error") +} + +func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) { + mockOwner := Owner{ + pdClient: &mockPDClient{}, + gcSafepointLastUpdate: time.Now(), + } + + // Owner should ignore UpdateServiceGCSafePoint error. + err := mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) +} + /* type handlerForPrueDMLTest struct { mu sync.RWMutex diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 7e8e9fbd118..813396188e6 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -168,7 +168,6 @@ var ( ErrAPIInvalidParam = errors.Normalize("invalid api parameter", errors.RFCCodeText("CDC:ErrAPIInvalidParam")) ErrInternalServerError = errors.Normalize("internal server error", errors.RFCCodeText("CDC:ErrInternalServerError")) ErrOwnerSortDir = errors.Normalize("owner sort dir", errors.RFCCodeText("CDC:ErrOwnerSortDir")) - ErrOwnerUpdateGCSafepoint = errors.Normalize("owner update gc safepoint", errors.RFCCodeText("CDC:ErrOwnerUpdateGCSafepoint")) ErrOwnerChangefeedNotFound = errors.Normalize("changefeed %s not found in owner cache", errors.RFCCodeText("CDC:ErrOwnerChangefeedNotFound")) ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState")) ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType")) From cac520c6840140fdfc10ee8c077712417f8cdc65 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 30 Sep 2020 16:37:55 +0800 Subject: [PATCH 2/3] fix lint Signed-off-by: Neil Shen --- cdc/owner_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 11ec372dcee..c368251ebc7 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -78,7 +78,6 @@ func (s *ownerSuite) TearDownTest(c *check.C) { type mockPDClient struct { pd.Client - err error } func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { From 208de550f4b01e7a2d556060a4483a8dbfbe68ab Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 9 Oct 2020 17:06:31 +0800 Subject: [PATCH 3/3] address comment Signed-off-by: Neil Shen --- cdc/owner_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cdc/owner_test.go b/cdc/owner_test.go index c368251ebc7..9fae6b7143e 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -78,21 +78,25 @@ func (s *ownerSuite) TearDownTest(c *check.C) { type mockPDClient struct { pd.Client + invokeCounter int } func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + m.invokeCounter++ return 0, errors.New("mock error") } func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) { + mockPDCli := &mockPDClient{} mockOwner := Owner{ - pdClient: &mockPDClient{}, + pdClient: mockPDCli, gcSafepointLastUpdate: time.Now(), } // Owner should ignore UpdateServiceGCSafePoint error. err := mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 1) } /*