diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index a487fdac747..302137d3cbb 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "strings" "sync" "time" @@ -114,6 +115,7 @@ func (c *Capture) reset(ctx context.Context) error { if err != nil { return errors.Trace(err) } + log.Info("reset session successfully", zap.Any("session", sess)) c.captureMu.Lock() defer c.captureMu.Unlock() @@ -355,11 +357,12 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error { } // Campaign to be an owner, it blocks until it becomes the owner if err := c.campaign(ctx); err != nil { - switch errors.Cause(err) { - case context.Canceled: + rootErr := errors.Cause(err) + if rootErr == context.Canceled { return nil - case mvcc.ErrCompacted: - // the revision we requested is compacted, just retry + } else if rootErr == mvcc.ErrCompacted || isErrCompacted(rootErr) { + log.Warn("campaign owner failed due to etcd revision "+ + "has been compacted, retry later", zap.Error(err)) continue } log.Warn("campaign owner failed", zap.Error(err)) @@ -608,3 +611,7 @@ func (c *Capture) StatusProvider() owner.StatusProvider { } return owner.NewStatusProvider(c.owner) } + +func isErrCompacted(err error) bool { + return strings.Contains(err.Error(), "required revision has been compacted") +} diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 105ac263462..8908a67fb34 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -553,6 +553,9 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { changefeedBarrierTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) c.metricsChangefeedBarrierTsGauge = nil + if c.isRemoved { + changefeedStatusGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) + } c.isReleased = true c.initialized = false } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 9202af1bef6..25c5b9d2420 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -900,6 +900,11 @@ func (p *processor) Close() error { log.Info("processor closing ...", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID)) + + // clean up metrics first to avoid some metrics are not cleaned up + // when error occurs during closing the processor + p.cleanupMetrics() + for _, tbl := range p.tables { tbl.Cancel() } @@ -941,6 +946,11 @@ func (p *processor) Close() error { } // mark tables share the same cdcContext with its original table, don't need to cancel failpoint.Inject("processorStopDelay", nil) + + return nil +} + +func (p *processor) cleanupMetrics() { resolvedTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) resolvedTsLagGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) checkpointTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) @@ -952,7 +962,6 @@ func (p *processor) Close() error { pipeline.SorterBatchReadSize.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) pipeline.SorterBatchReadDuration.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID) - return nil } // WriteDebugInfo write the debug info to Writer diff --git a/cdc/server.go b/cdc/server.go index 9d7320bd949..865aeffd266 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -46,6 +46,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/keepalive" ) const ( @@ -124,12 +125,15 @@ func (s *Server) Run(ctx context.Context) error { logConfig := logutil.DefaultZapLoggerConfig logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) + log.Info("create etcdCli", zap.Strings("endpoints", s.pdEndpoints)) + etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: s.pdEndpoints, - TLS: tlsConfig, - Context: ctx, - LogConfig: &logConfig, - DialTimeout: 5 * time.Second, + Endpoints: s.pdEndpoints, + TLS: tlsConfig, + Context: ctx, + LogConfig: &logConfig, + DialTimeout: 5 * time.Second, + AutoSyncInterval: 30 * time.Second, DialOptions: []grpc.DialOption{ grpcTLSOption, grpc.WithBlock(), @@ -142,6 +146,10 @@ func (s *Server) Run(ctx context.Context) error { }, MinConnectTimeout: 3 * time.Second, }), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 20 * time.Second, + }), }, }) if err != nil { @@ -240,6 +248,13 @@ func (s *Server) etcdHealthChecker(ctx context.Context) error { } cancel() } + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + _, err = s.etcdClient.Client.Unwrap().MemberList(ctx) + cancel() + if err != nil { + log.Warn("etcd health check error, fail to list etcd members", zap.Error(err)) + } } } } diff --git a/dm/pkg/etcdutil/etcdutil.go b/dm/pkg/etcdutil/etcdutil.go index 6b23deb0a76..421a93549f8 100644 --- a/dm/pkg/etcdutil/etcdutil.go +++ b/dm/pkg/etcdutil/etcdutil.go @@ -61,9 +61,10 @@ var etcdDefaultTxnStrategy = retry.FiniteRetryStrategy{} // CreateClient creates an etcd client with some default config items. func CreateClient(endpoints []string, tlsCfg *tls.Config) (*clientv3.Client, error) { return clientv3.New(clientv3.Config{ - Endpoints: endpoints, - DialTimeout: DefaultDialTimeout, - TLS: tlsCfg, + Endpoints: endpoints, + DialTimeout: DefaultDialTimeout, + AutoSyncInterval: 30 * time.Second, + TLS: tlsCfg, }) } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 4092444ccca..251e2e02ed6 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -66,7 +66,7 @@ var ( ) // set to var instead of const for mocking the value to speedup test -var maxTries uint64 = 8 +var maxTries uint64 = 12 // Client is a simple wrapper that adds retry to etcd RPC type Client struct { diff --git a/pkg/version/check.go b/pkg/version/check.go index ea24965942f..e4dafb599c4 100644 --- a/pkg/version/check.go +++ b/pkg/version/check.go @@ -20,6 +20,7 @@ import ( "io" "regexp" "strings" + "time" "github.com/coreos/go-semver/semver" "github.com/pingcap/errors" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/br/pkg/version" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/httputil" + "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/security" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -70,6 +72,8 @@ func removeVAndHash(v string) string { return strings.TrimPrefix(v, "v") } +var checkClusterVersionRetryTimes = 10 + // CheckClusterVersion check TiKV and PD version. // need only one PD alive and match the cdc version. func CheckClusterVersion( @@ -85,7 +89,14 @@ func CheckClusterVersion( } for _, pdAddr := range pdAddrs { - err = CheckPDVersion(ctx, pdAddr, credential) + // check pd version with retry, if the pdAddr is a service or lb address + // the http client may connect to an unhealthy PD that returns 503 + err = retry.Do(ctx, func() error { + return CheckPDVersion(ctx, pdAddr, credential) + }, retry.WithBackoffBaseDelay(time.Millisecond.Milliseconds()*10), + retry.WithBackoffMaxDelay(time.Second.Milliseconds()), + retry.WithMaxTries(uint64(checkClusterVersionRetryTimes)), + retry.WithIsRetryableErr(cerror.IsRetryableError)) if err == nil { break } @@ -106,6 +117,8 @@ func CheckPDVersion(ctx context.Context, pdAddr string, credential *security.Cre return err } + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() resp, err := httpClient.Get(ctx, fmt.Sprintf("%s/pd/api/v1/version", pdAddr)) if err != nil { return cerror.ErrCheckClusterVersionFromPD.GenWithStackByArgs(err)