From 78d6d11b0d6d5bb12c4ad05a50cbe0eaa844b100 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Wed, 3 Jan 2024 14:39:32 +0800 Subject: [PATCH] This is an automated cherry-pick of #10389 Signed-off-by: ti-chi-bot --- cdc/api/v2/model.go | 15 ++ cdc/api/v2/model_test.go | 6 + cdc/kv/client.go | 70 ++++-- cdc/kv/client_bench_test.go | 4 +- cdc/kv/client_test.go | 57 ++--- cdc/kv/metrics.go | 61 ++++- cdc/kv/region_worker.go | 189 ++++++++------ cdc/kv/shared_region_worker.go | 233 ++++++++++++++++++ cdc/model/changefeed_test.go | 12 + cdc/processor/processor.go | 5 + cdc/processor/sourcemanager/manager.go | 52 ++++ .../puller/dummy_puller_wrapper.go | 4 + .../sourcemanager/puller/puller_wrapper.go | 6 + cdc/puller/ddl_puller.go | 38 +++ cdc/puller/puller.go | 11 +- cdc/puller/puller_test.go | 1 + pkg/config/config_test_data.go | 7 + pkg/config/replica_config.go | 18 ++ 18 files changed, 668 insertions(+), 121 deletions(-) create mode 100644 cdc/kv/shared_region_worker.go diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index bc5157da987..5fe18841bee 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -181,8 +181,14 @@ type ReplicaConfig struct { ForceReplicate bool `json:"force_replicate"` IgnoreIneligibleTable bool `json:"ignore_ineligible_table"` CheckGCSafePoint bool `json:"check_gc_safe_point"` +<<<<<<< HEAD EnableSyncPoint bool `json:"enable_sync_point"` BDRMode bool `json:"bdr_mode"` +======= + EnableSyncPoint *bool `json:"enable_sync_point,omitempty"` + EnableTableMonitor *bool `json:"enable_table_monitor,omitempty"` + BDRMode *bool `json:"bdr_mode,omitempty"` +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) SyncPointInterval *JSONDuration `json:"sync_point_interval" swaggertype:"string"` SyncPointRetention *JSONDuration `json:"sync_point_retention" swaggertype:"string"` @@ -212,6 +218,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( res.ForceReplicate = c.ForceReplicate res.CheckGCSafePoint = c.CheckGCSafePoint res.EnableSyncPoint = c.EnableSyncPoint +<<<<<<< HEAD +======= + res.EnableTableMonitor = c.EnableTableMonitor + res.IgnoreIneligibleTable = c.IgnoreIneligibleTable +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) res.SQLMode = c.SQLMode if c.SyncPointInterval != nil { res.SyncPointInterval = c.SyncPointInterval.duration @@ -459,8 +470,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { IgnoreIneligibleTable: false, CheckGCSafePoint: cloned.CheckGCSafePoint, EnableSyncPoint: cloned.EnableSyncPoint, +<<<<<<< HEAD SyncPointInterval: &JSONDuration{cloned.SyncPointInterval}, SyncPointRetention: &JSONDuration{cloned.SyncPointRetention}, +======= + EnableTableMonitor: cloned.EnableTableMonitor, +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) BDRMode: cloned.BDRMode, SQLMode: cloned.SQLMode, } diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index ed08876ab40..13c64bc7337 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -34,7 +34,13 @@ var defaultAPIConfig = &ReplicaConfig{ CaseSensitive: false, EnableOldValue: true, CheckGCSafePoint: true, +<<<<<<< HEAD EnableSyncPoint: false, +======= + BDRMode: util.AddressOf(false), + EnableSyncPoint: util.AddressOf(false), + EnableTableMonitor: util.AddressOf(false), +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) SyncPointInterval: &JSONDuration{10 * time.Minute}, SyncPointRetention: &JSONDuration{24 * time.Hour}, Filter: &FilterConfig{ diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 8d4fcce010a..7005884216f 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -75,6 +75,8 @@ const ( regionScheduleReload = false scanRegionsConcurrency = 1024 + + tableMonitorInterval = 2 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -143,6 +145,7 @@ type CDCKVClient interface { ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error // RegionCount returns the number of captured regions. @@ -273,8 +276,9 @@ func (c *CDCClient) EventFeed( ctx context.Context, span tablepb.Span, ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error { - s := newEventFeedSession(c, span, lockResolver, ts, eventCh) + s := newEventFeedSession(c, span, lockResolver, ts, eventCh, enableTableMonitor) return s.eventFeed(ctx) } @@ -352,11 +356,10 @@ type eventFeedSession struct { rangeLock *regionlock.RegionRangeLock - // To identify metrics of different eventFeedSession - id string - regionChSizeGauge prometheus.Gauge - errChSizeGauge prometheus.Gauge - rangeChSizeGauge prometheus.Gauge + enableTableMonitor bool + regionChSizeGauge prometheus.Gauge + errChSizeGauge prometheus.Gauge + rangeChSizeGauge prometheus.Gauge streams map[string]*eventFeedStream streamsLock sync.RWMutex @@ -377,9 +380,9 @@ func newEventFeedSession( lockResolver txnutil.LockResolver, startTs uint64, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) *eventFeedSession { id := allocID() - idStr := strconv.FormatUint(id, 10) rangeLock := regionlock.NewRegionRangeLock( id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) @@ -390,16 +393,19 @@ func newEventFeedSession( tableID: client.tableID, tableName: client.tableName, - totalSpan: totalSpan, - eventCh: eventCh, - rangeLock: rangeLock, - lockResolver: lockResolver, - id: idStr, - regionChSizeGauge: clientChannelSize.WithLabelValues("region"), - errChSizeGauge: clientChannelSize.WithLabelValues("err"), - rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), - streams: make(map[string]*eventFeedStream), - streamsCanceller: make(map[string]context.CancelFunc), + totalSpan: totalSpan, + eventCh: eventCh, + rangeLock: rangeLock, + lockResolver: lockResolver, + enableTableMonitor: enableTableMonitor, + regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), + errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), + rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), + streams: make(map[string]*eventFeedStream), + streamsCanceller: make(map[string]context.CancelFunc), resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -1013,6 +1019,10 @@ func (s *eventFeedSession) receiveFromStream( metricSendEventBatchResolvedSize := batchResolvedEventSize. WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) + metricReceiveBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-receiver") + metricProcessBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-processor") // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic @@ -1032,7 +1042,7 @@ func (s *eventFeedSession) receiveFromStream( eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - err := handleExit(worker.run()) + err := handleExit(worker.run(s.enableTableMonitor)) if err != nil { log.Error("region worker exited with error", zap.Error(err), zap.Any("changefeed", s.changefeed), @@ -1043,10 +1053,32 @@ func (s *eventFeedSession) receiveFromStream( }) receiveEvents := func() error { + var receiveTime time.Duration + var processTime time.Duration + startToWork := time.Now() + maxCommitTs := model.Ts(0) for { + startToReceive := time.Now() cevent, err := stream.Recv() + if s.enableTableMonitor { + receiveTime += time.Since(startToReceive) + if time.Since(startToWork) >= tableMonitorInterval { + now := time.Now() + // Receive busyRatio indicates the blocking time (receive and decode grpc msg) of the worker. + busyRatio := receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + metricReceiveBusyRatio.Set(busyRatio) + receiveTime = 0 + // Process busyRatio indicates the working time (dispatch to region worker) of the worker. + busyRatio = processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + metricProcessBusyRatio.Set(busyRatio) + processTime = 0 + + startToWork = now + } + } + failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) { if op.(string) == "error" { _ = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) @@ -1105,6 +1137,7 @@ func (s *eventFeedSession) receiveFromStream( return nil } + startToProcess := time.Now() size := cevent.Size() if size > warnRecvMsgSizeThreshold { regionCount := 0 @@ -1149,6 +1182,7 @@ func (s *eventFeedSession) receiveFromStream( tsStat.commitTs.Store(maxCommitTs) } } + processTime += time.Since(startToProcess) } } eg.Go(func() error { diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index fd813903df2..91558628909 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -202,7 +202,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( go func() { err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) if errors.Cause(err) != context.Canceled { b.Error(err) } @@ -296,7 +296,7 @@ func prepareBench(b *testing.B, regionNum int) ( go func() { err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("z")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) if errors.Cause(err) != context.Canceled { b.Error(err) } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 9cc31215b14..fbfab631847 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -329,7 +329,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 1, lockResolver, eventCh) + 1, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -429,7 +429,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 1, lockResolver, eventCh) + 1, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -529,7 +529,7 @@ func TestHandleError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("d")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -689,7 +689,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.True(t, cerror.ErrVersionIncompatible.Equal(err)) }() @@ -757,7 +757,7 @@ func TestClusterIDMismatch(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.True(t, cerror.ErrClusterIDMismatch.Equal(err)) }() @@ -824,7 +824,7 @@ func testHandleFeedEvent(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1285,7 +1285,7 @@ func TestStreamSendWithError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockerResolver, eventCh) + 100, lockerResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1397,7 +1397,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1531,7 +1531,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer close(eventCh) err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1740,7 +1740,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1817,7 +1817,7 @@ func TestNoPendingRegionError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1895,7 +1895,7 @@ func TestDropStaleRequest(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2009,7 +2009,7 @@ func TestResolveLock(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2116,7 +2116,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer clientWg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, errUnreachable, err) }() @@ -2243,7 +2243,7 @@ func testEventAfterFeedStop(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2430,7 +2430,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2648,7 +2648,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2744,7 +2744,7 @@ func TestFailRegionReentrant(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2827,7 +2827,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2895,7 +2895,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2973,7 +2973,7 @@ func testKVClientForceReconnect(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3124,7 +3124,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("z")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3241,7 +3241,7 @@ func TestEvTimeUpdate(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3367,7 +3367,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3461,7 +3461,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer wg.Done() err = cdcClient.EventFeed(ctx, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, - 100, lockResolver, eventCh) + 100, lockResolver, eventCh, false) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3536,8 +3536,9 @@ func createFakeEventFeedSession() *eventFeedSession { return newEventFeedSession( &CDCClient{config: config.GetDefaultServerConfig()}, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, - nil, /*lockResolver*/ - 100, /*startTs*/ - nil, /*eventCh*/ + nil, /*lockResolver*/ + 100, /*startTs*/ + nil, /*eventCh*/ + false, /*diableTableMonitor*/ ) } diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index 4f063ed470d..25c00d6f833 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -71,7 +71,7 @@ var ( Subsystem: "kvclient", Name: "channel_size", Help: "size of each channel in kv client", - }, []string{"channel"}) + }, []string{"namespace", "changefeed", "table", "type"}) clientRegionTokenSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -110,6 +110,57 @@ var ( Help: "region events batch size", Buckets: prometheus.ExponentialBuckets(1, 2, 20), }) +<<<<<<< HEAD +======= + + regionConnectDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_connect_duration", + Help: "time of locating a region in ms", + Buckets: prometheus.ExponentialBuckets(1, 2, 20), + }, + // actions: lock, locate, connect. + []string{"namespace", "changefeed", "action"}) + + lockResolveDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "lock_resolve_duration", + Help: "time of lock resolve in ms", + Buckets: prometheus.ExponentialBuckets(1, 2, 20), + }, + // actions: wait, run. + []string{"namespace", "changefeed", "action"}) + + regionWorkerQueueDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_worker_queue_duration", + Help: "time of queue in region worker", + Buckets: prometheus.ExponentialBuckets(1, 2, 20), + }, + // actions: wait, run. + []string{"namespace", "changefeed"}) + + workerBusyRatio = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_worker_busy_ratio", + Help: "Busy ratio (X ms in 1s) for region worker.", + }, []string{"namespace", "changefeed", "table", "store", "type"}) + workerChannelSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_worker_channel_size", + Help: "size of each channel in region worker", + }, []string{"namespace", "changefeed", "table", "store", "type"}) +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) ) // InitMetrics registers all metrics in the kv package @@ -126,6 +177,14 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(batchResolvedEventSize) registry.MustRegister(grpcPoolStreamGauge) registry.MustRegister(regionEventsBatchSize) +<<<<<<< HEAD +======= + registry.MustRegister(regionConnectDuration) + registry.MustRegister(lockResolveDuration) + registry.MustRegister(regionWorkerQueueDuration) + registry.MustRegister(workerBusyRatio) + registry.MustRegister(workerChannelSize) +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) // Register client metrics to registry. registry.MustRegister(grpcMetrics) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index ddd1c9046fa..f6e27a702d7 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -18,6 +18,7 @@ import ( "encoding/hex" "reflect" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -74,6 +75,14 @@ type regionWorkerMetrics struct { metricSendEventResolvedCounter prometheus.Counter metricSendEventCommitCounter prometheus.Counter metricSendEventCommittedCounter prometheus.Counter +<<<<<<< HEAD +======= + + metricQueueDuration prometheus.Observer + + metricWorkerBusyRatio prometheus.Gauge + metricWorkerChannelSize prometheus.Gauge +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) } /* @@ -116,7 +125,7 @@ type regionWorker struct { pendingRegions *syncRegionFeedStateMap } -func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { +func newRegionWorkerMetrics(changefeedID model.ChangeFeedID, tableID string, storeAddr string) *regionWorkerMetrics { metrics := ®ionWorkerMetrics{} metrics.metricReceivedEventSize = eventSize.WithLabelValues("received") metrics.metricDroppedEventSize = eventSize.WithLabelValues("dropped") @@ -139,6 +148,17 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric metrics.metricSendEventCommittedCounter = sendEventCounter. WithLabelValues("committed", changefeedID.Namespace, changefeedID.ID) +<<<<<<< HEAD +======= + metrics.metricQueueDuration = regionWorkerQueueDuration. + WithLabelValues(changefeedID.Namespace, changefeedID.ID) + + metrics.metricWorkerBusyRatio = workerBusyRatio.WithLabelValues( + changefeedID.Namespace, changefeedID.ID, tableID, storeAddr, "event-handler") + metrics.metricWorkerChannelSize = workerChannelSize.WithLabelValues( + changefeedID.Namespace, changefeedID.ID, tableID, storeAddr, "input") + +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) return metrics } @@ -156,8 +176,13 @@ func newRegionWorker( rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, +<<<<<<< HEAD concurrency: s.client.config.KVClient.WorkerConcurrent, metrics: newRegionWorkerMetrics(changefeedID), +======= + concurrency: int(s.client.config.KVClient.WorkerConcurrent), + metrics: newRegionWorkerMetrics(changefeedID, strconv.FormatInt(s.tableID, 10), addr), +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) inputPending: 0, pendingRegions: pendingRegions, @@ -436,6 +461,7 @@ func (w *regionWorker) onHandleExit(err error) { } } +<<<<<<< HEAD func (w *regionWorker) eventHandler(ctx context.Context) error { pollEvents := func() ([]*regionStatefulEvent, error) { exitFn := func() error { @@ -444,15 +470,44 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { zap.String("changefeed", w.session.client.changefeed.ID)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } +======= +func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool) error { + exitFn := func() error { + log.Info("region worker closed by error", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName)) + return cerror.ErrRegionWorkerExit.GenWithStackByArgs() + } +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) + + metricsTicker := time.NewTicker(tableMonitorInterval) + defer metricsTicker.Stop() + var processTime time.Duration + startToWork := time.Now() + highWatermarkMet := false + for { select { case <-ctx.Done(): - return nil, errors.Trace(ctx.Err()) + return errors.Trace(ctx.Err()) case err := <-w.errorCh: - return nil, errors.Trace(err) + return errors.Trace(err) + case <-metricsTicker.C: + if enableTableMonitor { + w.metrics.metricWorkerChannelSize.Set(float64(len(w.inputCh))) + + now := time.Now() + // busyRatio indicates the actual working time of the worker. + busyRatio := processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 + w.metrics.metricWorkerBusyRatio.Set(busyRatio) + startToWork = now + processTime = 0 + } case events, ok := <-w.inputCh: if !ok { - return nil, exitFn() + return exitFn() } if len(events) == 0 { log.Panic("regionWorker.inputCh doesn't accept empty slice") @@ -461,80 +516,74 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { // event == nil means the region worker should exit and re-establish // all existing regions. if event == nil { - return nil, exitFn() + return exitFn() } } - return events, nil - } - } - - highWatermarkMet := false - for { - events, err := pollEvents() - if err != nil { - return err - } - regionEventsBatchSize.Observe(float64(len(events))) - inputPending := atomic.LoadInt32(&w.inputPending) - if highWatermarkMet { - highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark - } else { - highWatermarkMet = int(inputPending) >= regionWorkerHighWatermark - } - atomic.AddInt32(&w.inputPending, -int32(len(events))) + regionEventsBatchSize.Observe(float64(len(events))) - if highWatermarkMet { - // All events in one batch can be hashed into one handle slot. - slot := w.inputCalcSlot(events[0].regionID) - eventsX := make([]interface{}, 0, len(events)) - for _, event := range events { - eventsX = append(eventsX, event) - } - err = w.handles[slot].AddEvents(ctx, eventsX) - if err != nil { - return err + start := time.Now() + inputPending := atomic.LoadInt32(&w.inputPending) + if highWatermarkMet { + highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark + } else { + highWatermarkMet = int(inputPending) >= regionWorkerHighWatermark } - // Principle: events from the same region must be processed linearly. - // - // When buffered events exceed high watermark, we start to use worker - // pool to improve throughput, and we need a mechanism to quit worker - // pool when buffered events are less than low watermark, which means - // we should have a way to know whether events sent to the worker pool - // are all processed. - // Send a dummy event to each worker pool handler, after each of these - // events are processed, we can ensure all events sent to worker pool - // from this region worker are processed. - finishedCallbackCh := make(chan struct{}, 1) - err = w.handles[slot].AddEvent(ctx, ®ionStatefulEvent{finishedCallbackCh: finishedCallbackCh}) - if err != nil { - return err - } - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case err = <-w.errorCh: - return err - case <-finishedCallbackCh: - } - } else { - // We measure whether the current worker is busy based on the input - // channel size. If the buffered event count is larger than the high - // watermark, we send events to worker pool to increase processing - // throughput. Otherwise, we process event in local region worker to - // ensure low processing latency. - for _, event := range events { - err = w.processEvent(ctx, event) + atomic.AddInt32(&w.inputPending, -int32(len(events))) + + if highWatermarkMet { + // All events in one batch can be hashed into one handle slot. + slot := w.inputCalcSlot(events[0].regionID) + eventsX := make([]interface{}, 0, len(events)) + for _, event := range events { + eventsX = append(eventsX, event) + } + err := w.handles[slot].AddEvents(ctx, eventsX) if err != nil { return err } + // Principle: events from the same region must be processed linearly. + // + // When buffered events exceed high watermark, we start to use worker + // pool to improve throughput, and we need a mechanism to quit worker + // pool when buffered events are less than low watermark, which means + // we should have a way to know whether events sent to the worker pool + // are all processed. + // Send a dummy event to each worker pool handler, after each of these + // events are processed, we can ensure all events sent to worker pool + // from this region worker are processed. + finishedCallbackCh := make(chan struct{}, 1) + err = w.handles[slot].AddEvent(ctx, ®ionStatefulEvent{finishedCallbackCh: finishedCallbackCh}) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err = <-w.errorCh: + return err + case <-finishedCallbackCh: + } + } else { + // We measure whether the current worker is busy based on the input + // channel size. If the buffered event count is larger than the high + // watermark, we send events to worker pool to increase processing + // throughput. Otherwise, we process event in local region worker to + // ensure low processing latency. + for _, event := range events { + err := w.processEvent(ctx, event) + if err != nil { + return err + } + } } - } - for _, ev := range events { - // resolved ts event has been consumed, it is safe to put back. - if ev.resolvedTsEvent != nil { - w.session.resolvedTsPool.Put(ev) + for _, ev := range events { + // resolved ts event has been consumed, it is safe to put back. + if ev.resolvedTsEvent != nil { + w.session.resolvedTsPool.Put(ev) + } } + processTime += time.Since(start) } } } @@ -588,7 +637,7 @@ func (w *regionWorker) cancelStream(delay time.Duration) { } } -func (w *regionWorker) run() error { +func (w *regionWorker) run(enableTableMonitor bool) error { defer func() { for _, h := range w.handles { h.Unregister() @@ -613,7 +662,7 @@ func (w *regionWorker) run() error { return handleError(w.checkErrorReconnect(w.resolveLock(ctx))) }) wg.Go(func() error { - return handleError(w.eventHandler(ctx)) + return handleError(w.eventHandler(ctx, enableTableMonitor)) }) _ = handleError(w.collectWorkpoolError(ctx)) _ = wg.Wait() diff --git a/cdc/kv/shared_region_worker.go b/cdc/kv/shared_region_worker.go new file mode 100644 index 00000000000..aee9aee65cf --- /dev/null +++ b/cdc/kv/shared_region_worker.go @@ -0,0 +1,233 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "time" + + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/zap" +) + +// NOTE: +// 1. all contents come from one same TiKV store stream; +// 2. eventItem and resolvedTs shouldn't appear simultaneously; +type statefulEvent struct { + eventItem eventItem + resolvedTsBatch resolvedTsBatch + stream *requestedStream + start time.Time +} + +type eventItem struct { + // All items come from one same region. + item *cdcpb.Event + state *regionFeedState +} + +type resolvedTsBatch struct { + ts uint64 + regions []*regionFeedState +} + +func newEventItem(item *cdcpb.Event, state *regionFeedState, stream *requestedStream) statefulEvent { + return statefulEvent{ + eventItem: eventItem{item, state}, + stream: stream, + start: time.Now(), + } +} + +func newResolvedTsBatch(ts uint64, stream *requestedStream) statefulEvent { + return statefulEvent{ + resolvedTsBatch: resolvedTsBatch{ts: ts}, + stream: stream, + start: time.Now(), + } +} + +type sharedRegionWorker struct { + changefeed model.ChangeFeedID + client *SharedClient + statesManager *regionStateManager + inputCh chan statefulEvent + metrics *regionWorkerMetrics +} + +func newSharedRegionWorker(c *SharedClient) *sharedRegionWorker { + return &sharedRegionWorker{ + changefeed: c.changefeed, + client: c, + inputCh: make(chan statefulEvent, regionWorkerInputChanSize), + statesManager: newRegionStateManager(-1), + metrics: newRegionWorkerMetrics(c.changefeed, "shared", "shared"), + } +} + +func (w *sharedRegionWorker) sendEvent(ctx context.Context, event statefulEvent) error { + select { + case <-ctx.Done(): + return ctx.Err() + case w.inputCh <- event: + return nil + } +} + +func (w *sharedRegionWorker) run(ctx context.Context) error { + for { + var event statefulEvent + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-w.inputCh: + } + + w.metrics.metricQueueDuration.Observe(float64(time.Since(event.start).Milliseconds())) + w.processEvent(ctx, event) + } +} + +func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, stream *requestedStream) { + stepsToRemoved := state.markRemoved() + err := state.takeError() + if err != nil { + log.Info("region worker get a region error", + zap.String("namespace", w.changefeed.Namespace), + zap.String("changefeed", w.changefeed.ID), + zap.Uint64("streamID", stream.streamID), + zap.Any("subscriptionID", state.getRegionID()), + zap.Uint64("regionID", state.sri.verID.GetID()), + zap.Bool("reschedule", stepsToRemoved), + zap.Error(err)) + } + if stepsToRemoved { + stream.takeState(SubscriptionID(state.requestID), state.getRegionID()) + w.client.onRegionFail(newRegionErrorInfo(state.getRegionInfo(), err)) + } +} + +func (w *sharedRegionWorker) processEvent(ctx context.Context, event statefulEvent) { + if event.eventItem.state != nil { + state := event.eventItem.state + if state.isStale() { + w.handleSingleRegionError(state, event.stream) + return + } + w.metrics.metricReceivedEventSize.Observe(float64(event.eventItem.item.Event.Size())) + switch x := event.eventItem.item.Event.(type) { + case *cdcpb.Event_Entries_: + if err := w.handleEventEntry(ctx, x, state); err != nil { + state.markStopped(err) + w.handleSingleRegionError(state, event.stream) + return + } + case *cdcpb.Event_ResolvedTs: + w.handleResolvedTs(ctx, resolvedTsBatch{ + ts: x.ResolvedTs, + regions: []*regionFeedState{state}, + }) + case *cdcpb.Event_Error: + state.markStopped(&eventError{err: x.Error}) + w.handleSingleRegionError(state, event.stream) + return + case *cdcpb.Event_Admin_: + } + } else if len(event.resolvedTsBatch.regions) > 0 { + w.handleResolvedTs(ctx, event.resolvedTsBatch) + } +} + +// NOTE: context.Canceled won't be treated as an error. +func (w *sharedRegionWorker) handleEventEntry(ctx context.Context, x *cdcpb.Event_Entries_, state *regionFeedState) error { + startTs := state.sri.requestedTable.startTs + emit := func(assembled model.RegionFeedEvent) bool { + x := state.sri.requestedTable.associateSubscriptionID(assembled) + select { + case state.sri.requestedTable.eventCh <- x: + return true + case <-ctx.Done(): + return false + } + } + tableID := state.sri.requestedTable.span.TableID + log.Debug("region worker get an Event", + zap.String("namespace", w.changefeed.Namespace), + zap.String("changefeed", w.changefeed.ID), + zap.Any("subscriptionID", state.sri.requestedTable.subscriptionID), + zap.Int64("tableID", tableID), + zap.Int("rows", len(x.Entries.GetEntries()))) + return handleEventEntry(x, startTs, state, w.metrics, emit, w.changefeed, tableID) +} + +func (w *sharedRegionWorker) handleResolvedTs(ctx context.Context, batch resolvedTsBatch) { + resolvedSpans := make(map[SubscriptionID]*struct { + spans []model.RegionComparableSpan + requestedTable *requestedTable + }) + + for _, state := range batch.regions { + if state.isStale() || !state.isInitialized() { + continue + } + + spansAndChan := resolvedSpans[state.sri.requestedTable.subscriptionID] + if spansAndChan == nil { + spansAndChan = &struct { + spans []model.RegionComparableSpan + requestedTable *requestedTable + }{requestedTable: state.sri.requestedTable} + resolvedSpans[state.sri.requestedTable.subscriptionID] = spansAndChan + } + + regionID := state.getRegionID() + lastResolvedTs := state.getLastResolvedTs() + if batch.ts < lastResolvedTs { + log.Debug("The resolvedTs is fallen back in kvclient", + zap.String("namespace", w.changefeed.Namespace), + zap.String("changefeed", w.changefeed.ID), + zap.Uint64("regionID", regionID), + zap.Uint64("resolvedTs", batch.ts), + zap.Uint64("lastResolvedTs", lastResolvedTs)) + continue + } + state.updateResolvedTs(batch.ts) + + span := model.RegionComparableSpan{Span: state.sri.span, Region: regionID} + span.Span.TableID = state.sri.requestedTable.span.TableID + spansAndChan.spans = append(spansAndChan.spans, span) + } + + for subscriptionID, spansAndChan := range resolvedSpans { + log.Debug("region worker get a ResolvedTs", + zap.String("namespace", w.changefeed.Namespace), + zap.String("changefeed", w.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.Uint64("ResolvedTs", batch.ts), + zap.Int("spanCount", len(spansAndChan.spans))) + if len(spansAndChan.spans) > 0 { + revent := model.RegionFeedEvent{Resolved: &model.ResolvedSpans{ + Spans: spansAndChan.spans, ResolvedTs: batch.ts, + }} + x := spansAndChan.requestedTable.associateSubscriptionID(revent) + select { + case spansAndChan.requestedTable.eventCh <- x: + w.metrics.metricSendEventResolvedCounter.Add(float64(len(resolvedSpans))) + case <-ctx.Done(): + } + } + } +} diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index 62756672cf2..ff1b5f6b127 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -145,12 +145,24 @@ func TestVerifyAndComplete(t *testing.T) { SinkURI: "blackhole://", StartTs: 417257993615179777, Config: &config.ReplicaConfig{ +<<<<<<< HEAD MemoryQuota: 1073741824, CaseSensitive: false, EnableOldValue: true, CheckGCSafePoint: true, SyncPointInterval: time.Minute * 10, SyncPointRetention: time.Hour * 24, +======= + MemoryQuota: 1073741824, + CaseSensitive: false, + CheckGCSafePoint: true, + EnableSyncPoint: util.AddressOf(false), + EnableTableMonitor: util.AddressOf(false), + SyncPointInterval: util.AddressOf(time.Minute * 10), + SyncPointRetention: util.AddressOf(time.Hour * 24), + BDRMode: util.AddressOf(false), + IgnoreIneligibleTable: false, +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) }, } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index e9202dc321e..be318df4fa3 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -685,7 +685,12 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, +<<<<<<< HEAD sortEngine, p.changefeed.Info.Config.BDRMode) +======= + sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode), + util.GetOrZero(p.latestInfo.Config.EnableTableMonitor)) +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) p.sourceManager.name = "SourceManager" p.sourceManager.spawn(stdCtx) diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 1ef797dda19..41cc2d17593 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -53,6 +53,7 @@ type SourceManager struct { // Used to indicate whether the changefeed is in BDR mode. bdrMode bool +<<<<<<< HEAD // pullerWrapperCreator is used to create a puller wrapper. // Only used for testing. pullerWrapperCreator func(changefeed model.ChangeFeedID, @@ -61,6 +62,14 @@ type SourceManager struct { startTs model.Ts, bdrMode bool, ) pullerwrapper.Wrapper +======= + // if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers` + // will be used. Otherwise `multiplexingPuller` will be used instead. + multiplexing bool + enableTableMonitor bool + tablePullers tablePullers + multiplexingPuller multiplexingPuller +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) } // New creates a new source manager. @@ -70,7 +79,9 @@ func New( mg entry.MounterGroup, engine engine.SortEngine, bdrMode bool, + enableTableMonitor bool, ) *SourceManager { +<<<<<<< HEAD return &SourceManager{ ready: make(chan struct{}), changefeedID: changefeedID, @@ -81,6 +92,10 @@ func New( bdrMode: bdrMode, pullerWrapperCreator: pullerwrapper.NewPullerWrapper, } +======= + multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing + return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper, enableTableMonitor) +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) } // NewForTest creates a new source manager for testing. @@ -91,6 +106,7 @@ func NewForTest( engine engine.SortEngine, bdrMode bool, ) *SourceManager { +<<<<<<< HEAD return &SourceManager{ ready: make(chan struct{}), changefeedID: changefeedID, @@ -100,6 +116,30 @@ func NewForTest( errChan: make(chan error, 16), bdrMode: bdrMode, pullerWrapperCreator: pullerwrapper.NewPullerWrapperForTest, +======= + return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, pullerwrapper.NewPullerWrapperForTest, false) +} + +func newSourceManager( + changefeedID model.ChangeFeedID, + up *upstream.Upstream, + mg entry.MounterGroup, + engine sorter.SortEngine, + bdrMode bool, + multiplexing bool, + pullerWrapperCreator pullerWrapperCreator, + enableTableMonitor bool, +) *SourceManager { + mgr := &SourceManager{ + ready: make(chan struct{}), + changefeedID: changefeedID, + up: up, + mg: mg, + engine: engine, + bdrMode: bdrMode, + multiplexing: multiplexing, + enableTableMonitor: enableTableMonitor, +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) } } @@ -107,9 +147,21 @@ func NewForTest( func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts) { // Add table to the engine first, so that the engine can receive the events from the puller. m.engine.AddTable(span, startTs) +<<<<<<< HEAD p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode) p.Start(m.ctx, m.up, m.engine, m.errChan) m.pullers.Store(span, p) +======= + + if m.multiplexing { + m.multiplexingPuller.puller.Subscribe([]tablepb.Span{span}, startTs, tableName) + return + } + + p := m.tablePullers.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode) + p.Start(m.tablePullers.ctx, m.up, m.engine, m.tablePullers.errChan, m.enableTableMonitor) + m.tablePullers.Store(span, p) +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) } // RemoveTable removes a table from the source manager. Stop puller and unregister table from the engine. diff --git a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go index 8607184a9a3..86c3a766941 100644 --- a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go @@ -39,7 +39,11 @@ func NewPullerWrapperForTest( } func (d *dummyPullerWrapper) Start(ctx context.Context, up *upstream.Upstream, +<<<<<<< HEAD eventSortEngine engine.SortEngine, errCh chan<- error) { +======= + eventSortEngine sorter.SortEngine, errCh chan<- error, enableTableMonitor bool) { +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) } func (d *dummyPullerWrapper) GetStats() puller.Stats { diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 8861dacccac..a334e96ef63 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -35,6 +35,7 @@ type Wrapper interface { up *upstream.Upstream, eventSortEngine engine.SortEngine, errChan chan<- error, + enableTableMonitor bool, ) GetStats() puller.Stats Close() @@ -79,6 +80,7 @@ func (n *WrapperImpl) Start( up *upstream.Upstream, eventSortEngine engine.SortEngine, errChan chan<- error, + enableTableMonitor bool, ) { ctx, n.cancel = context.WithCancel(ctx) errorHandler := func(err error) { @@ -108,7 +110,11 @@ func (n *WrapperImpl) Start( n.span.TableID, n.tableName, n.bdrMode, +<<<<<<< HEAD false, +======= + enableTableMonitor, +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) ) // Use errgroup to ensure all sub goroutines can exit without calling Close. diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 97db61d71ee..27486132679 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -541,6 +541,7 @@ func NewDDLJobPuller( return &ddlJobPullerImpl{ changefeedID: changefeed, filter: filter, +<<<<<<< HEAD schemaStorage: schemaStorage, puller: New( ctx, @@ -560,6 +561,43 @@ func NewDDLJobPuller( kvStorage: kvStorage, outputCh: make(chan *model.DDLJobEntry, defaultPullerOutputChanSize), }, nil +======= + outputCh: make(chan *model.DDLJobEntry, defaultPullerOutputChanSize), + } + if jobPuller.multiplexing { + mp := &jobPuller.multiplexingPuller + + rawDDLCh := make(chan *model.RawKVEntry, defaultPullerOutputChanSize) + mp.sortedDDLCh = memorysorter.SortOutput(ctx, changefeed, rawDDLCh) + grpcPool := sharedconn.NewConnAndClientPool(up.SecurityConfig, kv.GetGlobalGrpcMetrics()) + + client := kv.NewSharedClient( + changefeed, cfg, ddlPullerFilterLoop, + pdCli, grpcPool, regionCache, pdClock, + txnutil.NewLockerResolver(kvStorage.(tikv.Storage), changefeed), + ) + consume := func(ctx context.Context, raw *model.RawKVEntry, _ []tablepb.Span) error { + select { + case <-ctx.Done(): + return ctx.Err() + case rawDDLCh <- raw: + return nil + } + } + slots, hasher := 1, func(tablepb.Span, int) int { return 0 } + mp.MultiplexingPuller = NewMultiplexingPuller(changefeed, client, consume, slots, hasher, 1) + + mp.Subscribe(spans, checkpointTs, memorysorter.DDLPullerTableName) + } else { + jobPuller.puller.Puller = New( + ctx, pdCli, up.GrpcPool, regionCache, kvStorage, pdClock, + checkpointTs, spans, cfg, changefeed, -1, memorysorter.DDLPullerTableName, + ddlPullerFilterLoop, false, + ) + } + + return jobPuller +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) } // DDLPuller is the interface for DDL Puller, used by owner only. diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 3208dca8284..f1ab31bb964 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -79,6 +79,8 @@ type pullerImpl struct { lastForwardResolvedTs uint64 // startResolvedTs is the resolvedTs when puller is initialized startResolvedTs uint64 + + enableTableMonitor bool } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -95,7 +97,11 @@ func New(ctx context.Context, tableID model.TableID, tableName string, filterLoop bool, +<<<<<<< HEAD isDDLPuller bool, +======= + enableTableMonitor bool, +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) ) Puller { tikvStorage, ok := kvStorage.(tikv.Storage) if !ok { @@ -121,7 +127,8 @@ func New(ctx context.Context, tableName: tableName, cfg: cfg, - startResolvedTs: checkpointTs, + startResolvedTs: checkpointTs, + enableTableMonitor: enableTableMonitor, } return p } @@ -139,7 +146,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { span := span g.Go(func() error { - return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, eventCh) + return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, eventCh, p.enableTableMonitor) }) } diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 4291ce30675..2aaf851bd82 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -81,6 +81,7 @@ func (mc *mockCDCKVClient) EventFeed( ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, + enableTableMonitor bool, ) error { for { select { diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 846d523ac00..5ac7222fb05 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -21,6 +21,11 @@ const ( "force-replicate": true, "check-gc-safe-point": true, "enable-sync-point": false, +<<<<<<< HEAD +======= + "enable-table-monitor": false, + "bdr-mode": false, +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, "filter": { @@ -176,6 +181,7 @@ const ( "force-replicate": true, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, @@ -319,6 +325,7 @@ const ( "force-replicate": true, "check-gc-safe-point": true, "enable-sync-point": false, + "enable-table-monitor": false, "bdr-mode": false, "sync-point-interval": 600000000000, "sync-point-retention": 86400000000000, diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 8be46c978d8..0f8f26efda5 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -51,9 +51,17 @@ var defaultReplicaConfig = &ReplicaConfig{ CaseSensitive: false, EnableOldValue: true, CheckGCSafePoint: true, +<<<<<<< HEAD EnableSyncPoint: false, SyncPointInterval: time.Minute * 10, SyncPointRetention: time.Hour * 24, +======= + EnableSyncPoint: util.AddressOf(false), + EnableTableMonitor: util.AddressOf(false), + SyncPointInterval: util.AddressOf(10 * time.Minute), + SyncPointRetention: util.AddressOf(24 * time.Hour), + BDRMode: util.AddressOf(false), +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) Filter: &FilterConfig{ Rules: []string{"*.*"}, }, @@ -129,7 +137,17 @@ type replicaConfig struct { EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"` ForceReplicate bool `toml:"force-replicate" json:"force-replicate"` CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"` +<<<<<<< HEAD EnableSyncPoint bool `toml:"enable-sync-point" json:"enable-sync-point"` +======= + // EnableSyncPoint is only available when the downstream is a Database. + EnableSyncPoint *bool `toml:"enable-sync-point" json:"enable-sync-point,omitempty"` + EnableTableMonitor *bool `toml:"enable-table-monitor" json:"enable-table-monitor"` + // IgnoreIneligibleTable is used to store the user's config when creating a changefeed. + // not used in the changefeed's lifecycle. + IgnoreIneligibleTable bool `toml:"ignore-ineligible-table" json:"ignore-ineligible-table"` + +>>>>>>> 4c31fda8b3 (kvclient(ticdc): add worker busy monitor (#10389)) // BDR(Bidirectional Replication) is a feature that allows users to // replicate data of same tables from TiDB-1 to TiDB-2 and vice versa. // This feature is only available for TiDB.