diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 0564fb0d1ee..3d23b8079dc 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -182,6 +182,7 @@ type ReplicaConfig struct { IgnoreIneligibleTable bool `json:"ignore_ineligible_table"` CheckGCSafePoint bool `json:"check_gc_safe_point"` EnableSyncPoint *bool `json:"enable_sync_point,omitempty"` + EnableTableMonitor *bool `json:"enable_table_monitor,omitempty"` BDRMode *bool `json:"bdr_mode,omitempty"` SyncPointInterval *JSONDuration `json:"sync_point_interval,omitempty" swaggertype:"string"` @@ -211,6 +212,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( res.ForceReplicate = c.ForceReplicate res.CheckGCSafePoint = c.CheckGCSafePoint res.EnableSyncPoint = c.EnableSyncPoint + res.EnableTableMonitor = c.EnableTableMonitor res.IgnoreIneligibleTable = c.IgnoreIneligibleTable res.SQLMode = c.SQLMode if c.SyncPointInterval != nil { @@ -503,6 +505,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { IgnoreIneligibleTable: cloned.IgnoreIneligibleTable, CheckGCSafePoint: cloned.CheckGCSafePoint, EnableSyncPoint: cloned.EnableSyncPoint, + EnableTableMonitor: cloned.EnableTableMonitor, BDRMode: cloned.BDRMode, SQLMode: cloned.SQLMode, } diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index c6b999c551e..33dc2a1a623 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -35,6 +35,7 @@ var defaultAPIConfig = &ReplicaConfig{ CheckGCSafePoint: true, BDRMode: util.AddressOf(false), EnableSyncPoint: util.AddressOf(false), + EnableTableMonitor: util.AddressOf(false), SyncPointInterval: &JSONDuration{10 * time.Minute}, SyncPointRetention: &JSONDuration{24 * time.Hour}, Filter: &FilterConfig{ diff --git a/cdc/kv/client.go b/cdc/kv/client.go index fe4fde0c377..32fc3989e27 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -75,6 +75,8 @@ const ( regionScheduleReload = false scanRegionsConcurrency = 1024 + + tableMonitorInterval = 2 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -143,6 +145,7 @@ type CDCKVClient interface { ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error // RegionCount returns the number of captured regions. @@ -273,8 +276,9 @@ func (c *CDCClient) EventFeed( ctx context.Context, span tablepb.Span, ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error { - s := newEventFeedSession(c, span, lockResolver, ts, eventCh) + s := newEventFeedSession(c, span, lockResolver, ts, eventCh, enableTableMonitor) return s.eventFeed(ctx) } @@ -352,11 +356,10 @@ type eventFeedSession struct { rangeLock *regionlock.RegionRangeLock - // To identify metrics of different eventFeedSession - id string - regionChSizeGauge prometheus.Gauge - errChSizeGauge prometheus.Gauge - rangeChSizeGauge prometheus.Gauge + enableTableMonitor bool + regionChSizeGauge prometheus.Gauge + errChSizeGauge prometheus.Gauge + rangeChSizeGauge prometheus.Gauge streams map[string]*eventFeedStream streamsLock sync.RWMutex @@ -377,9 +380,9 @@ func newEventFeedSession( lockResolver txnutil.LockResolver, startTs uint64, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) *eventFeedSession { id := allocID() - idStr := strconv.FormatUint(id, 10) rangeLock := regionlock.NewRegionRangeLock( id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) @@ -390,16 +393,19 @@ func newEventFeedSession( tableID: client.tableID, tableName: client.tableName, - totalSpan: totalSpan, - eventCh: eventCh, - rangeLock: rangeLock, - lockResolver: lockResolver, - id: idStr, - regionChSizeGauge: clientChannelSize.WithLabelValues("region"), - errChSizeGauge: clientChannelSize.WithLabelValues("err"), - rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), - streams: make(map[string]*eventFeedStream), - streamsCanceller: make(map[string]context.CancelFunc), + totalSpan: totalSpan, + eventCh: eventCh, + rangeLock: rangeLock, + lockResolver: lockResolver, + enableTableMonitor: enableTableMonitor, + regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), + errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), + rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), + streams: make(map[string]*eventFeedStream), + streamsCanceller: make(map[string]context.CancelFunc), resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -1013,6 +1019,10 @@ func (s *eventFeedSession) receiveFromStream( metricSendEventBatchResolvedSize := batchResolvedEventSize. WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) + metricReceiveBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-receiver") + metricProcessBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-processor") // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic @@ -1032,7 +1042,7 @@ func (s *eventFeedSession) receiveFromStream( eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - err := handleExit(worker.run()) + err := handleExit(worker.run(s.enableTableMonitor)) if err != nil { log.Error("region worker exited with error", zap.Error(err), zap.Any("changefeed", s.changefeed), @@ -1043,10 +1053,32 @@ func (s *eventFeedSession) receiveFromStream( }) receiveEvents := func() error { + var receiveTime time.Duration + var processTime time.Duration + startToWork := time.Now() + maxCommitTs := model.Ts(0) for { + startToReceive := time.Now() cevent, err := stream.Recv() + if s.enableTableMonitor { + receiveTime += time.Since(startToReceive) + if time.Since(startToWork) >= tableMonitorInterval { + now := time.Now() + // Receive busyRatio indicates the blocking time (receive and decode grpc msg) of the worker. + busyRatio := receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + metricReceiveBusyRatio.Set(busyRatio) + receiveTime = 0 + // Process busyRatio indicates the working time (dispatch to region worker) of the worker. + busyRatio = processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + metricProcessBusyRatio.Set(busyRatio) + processTime = 0 + + startToWork = now + } + } + failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) { if op.(string) == "error" { _ = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) @@ -1105,6 +1137,7 @@ func (s *eventFeedSession) receiveFromStream( return nil } + startToProcess := time.Now() size := cevent.Size() if size > warnRecvMsgSizeThreshold { regionCount := 0 @@ -1149,6 +1182,7 @@ func (s *eventFeedSession) receiveFromStream( tsStat.commitTs.Store(maxCommitTs) } } + processTime += time.Since(startToProcess) } } eg.Go(func() error { diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 668677a4f96..3efea93a631 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -205,7 +205,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( go func() { err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) if errors.Cause(err) != context.Canceled { b.Error(err) } @@ -299,7 +299,7 @@ func prepareBench(b *testing.B, regionNum int) ( go func() { err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("z")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) if errors.Cause(err) != context.Canceled { b.Error(err) } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index c2288e3705e..fd49202bd24 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -335,7 +335,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 1, lockResolver, eventCh) + 1, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -435,7 +435,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 1, lockResolver, eventCh) + 1, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -535,7 +535,7 @@ func TestHandleError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("d")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -695,7 +695,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.True(t, cerror.ErrVersionIncompatible.Equal(err)) }() @@ -763,7 +763,7 @@ func TestClusterIDMismatch(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.True(t, cerror.ErrClusterIDMismatch.Equal(err)) }() @@ -830,7 +830,7 @@ func testHandleFeedEvent(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1291,7 +1291,7 @@ func TestStreamSendWithError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockerResolver, eventCh) + 100, lockerResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1403,7 +1403,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1537,7 +1537,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer close(eventCh) err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1746,7 +1746,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1823,7 +1823,7 @@ func TestNoPendingRegionError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1901,7 +1901,7 @@ func TestDropStaleRequest(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2015,7 +2015,7 @@ func TestResolveLock(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2121,7 +2121,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer clientWg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, errUnreachable, err) }() @@ -2248,7 +2248,7 @@ func testEventAfterFeedStop(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2435,7 +2435,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2653,7 +2653,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2749,7 +2749,7 @@ func TestFailRegionReentrant(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2832,7 +2832,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2900,7 +2900,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2978,7 +2978,7 @@ func testKVClientForceReconnect(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3129,7 +3129,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("z")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3246,7 +3246,7 @@ func TestEvTimeUpdate(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3372,7 +3372,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3466,7 +3466,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer wg.Done() err = cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3541,8 +3541,9 @@ func createFakeEventFeedSession() *eventFeedSession { return newEventFeedSession( &CDCClient{config: config.GetDefaultServerConfig()}, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - nil, /*lockResolver*/ - 100, /*startTs*/ - nil, /*eventCh*/ + nil, /*lockResolver*/ + 100, /*startTs*/ + nil, /*eventCh*/ + false, /*diableTableMonitor*/ ) } diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index fb0d6d90a9b..b8d9067d136 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -71,7 +71,7 @@ var ( Subsystem: "kvclient", Name: "channel_size", Help: "size of each channel in kv client", - }, []string{"channel"}) + }, []string{"namespace", "changefeed", "table", "type"}) clientRegionTokenSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -143,6 +143,21 @@ var ( }, // actions: wait, run. []string{"namespace", "changefeed"}) + + workerBusyRatio = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_worker_busy_ratio", + Help: "Busy ratio (X ms in 1s) for region worker.", + }, []string{"namespace", "changefeed", "table", "store", "type"}) + workerChannelSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_worker_channel_size", + Help: "size of each channel in region worker", + }, []string{"namespace", "changefeed", "table", "store", "type"}) ) // GetGlobalGrpcMetrics gets the global grpc metrics. @@ -167,6 +182,8 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(regionConnectDuration) registry.MustRegister(lockResolveDuration) registry.MustRegister(regionWorkerQueueDuration) + registry.MustRegister(workerBusyRatio) + registry.MustRegister(workerChannelSize) // Register client metrics to registry. registry.MustRegister(grpcMetrics) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 3934f33caa4..963b58fcd5a 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -18,6 +18,7 @@ import ( "encoding/hex" "reflect" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -75,6 +76,9 @@ type regionWorkerMetrics struct { metricSendEventCommittedCounter prometheus.Counter metricQueueDuration prometheus.Observer + + metricWorkerBusyRatio prometheus.Gauge + metricWorkerChannelSize prometheus.Gauge } /* @@ -117,7 +121,7 @@ type regionWorker struct { pendingRegions *syncRegionFeedStateMap } -func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { +func newRegionWorkerMetrics(changefeedID model.ChangeFeedID, tableID string, storeAddr string) *regionWorkerMetrics { metrics := ®ionWorkerMetrics{} metrics.metricReceivedEventSize = eventSize.WithLabelValues("received") metrics.metricDroppedEventSize = eventSize.WithLabelValues("dropped") @@ -143,6 +147,11 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric metrics.metricQueueDuration = regionWorkerQueueDuration. WithLabelValues(changefeedID.Namespace, changefeedID.ID) + metrics.metricWorkerBusyRatio = workerBusyRatio.WithLabelValues( + changefeedID.Namespace, changefeedID.ID, tableID, storeAddr, "event-handler") + metrics.metricWorkerChannelSize = workerChannelSize.WithLabelValues( + changefeedID.Namespace, changefeedID.ID, tableID, storeAddr, "input") + return metrics } @@ -161,7 +170,7 @@ func newRegionWorker( rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, concurrency: int(s.client.config.KVClient.WorkerConcurrent), - metrics: newRegionWorkerMetrics(changefeedID), + metrics: newRegionWorkerMetrics(changefeedID, strconv.FormatInt(s.tableID, 10), addr), inputPending: 0, pendingRegions: pendingRegions, @@ -431,6 +440,7 @@ func (w *regionWorker) onHandleExit(err error) { } } +<<<<<<< HEAD func (w *regionWorker) eventHandler(ctx context.Context) error { pollEvents := func() ([]*regionStatefulEvent, error) { exitFn := func() error { @@ -439,15 +449,44 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { zap.String("changefeed", w.session.client.changefeed.ID)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } +======= +func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool) error { + exitFn := func() error { + log.Info("region worker closed by error", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName)) + return cerror.ErrRegionWorkerExit.GenWithStackByArgs() + } +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) + metricsTicker := time.NewTicker(tableMonitorInterval) + defer metricsTicker.Stop() + var processTime time.Duration + startToWork := time.Now() + + highWatermarkMet := false + for { select { case <-ctx.Done(): - return nil, errors.Trace(ctx.Err()) + return errors.Trace(ctx.Err()) case err := <-w.errorCh: - return nil, errors.Trace(err) + return errors.Trace(err) + case <-metricsTicker.C: + if enableTableMonitor { + w.metrics.metricWorkerChannelSize.Set(float64(len(w.inputCh))) + + now := time.Now() + // busyRatio indicates the actual working time of the worker. + busyRatio := processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + w.metrics.metricWorkerBusyRatio.Set(busyRatio) + startToWork = now + processTime = 0 + } case events, ok := <-w.inputCh: if !ok { - return nil, exitFn() + return exitFn() } if len(events) == 0 { log.Panic("regionWorker.inputCh doesn't accept empty slice") @@ -456,80 +495,74 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { // event == nil means the region worker should exit and re-establish // all existing regions. if event == nil { - return nil, exitFn() + return exitFn() } } - return events, nil - } - } - highWatermarkMet := false - for { - events, err := pollEvents() - if err != nil { - return err - } - regionEventsBatchSize.Observe(float64(len(events))) - - inputPending := atomic.LoadInt32(&w.inputPending) - if highWatermarkMet { - highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark - } else { - highWatermarkMet = int(inputPending) >= regionWorkerHighWatermark - } - atomic.AddInt32(&w.inputPending, -int32(len(events))) + regionEventsBatchSize.Observe(float64(len(events))) - if highWatermarkMet { - // All events in one batch can be hashed into one handle slot. - slot := w.inputCalcSlot(events[0].regionID) - eventsX := make([]interface{}, 0, len(events)) - for _, event := range events { - eventsX = append(eventsX, event) - } - err = w.handles[slot].AddEvents(ctx, eventsX) - if err != nil { - return err - } - // Principle: events from the same region must be processed linearly. - // - // When buffered events exceed high watermark, we start to use worker - // pool to improve throughput, and we need a mechanism to quit worker - // pool when buffered events are less than low watermark, which means - // we should have a way to know whether events sent to the worker pool - // are all processed. - // Send a dummy event to each worker pool handler, after each of these - // events are processed, we can ensure all events sent to worker pool - // from this region worker are processed. - finishedCallbackCh := make(chan struct{}, 1) - err = w.handles[slot].AddEvent(ctx, ®ionStatefulEvent{finishedCallbackCh: finishedCallbackCh}) - if err != nil { - return err - } - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case err = <-w.errorCh: - return err - case <-finishedCallbackCh: + start := time.Now() + inputPending := atomic.LoadInt32(&w.inputPending) + if highWatermarkMet { + highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark + } else { + highWatermarkMet = int(inputPending) >= regionWorkerHighWatermark } - } else { - // We measure whether the current worker is busy based on the input - // channel size. If the buffered event count is larger than the high - // watermark, we send events to worker pool to increase processing - // throughput. Otherwise, we process event in local region worker to - // ensure low processing latency. - for _, event := range events { - err = w.processEvent(ctx, event) + atomic.AddInt32(&w.inputPending, -int32(len(events))) + + if highWatermarkMet { + // All events in one batch can be hashed into one handle slot. + slot := w.inputCalcSlot(events[0].regionID) + eventsX := make([]interface{}, 0, len(events)) + for _, event := range events { + eventsX = append(eventsX, event) + } + err := w.handles[slot].AddEvents(ctx, eventsX) if err != nil { return err } + // Principle: events from the same region must be processed linearly. + // + // When buffered events exceed high watermark, we start to use worker + // pool to improve throughput, and we need a mechanism to quit worker + // pool when buffered events are less than low watermark, which means + // we should have a way to know whether events sent to the worker pool + // are all processed. + // Send a dummy event to each worker pool handler, after each of these + // events are processed, we can ensure all events sent to worker pool + // from this region worker are processed. + finishedCallbackCh := make(chan struct{}, 1) + err = w.handles[slot].AddEvent(ctx, ®ionStatefulEvent{finishedCallbackCh: finishedCallbackCh}) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err = <-w.errorCh: + return err + case <-finishedCallbackCh: + } + } else { + // We measure whether the current worker is busy based on the input + // channel size. If the buffered event count is larger than the high + // watermark, we send events to worker pool to increase processing + // throughput. Otherwise, we process event in local region worker to + // ensure low processing latency. + for _, event := range events { + err := w.processEvent(ctx, event) + if err != nil { + return err + } + } } - } - for _, ev := range events { - // resolved ts event has been consumed, it is safe to put back. - if ev.resolvedTsEvent != nil { - w.session.resolvedTsPool.Put(ev) + for _, ev := range events { + // resolved ts event has been consumed, it is safe to put back. + if ev.resolvedTsEvent != nil { + w.session.resolvedTsPool.Put(ev) + } } + processTime += time.Since(start) } } } @@ -583,7 +616,7 @@ func (w *regionWorker) cancelStream(delay time.Duration) { } } -func (w *regionWorker) run() error { +func (w *regionWorker) run(enableTableMonitor bool) error { defer func() { for _, h := range w.handles { h.Unregister() @@ -608,7 +641,7 @@ func (w *regionWorker) run() error { return handleError(w.checkErrorReconnect(w.resolveLock(ctx))) }) wg.Go(func() error { - return handleError(w.eventHandler(ctx)) + return handleError(w.eventHandler(ctx, enableTableMonitor)) }) _ = handleError(w.collectWorkpoolError(ctx)) _ = wg.Wait() diff --git a/cdc/kv/shared_region_worker.go b/cdc/kv/shared_region_worker.go index 83b7805712d..d8104fb5248 100644 --- a/cdc/kv/shared_region_worker.go +++ b/cdc/kv/shared_region_worker.go @@ -74,7 +74,7 @@ func newSharedRegionWorker(c *SharedClient) *sharedRegionWorker { client: c, inputCh: make(chan statefulEvent, regionWorkerInputChanSize), statesManager: newRegionStateManager(-1), - metrics: newRegionWorkerMetrics(c.changefeed), + metrics: newRegionWorkerMetrics(c.changefeed, "shared", "shared"), } } diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 3e699852bfa..d5bc970608b 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -250,6 +250,7 @@ func TestVerifyAndComplete(t *testing.T) { CaseSensitive: false, CheckGCSafePoint: true, EnableSyncPoint: util.AddressOf(false), + EnableTableMonitor: util.AddressOf(false), SyncPointInterval: util.AddressOf(time.Minute * 10), SyncPointRetention: util.AddressOf(time.Hour * 24), BDRMode: util.AddressOf(false), diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 58205b11008..e6d347d4056 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -621,7 +621,8 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, - sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode)) + sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode), + util.GetOrZero(p.latestInfo.Config.EnableTableMonitor)) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 3035076993f..0ce4c687222 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -75,6 +75,7 @@ type SourceManager struct { // if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers` // will be used. Otherwise `multiplexingPuller` will be used instead. multiplexing bool + enableTableMonitor bool tablePullers tablePullers multiplexingPuller multiplexingPuller } @@ -86,9 +87,10 @@ func New( mg entry.MounterGroup, engine engine.SortEngine, bdrMode bool, + enableTableMonitor bool, ) *SourceManager { multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing - return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper) + return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper, enableTableMonitor) } // NewForTest creates a new source manager for testing. @@ -99,7 +101,7 @@ func NewForTest( engine engine.SortEngine, bdrMode bool, ) *SourceManager { - return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, pullerwrapper.NewPullerWrapperForTest) + return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, pullerwrapper.NewPullerWrapperForTest, false) } func newSourceManager( @@ -110,15 +112,17 @@ func newSourceManager( bdrMode bool, multiplexing bool, pullerWrapperCreator pullerWrapperCreator, + enableTableMonitor bool, ) *SourceManager { mgr := &SourceManager{ - ready: make(chan struct{}), - changefeedID: changefeedID, - up: up, - mg: mg, - engine: engine, - bdrMode: bdrMode, - multiplexing: multiplexing, + ready: make(chan struct{}), + changefeedID: changefeedID, + up: up, + mg: mg, + engine: engine, + bdrMode: bdrMode, + multiplexing: multiplexing, + enableTableMonitor: enableTableMonitor, } if !multiplexing { mgr.tablePullers.errChan = make(chan error, 16) @@ -138,7 +142,7 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo } p := m.tablePullers.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode) - p.Start(m.tablePullers.ctx, m.up, m.engine, m.tablePullers.errChan) + p.Start(m.tablePullers.ctx, m.up, m.engine, m.tablePullers.errChan, m.enableTableMonitor) m.tablePullers.Store(span, p) } diff --git a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go index 8607184a9a3..86c3a766941 100644 --- a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go @@ -39,7 +39,11 @@ func NewPullerWrapperForTest( } func (d *dummyPullerWrapper) Start(ctx context.Context, up *upstream.Upstream, +<<<<<<< HEAD eventSortEngine engine.SortEngine, errCh chan<- error) { +======= + eventSortEngine sorter.SortEngine, errCh chan<- error, enableTableMonitor bool) { +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) } func (d *dummyPullerWrapper) GetStats() puller.Stats { diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 06749fab252..2df0834ed80 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -35,6 +35,7 @@ type Wrapper interface { up *upstream.Upstream, eventSortEngine engine.SortEngine, errChan chan<- error, + enableTableMonitor bool, ) GetStats() puller.Stats Close() @@ -79,6 +80,7 @@ func (n *WrapperImpl) Start( up *upstream.Upstream, eventSortEngine engine.SortEngine, errChan chan<- error, + enableTableMonitor bool, ) { ctx, n.cancel = context.WithCancel(ctx) errorHandler := func(err error) { @@ -108,6 +110,7 @@ func (n *WrapperImpl) Start( n.span.TableID, n.tableName, n.bdrMode, + enableTableMonitor, ) // Use errgroup to ensure all sub goroutines can exit without calling Close. diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 041ebc5dcd4..2e5896f0a48 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -620,7 +620,7 @@ func NewDDLJobPuller( jobPuller.puller.Puller = New( ctx, pdCli, up.GrpcPool, regionCache, kvStorage, pdClock, checkpointTs, spans, cfg, changefeed, -1, memorysorter.DDLPullerTableName, - ddlPullerFilterLoop, + ddlPullerFilterLoop, false, ) } diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index dda87b9e704..d27739d6055 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -78,6 +78,8 @@ type pullerImpl struct { lastForwardResolvedTs uint64 // startResolvedTs is the resolvedTs when puller is initialized startResolvedTs uint64 + + enableTableMonitor bool } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -94,6 +96,7 @@ func New(ctx context.Context, tableID model.TableID, tableName string, filterLoop bool, + enableTableMonitor bool, ) Puller { tikvStorage, ok := kvStorage.(tikv.Storage) if !ok { @@ -119,7 +122,8 @@ func New(ctx context.Context, tableName: tableName, cfg: cfg, - startResolvedTs: checkpointTs, + startResolvedTs: checkpointTs, + enableTableMonitor: enableTableMonitor, } return p } @@ -137,7 +141,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { span := span g.Go(func() error { - return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, eventCh) + return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, eventCh, p.enableTableMonitor) }) } diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 26dfc555325..2aaf851bd82 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -81,6 +81,7 @@ func (mc *mockCDCKVClient) EventFeed( ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error { for { select { @@ -134,7 +135,7 @@ func newPullerForTest( ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), checkpointTs, spans, config.GetDefaultServerConfig(), model.DefaultChangeFeedID("changefeed-id-test"), 0, - "table-test", false) + "table-test", false, false) wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index d68f7fe329c..c0c4f3fafd4 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -21,6 +21,7 @@ const ( "ignore-ineligible-table":false, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, @@ -180,6 +181,7 @@ const ( "ignore-ineligible-table":false, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, @@ -332,6 +334,7 @@ const ( "ignore-ineligible-table":false, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 333a1e9f942..74df397cfb1 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -52,6 +52,7 @@ var defaultReplicaConfig = &ReplicaConfig{ CaseSensitive: false, CheckGCSafePoint: true, EnableSyncPoint: util.AddressOf(false), + EnableTableMonitor: util.AddressOf(false), SyncPointInterval: util.AddressOf(10 * time.Minute), SyncPointRetention: util.AddressOf(24 * time.Hour), BDRMode: util.AddressOf(false), @@ -128,7 +129,8 @@ type replicaConfig struct { ForceReplicate bool `toml:"force-replicate" json:"force-replicate"` CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"` // EnableSyncPoint is only available when the downstream is a Database. - EnableSyncPoint *bool `toml:"enable-sync-point" json:"enable-sync-point,omitempty"` + EnableSyncPoint *bool `toml:"enable-sync-point" json:"enable-sync-point,omitempty"` + EnableTableMonitor *bool `toml:"enable-table-monitor" json:"enable-table-monitor"` // IgnoreIneligibleTable is used to store the user's config when creating a changefeed. // not used in the changefeed's lifecycle. IgnoreIneligibleTable bool `toml:"ignore-ineligible-table" json:"ignore-ineligible-table"`