Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

capture (ticdc): fix processor exit unexpectedly when some pd node fail #8884

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -403,11 +404,13 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
// Campaign to be the owner, it blocks until it been elected.
if err := c.campaign(ctx); err != nil {
switch errors.Cause(err) {
case context.Canceled:

rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
case mvcc.ErrCompacted:
// the revision we requested is compacted, just retry
} else if rootErr == mvcc.ErrCompacted || isErrCompacted(rootErr) {
log.Warn("campaign owner failed due to etcd revision "+
"has been compacted, retry later", zap.Error(err))
continue
}
log.Warn("campaign owner failed",
Expand Down Expand Up @@ -550,9 +553,6 @@ func (c *captureImpl) GetOwner() (owner.Owner, error) {

// campaign to be an owner.
func (c *captureImpl) campaign(ctx context.Context) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
// TODO: `Campaign` will get stuck when send SIGSTOP to pd leader.
// For `Campaign`, when send SIGSTOP to pd leader, cdc maybe call `cancel`
// (cause by `processor routine` exit). And inside `Campaign`, the routine
Expand Down Expand Up @@ -714,3 +714,7 @@ func (c *captureImpl) StatusProvider() owner.StatusProvider {
func (c *captureImpl) IsReady() bool {
return c.migrator.IsMigrateDone()
}

func isErrCompacted(err error) bool {
return strings.Contains(err.Error(), "required revision has been compacted")
}
6 changes: 6 additions & 0 deletions cdc/capture/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package capture
import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
)

// election wraps the owner election methods.
Expand All @@ -37,6 +40,9 @@ func newElection(sess *concurrency.Session, key string) election {
}

func (e *electionImpl) campaign(ctx context.Context, key string) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return e.election.Campaign(ctx, key)
}

Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/availability/owner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ function test_owner_cleanup_stale_tasks() {
function test_owner_retryable_error() {
echo "run test case test_owner_retryable_error"
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/capture/capture-campaign-compacted-error=1*return(true)'

# start a capture server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server1
# ensure the server become the owner
Expand Down