From 76212ee47d9e8e37c25ab401673a9888d4a03bab Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Fri, 29 Dec 2023 14:04:28 +0800 Subject: [PATCH 1/5] Update metrics.go --- cdc/api/v2/model.go | 3 + cdc/api/v2/model_test.go | 1 + cdc/kv/client.go | 62 ++++-- cdc/kv/client_bench_test.go | 4 +- cdc/kv/client_test.go | 57 +++--- cdc/kv/metrics.go | 19 +- cdc/kv/region_worker.go | 182 ++++++++++-------- cdc/kv/shared_region_worker.go | 2 +- cdc/processor/processor.go | 3 +- cdc/processor/sourcemanager/manager.go | 24 ++- .../puller/dummy_puller_wrapper.go | 2 +- .../sourcemanager/puller/puller_wrapper.go | 3 + cdc/puller/ddl_puller.go | 2 +- cdc/puller/puller.go | 8 +- cdc/puller/puller_test.go | 3 +- pkg/config/config_test_data.go | 3 + pkg/config/replica_config.go | 4 +- 17 files changed, 234 insertions(+), 148 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 646ce926cf0..cf0a4003e4a 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -191,6 +191,7 @@ type ReplicaConfig struct { IgnoreIneligibleTable bool `json:"ignore_ineligible_table"` CheckGCSafePoint bool `json:"check_gc_safe_point"` EnableSyncPoint *bool `json:"enable_sync_point,omitempty"` + EnableTableMonitor *bool `json:"enable_table_monitor,omitempty"` BDRMode *bool `json:"bdr_mode,omitempty"` SyncPointInterval *JSONDuration `json:"sync_point_interval,omitempty" swaggertype:"string"` @@ -221,6 +222,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( res.ForceReplicate = c.ForceReplicate res.CheckGCSafePoint = c.CheckGCSafePoint res.EnableSyncPoint = c.EnableSyncPoint + res.EnableTableMonitor = c.EnableTableMonitor res.IgnoreIneligibleTable = c.IgnoreIneligibleTable res.SQLMode = c.SQLMode if c.SyncPointInterval != nil { @@ -532,6 +534,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { IgnoreIneligibleTable: cloned.IgnoreIneligibleTable, CheckGCSafePoint: cloned.CheckGCSafePoint, EnableSyncPoint: cloned.EnableSyncPoint, + EnableTableMonitor: cloned.EnableTableMonitor, BDRMode: cloned.BDRMode, SQLMode: cloned.SQLMode, } diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 30115d4c714..a7e0ceb9d04 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -35,6 +35,7 @@ var defaultAPIConfig = &ReplicaConfig{ CheckGCSafePoint: true, BDRMode: util.AddressOf(false), EnableSyncPoint: util.AddressOf(false), + EnableTableMonitor: util.AddressOf(false), SyncPointInterval: &JSONDuration{10 * time.Minute}, SyncPointRetention: &JSONDuration{24 * time.Hour}, Filter: &FilterConfig{ diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 74b3c506d88..c462a0aace4 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -144,6 +144,7 @@ type CDCKVClient interface { ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error // RegionCount returns the number of captured regions. @@ -276,8 +277,9 @@ func (c *CDCClient) EventFeed( ctx context.Context, span tablepb.Span, ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error { - s := newEventFeedSession(c, span, lockResolver, ts, eventCh) + s := newEventFeedSession(c, span, lockResolver, ts, eventCh, enableTableMonitor) return s.eventFeed(ctx) } @@ -355,11 +357,10 @@ type eventFeedSession struct { rangeLock *regionlock.RegionRangeLock - // To identify metrics of different eventFeedSession - id string - regionChSizeGauge prometheus.Gauge - errChSizeGauge prometheus.Gauge - rangeChSizeGauge prometheus.Gauge + enableTableMonitor bool + regionChSizeGauge prometheus.Gauge + errChSizeGauge prometheus.Gauge + rangeChSizeGauge prometheus.Gauge streams map[string]*eventFeedStream streamsLock sync.RWMutex @@ -380,9 +381,9 @@ func newEventFeedSession( lockResolver txnutil.LockResolver, startTs uint64, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) *eventFeedSession { id := allocID() - idStr := strconv.FormatUint(id, 10) rangeLock := regionlock.NewRegionRangeLock( id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) @@ -393,16 +394,19 @@ func newEventFeedSession( tableID: client.tableID, tableName: client.tableName, - totalSpan: totalSpan, - eventCh: eventCh, - rangeLock: rangeLock, - lockResolver: lockResolver, - id: idStr, - regionChSizeGauge: clientChannelSize.WithLabelValues("region"), - errChSizeGauge: clientChannelSize.WithLabelValues("err"), - rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), - streams: make(map[string]*eventFeedStream), - streamsCanceller: make(map[string]context.CancelFunc), + totalSpan: totalSpan, + eventCh: eventCh, + rangeLock: rangeLock, + lockResolver: lockResolver, + enableTableMonitor: enableTableMonitor, + regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), + errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), + rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), + streams: make(map[string]*eventFeedStream), + streamsCanceller: make(map[string]context.CancelFunc), resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -1074,7 +1078,7 @@ func (s *eventFeedSession) receiveFromStream( eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - err := handleExit(worker.run()) + err := handleExit(worker.run(s.enableTableMonitor)) if err != nil { log.Error("region worker exited with error", zap.String("namespace", s.changefeed.Namespace), @@ -1089,10 +1093,32 @@ func (s *eventFeedSession) receiveFromStream( }) receiveEvents := func() error { + metricWorkerBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), "event-receiver", addr) + metricsTicker := time.NewTicker(time.Second * 10) + defer metricsTicker.Stop() + var receiveTime time.Duration + startToWork := time.Now() + maxCommitTs := model.Ts(0) for { + startToReceive := time.Now() cevent, err := stream.Recv() + if s.enableTableMonitor { + receiveTime += time.Since(startToReceive) + select { + case <-metricsTicker.C: + now := time.Now() + // busyRatio indicates the actual working time (receive and decode grpc msg) of the worker. + busyRatio := int(receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 1000) + metricWorkerBusyRatio.Add(float64(busyRatio)) + startToWork = now + receiveTime = 0 + default: + } + } + failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) { if op.(string) == "error" { _ = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 1cf0546b88e..d6721870784 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -205,7 +205,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( go func() { err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) if errors.Cause(err) != context.Canceled { b.Error(err) } @@ -299,7 +299,7 @@ func prepareBench(b *testing.B, regionNum int) ( go func() { err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("z")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) if errors.Cause(err) != context.Canceled { b.Error(err) } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 9c6deedfde2..53bfd1ae026 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -335,7 +335,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 1, lockResolver, eventCh) + 1, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -435,7 +435,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 1, lockResolver, eventCh) + 1, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -535,7 +535,7 @@ func TestHandleError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("d")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -695,7 +695,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.True(t, cerror.ErrVersionIncompatible.Equal(err)) }() @@ -763,7 +763,7 @@ func TestClusterIDMismatch(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.True(t, cerror.ErrClusterIDMismatch.Equal(err)) }() @@ -830,7 +830,7 @@ func testHandleFeedEvent(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1291,7 +1291,7 @@ func TestStreamSendWithError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockerResolver, eventCh) + 100, lockerResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1403,7 +1403,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1537,7 +1537,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer close(eventCh) err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1746,7 +1746,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1823,7 +1823,7 @@ func TestNoPendingRegionError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1901,7 +1901,7 @@ func TestDropStaleRequest(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2015,7 +2015,7 @@ func TestResolveLock(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2121,7 +2121,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer clientWg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, errUnreachable, err) }() @@ -2248,7 +2248,7 @@ func testEventAfterFeedStop(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2435,7 +2435,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2653,7 +2653,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2749,7 +2749,7 @@ func TestFailRegionReentrant(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2832,7 +2832,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2900,7 +2900,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2978,7 +2978,7 @@ func testKVClientForceReconnect(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3129,7 +3129,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("z")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3246,7 +3246,7 @@ func TestEvTimeUpdate(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3372,7 +3372,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3466,7 +3466,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer wg.Done() err = cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3541,8 +3541,9 @@ func createFakeEventFeedSession() *eventFeedSession { return newEventFeedSession( &CDCClient{config: config.GetDefaultServerConfig()}, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - nil, /*lockResolver*/ - 100, /*startTs*/ - nil, /*eventCh*/ + nil, /*lockResolver*/ + 100, /*startTs*/ + nil, /*eventCh*/ + false, /*diableTableMonitor*/ ) } diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index fb0d6d90a9b..1cd72a715e6 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -71,7 +71,7 @@ var ( Subsystem: "kvclient", Name: "channel_size", Help: "size of each channel in kv client", - }, []string{"channel"}) + }, []string{"namespace", "changefeed", "table", "type"}) clientRegionTokenSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -143,6 +143,21 @@ var ( }, // actions: wait, run. []string{"namespace", "changefeed"}) + + workerBusyRatio = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_worker_busy_ratio", + Help: "Busy ratio (X ms in 1s) for region worker.", + }, []string{"namespace", "changefeed", "table", "store", "type"}) + workerChannelSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_worker_channel_size", + Help: "size of each channel in region worker", + }, []string{"namespace", "changefeed", "table", "store", "type"}) ) // GetGlobalGrpcMetrics gets the global grpc metrics. @@ -167,6 +182,8 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(regionConnectDuration) registry.MustRegister(lockResolveDuration) registry.MustRegister(regionWorkerQueueDuration) + registry.MustRegister(workerBusyRatio) + registry.MustRegister(workerChannelSize) // Register client metrics to registry. registry.MustRegister(grpcMetrics) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 3f430878dbb..8c56117e3e0 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -18,6 +18,7 @@ import ( "encoding/hex" "reflect" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -76,6 +77,9 @@ type regionWorkerMetrics struct { metricSendEventCommittedCounter prometheus.Counter metricQueueDuration prometheus.Observer + + metricWorkerBusyRatio prometheus.Counter + metricWorkerChannelSize prometheus.Gauge } /* @@ -118,7 +122,7 @@ type regionWorker struct { pendingRegions *syncRegionFeedStateMap } -func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { +func newRegionWorkerMetrics(changefeedID model.ChangeFeedID, tableID string, storeAddr string) *regionWorkerMetrics { metrics := ®ionWorkerMetrics{} metrics.metricReceivedEventSize = eventSize.WithLabelValues("received") metrics.metricDroppedEventSize = eventSize.WithLabelValues("dropped") @@ -144,6 +148,11 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric metrics.metricQueueDuration = regionWorkerQueueDuration. WithLabelValues(changefeedID.Namespace, changefeedID.ID) + metrics.metricWorkerBusyRatio = workerBusyRatio.WithLabelValues( + changefeedID.Namespace, changefeedID.ID, tableID, storeAddr, "event-handler") + metrics.metricWorkerChannelSize = workerChannelSize.WithLabelValues( + changefeedID.Namespace, changefeedID.ID, tableID, storeAddr, "input") + return metrics } @@ -162,7 +171,7 @@ func newRegionWorker( rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, concurrency: int(s.client.config.KVClient.WorkerConcurrent), - metrics: newRegionWorkerMetrics(changefeedID), + metrics: newRegionWorkerMetrics(changefeedID, strconv.FormatInt(s.tableID, 10), addr), inputPending: 0, pendingRegions: pendingRegions, @@ -438,25 +447,42 @@ func (w *regionWorker) onHandleExit(err error) { } } -func (w *regionWorker) eventHandler(ctx context.Context) error { - pollEvents := func() ([]*regionStatefulEvent, error) { - exitFn := func() error { - log.Info("region worker closed by error", - zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID), - zap.Int64("tableID", w.session.tableID), - zap.String("tableName", w.session.tableName)) - return cerror.ErrRegionWorkerExit.GenWithStackByArgs() - } +func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool) error { + exitFn := func() error { + log.Info("region worker closed by error", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName)) + return cerror.ErrRegionWorkerExit.GenWithStackByArgs() + } + metricsTicker := time.NewTicker(time.Second * 10) + defer metricsTicker.Stop() + var processTime time.Duration + startToWork := time.Now() + + highWatermarkMet := false + for { select { case <-ctx.Done(): - return nil, errors.Trace(ctx.Err()) + return errors.Trace(ctx.Err()) case err := <-w.errorCh: - return nil, errors.Trace(err) + return errors.Trace(err) + case <-metricsTicker.C: + if enableTableMonitor { + w.metrics.metricWorkerChannelSize.Set(float64(len(w.inputCh))) + + now := time.Now() + // busyRatio indicates the actual working time of the worker. + busyRatio := int(processTime.Seconds() / now.Sub(startToWork).Seconds() * 1000) + w.metrics.metricWorkerBusyRatio.Add(float64(busyRatio)) + startToWork = now + processTime = 0 + } case events, ok := <-w.inputCh: if !ok { - return nil, exitFn() + return exitFn() } if len(events) == 0 { log.Panic("regionWorker.inputCh doesn't accept empty slice") @@ -465,80 +491,74 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { // event == nil means the region worker should exit and re-establish // all existing regions. if event == nil { - return nil, exitFn() + return exitFn() } } - return events, nil - } - } - - highWatermarkMet := false - for { - events, err := pollEvents() - if err != nil { - return err - } - regionEventsBatchSize.Observe(float64(len(events))) - inputPending := atomic.LoadInt32(&w.inputPending) - if highWatermarkMet { - highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark - } else { - highWatermarkMet = int(inputPending) >= regionWorkerHighWatermark - } - atomic.AddInt32(&w.inputPending, -int32(len(events))) + regionEventsBatchSize.Observe(float64(len(events))) - if highWatermarkMet { - // All events in one batch can be hashed into one handle slot. - slot := w.inputCalcSlot(events[0].regionID) - eventsX := make([]interface{}, 0, len(events)) - for _, event := range events { - eventsX = append(eventsX, event) - } - err = w.handles[slot].AddEvents(ctx, eventsX) - if err != nil { - return err + start := time.Now() + inputPending := atomic.LoadInt32(&w.inputPending) + if highWatermarkMet { + highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark + } else { + highWatermarkMet = int(inputPending) >= regionWorkerHighWatermark } - // Principle: events from the same region must be processed linearly. - // - // When buffered events exceed high watermark, we start to use worker - // pool to improve throughput, and we need a mechanism to quit worker - // pool when buffered events are less than low watermark, which means - // we should have a way to know whether events sent to the worker pool - // are all processed. - // Send a dummy event to each worker pool handler, after each of these - // events are processed, we can ensure all events sent to worker pool - // from this region worker are processed. - finishedCallbackCh := make(chan struct{}, 1) - err = w.handles[slot].AddEvent(ctx, ®ionStatefulEvent{finishedCallbackCh: finishedCallbackCh}) - if err != nil { - return err - } - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case err = <-w.errorCh: - return err - case <-finishedCallbackCh: - } - } else { - // We measure whether the current worker is busy based on the input - // channel size. If the buffered event count is larger than the high - // watermark, we send events to worker pool to increase processing - // throughput. Otherwise, we process event in local region worker to - // ensure low processing latency. - for _, event := range events { - err = w.processEvent(ctx, event) + atomic.AddInt32(&w.inputPending, -int32(len(events))) + + if highWatermarkMet { + // All events in one batch can be hashed into one handle slot. + slot := w.inputCalcSlot(events[0].regionID) + eventsX := make([]interface{}, 0, len(events)) + for _, event := range events { + eventsX = append(eventsX, event) + } + err := w.handles[slot].AddEvents(ctx, eventsX) if err != nil { return err } + // Principle: events from the same region must be processed linearly. + // + // When buffered events exceed high watermark, we start to use worker + // pool to improve throughput, and we need a mechanism to quit worker + // pool when buffered events are less than low watermark, which means + // we should have a way to know whether events sent to the worker pool + // are all processed. + // Send a dummy event to each worker pool handler, after each of these + // events are processed, we can ensure all events sent to worker pool + // from this region worker are processed. + finishedCallbackCh := make(chan struct{}, 1) + err = w.handles[slot].AddEvent(ctx, ®ionStatefulEvent{finishedCallbackCh: finishedCallbackCh}) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err = <-w.errorCh: + return err + case <-finishedCallbackCh: + } + } else { + // We measure whether the current worker is busy based on the input + // channel size. If the buffered event count is larger than the high + // watermark, we send events to worker pool to increase processing + // throughput. Otherwise, we process event in local region worker to + // ensure low processing latency. + for _, event := range events { + err := w.processEvent(ctx, event) + if err != nil { + return err + } + } } - } - for _, ev := range events { - // resolved ts event has been consumed, it is safe to put back. - if ev.resolvedTsEvent != nil { - w.session.resolvedTsPool.Put(ev) + for _, ev := range events { + // resolved ts event has been consumed, it is safe to put back. + if ev.resolvedTsEvent != nil { + w.session.resolvedTsPool.Put(ev) + } } + processTime += time.Since(start) } } } @@ -592,7 +612,7 @@ func (w *regionWorker) cancelStream(delay time.Duration) { } } -func (w *regionWorker) run() error { +func (w *regionWorker) run(enableTableMonitor bool) error { defer func() { for _, h := range w.handles { h.Unregister() @@ -617,7 +637,7 @@ func (w *regionWorker) run() error { return handleError(w.checkErrorReconnect(w.resolveLock(ctx))) }) wg.Go(func() error { - return handleError(w.eventHandler(ctx)) + return handleError(w.eventHandler(ctx, enableTableMonitor)) }) _ = handleError(w.collectWorkpoolError(ctx)) _ = wg.Wait() diff --git a/cdc/kv/shared_region_worker.go b/cdc/kv/shared_region_worker.go index 732b148f1ff..aee9aee65cf 100644 --- a/cdc/kv/shared_region_worker.go +++ b/cdc/kv/shared_region_worker.go @@ -74,7 +74,7 @@ func newSharedRegionWorker(c *SharedClient) *sharedRegionWorker { client: c, inputCh: make(chan statefulEvent, regionWorkerInputChanSize), statesManager: newRegionStateManager(-1), - metrics: newRegionWorkerMetrics(c.changefeed), + metrics: newRegionWorkerMetrics(c.changefeed, "shared", "shared"), } } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index b7938985ff3..7e235b69241 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -626,7 +626,8 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, - sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode)) + sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode), + util.GetOrZero(p.latestInfo.Config.EnableTableMonitor)) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 258694793f4..742cd8f9766 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -75,6 +75,7 @@ type SourceManager struct { // if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers` // will be used. Otherwise `multiplexingPuller` will be used instead. multiplexing bool + enableTableMonitor bool tablePullers tablePullers multiplexingPuller multiplexingPuller } @@ -86,9 +87,10 @@ func New( mg entry.MounterGroup, engine sorter.SortEngine, bdrMode bool, + enableTableMonitor bool, ) *SourceManager { multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing - return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper) + return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper, enableTableMonitor) } // NewForTest creates a new source manager for testing. @@ -99,7 +101,7 @@ func NewForTest( engine sorter.SortEngine, bdrMode bool, ) *SourceManager { - return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, pullerwrapper.NewPullerWrapperForTest) + return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, pullerwrapper.NewPullerWrapperForTest, false) } func newSourceManager( @@ -110,15 +112,17 @@ func newSourceManager( bdrMode bool, multiplexing bool, pullerWrapperCreator pullerWrapperCreator, + enableTableMonitor bool, ) *SourceManager { mgr := &SourceManager{ - ready: make(chan struct{}), - changefeedID: changefeedID, - up: up, - mg: mg, - engine: engine, - bdrMode: bdrMode, - multiplexing: multiplexing, + ready: make(chan struct{}), + changefeedID: changefeedID, + up: up, + mg: mg, + engine: engine, + bdrMode: bdrMode, + multiplexing: multiplexing, + enableTableMonitor: enableTableMonitor, } if !multiplexing { mgr.tablePullers.errChan = make(chan error, 16) @@ -138,7 +142,7 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo } p := m.tablePullers.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode) - p.Start(m.tablePullers.ctx, m.up, m.engine, m.tablePullers.errChan) + p.Start(m.tablePullers.ctx, m.up, m.engine, m.tablePullers.errChan, m.enableTableMonitor) m.tablePullers.Store(span, p) } diff --git a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go index 171370f7a43..187cb27f083 100644 --- a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go @@ -39,7 +39,7 @@ func NewPullerWrapperForTest( } func (d *dummyPullerWrapper) Start(ctx context.Context, up *upstream.Upstream, - eventSortEngine sorter.SortEngine, errCh chan<- error) { + eventSortEngine sorter.SortEngine, errCh chan<- error, enableTableMonitor bool) { } func (d *dummyPullerWrapper) GetStats() puller.Stats { diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index e253f43ffe9..b61845e2dc5 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -35,6 +35,7 @@ type Wrapper interface { up *upstream.Upstream, eventSortEngine sorter.SortEngine, errChan chan<- error, + enableTableMonitor bool, ) GetStats() puller.Stats Close() @@ -79,6 +80,7 @@ func (n *WrapperImpl) Start( up *upstream.Upstream, eventSortEngine sorter.SortEngine, errChan chan<- error, + enableTableMonitor bool, ) { ctx, n.cancel = context.WithCancel(ctx) errorHandler := func(err error) { @@ -108,6 +110,7 @@ func (n *WrapperImpl) Start( n.span.TableID, n.tableName, n.bdrMode, + enableTableMonitor, ) // Use errgroup to ensure all sub goroutines can exit without calling Close. diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 8f62fb021ff..7296b9c392b 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -630,7 +630,7 @@ func NewDDLJobPuller( jobPuller.puller.Puller = New( ctx, pdCli, up.GrpcPool, regionCache, kvStorage, pdClock, checkpointTs, spans, cfg, changefeed, -1, memorysorter.DDLPullerTableName, - ddlPullerFilterLoop, + ddlPullerFilterLoop, false, ) } diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index c9793475b37..41a264fd707 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -78,6 +78,8 @@ type pullerImpl struct { lastForwardResolvedTs uint64 // startResolvedTs is the resolvedTs when puller is initialized startResolvedTs uint64 + + enableTableMonitor bool } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -94,6 +96,7 @@ func New(ctx context.Context, tableID model.TableID, tableName string, filterLoop bool, + enableTableMonitor bool, ) Puller { tikvStorage, ok := kvStorage.(tikv.Storage) if !ok { @@ -119,7 +122,8 @@ func New(ctx context.Context, tableName: tableName, cfg: cfg, - startResolvedTs: checkpointTs, + startResolvedTs: checkpointTs, + enableTableMonitor: enableTableMonitor, } return p } @@ -137,7 +141,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { span := span g.Go(func() error { - return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, eventCh) + return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, eventCh, p.enableTableMonitor) }) } diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index d1940d790a4..211265258eb 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -82,6 +82,7 @@ func (mc *mockCDCKVClient) EventFeed( ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error { for { select { @@ -135,7 +136,7 @@ func newPullerForTest( ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), checkpointTs, spans, config.GetDefaultServerConfig(), model.DefaultChangeFeedID("changefeed-id-test"), 0, - "table-test", false) + "table-test", false, false) wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 9ac94bb16db..0d63802f2c1 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -21,6 +21,7 @@ const ( "ignore-ineligible-table":false, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, @@ -200,6 +201,7 @@ const ( "ignore-ineligible-table":false, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, @@ -363,6 +365,7 @@ const ( "ignore-ineligible-table":false, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 98e8a1cb67f..4ed51891f24 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -52,6 +52,7 @@ var defaultReplicaConfig = &ReplicaConfig{ CaseSensitive: false, CheckGCSafePoint: true, EnableSyncPoint: util.AddressOf(false), + EnableTableMonitor: util.AddressOf(false), SyncPointInterval: util.AddressOf(10 * time.Minute), SyncPointRetention: util.AddressOf(24 * time.Hour), BDRMode: util.AddressOf(false), @@ -135,7 +136,8 @@ type replicaConfig struct { ForceReplicate bool `toml:"force-replicate" json:"force-replicate"` CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"` // EnableSyncPoint is only available when the downstream is a Database. - EnableSyncPoint *bool `toml:"enable-sync-point" json:"enable-sync-point,omitempty"` + EnableSyncPoint *bool `toml:"enable-sync-point" json:"enable-sync-point,omitempty"` + EnableTableMonitor *bool `toml:"enable-table-monitor" json:"enable-table-monitor"` // IgnoreIneligibleTable is used to store the user's config when creating a changefeed. // not used in the changefeed's lifecycle. IgnoreIneligibleTable bool `toml:"ignore-ineligible-table" json:"ignore-ineligible-table"` From 11504cf15f699f533956f2173c569a9a1b7bd175 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 2 Jan 2024 19:38:42 +0800 Subject: [PATCH 2/5] fix --- cdc/kv/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index c462a0aace4..cb69756a075 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1094,7 +1094,7 @@ func (s *eventFeedSession) receiveFromStream( receiveEvents := func() error { metricWorkerBusyRatio := workerBusyRatio.WithLabelValues( - s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), "event-receiver", addr) + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-receiver") metricsTicker := time.NewTicker(time.Second * 10) defer metricsTicker.Stop() var receiveTime time.Duration From db5ce0e64dca3f1eb9454630cacad6dccbeb7c81 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 3 Jan 2024 00:21:05 +0800 Subject: [PATCH 3/5] address comment --- cdc/kv/client.go | 24 +++++++++++++++++------- cdc/kv/metrics.go | 4 ++-- cdc/kv/region_worker.go | 6 +++--- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index cb69756a075..aa91d9faad9 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1059,6 +1059,10 @@ func (s *eventFeedSession) receiveFromStream( metricSendEventBatchResolvedSize := batchResolvedEventSize. WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) + metricReceiveBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-receiver") + metricProcessBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-processor") // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic @@ -1093,11 +1097,10 @@ func (s *eventFeedSession) receiveFromStream( }) receiveEvents := func() error { - metricWorkerBusyRatio := workerBusyRatio.WithLabelValues( - s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-receiver") - metricsTicker := time.NewTicker(time.Second * 10) + metricsTicker := time.NewTicker(time.Second * 20) defer metricsTicker.Stop() var receiveTime time.Duration + var processTime time.Duration startToWork := time.Now() maxCommitTs := model.Ts(0) @@ -1110,11 +1113,16 @@ func (s *eventFeedSession) receiveFromStream( select { case <-metricsTicker.C: now := time.Now() - // busyRatio indicates the actual working time (receive and decode grpc msg) of the worker. - busyRatio := int(receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 1000) - metricWorkerBusyRatio.Add(float64(busyRatio)) - startToWork = now + // busyRatio indicates the blocking time (receive and decode grpc msg) of the worker. + busyRatio := receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + metricReceiveBusyRatio.Set(busyRatio) receiveTime = 0 + // busyRatio indicates the working time (dispatch to region worker) of the worker. + busyRatio = processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + metricProcessBusyRatio.Set(busyRatio) + processTime = 0 + + startToWork = now default: } } @@ -1177,6 +1185,7 @@ func (s *eventFeedSession) receiveFromStream( return nil } + startToProcess := time.Now() size := cevent.Size() if size > warnRecvMsgSizeThreshold { regionCount := 0 @@ -1219,6 +1228,7 @@ func (s *eventFeedSession) receiveFromStream( tsStat.commitTs.Store(maxCommitTs) } } + processTime += time.Since(startToProcess) } } eg.Go(func() error { diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index 1cd72a715e6..b8d9067d136 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -144,8 +144,8 @@ var ( // actions: wait, run. []string{"namespace", "changefeed"}) - workerBusyRatio = prometheus.NewCounterVec( - prometheus.CounterOpts{ + workerBusyRatio = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "kvclient", Name: "region_worker_busy_ratio", diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 8c56117e3e0..efb341d745b 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -78,7 +78,7 @@ type regionWorkerMetrics struct { metricQueueDuration prometheus.Observer - metricWorkerBusyRatio prometheus.Counter + metricWorkerBusyRatio prometheus.Gauge metricWorkerChannelSize prometheus.Gauge } @@ -475,8 +475,8 @@ func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool now := time.Now() // busyRatio indicates the actual working time of the worker. - busyRatio := int(processTime.Seconds() / now.Sub(startToWork).Seconds() * 1000) - w.metrics.metricWorkerBusyRatio.Add(float64(busyRatio)) + busyRatio := processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + w.metrics.metricWorkerBusyRatio.Set(busyRatio) startToWork = now processTime = 0 } From 0de2d9cc95fd22adf11929a25bbc36f1f9bd8760 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 3 Jan 2024 11:41:09 +0800 Subject: [PATCH 4/5] address comments --- cdc/kv/client.go | 12 +++++------- cdc/kv/region_worker.go | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index aa91d9faad9..899d919965e 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -75,6 +75,8 @@ const ( regionScheduleReload = false scanRegionsConcurrency = 1024 + + tableMonitorInterval = 2 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -1097,8 +1099,6 @@ func (s *eventFeedSession) receiveFromStream( }) receiveEvents := func() error { - metricsTicker := time.NewTicker(time.Second * 20) - defer metricsTicker.Stop() var receiveTime time.Duration var processTime time.Duration startToWork := time.Now() @@ -1110,20 +1110,18 @@ func (s *eventFeedSession) receiveFromStream( if s.enableTableMonitor { receiveTime += time.Since(startToReceive) - select { - case <-metricsTicker.C: + if time.Since(startToWork) >= tableMonitorInterval { now := time.Now() - // busyRatio indicates the blocking time (receive and decode grpc msg) of the worker. + // Receive busyRatio indicates the blocking time (receive and decode grpc msg) of the worker. busyRatio := receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 100 metricReceiveBusyRatio.Set(busyRatio) receiveTime = 0 - // busyRatio indicates the working time (dispatch to region worker) of the worker. + // Process busyRatio indicates the working time (dispatch to region worker) of the worker. busyRatio = processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 metricProcessBusyRatio.Set(busyRatio) processTime = 0 startToWork = now - default: } } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index efb341d745b..66919606187 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -457,7 +457,7 @@ func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } - metricsTicker := time.NewTicker(time.Second * 10) + metricsTicker := time.NewTicker(tableMonitorInterval) defer metricsTicker.Stop() var processTime time.Duration startToWork := time.Now() From 704ad6ddedcd36b72a59d42ef1d8fe26b829903e Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 3 Jan 2024 12:08:47 +0800 Subject: [PATCH 5/5] fix ut --- cdc/model/changefeed_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 2bf682ecdd4..16626280037 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -250,6 +250,7 @@ func TestVerifyAndComplete(t *testing.T) { CaseSensitive: false, CheckGCSafePoint: true, EnableSyncPoint: util.AddressOf(false), + EnableTableMonitor: util.AddressOf(false), SyncPointInterval: util.AddressOf(time.Minute * 10), SyncPointRetention: util.AddressOf(time.Hour * 24), BDRMode: util.AddressOf(false),