diff --git a/cdc/kv/client.go b/cdc/kv/client.go index fe4fde0c377..ce182790d0e 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -133,6 +133,27 @@ func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo { type eventFeedStream struct { client cdcpb.ChangeData_EventFeedClient conn *sharedConn +<<<<<<< HEAD +======= + // regions is used to store the state of the regions that are being processed by the stream. + regions *syncRegionFeedStateMap + // addr is the address of the TiKV store + addr string + // storeID is the ID of the TiKV store + storeID uint64 + // id is the stream ID, which is used to identify the stream. + id uint64 + // cancel is used to cancel the gRPC stream + cancel context.CancelFunc + isCanceled atomic.Bool + createTime time.Time +} + +func (s *eventFeedStream) close() { + if s.isCanceled.CompareAndSwap(false, true) { + s.cancel() + } +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) } // CDCKVClient is an interface to receives kv changed logs from TiKV @@ -221,6 +242,7 @@ func NewCDCClient( return c } +<<<<<<< HEAD func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) { streamFunc := func() (err error) { var conn *sharedConn @@ -252,17 +274,62 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) zap.String("changefeed", c.changefeed.ID), zap.String("addr", addr)) return nil +======= +func (c *CDCClient) newStream( + ctx context.Context, + addr string, + storeID uint64, +) (stream *eventFeedStream, err error) { + var conn *sharedConn + defer func() { + if err != nil && conn != nil { + c.grpcPool.ReleaseConn(conn, addr) + } + }() + + conn, err = c.grpcPool.GetConn(addr) + if err != nil { + return nil, errors.Trace(err) +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) } - if c.config.Debug.EnableKVConnectBackOff { - newStreamErr = retry.Do(ctx, streamFunc, - retry.WithBackoffBaseDelay(100), - retry.WithMaxTries(2), - retry.WithIsRetryableErr(cerror.IsRetryableError), - ) - return + + ctx, cancel := context.WithCancel(ctx) + err = version.CheckStoreVersion(ctx, c.pd, storeID) + if err != nil { + cancel() + return nil, errors.Trace(err) } - newStreamErr = streamFunc() - return + + client := cdcpb.NewChangeDataClient(conn.ClientConn) + var streamClient cdcpb.ChangeData_EventFeedClient + + streamClient, err = client.EventFeed(ctx) + if err != nil { + cancel() + return nil, cerror.WrapError(cerror.ErrTiKVEventFeed, err) + } + + stream = &eventFeedStream{ + client: streamClient, + conn: conn, + regions: newSyncRegionFeedStateMap(), + addr: addr, + storeID: storeID, + id: allocateStreamID(), + cancel: cancel, + isCanceled: atomic.Bool{}, + createTime: time.Now(), + } + + log.Info("created stream to store", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Int64("tableID", c.tableID), + zap.String("tableName", c.tableName), + zap.String("store", addr), + zap.Uint64("storeID", storeID), + zap.Uint64("streamID", stream.id)) + return stream, nil } // EventFeed divides a EventFeed request on range boundaries and establishes @@ -358,9 +425,16 @@ type eventFeedSession struct { errChSizeGauge prometheus.Gauge rangeChSizeGauge prometheus.Gauge +<<<<<<< HEAD streams map[string]*eventFeedStream streamsLock sync.RWMutex streamsCanceller map[string]context.CancelFunc +======= + // storeStreamsCache is used to cache the established gRPC streams to TiKV stores. + // Note: The cache is not thread-safe, so it should be accessed in the same goroutine. + // For now, it is only accessed in the `requestRegionToStore` goroutine. + storeStreamsCache map[string]*eventFeedStream +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) // use sync.Pool to store resolved ts event only, because resolved ts event // has the same size and generate cycle. @@ -384,6 +458,7 @@ func newEventFeedSession( id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) return &eventFeedSession{ +<<<<<<< HEAD client: client, startTs: startTs, changefeed: client.changefeed, @@ -400,6 +475,25 @@ func newEventFeedSession( rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), streams: make(map[string]*eventFeedStream), streamsCanceller: make(map[string]context.CancelFunc), +======= + client: client, + startTs: startTs, + changefeed: client.changefeed, + tableID: client.tableID, + tableName: client.tableName, + storeStreamsCache: make(map[string]*eventFeedStream), + totalSpan: totalSpan, + eventCh: eventCh, + rangeLock: rangeLock, + lockResolver: lockResolver, + enableTableMonitor: enableTableMonitor, + regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), + errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), + rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -586,11 +680,14 @@ func (s *eventFeedSession) requestRegionToStore( ctx context.Context, g *errgroup.Group, ) error { +<<<<<<< HEAD // Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map, // and it will be loaded by the receiver thread when it receives the first response from that region. We need this // to pass the region info to the receiver since the region info cannot be inferred from the response from TiKV. storePendingRegions := make(map[string]*syncRegionFeedStateMap) +======= +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) header := &cdcpb.Header{ ClusterId: s.client.clusterID, TicdcVersion: version.ReleaseSemver(), @@ -627,10 +724,12 @@ func (s *eventFeedSession) requestRegionToStore( // each TiKV store has an independent pendingRegions. storeAddr := rpcCtx.Addr storeID := rpcCtx.Peer.GetStoreId() + var ( stream *eventFeedStream err error ) +<<<<<<< HEAD stream, ok := s.getStream(storeAddr) if !ok { // when a new stream is established, always create a new pending @@ -641,6 +740,15 @@ func (s *eventFeedSession) requestRegionToStore( streamCtx, streamCancel := context.WithCancel(ctx) _ = streamCancel // to avoid possible context leak warning from govet stream, err = s.client.newStream(streamCtx, storeAddr, storeID) +======= + stream, ok := s.storeStreamsCache[storeAddr] + if !ok || stream.isCanceled.Load() { + if ok { + // If the stream is canceled, we need to delete it from the cache and close it. + s.deleteStream(stream) + } + stream, err = s.client.newStream(ctx, storeAddr, storeID) +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) if err != nil { // get stream failed, maybe the store is down permanently, we should try to relocate the active store log.Warn("get grpc stream client failed", @@ -664,13 +772,20 @@ func (s *eventFeedSession) requestRegionToStore( s.onRegionFail(ctx, errInfo) continue } +<<<<<<< HEAD s.addStream(storeAddr, stream, streamCancel) +======= + + s.addStream(stream) + +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) log.Info("creating new stream to store to send request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), zap.Uint64("storeID", storeID), +<<<<<<< HEAD zap.String("addr", storeAddr)) g.Go(func() error { @@ -694,6 +809,18 @@ func (s *eventFeedSession) requestRegionToStore( pendingRegions.setByRequestID(requestID, state) log.Info("start new request", +======= + zap.String("store", storeAddr), + zap.Uint64("streamID", stream.id)) + g.Go(func() error { + return s.receiveFromStream(ctx, stream) + }) + } + + state := newRegionFeedState(sri, requestID) + stream.regions.setByRequestID(requestID, state) + s.client.logRegionDetails("start new request", +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), @@ -702,9 +829,12 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("addr", storeAddr)) err = stream.client.Send(req) +<<<<<<< HEAD // If Send error, the receiver should have received error too or will receive error soon. So we don't need // to do extra work here. +======= +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) if err != nil { log.Warn("send request to stream failed", zap.String("namespace", s.changefeed.Namespace), @@ -728,6 +858,7 @@ func (s *eventFeedSession) requestRegionToStore( zap.Uint64("requestID", requestID), zap.Error(err)) } +<<<<<<< HEAD // Delete the stream from the map so that the next time the store is accessed, the stream will be // re-established. s.deleteStream(storeAddr) @@ -736,10 +867,15 @@ func (s *eventFeedSession) requestRegionToStore( // pending regions, the new pending regions that are requested after reconnecting won't be stopped // incorrectly. delete(storePendingRegions, storeAddr) +======= +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) + // Delete the stream from the cache so that when next time a region of + // this store is requested, a new stream to this store will be created. + s.deleteStream(stream) // Remove the region from pendingRegions. If it's already removed, it should be already retried by // `receiveFromStream`, so no need to retry here. - _, ok := pendingRegions.takeByRequestID(requestID) + _, ok := stream.regions.takeByRequestID(requestID) if !ok { continue } @@ -979,10 +1115,14 @@ func (s *eventFeedSession) getRPCContextForRegion(ctx context.Context, id tikv.R // routine exits to establish these regions. func (s *eventFeedSession) receiveFromStream( parentCtx context.Context, +<<<<<<< HEAD addr string, storeID uint64, stream cdcpb.ChangeData_EventFeedClient, pendingRegions *syncRegionFeedStateMap, +======= + stream *eventFeedStream, +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) ) error { var tsStat *tableStoreStat s.client.tableStoreStats.Lock() @@ -1004,7 +1144,7 @@ func (s *eventFeedSession) receiveFromStream( failpoint.Inject("kvClientStreamCloseDelay", nil) - remainingRegions := pendingRegions.takeAll() + remainingRegions := stream.regions.takeAll() for _, state := range remainingRegions { errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs()) s.onRegionFail(parentCtx, errInfo) @@ -1016,7 +1156,11 @@ func (s *eventFeedSession) receiveFromStream( // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic +<<<<<<< HEAD worker := newRegionWorker(parentCtx, s.changefeed, s, addr, pendingRegions) +======= + worker := newRegionWorker(parentCtx, stream, s) +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) defer worker.evictAllRegions() ctx, cancel := context.WithCancel(parentCtx) @@ -1126,7 +1270,12 @@ func (s *eventFeedSession) receiveFromStream( } } } +<<<<<<< HEAD err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, addr) +======= + + err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, stream.regions) +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) if err != nil { return err } @@ -1296,6 +1445,7 @@ func (s *eventFeedSession) sendResolvedTs( return nil } +<<<<<<< HEAD func (s *eventFeedSession) addStream(storeAddr string, stream *eventFeedStream, cancel context.CancelFunc) { s.streamsLock.Lock() defer s.streamsLock.Unlock() @@ -1328,6 +1478,88 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can defer s.streamsLock.RUnlock() cancel, ok = s.streamsCanceller[storeAddr] return +======= +// addStream adds a stream to the session.streams. +// Note: It must be called with deleteStream in a same goroutine. +func (s *eventFeedSession) addStream(stream *eventFeedStream) { + oldStream, ok := s.storeStreamsCache[stream.addr] + if ok { + failpoint.Inject("kvClientAddDuplicatedStream", func() { + log.Error("A stream to a same store already exists, it shouldn't happen, please report a bug", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("oldStreamID", oldStream.id), + zap.Uint64("newStreamID", stream.id)) + }) + // There is no need to return an error here because even if it happens, + // it does not harm the data correctness, but may only cause some lag spikes. + // Log it to help us improve the code. + log.Error("A stream to a same store already exists, it shouldn't happen, please report a bug", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("oldStreamID", oldStream.id), + zap.Uint64("newStreamID", stream.id)) + } + s.storeStreamsCache[stream.addr] = stream +} + +// deleteStream deletes a stream from the session.streams. +// If the stream is not found, it takes no effect. +func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { + streamInMap, ok := s.storeStreamsCache[streamToDelete.addr] + if !ok { + // This should not happen, but it will be no harm if it happens. + // Log a warning message to help us diagnose the problem. + log.Warn("delete stream failed, stream not found, ignore it", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Uint64("streamIDInMap", streamInMap.id)) + return + } + if streamInMap.id != streamToDelete.id { + // This should not happen, but it will be no harm if it happens. + // Log a warning message to help us diagnose the problem. + log.Warn("delete stream failed, stream id mismatch, ignore it", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Uint64("streamIDInMap", streamInMap.id)) + return + } + + if time.Since(streamToDelete.createTime) < streamAlterInterval { + log.Warn("It's too soon to delete a stream, wait for a while", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Duration("sinceCreateDuration", time.Since(streamToDelete.createTime))) + time.Sleep(streamAlterInterval - time.Since(streamToDelete.createTime)) + } + + s.client.grpcPool.ReleaseConn(streamToDelete.conn, streamToDelete.addr) + streamToDelete.close() + delete(s.storeStreamsCache, streamToDelete.addr) + log.Info("A stream to store has been removed", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("store", streamToDelete.addr), + zap.Uint64("storeID", streamToDelete.storeID), + zap.Uint64("streamID", streamToDelete.id), + zap.Duration("sinceCreateDuration", time.Since(streamToDelete.createTime))) +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) } func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index c2288e3705e..03efcba6395 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -1382,6 +1382,12 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) + err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientAddDuplicatedStream", failpointStr) + require.Nil(t, err) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientAddDuplicatedStream") + }() + err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError", failpointStr) require.Nil(t, err) defer func() { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 3934f33caa4..1c849ca488c 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -109,12 +109,17 @@ type regionWorker struct { metrics *regionWorkerMetrics +<<<<<<< HEAD storeAddr string // how many pending input events inputPending int32 pendingRegions *syncRegionFeedStateMap +======= + // how many pending input events from the input channel + inputPendingEvents int32 +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) } func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { @@ -147,6 +152,7 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric } func newRegionWorker( +<<<<<<< HEAD ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, pendingRegions *syncRegionFeedStateMap, ) *regionWorker { @@ -165,6 +171,25 @@ func newRegionWorker( inputPending: 0, pendingRegions: pendingRegions, +======= + ctx context.Context, + stream *eventFeedStream, + s *eventFeedSession, +) *regionWorker { + return ®ionWorker{ + parentCtx: ctx, + session: s, + inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), + outputCh: s.eventCh, + stream: stream, + errorCh: make(chan error, 1), + statesManager: newRegionStateManager(-1), + rtsManager: newRegionTsManager(), + rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), + concurrency: int(s.client.config.KVClient.WorkerConcurrent), + metrics: newRegionWorkerMetrics(s.changefeed, strconv.FormatInt(s.tableID, 10), stream.addr), + inputPendingEvents: 0, +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) } } @@ -200,7 +225,20 @@ func (w *regionWorker) checkShouldExit() error { empty := w.checkRegionStateEmpty() // If there is no region maintained by this region worker, exit it and // cancel the gRPC stream. +<<<<<<< HEAD if empty && w.pendingRegions.len() == 0 { +======= + if empty && w.stream.regions.len() == 0 { + log.Info("A single region error happens before, "+ + "and there is no region maintained by the stream, "+ + "exit it and cancel the gRPC stream", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.String("storeAddr", w.stream.addr), + zap.Uint64("streamID", w.stream.id), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName)) +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) w.cancelStream(time.Duration(0)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } @@ -471,6 +509,7 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { } regionEventsBatchSize.Observe(float64(len(events))) +<<<<<<< HEAD inputPending := atomic.LoadInt32(&w.inputPending) if highWatermarkMet { highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark @@ -520,6 +559,25 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { // ensure low processing latency. for _, event := range events { err = w.processEvent(ctx, event) +======= + start := time.Now() + inputPending := atomic.LoadInt32(&w.inputPendingEvents) + if highWatermarkMet { + highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark + } else { + highWatermarkMet = int(inputPending) >= regionWorkerHighWatermark + } + atomic.AddInt32(&w.inputPendingEvents, -int32(len(events))) + + if highWatermarkMet { + // All events in one batch can be hashed into one handle slot. + slot := w.inputCalcSlot(events[0].regionID) + eventsX := make([]interface{}, 0, len(events)) + for _, event := range events { + eventsX = append(eventsX, event) + } + err := w.handles[slot].AddEvents(ctx, eventsX) +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) if err != nil { return err } @@ -566,6 +624,7 @@ func (w *regionWorker) checkErrorReconnect(err error) error { } func (w *regionWorker) cancelStream(delay time.Duration) { +<<<<<<< HEAD cancel, ok := w.session.getStreamCancel(w.storeAddr) if ok { // cancel the stream to trigger strem.Recv with context cancel error @@ -581,6 +640,17 @@ func (w *regionWorker) cancelStream(delay time.Duration) { zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID)) } +======= + // cancel the stream to make strem.Recv returns a context cancel error + // This will make the receiveFromStream goroutine exit and the stream can + // be re-established by the caller. + // Note: use context cancel is the only way to terminate a gRPC stream. + w.stream.close() + // Failover in stream.Recv has 0-100ms delay, the onRegionFail + // should be called after stream has been deleted. Add a delay here + // to avoid too frequent region rebuilt. + time.Sleep(delay) +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) } func (w *regionWorker) run() error { @@ -839,10 +909,10 @@ func (w *regionWorker) evictAllRegions() { // sendEvents puts events into inputCh and updates some internal states. // Callers must ensure that all items in events can be hashed into one handle slot. func (w *regionWorker) sendEvents(ctx context.Context, events []*regionStatefulEvent) error { - atomic.AddInt32(&w.inputPending, int32(len(events))) + atomic.AddInt32(&w.inputPendingEvents, int32(len(events))) select { case <-ctx.Done(): - atomic.AddInt32(&w.inputPending, -int32(len(events))) + atomic.AddInt32(&w.inputPendingEvents, -int32(len(events))) return ctx.Err() case w.inputCh <- events: return nil diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index c8041246a96..20f069f6de3 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -159,7 +159,16 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { &tikv.RPCContext{}), 0) state.sri.lockedRange = ®ionlock.LockedRange{} state.start() +<<<<<<< HEAD worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) +======= + stream := &eventFeedStream{ + storeID: 1, + id: 2, + regions: newSyncRegionFeedStateMap(), + } + worker := newRegionWorker(ctx, stream, s) +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -322,7 +331,16 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { s1.sri.lockedRange = ®ionlock.LockedRange{} s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() +<<<<<<< HEAD w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) +======= + stream := &eventFeedStream{ + storeID: 1, + id: 2, + regions: newSyncRegionFeedStateMap(), + } + w := newRegionWorker(ctx, stream, s) +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5, diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 063c0e334dc..6c0cb4ea84f 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -170,7 +170,19 @@ const ( "check-balance-interval": 60000000000, "add-table-batch-size": 50 }, +<<<<<<< HEAD "enable-kv-connect-backoff": false, +======= + "cdc-v2": { + "enable": false, + "meta-store": { + "uri": "", + "ssl-ca": "", + "ssl-cert": "", + "ssl-key": "" + } + }, +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) "puller": { "enable-resolved-ts-stuck-detection": false, "resolved-ts-stuck-interval": 300000000000 diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 59528b886a1..41390dee1fe 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -26,8 +26,13 @@ type DebugConfig struct { // Scheduler is the configuration of the two-phase scheduler. Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` +<<<<<<< HEAD // EnableKVConnectBackOff enables the backoff for kv connect. EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` +======= + // CDCV2 enables ticdc version 2 implementation with new metastore + CDCV2 *CDCV2 `toml:"cdc-v2" json:"cdc-v2"` +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) // Puller is the configuration of the puller. Puller *PullerConfig `toml:"puller" json:"puller"` diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index c77b6e17ad5..feb5d2f6d9e 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -132,8 +132,13 @@ var defaultServerConfig = &ServerConfig{ }, Messages: defaultMessageConfig.Clone(), +<<<<<<< HEAD Scheduler: NewDefaultSchedulerConfig(), EnableKVConnectBackOff: false, +======= + Scheduler: NewDefaultSchedulerConfig(), + CDCV2: &CDCV2{Enable: false}, +>>>>>>> a609ffc488 (kv (ticdc): Improve the codebase of kvClient. (#10585)) Puller: &PullerConfig{ EnableResolvedTsStuckDetection: false, ResolvedTsStuckInterval: TomlDuration(5 * time.Minute),