Skip to content

Commit

Permalink
mysql (ticdc): Improve the performance of the mysql sink by refining …
Browse files Browse the repository at this point in the history
…the transaction event batching logic (#10466) (#11248)

close #11241
  • Loading branch information
ti-chi-bot authored Jun 13, 2024
1 parent c35c3c0 commit 8588082
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/txn/txn_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func newSink(ctx context.Context,
for i, backend := range backends {
w := newWorker(ctx1, changefeedID, i, backend, len(backends))
txnCh := sink.alive.conflictDetector.GetOutChByCacheID(int64(i))
g.Go(func() error { return w.runLoop(txnCh) })
g.Go(func() error { return w.run(txnCh) })
sink.workers = append(sink.workers, w)
}

Expand Down
81 changes: 43 additions & 38 deletions cdc/sink/dmlsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type worker struct {
metricConflictDetectDuration prometheus.Observer
metricQueueDuration prometheus.Observer
metricTxnWorkerFlushDuration prometheus.Observer
metricTxnWorkerBusyRatio prometheus.Counter
metricTxnWorkerTotalDuration prometheus.Observer
metricTxnWorkerHandledRows prometheus.Counter

// Fields only used in the background loop.
Expand All @@ -63,8 +63,8 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID,

metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricQueueDuration: txn.QueueDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerBusyRatio: txn.WorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),
metricTxnWorkerTotalDuration: txn.WorkerTotalDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),
metricTxnWorkerHandledRows: txn.WorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),

flushInterval: backend.MaxFlushInterval(),
Expand All @@ -73,8 +73,8 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID,
}
}

// Run a loop.
func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error {
// Continuously get events from txnCh and call backend flush based on conditions.
func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error {
defer func() {
if err := w.backend.Close(); err != nil {
log.Info("Transaction dmlSink backend close fail",
Expand All @@ -87,14 +87,7 @@ func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) erro
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID))

ticker := time.NewTicker(w.flushInterval)
defer ticker.Stop()

needFlush := false
var flushTimeSlice, totalTimeSlice time.Duration
overseerTicker := time.NewTicker(time.Second)
defer overseerTicker.Stop()
startToWork := time.Now()
start := time.Now()
for {
select {
case <-w.ctx.Done():
Expand All @@ -103,27 +96,43 @@ func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) erro
zap.Int("workerID", w.ID))
return nil
case txn := <-txnCh:
// we get the data from txnCh.out until no more data here or reach the state that can be flushed.
// If no more data in txnCh.out, and also not reach the state that can be flushed,
// we will wait for 10ms and then do flush to avoid too much flush with small amount of txns.
if txn.TxnEvent != nil {
needFlush = w.onEvent(txn.TxnEvent, txn.PostTxnExecuted)
}
case <-ticker.C:
needFlush = true
case now := <-overseerTicker.C:
totalTimeSlice = now.Sub(startToWork)
busyRatio := int(flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000)
w.metricTxnWorkerBusyRatio.Add(float64(busyRatio) / float64(w.workerCount))
startToWork = now
flushTimeSlice = 0
}
if needFlush {
if err := w.doFlush(&flushTimeSlice); err != nil {
log.Error("Transaction dmlSink worker exits unexpectly",
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID),
zap.Error(err))
return err
needFlush := w.onEvent(txn.TxnEvent, txn.PostTxnExecuted)
if !needFlush {
delay := time.NewTimer(w.flushInterval)
for !needFlush {
select {
case txn := <-txnCh:
needFlush = w.onEvent(txn.TxnEvent, txn.PostTxnExecuted)
case <-delay.C:
needFlush = true
}
}
// Release resources promptly
if !delay.Stop() {
select {
case <-delay.C:
default:
}
}
}
// needFlush must be true here, so we can do flush.
if err := w.doFlush(); err != nil {
log.Error("Transaction dmlSink worker exits unexpectly",
zap.String("changefeedID", w.changefeed),
zap.Int("workerID", w.ID),
zap.Error(err))
return err
}
// we record total time to calcuate the worker busy ratio.
// so we record the total time after flushing, to unified statistics on
// flush time and total time
w.metricTxnWorkerTotalDuration.Observe(time.Since(start).Seconds())
start = time.Now()
}
needFlush = false
}
}
}
Expand All @@ -150,16 +159,12 @@ func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool {
}

// doFlush flushes the backend.
// It returns true only if it can no longer be flushed.
func (w *worker) doFlush(flushTimeSlice *time.Duration) error {
func (w *worker) doFlush() error {
if w.hasPending {
start := time.Now()
defer func() {
elapsed := time.Since(start)
*flushTimeSlice += elapsed
w.metricTxnWorkerFlushDuration.Observe(elapsed.Seconds())
w.metricTxnWorkerFlushDuration.Observe(time.Since(start).Seconds())
}()

if err := w.backend.Flush(w.ctx); err != nil {
log.Warn("Transaction dmlSink backend flush fail",
zap.String("changefeedID", w.changefeed),
Expand Down
15 changes: 8 additions & 7 deletions cdc/sink/metrics/txn/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ var (
Name: "txn_worker_flush_duration",
Help: "Flush duration (s) for txn worker.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~524s
}, []string{"namespace", "changefeed"})
}, []string{"namespace", "changefeed", "id"})

WorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
WorkerTotalDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "txn_worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for all workers.",
}, []string{"namespace", "changefeed"})
Name: "txn_worker_total_duration",
Help: "total duration (s) for txn worker.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 1ms~524s
}, []string{"namespace", "changefeed", "id"})

WorkerHandledRows = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -94,7 +95,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ConflictDetectDuration)
registry.MustRegister(QueueDuration)
registry.MustRegister(WorkerFlushDuration)
registry.MustRegister(WorkerBusyRatio)
registry.MustRegister(WorkerTotalDuration)
registry.MustRegister(WorkerHandledRows)
registry.MustRegister(SinkDMLBatchCommit)
registry.MustRegister(SinkDMLBatchCallback)
Expand Down
4 changes: 2 additions & 2 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -6598,9 +6598,9 @@
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_sink_txn_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (namespace,changefeed,instance)",
"expr": "sum(rate(ticdc_sink_txn_worker_flush_duration_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (namespace,changefeed,instance,id) /sum(rate(ticdc_sink_txn_worker_total_duration_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (namespace,changefeed,instance,id) *100",
"interval": "",
"legendFormat": "{{namespace}}-{{changefeed}}-{{instance}}",
"legendFormat": "{{namespace}}-{{changefeed}}-{{instance}}-worker-{{id}}",
"queryType": "randomWalk",
"refId": "A"
}
Expand Down

0 comments on commit 8588082

Please sign in to comment.