Skip to content

Commit

Permalink
fix processor exit unexpectedly when some pd node fail
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored and ti-chi-bot committed May 6, 2023
1 parent 3c6e704 commit 0d4c068
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
20 changes: 13 additions & 7 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -403,11 +404,11 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
// Campaign to be the owner, it blocks until it been elected.
if err := c.campaign(ctx); err != nil {
switch errors.Cause(err) {
case context.Canceled:

rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
case mvcc.ErrCompacted:
// the revision we requested is compacted, just retry
} else if rootErr == mvcc.ErrCompacted || isErrCompacted(rootErr) {
continue
}
log.Warn("campaign owner failed",
Expand Down Expand Up @@ -550,9 +551,10 @@ func (c *captureImpl) GetOwner() (owner.Owner, error) {

// campaign to be an owner.
func (c *captureImpl) campaign(ctx context.Context) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
//failpoint.Inject("capture-campaign-compacted-error", func() {
// failpoint.Return(errors.Trace(mvcc.ErrCompacted))
//})

// TODO: `Campaign` will get stuck when send SIGSTOP to pd leader.
// For `Campaign`, when send SIGSTOP to pd leader, cdc maybe call `cancel`
// (cause by `processor routine` exit). And inside `Campaign`, the routine
Expand Down Expand Up @@ -714,3 +716,7 @@ func (c *captureImpl) StatusProvider() owner.StatusProvider {
func (c *captureImpl) IsReady() bool {
return c.migrator.IsMigrateDone()
}

func isErrCompacted(err error) bool {
return strings.Contains(err.Error(), "required revision has been compacted")
}
6 changes: 6 additions & 0 deletions cdc/capture/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package capture

import (
"context"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.etcd.io/etcd/server/v3/mvcc"

"go.etcd.io/etcd/client/v3/concurrency"
)
Expand All @@ -37,6 +40,9 @@ func newElection(sess *concurrency.Session, key string) election {
}

func (e *electionImpl) campaign(ctx context.Context, key string) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return e.election.Campaign(ctx, key)
}

Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/availability/owner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ function test_owner_cleanup_stale_tasks() {
function test_owner_retryable_error() {
echo "run test case test_owner_retryable_error"
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/capture/capture-campaign-compacted-error=1*return(true)'

# start a capture server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server1
# ensure the server become the owner
Expand Down

0 comments on commit 0d4c068

Please sign in to comment.