Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tiflow into sort-dir
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Dec 16, 2022
2 parents 938c269 + df521ec commit 5dce721
Show file tree
Hide file tree
Showing 20 changed files with 248 additions and 151 deletions.
23 changes: 17 additions & 6 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ type eventFeedSession struct {
changefeed model.ChangeFeedID
tableID model.TableID
tableName string

// use sync.Pool to store resolved ts event only, because resolved ts event
// has the same size and generate cycle.
resolvedTsPool sync.Pool
}

type rangeRequestTask struct {
Expand Down Expand Up @@ -447,6 +451,13 @@ func newEventFeedSession(
rangeChSizeGauge: clientChannelSize.WithLabelValues("range"),
streams: make(map[string]*eventFeedStream),
streamsCanceller: make(map[string]context.CancelFunc),
resolvedTsPool: sync.Pool{
New: func() any {
return &regionStatefulEvent{
resolvedTsEvent: &resolvedTsEvent{},
}
},
},

changefeed: changefeed,
tableID: tableID,
Expand Down Expand Up @@ -1326,12 +1337,12 @@ func (s *eventFeedSession) sendResolvedTs(
for i := 0; i < worker.concurrency; i++ {
// Allocate a buffer with 1.5x length than average to reduce reallocate.
buffLen := len(resolvedTs.Regions) / worker.concurrency * 2
statefulEvents[i] = &regionStatefulEvent{
resolvedTsEvent: &resolvedTsEvent{
resolvedTs: resolvedTs.Ts,
regions: make([]*regionFeedState, 0, buffLen),
},
}
ev := s.resolvedTsPool.Get().(*regionStatefulEvent)
// must reset fields to prevent dirty data
ev.resolvedTsEvent.resolvedTs = resolvedTs.Ts
ev.resolvedTsEvent.regions = make([]*regionFeedState, 0, buffLen)
ev.finishedCallbackCh = nil
statefulEvents[i] = ev
}

for _, regionID := range resolvedTs.Regions {
Expand Down
6 changes: 6 additions & 0 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ func (w *regionWorker) eventHandler(ctx context.Context) error {
}
}
}
for _, ev := range events {
// resolved ts event has been consumed, it is safe to put back.
if ev.resolvedTsEvent != nil {
w.session.resolvedTsPool.Put(ev)
}
}
}
}

Expand Down
19 changes: 14 additions & 5 deletions dm/dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (m *Dumpling) Init(ctx context.Context) error {
Subsystem: "dumpling",
Name: "exit_with_error_count",
Help: "counter for dumpling exit with error",
}, []string{"task", "source_id"},
}, []string{"task", "source_id", "resumable_err"},
)
m.dumpConfig.PromFactory = promutil.NewWrappingFactory(
m.cfg.MetricsFactory,
Expand All @@ -111,9 +111,15 @@ func (m *Dumpling) Init(ctx context.Context) error {
return nil
}

func (m *Dumpling) handleExitErrMetric(err *pb.ProcessError) {
resumable := fmt.Sprintf("%t", unit.IsResumableError(err))
m.metricProxies.dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID, resumable).Inc()
}

// Process implements Unit.Process.
func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {
m.metricProxies.dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID).Add(0)
m.metricProxies.dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID, "true").Add(0)
m.metricProxies.dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID, "false").Add(0)

