From 0d4c06805946beac9853299b12c76b1a80a25e1e Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Fri, 5 May 2023 16:28:12 +0800 Subject: [PATCH] fix processor exit unexpectedly when some pd node fail --- cdc/capture/capture.go | 20 ++++++++++++------- cdc/capture/election.go | 6 ++++++ tests/integration_tests/availability/owner.sh | 1 + 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 43437b8c97b..245a1fce405 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "strings" "sync" "time" @@ -403,11 +404,11 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { } // Campaign to be the owner, it blocks until it been elected. if err := c.campaign(ctx); err != nil { - switch errors.Cause(err) { - case context.Canceled: + + rootErr := errors.Cause(err) + if rootErr == context.Canceled { return nil - case mvcc.ErrCompacted: - // the revision we requested is compacted, just retry + } else if rootErr == mvcc.ErrCompacted || isErrCompacted(rootErr) { continue } log.Warn("campaign owner failed", @@ -550,9 +551,10 @@ func (c *captureImpl) GetOwner() (owner.Owner, error) { // campaign to be an owner. func (c *captureImpl) campaign(ctx context.Context) error { - failpoint.Inject("capture-campaign-compacted-error", func() { - failpoint.Return(errors.Trace(mvcc.ErrCompacted)) - }) + //failpoint.Inject("capture-campaign-compacted-error", func() { + // failpoint.Return(errors.Trace(mvcc.ErrCompacted)) + //}) + // TODO: `Campaign` will get stuck when send SIGSTOP to pd leader. // For `Campaign`, when send SIGSTOP to pd leader, cdc maybe call `cancel` // (cause by `processor routine` exit). And inside `Campaign`, the routine @@ -714,3 +716,7 @@ func (c *captureImpl) StatusProvider() owner.StatusProvider { func (c *captureImpl) IsReady() bool { return c.migrator.IsMigrateDone() } + +func isErrCompacted(err error) bool { + return strings.Contains(err.Error(), "required revision has been compacted") +} diff --git a/cdc/capture/election.go b/cdc/capture/election.go index 139c849cdba..0dee3aea5f2 100644 --- a/cdc/capture/election.go +++ b/cdc/capture/election.go @@ -15,6 +15,9 @@ package capture import ( "context" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/client/v3/concurrency" ) @@ -37,6 +40,9 @@ func newElection(sess *concurrency.Session, key string) election { } func (e *electionImpl) campaign(ctx context.Context, key string) error { + failpoint.Inject("capture-campaign-compacted-error", func() { + failpoint.Return(errors.Trace(mvcc.ErrCompacted)) + }) return e.election.Campaign(ctx, key) } diff --git a/tests/integration_tests/availability/owner.sh b/tests/integration_tests/availability/owner.sh index f81ac4f136c..b78c0ecd78e 100755 --- a/tests/integration_tests/availability/owner.sh +++ b/tests/integration_tests/availability/owner.sh @@ -137,6 +137,7 @@ function test_owner_cleanup_stale_tasks() { function test_owner_retryable_error() { echo "run test case test_owner_retryable_error" export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/capture/capture-campaign-compacted-error=1*return(true)' + # start a capture server run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server1 # ensure the server become the owner