Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10389
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Jan 3, 2024
1 parent 600ad4a commit d321d73
Show file tree
Hide file tree
Showing 18 changed files with 249 additions and 137 deletions.
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type ReplicaConfig struct {
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
EnableSyncPoint *bool `json:"enable_sync_point,omitempty"`
EnableTableMonitor *bool `json:"enable_table_monitor,omitempty"`
BDRMode *bool `json:"bdr_mode,omitempty"`

SyncPointInterval *JSONDuration `json:"sync_point_interval,omitempty" swaggertype:"string"`
Expand Down Expand Up @@ -211,6 +212,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
res.ForceReplicate = c.ForceReplicate
res.CheckGCSafePoint = c.CheckGCSafePoint
res.EnableSyncPoint = c.EnableSyncPoint
res.EnableTableMonitor = c.EnableTableMonitor
res.IgnoreIneligibleTable = c.IgnoreIneligibleTable
res.SQLMode = c.SQLMode
if c.SyncPointInterval != nil {
Expand Down Expand Up @@ -503,6 +505,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
IgnoreIneligibleTable: cloned.IgnoreIneligibleTable,
CheckGCSafePoint: cloned.CheckGCSafePoint,
EnableSyncPoint: cloned.EnableSyncPoint,
EnableTableMonitor: cloned.EnableTableMonitor,
BDRMode: cloned.BDRMode,
SQLMode: cloned.SQLMode,
}
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var defaultAPIConfig = &ReplicaConfig{
CheckGCSafePoint: true,
BDRMode: util.AddressOf(false),
EnableSyncPoint: util.AddressOf(false),
EnableTableMonitor: util.AddressOf(false),
SyncPointInterval: &JSONDuration{10 * time.Minute},
SyncPointRetention: &JSONDuration{24 * time.Hour},
Filter: &FilterConfig{
Expand Down
70 changes: 52 additions & 18 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ const (
regionScheduleReload = false

scanRegionsConcurrency = 1024

tableMonitorInterval = 2 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -143,6 +145,7 @@ type CDCKVClient interface {
ts uint64,
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) error

// RegionCount returns the number of captured regions.
Expand Down Expand Up @@ -273,8 +276,9 @@ func (c *CDCClient) EventFeed(
ctx context.Context, span tablepb.Span, ts uint64,
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) error {
s := newEventFeedSession(c, span, lockResolver, ts, eventCh)
s := newEventFeedSession(c, span, lockResolver, ts, eventCh, enableTableMonitor)
return s.eventFeed(ctx)
}

Expand Down Expand Up @@ -352,11 +356,10 @@ type eventFeedSession struct {

rangeLock *regionlock.RegionRangeLock

// To identify metrics of different eventFeedSession
id string
regionChSizeGauge prometheus.Gauge
errChSizeGauge prometheus.Gauge
rangeChSizeGauge prometheus.Gauge
enableTableMonitor bool
regionChSizeGauge prometheus.Gauge
errChSizeGauge prometheus.Gauge
rangeChSizeGauge prometheus.Gauge

streams map[string]*eventFeedStream
streamsLock sync.RWMutex
Expand All @@ -377,9 +380,9 @@ func newEventFeedSession(
lockResolver txnutil.LockResolver,
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) *eventFeedSession {
id := allocID()
idStr := strconv.FormatUint(id, 10)
rangeLock := regionlock.NewRegionRangeLock(
id, totalSpan.StartKey, totalSpan.EndKey, startTs,
client.changefeed.Namespace+"."+client.changefeed.ID)
Expand All @@ -390,16 +393,19 @@ func newEventFeedSession(
tableID: client.tableID,
tableName: client.tableName,

totalSpan: totalSpan,
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
id: idStr,
regionChSizeGauge: clientChannelSize.WithLabelValues("region"),
errChSizeGauge: clientChannelSize.WithLabelValues("err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues("range"),
streams: make(map[string]*eventFeedStream),
streamsCanceller: make(map[string]context.CancelFunc),
totalSpan: totalSpan,
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
enableTableMonitor: enableTableMonitor,
regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"),
errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"),
streams: make(map[string]*eventFeedStream),
streamsCanceller: make(map[string]context.CancelFunc),
resolvedTsPool: sync.Pool{
New: func() any {
return &regionStatefulEvent{
Expand Down Expand Up @@ -1013,6 +1019,10 @@ func (s *eventFeedSession) receiveFromStream(

metricSendEventBatchResolvedSize := batchResolvedEventSize.
WithLabelValues(s.changefeed.Namespace, s.changefeed.ID)
metricReceiveBusyRatio := workerBusyRatio.WithLabelValues(
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-receiver")
metricProcessBusyRatio := workerBusyRatio.WithLabelValues(
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), addr, "event-processor")

// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outer code logic
Expand All @@ -1032,7 +1042,7 @@ func (s *eventFeedSession) receiveFromStream(

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
err := handleExit(worker.run())
err := handleExit(worker.run(s.enableTableMonitor))
if err != nil {
log.Error("region worker exited with error", zap.Error(err),
zap.Any("changefeed", s.changefeed),
Expand All @@ -1043,10 +1053,32 @@ func (s *eventFeedSession) receiveFromStream(
})

receiveEvents := func() error {
var receiveTime time.Duration
var processTime time.Duration
startToWork := time.Now()

maxCommitTs := model.Ts(0)
for {
startToReceive := time.Now()
cevent, err := stream.Recv()

if s.enableTableMonitor {
receiveTime += time.Since(startToReceive)
if time.Since(startToWork) >= tableMonitorInterval {
now := time.Now()
// Receive busyRatio indicates the blocking time (receive and decode grpc msg) of the worker.
busyRatio := receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 100
metricReceiveBusyRatio.Set(busyRatio)
receiveTime = 0
// Process busyRatio indicates the working time (dispatch to region worker) of the worker.
busyRatio = processTime.Seconds() / now.Sub(startToWork).Seconds() * 100
metricProcessBusyRatio.Set(busyRatio)
processTime = 0

startToWork = now
}
}

failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) {
if op.(string) == "error" {
_ = worker.sendEvents(ctx, []*regionStatefulEvent{nil})
Expand Down Expand Up @@ -1105,6 +1137,7 @@ func (s *eventFeedSession) receiveFromStream(
return nil
}

startToProcess := time.Now()
size := cevent.Size()
if size > warnRecvMsgSizeThreshold {
regionCount := 0
Expand Down Expand Up @@ -1149,6 +1182,7 @@ func (s *eventFeedSession) receiveFromStream(
tsStat.commitTs.Store(maxCommitTs)
}
}
processTime += time.Since(startToProcess)
}
}
eg.Go(func() error {
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
go func() {
err := cdcClient.EventFeed(ctx,
tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")},
100, lockResolver, eventCh)
100, lockResolver, eventCh, false)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func prepareBench(b *testing.B, regionNum int) (
go func() {
err := cdcClient.EventFeed(ctx,
tablepb.Span{StartKey: []byte("a"), EndKey: []byte("z")},
100, lockResolver, eventCh)
100, lockResolver, eventCh, false)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down
Loading

0 comments on commit d321d73

Please sign in to comment.