failpoint.Inject("dumpUnitProcessWithError", func(val failpoint.Value) {
m.logger.Info("dump unit runs with injected error", zap.String("failpoint", "dumpUnitProcessWithError"), zap.Reflect("error", val))
Expand Down Expand Up @@ -148,7 +154,9 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {
err := storage.RemoveAll(ctx, m.cfg.Dir, nil)
if err != nil {
m.logger.Error("fail to remove output directory", zap.String("directory", m.cfg.Dir), log.ShortError(err))
errs = append(errs, unit.NewProcessError(terror.ErrDumpUnitRuntime.Delegate(err, "fail to remove output directory: "+m.cfg.Dir)))
processError := unit.NewProcessError(terror.ErrDumpUnitRuntime.Delegate(err, "fail to remove output directory: "+m.cfg.Dir))
m.handleExitErrMetric(processError)
errs = append(errs, processError)
pr <- pb.ProcessResult{
IsCanceled: false,
Errors: errs,
Expand Down Expand Up @@ -187,8 +195,9 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {
if utils.IsContextCanceledError(err) {
m.logger.Info("filter out error caused by user cancel")
} else {
m.metricProxies.dumplingExitWithErrorCounter.WithLabelValues(m.cfg.Name, m.cfg.SourceID).Inc()
errs = append(errs, unit.NewProcessError(terror.ErrDumpUnitRuntime.Delegate(err, "")))
processError := unit.NewProcessError(terror.ErrDumpUnitRuntime.Delegate(err, ""))
m.handleExitErrMetric(processError)
errs = append(errs, processError)
}
}

Expand Down
2 changes: 1 addition & 1 deletion dm/dumpling/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var defaultMetricProxies = &metricProxies{
Subsystem: "dumpling",
Name: "exit_with_error_count",
Help: "counter for dumpling exit with error",
}, []string{"task", "source_id"}),
}, []string{"task", "source_id", "resumable_err"}),
}

// RegisterMetrics registers metrics and saves the given registry for later use.
Expand Down
15 changes: 12 additions & 3 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package loader

import (
"context"
"fmt"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -402,6 +403,11 @@ func (l *LightningLoader) restore(ctx context.Context) error {
return err
}

func (l *LightningLoader) handleExitErrMetric(err *pb.ProcessError) {
resumable := fmt.Sprintf("%t", unit.IsResumableError(err))
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID, resumable).Inc()
}

// Process implements Unit.Process.
func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult) {
l.logger.Info("lightning load start")
Expand All @@ -416,9 +422,10 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult)

binlog, gtid, err := getMydumpMetadata(ctx, l.cli, l.cfg, l.workerName)
if err != nil {
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Inc()
processError := unit.NewProcessError(err)
l.handleExitErrMetric(processError)
pr <- pb.ProcessResult{
Errors: []*pb.ProcessError{unit.NewProcessError(err)},
Errors: []*pb.ProcessError{processError},
}
return
}
Expand All @@ -431,7 +438,9 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult)

if err := l.restore(ctx); err != nil && !utils.IsContextCanceledError(err) {
l.logger.Error("process error", zap.Error(err))
errs = append(errs, unit.NewProcessError(err))
processError := unit.NewProcessError(err)
l.handleExitErrMetric(processError)
errs = append(errs, processError)
}
isCanceled := false
select {
Expand Down
19 changes: 14 additions & 5 deletions dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -581,19 +582,26 @@ func (l *Loader) Init(ctx context.Context) (err error) {
return nil
}

func (l *Loader) handleExitErrMetric(err *pb.ProcessError) {
resumable := fmt.Sprintf("%t", unit.IsResumableError(err))
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID, resumable).Inc()
}

// Process implements Unit.Process.
func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Add(0)
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID, "true").Add(0)
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID, "false").Add(0)

newCtx, cancel := context.WithCancel(ctx)
defer cancel()

l.newFileJobQueue()
binlog, gtid, err := getMydumpMetadata(ctx, l.cli, l.cfg, l.workerName)
if err != nil {
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Inc()
processError := unit.NewProcessError(err)
l.handleExitErrMetric(processError)
pr <- pb.ProcessResult{
Errors: []*pb.ProcessError{unit.NewProcessError(err)},
Errors: []*pb.ProcessError{processError},
}
return
}
Expand Down Expand Up @@ -637,8 +645,9 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {
if utils.IsContextCanceledError(err) {
l.logger.Info("filter out error caused by user cancel")
} else {
loaderExitWithErrorCounter.WithLabelValues(l.cfg.Name, l.cfg.SourceID).Inc()
errs = append(errs, unit.NewProcessError(err))
processError := unit.NewProcessError(err)
l.handleExitErrMetric(processError)
errs = append(errs, processError)
}
}

Expand Down
2 changes: 1 addition & 1 deletion dm/loader/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var (
Subsystem: "loader",
Name: "exit_with_error_count",
Help: "counter for loader exits with error",
}, []string{"task", "source_id"})
}, []string{"task", "source_id", "resumable_err"})

remainingTimeGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
16 changes: 8 additions & 8 deletions dm/metrics/alertmanager/dm_worker.rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ groups:
summary: DM remain storage of relay log

