Skip to content

Commit

Permalink
pd, changefeed (ticdc): fix pd related issues (#8884, #8813, #9106, #…
Browse files Browse the repository at this point in the history
…9174) (#8901)

ref #8868, close #8877
  • Loading branch information
ti-chi-bot authored Jun 13, 2023
1 parent a912663 commit 1ea99d5
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 15 deletions.
15 changes: 11 additions & 4 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -114,6 +115,7 @@ func (c *Capture) reset(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
log.Info("reset session successfully", zap.Any("session", sess))

c.captureMu.Lock()
defer c.captureMu.Unlock()
Expand Down Expand Up @@ -355,11 +357,12 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
}
// Campaign to be an owner, it blocks until it becomes the owner
if err := c.campaign(ctx); err != nil {
switch errors.Cause(err) {
case context.Canceled:
rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
case mvcc.ErrCompacted:
// the revision we requested is compacted, just retry
} else if rootErr == mvcc.ErrCompacted || isErrCompacted(rootErr) {
log.Warn("campaign owner failed due to etcd revision "+
"has been compacted, retry later", zap.Error(err))
continue
}
log.Warn("campaign owner failed", zap.Error(err))
Expand Down Expand Up @@ -608,3 +611,7 @@ func (c *Capture) StatusProvider() owner.StatusProvider {
}
return owner.NewStatusProvider(c.owner)
}

func isErrCompacted(err error) bool {
return strings.Contains(err.Error(), "required revision has been compacted")
}
3 changes: 3 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
changefeedBarrierTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedBarrierTsGauge = nil

if c.isRemoved {
changefeedStatusGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
}
c.isReleased = true
c.initialized = false
}
Expand Down
11 changes: 10 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,11 @@ func (p *processor) Close() error {
log.Info("processor closing ...",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID))

// clean up metrics first to avoid some metrics are not cleaned up
// when error occurs during closing the processor
p.cleanupMetrics()

for _, tbl := range p.tables {
tbl.Cancel()
}
Expand Down Expand Up @@ -941,6 +946,11 @@ func (p *processor) Close() error {
}
// mark tables share the same cdcContext with its original table, don't need to cancel
failpoint.Inject("processorStopDelay", nil)

return nil
}

func (p *processor) cleanupMetrics() {
resolvedTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
resolvedTsLagGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
checkpointTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
Expand All @@ -952,7 +962,6 @@ func (p *processor) Close() error {

pipeline.SorterBatchReadSize.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
pipeline.SorterBatchReadDuration.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
return nil
}

// WriteDebugInfo write the debug info to Writer
Expand Down
25 changes: 20 additions & 5 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

const (
Expand Down Expand Up @@ -124,12 +125,15 @@ func (s *Server) Run(ctx context.Context) error {
logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel)

log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints))

etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: s.pdEndpoints,
TLS: tlsConfig,
Context: ctx,
LogConfig: &logConfig,
DialTimeout: 5 * time.Second,
Endpoints: s.pdEndpoints,
TLS: tlsConfig,
Context: ctx,
LogConfig: &logConfig,
DialTimeout: 5 * time.Second,
AutoSyncInterval: 30 * time.Second,
DialOptions: []grpc.DialOption{
grpcTLSOption,
grpc.WithBlock(),
Expand All @@ -142,6 +146,10 @@ func (s *Server) Run(ctx context.Context) error {
},
MinConnectTimeout: 3 * time.Second,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
}),
},
})
if err != nil {
Expand Down Expand Up @@ -240,6 +248,13 @@ func (s *Server) etcdHealthChecker(ctx context.Context) error {
}
cancel()
}

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = s.etcdClient.Client.Unwrap().MemberList(ctx)
cancel()
if err != nil {
log.Warn("etcd health check error, fail to list etcd members", zap.Error(err))
}
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions dm/pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ var etcdDefaultTxnStrategy = retry.FiniteRetryStrategy{}
// CreateClient creates an etcd client with some default config items.
func CreateClient(endpoints []string, tlsCfg *tls.Config) (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: DefaultDialTimeout,
TLS: tlsCfg,
Endpoints: endpoints,
DialTimeout: DefaultDialTimeout,
AutoSyncInterval: 30 * time.Second,
TLS: tlsCfg,
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var (
)

// set to var instead of const for mocking the value to speedup test
var maxTries uint64 = 8
var maxTries uint64 = 12

// Client is a simple wrapper that adds retry to etcd RPC
type Client struct {
Expand Down
15 changes: 14 additions & 1 deletion pkg/version/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"regexp"
"strings"
"time"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/version"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/httputil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -70,6 +72,8 @@ func removeVAndHash(v string) string {
return strings.TrimPrefix(v, "v")
}

var checkClusterVersionRetryTimes = 10

// CheckClusterVersion check TiKV and PD version.
// need only one PD alive and match the cdc version.
func CheckClusterVersion(
Expand All @@ -85,7 +89,14 @@ func CheckClusterVersion(
}

for _, pdAddr := range pdAddrs {
err = CheckPDVersion(ctx, pdAddr, credential)
// check pd version with retry, if the pdAddr is a service or lb address
// the http client may connect to an unhealthy PD that returns 503
err = retry.Do(ctx, func() error {
return CheckPDVersion(ctx, pdAddr, credential)
}, retry.WithBackoffBaseDelay(time.Millisecond.Milliseconds()*10),
retry.WithBackoffMaxDelay(time.Second.Milliseconds()),
retry.WithMaxTries(uint64(checkClusterVersionRetryTimes)),
retry.WithIsRetryableErr(cerror.IsRetryableError))
if err == nil {
break
}
Expand All @@ -106,6 +117,8 @@ func CheckPDVersion(ctx context.Context, pdAddr string, credential *security.Cre
return err
}

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
resp, err := httpClient.Get(ctx, fmt.Sprintf("%s/pd/api/v1/version", pdAddr))
if err != nil {
return cerror.ErrCheckClusterVersionFromPD.GenWithStackByArgs(err)
Expand Down

0 comments on commit 1ea99d5

Please sign in to comment.