Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql-sink: add more performance related metrics #702

Merged
merged 7 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cdc/sink/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,28 @@ var (
Name: "execution_error",
Help: "total count of execution errors",
}, []string{"capture", "changefeed"})
conflictDetectDurationHis = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "conflict_detect_duration",
Help: "Bucketed histogram of conflict detect time (s) for single DML statement",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
}, []string{"capture", "changefeed"})
bucketSizeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "bucket_size",
Help: "size of the DML bucket",
}, []string{"capture", "changefeed", "bucket"})
Comment on lines +52 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like gauge is more suitable for size.

Copy link
Contributor Author

@amyangfei amyangfei Jun 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the sink worker flushes DMLs every 100ms, it is hard to track real bucket size if using gauge, so here use a counter instead.

)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(execBatchHistogram)
registry.MustRegister(execTxnHistogram)
registry.MustRegister(executionErrorCounter)
registry.MustRegister(conflictDetectDurationHis)
registry.MustRegister(bucketSizeCounter)
}
101 changes: 61 additions & 40 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/ticdc/pkg/util"
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -69,6 +70,10 @@ type mysqlSink struct {
unresolvedTxns map[model.TableName][]*model.Txn

statistics *Statistics

// metrics used by mysql sink only
metricConflictDetectDurationHis prometheus.Observer
metricBucketSizeCounters []prometheus.Counter
}

func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
Expand Down Expand Up @@ -345,7 +350,9 @@ func newMySQLSink(ctx context.Context, sinkURI *url.URL, dsn *dmysql.Config, fil
if err != nil {
return nil, errors.Trace(err)
}
params.workerCount = c
if c > 0 {
params.workerCount = c
}
}
s = sinkURI.Query().Get("max-txn-row")
if s != "" {
Expand Down Expand Up @@ -405,12 +412,22 @@ func newMySQLSink(ctx context.Context, sinkURI *url.URL, dsn *dmysql.Config, fil
db.SetMaxIdleConns(params.workerCount)
db.SetMaxOpenConns(params.workerCount)

metricConflictDetectDurationHis := conflictDetectDurationHis.WithLabelValues(
params.captureAddr, params.changefeedID)
metricBucketSizeCounters := make([]prometheus.Counter, params.workerCount)
for i := 0; i < params.workerCount; i++ {
metricBucketSizeCounters[i] = bucketSizeCounter.WithLabelValues(
params.captureAddr, params.changefeedID, strconv.Itoa(i))
}

sink := &mysqlSink{
db: db,
unresolvedTxns: make(map[model.TableName][]*model.Txn),
params: params,
filter: filter,
statistics: NewStatistics("mysql", opts),
db: db,
unresolvedTxns: make(map[model.TableName][]*model.Txn),
params: params,
filter: filter,
statistics: NewStatistics("mysql", opts),
metricConflictDetectDurationHis: metricConflictDetectDurationHis,
metricBucketSizeCounters: metricBucketSizeCounters,
}

if val, ok := opts[mark.OptCyclicConfig]; ok {
Expand All @@ -431,21 +448,12 @@ func newMySQLSink(ctx context.Context, sinkURI *url.URL, dsn *dmysql.Config, fil
}

func (s *mysqlSink) concurrentExec(ctx context.Context, txnsGroup map[model.TableName][]*model.Txn) error {
return concurrentExec(ctx, txnsGroup, s.params.workerCount, s.params.maxTxnRow, s.execDMLs)
}

func concurrentExec(
ctx context.Context, txnsGroup map[model.TableName][]*model.Txn, nWorkers, maxTxnRow int,
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error,
) error {
if nWorkers == 0 {
nWorkers = defaultParams.workerCount
}
nWorkers := s.params.workerCount
workers := make([]*mysqlSinkWorker, nWorkers)
errg, ctx := errgroup.WithContext(ctx)
for i := 0; i < nWorkers; i++ {
i := i
workers[i] = newMySQLSinkWorker(maxTxnRow, i, execDMLs)
workers[i] = newMySQLSinkWorker(s.params.maxTxnRow, i, s.metricBucketSizeCounters[i], s.execDMLs)
errg.Go(func() error {
return workers[i].run(ctx)
})
Expand All @@ -457,21 +465,26 @@ func concurrentExec(
causality.add(txn.Keys, idx)
workers[idx].appendTxn(ctx, txn)
}
resolveConflict := func(txn *model.Txn) {
if conflict, idx := causality.detectConflict(txn.Keys); conflict {
if idx >= 0 {
sendFn(txn, idx)
return
}
for _, w := range workers {
w.waitAllTxnsExecuted()
}
causality.reset()
}
sendFn(txn, rowsChIdx)
rowsChIdx++
rowsChIdx = rowsChIdx % nWorkers
}
for _, txns := range txnsGroup {
for _, txn := range txns {
if conflict, idx := causality.detectConflict(txn.Keys); conflict {
if idx >= 0 {
sendFn(txn, idx)
continue
}
for _, w := range workers {
w.waitAllTxnsExecuted()
}
causality.reset()
}
sendFn(txn, rowsChIdx)
rowsChIdx++
rowsChIdx = rowsChIdx % nWorkers
startTime := time.Now()
resolveConflict(txn)
s.metricConflictDetectDurationHis.Observe(time.Since(startTime).Seconds())
}
}
for _, w := range workers {
Expand All @@ -481,19 +494,26 @@ func concurrentExec(
}

type mysqlSinkWorker struct {
txnCh chan *model.Txn
txnWg sync.WaitGroup
maxTxnRow int
bucket int
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error
txnCh chan *model.Txn
txnWg sync.WaitGroup
maxTxnRow int
bucket int
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error
metricBucketSize prometheus.Counter
}

func newMySQLSinkWorker(maxTxnRow int, bucket int, execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error) *mysqlSinkWorker {
func newMySQLSinkWorker(
maxTxnRow int,
bucket int,
metricBucketSize prometheus.Counter,
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error,
) *mysqlSinkWorker {
return &mysqlSinkWorker{
txnCh: make(chan *model.Txn, 1024),
maxTxnRow: maxTxnRow,
bucket: bucket,
execDMLs: execDMLs,
txnCh: make(chan *model.Txn, 1024),
maxTxnRow: maxTxnRow,
bucket: bucket,
metricBucketSize: metricBucketSize,
execDMLs: execDMLs,
}
}

Expand Down Expand Up @@ -546,6 +566,7 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) {
}
toExecRows = toExecRows[:0]
w.txnWg.Add(-1 * txnNum)
w.metricBucketSize.Add(float64(txnNum))
txnNum = 0
lastExecTime = time.Now()
return nil
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func (s MySQLSinkSuite) TestMysqlSinkWorker(c *check.C) {
var outputRows [][]*model.RowChangedEvent
var outputReplicaIDs []uint64
w := newMySQLSinkWorker(tc.maxTxnRow, 1,
bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"),
func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error {
outputRows = append(outputRows, events)
outputReplicaIDs = append(outputReplicaIDs, replicaID)
Expand Down
Loading