- alert: DM_relay_process_exits_with_error
expr: changes(dm_relay_exit_with_error_count[1m]) > 0
expr: changes(dm_relay_exit_with_error_count{resumable_err="false"}[1m]) > 0 or on(instance, job) increase(dm_relay_exit_with_error_count{resumable_err="true"}[2m]) > 3
labels:
env: ENV_LABELS_ENV
level: critical
expr: changes(dm_relay_exit_with_error_count[1m]) > 0
expr: changes(dm_relay_exit_with_error_count{resumable_err="false"}[1m]) > 0 or on(instance, job) increase(dm_relay_exit_with_error_count{resumable_err="true"}[2m]) > 3
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}'
value: '{{ $value }}'
Expand Down Expand Up @@ -68,33 +68,33 @@ groups:
summary: DM fail to write relay log

- alert: DM_dump_process_exists_with_error
expr: changes(dm_mydumper_exit_with_error_count[1m]) > 0
expr: changes(dm_mydumper_exit_with_error_count{resumable_err="false"}[1m]) > 0 or on(source_id, task) increase(dm_mydumper_exit_with_error_count{resumable_err="true"}[2m]) > 3
labels:
env: ENV_LABELS_ENV
level: critical
expr: changes(dm_mydumper_exit_with_error_count[1m]) > 0
expr: changes(dm_mydumper_exit_with_error_count{resumable_err="false"}[1m]) > 0 or on(source_id, task) increase(dm_mydumper_exit_with_error_count{resumable_err="true"}[2m]) > 3
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, task: {{ $labels.task }}, values: {{ $value }}'
value: '{{ $value }}'
summary: DM dump process exists with error

- alert: DM_load_process_exists_with_error
expr: changes(dm_loader_exit_with_error_count[1m]) > 0
expr: changes(dm_loader_exit_with_error_count{resumable_err="false"}[1m]) > 0 or on(source_id, task) increase(dm_loader_exit_with_error_count{resumable_err="true"}[2m]) > 3
labels:
env: ENV_LABELS_ENV
level: critical
expr: changes(dm_loader_exit_with_error_count[1m]) > 0
expr: changes(dm_loader_exit_with_error_count{resumable_err="false"}[1m]) > 0 or on(source_id, task) increase(dm_loader_exit_with_error_count{resumable_err="true"}[2m]) > 3
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, task: {{ $labels.task }}, values: {{ $value }}'
value: '{{ $value }}'
summary: DM load process exists with error

- alert: DM_sync_process_exists_with_error
expr: changes(dm_syncer_exit_with_error_count[1m]) > 0
expr: changes(dm_syncer_exit_with_error_count{resumable_err="false"}[1m]) > 0 or on(source_id, task) increase(dm_syncer_exit_with_error_count{resumable_err="true"}[2m]) > 3
labels:
env: ENV_LABELS_ENV
level: critical
expr: changes(dm_syncer_exit_with_error_count[1m]) > 0
expr: changes(dm_syncer_exit_with_error_count{resumable_err="false"}[1m]) > 0 or on(source_id, task) increase(dm_syncer_exit_with_error_count{resumable_err="true"}[2m]) > 3
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, task: {{ $labels.task }}, values: {{ $value }}'
value: '{{ $value }}'
Expand Down
4 changes: 2 additions & 2 deletions dm/relay/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ var (
})

// should alert.
relayExitWithErrorCounter = prometheus.NewCounter(
relayExitWithErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Subsystem: "relay",
Name: "exit_with_error_count",
Help: "counter of relay unit exits with error",
})
}, []string{"resumable_err"})
)

