diff --git a/cdc/sink/metrics.go b/cdc/sink/metrics.go index 36784efe283..10b5c56af7b 100644 --- a/cdc/sink/metrics.go +++ b/cdc/sink/metrics.go @@ -41,6 +41,21 @@ var ( Name: "execution_error", Help: "total count of execution errors", }, []string{"capture", "changefeed"}) + conflictDetectDurationHis = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "conflict_detect_duration", + Help: "Bucketed histogram of conflict detect time (s) for single DML statement", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21), + }, []string{"capture", "changefeed"}) + bucketSizeCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "bucket_size", + Help: "size of the DML bucket", + }, []string{"capture", "changefeed", "bucket"}) ) // InitMetrics registers all metrics in this file @@ -48,4 +63,6 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(execBatchHistogram) registry.MustRegister(execTxnHistogram) registry.MustRegister(executionErrorCounter) + registry.MustRegister(conflictDetectDurationHis) + registry.MustRegister(bucketSizeCounter) } diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 66603a58d37..4a008ca1a4f 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/ticdc/pkg/util" tddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -69,6 +70,10 @@ type mysqlSink struct { unresolvedTxns map[model.TableName][]*model.Txn statistics *Statistics + + // metrics used by mysql sink only + metricConflictDetectDurationHis prometheus.Observer + metricBucketSizeCounters []prometheus.Counter } func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { @@ -345,7 +350,9 @@ func newMySQLSink(ctx context.Context, sinkURI *url.URL, dsn *dmysql.Config, fil if err != nil { return nil, errors.Trace(err) } - params.workerCount = c + if c > 0 { + params.workerCount = c + } } s = sinkURI.Query().Get("max-txn-row") if s != "" { @@ -405,12 +412,22 @@ func newMySQLSink(ctx context.Context, sinkURI *url.URL, dsn *dmysql.Config, fil db.SetMaxIdleConns(params.workerCount) db.SetMaxOpenConns(params.workerCount) + metricConflictDetectDurationHis := conflictDetectDurationHis.WithLabelValues( + params.captureAddr, params.changefeedID) + metricBucketSizeCounters := make([]prometheus.Counter, params.workerCount) + for i := 0; i < params.workerCount; i++ { + metricBucketSizeCounters[i] = bucketSizeCounter.WithLabelValues( + params.captureAddr, params.changefeedID, strconv.Itoa(i)) + } + sink := &mysqlSink{ - db: db, - unresolvedTxns: make(map[model.TableName][]*model.Txn), - params: params, - filter: filter, - statistics: NewStatistics("mysql", opts), + db: db, + unresolvedTxns: make(map[model.TableName][]*model.Txn), + params: params, + filter: filter, + statistics: NewStatistics("mysql", opts), + metricConflictDetectDurationHis: metricConflictDetectDurationHis, + metricBucketSizeCounters: metricBucketSizeCounters, } if val, ok := opts[mark.OptCyclicConfig]; ok { @@ -431,21 +448,12 @@ func newMySQLSink(ctx context.Context, sinkURI *url.URL, dsn *dmysql.Config, fil } func (s *mysqlSink) concurrentExec(ctx context.Context, txnsGroup map[model.TableName][]*model.Txn) error { - return concurrentExec(ctx, txnsGroup, s.params.workerCount, s.params.maxTxnRow, s.execDMLs) -} - -func concurrentExec( - ctx context.Context, txnsGroup map[model.TableName][]*model.Txn, nWorkers, maxTxnRow int, - execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error, -) error { - if nWorkers == 0 { - nWorkers = defaultParams.workerCount - } + nWorkers := s.params.workerCount workers := make([]*mysqlSinkWorker, nWorkers) errg, ctx := errgroup.WithContext(ctx) for i := 0; i < nWorkers; i++ { i := i - workers[i] = newMySQLSinkWorker(maxTxnRow, i, execDMLs) + workers[i] = newMySQLSinkWorker(s.params.maxTxnRow, i, s.metricBucketSizeCounters[i], s.execDMLs) errg.Go(func() error { return workers[i].run(ctx) }) @@ -457,21 +465,26 @@ func concurrentExec( causality.add(txn.Keys, idx) workers[idx].appendTxn(ctx, txn) } + resolveConflict := func(txn *model.Txn) { + if conflict, idx := causality.detectConflict(txn.Keys); conflict { + if idx >= 0 { + sendFn(txn, idx) + return + } + for _, w := range workers { + w.waitAllTxnsExecuted() + } + causality.reset() + } + sendFn(txn, rowsChIdx) + rowsChIdx++ + rowsChIdx = rowsChIdx % nWorkers + } for _, txns := range txnsGroup { for _, txn := range txns { - if conflict, idx := causality.detectConflict(txn.Keys); conflict { - if idx >= 0 { - sendFn(txn, idx) - continue - } - for _, w := range workers { - w.waitAllTxnsExecuted() - } - causality.reset() - } - sendFn(txn, rowsChIdx) - rowsChIdx++ - rowsChIdx = rowsChIdx % nWorkers + startTime := time.Now() + resolveConflict(txn) + s.metricConflictDetectDurationHis.Observe(time.Since(startTime).Seconds()) } } for _, w := range workers { @@ -481,19 +494,26 @@ func concurrentExec( } type mysqlSinkWorker struct { - txnCh chan *model.Txn - txnWg sync.WaitGroup - maxTxnRow int - bucket int - execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error + txnCh chan *model.Txn + txnWg sync.WaitGroup + maxTxnRow int + bucket int + execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error + metricBucketSize prometheus.Counter } -func newMySQLSinkWorker(maxTxnRow int, bucket int, execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error) *mysqlSinkWorker { +func newMySQLSinkWorker( + maxTxnRow int, + bucket int, + metricBucketSize prometheus.Counter, + execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error, +) *mysqlSinkWorker { return &mysqlSinkWorker{ - txnCh: make(chan *model.Txn, 1024), - maxTxnRow: maxTxnRow, - bucket: bucket, - execDMLs: execDMLs, + txnCh: make(chan *model.Txn, 1024), + maxTxnRow: maxTxnRow, + bucket: bucket, + metricBucketSize: metricBucketSize, + execDMLs: execDMLs, } } @@ -546,6 +566,7 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) { } toExecRows = toExecRows[:0] w.txnWg.Add(-1 * txnNum) + w.metricBucketSize.Add(float64(txnNum)) txnNum = 0 lastExecTime = time.Now() return nil diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 3721509e138..ec28f582f9e 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -438,6 +438,7 @@ func (s MySQLSinkSuite) TestMysqlSinkWorker(c *check.C) { var outputRows [][]*model.RowChangedEvent var outputReplicaIDs []uint64 w := newMySQLSinkWorker(tc.maxTxnRow, 1, + bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"), func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { outputRows = append(outputRows, events) outputReplicaIDs = append(outputReplicaIDs, replicaID) diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 78e83bb0f93..21f6dd3f3e2 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -3558,6 +3558,208 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Bucketed histogram of conflict detect time (s) for single DML statement", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 59 + }, + "hiddenSeries": false, + "id": 83, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.95,sum(rate(ticdc_sink_conflict_detect_duration_bucket{changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,instance))", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{instance}}-95", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.99,sum(rate(ticdc_sink_conflict_detect_duration_bucket{changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (le,instance))", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{instance}}-99", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "sink conflict detect duration", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:241", + "format": "s", + "label": null, + "logBase": 2, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:242", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "size of the DML bucket", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 66 + }, + "hiddenSeries": false, + "id": 84, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(ticdc_sink_bucket_size{changefeed=~\"$changefeed\",capture=~\"$capture\"}[1m])) by (capture,bucket)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{capture}}-{{bucket}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "mysql sink bucket_size", + "tooltip": { + "shared": true, + "sort": 2, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:860", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:861", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Changefeed", @@ -4581,4 +4783,4 @@ "title": "Test-Cluster-CDC", "uid": "YiGL8hBZa", "version": 6 -} \ No newline at end of file +}