// RegisterMetrics register metrics.
Expand Down
8 changes: 6 additions & 2 deletions dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,17 @@ func (r *Relay) Init(ctx context.Context) (err error) {

// Process implements the dm.Unit interface.
func (r *Relay) Process(ctx context.Context) pb.ProcessResult {
relayExitWithErrorCounter.WithLabelValues("true").Add(0)
relayExitWithErrorCounter.WithLabelValues("false").Add(0)
errs := make([]*pb.ProcessError, 0, 1)
err := r.process(ctx)
if err != nil && errors.Cause(err) != replication.ErrSyncClosed {
relayExitWithErrorCounter.Inc()
r.logger.Error("process exit", zap.Error(err))
// TODO: add specified error type instead of pb.ErrorType_UnknownError
errs = append(errs, unit.NewProcessError(err))
processError := unit.NewProcessError(err)
resumable := fmt.Sprintf("%t", unit.IsResumableRelayError(processError))
relayExitWithErrorCounter.WithLabelValues(resumable).Inc()
errs = append(errs, processError)
}

isCanceled := false
Expand Down
42 changes: 22 additions & 20 deletions dm/syncer/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,25 @@ const (

// Metrics groups syncer's metric variables.
type Metrics struct {
BinlogReadDurationHistogram prometheus.Observer
BinlogEventSizeHistogram prometheus.Observer
ConflictDetectDurationHistogram prometheus.Observer
IdealQPS prometheus.Gauge
BinlogMasterPosGauge prometheus.Gauge
BinlogSyncerPosGauge prometheus.Gauge
BinlogMasterFileGauge prometheus.Gauge
BinlogSyncerFileGauge prometheus.Gauge
BinlogEventRowHistogram prometheus.Observer
TxnHistogram prometheus.Observer
QueryHistogram prometheus.Observer
SyncerExitWithErrorCounter prometheus.Counter
ReplicationLagGauge prometheus.Gauge
ReplicationLagHistogram prometheus.Observer
RemainingTimeGauge prometheus.Gauge
ShardLockResolving prometheus.Gauge
FinishedTransactionTotal prometheus.Counter
FlushCheckPointsTimeInterval prometheus.Observer
BinlogReadDurationHistogram prometheus.Observer
BinlogEventSizeHistogram prometheus.Observer
ConflictDetectDurationHistogram prometheus.Observer
IdealQPS prometheus.Gauge
BinlogMasterPosGauge prometheus.Gauge
BinlogSyncerPosGauge prometheus.Gauge
BinlogMasterFileGauge prometheus.Gauge
BinlogSyncerFileGauge prometheus.Gauge
BinlogEventRowHistogram prometheus.Observer
TxnHistogram prometheus.Observer
QueryHistogram prometheus.Observer
ExitWithResumableErrorCounter prometheus.Counter
ExitWithNonResumableErrorCounter prometheus.Counter
ReplicationLagGauge prometheus.Gauge
ReplicationLagHistogram prometheus.Observer
RemainingTimeGauge prometheus.Gauge
ShardLockResolving prometheus.Gauge
FinishedTransactionTotal prometheus.Counter
FlushCheckPointsTimeInterval prometheus.Observer
}

// Proxies provides the ability to clean Metrics values when syncer is closed.
Expand Down Expand Up @@ -230,7 +231,7 @@ func (m *Proxies) Init(f promutil.Factory) {
Subsystem: "syncer",
Name: "exit_with_error_count",
Help: "counter for syncer exits with error",
}, []string{"task", "source_id"})
}, []string{"task", "source_id", "resumable_err"})
m.replicationLagGauge = f.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Expand Down Expand Up @@ -308,7 +309,8 @@ func (m *Proxies) CacheForOneTask(taskName, workerName, sourceID string) *Proxie
ret.Metrics.BinlogEventRowHistogram = m.binlogEventRowHistogram.WithLabelValues(workerName, taskName, sourceID)
ret.Metrics.TxnHistogram = m.txnHistogram.WithLabelValues(taskName, workerName, sourceID)
ret.Metrics.QueryHistogram = m.queryHistogram.WithLabelValues(taskName, workerName, sourceID)
ret.Metrics.SyncerExitWithErrorCounter = m.syncerExitWithErrorCounter.WithLabelValues(taskName, sourceID)
ret.Metrics.ExitWithResumableErrorCounter = m.syncerExitWithErrorCounter.WithLabelValues(taskName, sourceID, "true")
ret.Metrics.ExitWithNonResumableErrorCounter = m.syncerExitWithErrorCounter.WithLabelValues(taskName, sourceID, "false")
ret.Metrics.ReplicationLagGauge = m.replicationLagGauge.WithLabelValues(taskName, sourceID, workerName)
ret.Metrics.ReplicationLagHistogram = m.replicationLagHistogram.WithLabelValues(taskName, sourceID, workerName)
ret.Metrics.RemainingTimeGauge = m.remainingTimeGauge.WithLabelValues(taskName, sourceID, workerName)
Expand Down
Loading

0 comments on commit 5dce721

Please sign in to comment.