From 80bb448852c10ce3287bccf7148379cbefb0c608 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Wed, 31 Jan 2024 15:43:54 +0800 Subject: [PATCH 1/6] This is an automated cherry-pick of #10559 Signed-off-by: ti-chi-bot --- cdc/entry/schema_test_helper.go | 128 +++++++++++++ cdc/kv/client.go | 312 +++++++++++++++++++++++++------- cdc/kv/client_test.go | 204 ++++++++++----------- cdc/kv/region_worker.go | 84 ++++++--- cdc/kv/region_worker_test.go | 12 +- 5 files changed, 553 insertions(+), 187 deletions(-) diff --git a/cdc/entry/schema_test_helper.go b/cdc/entry/schema_test_helper.go index e4f639e0917..7bbc03459f2 100644 --- a/cdc/entry/schema_test_helper.go +++ b/cdc/entry/schema_test_helper.go @@ -17,10 +17,15 @@ package entry import ( +<<<<<<< HEAD +======= + "context" +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) "encoding/json" "strings" "testing" +<<<<<<< HEAD ticonfig "github.com/pingcap/tidb/config" tiddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" @@ -30,6 +35,23 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/testkit" +======= + ticonfig "github.com/pingcap/tidb/pkg/config" + tiddl "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/kv" + timeta "github.com/pingcap/tidb/pkg/meta" + timodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/store/mockstore" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/integrity" + "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/tiflow/pkg/util" +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" ) @@ -130,6 +152,112 @@ func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job { return jobs } +<<<<<<< HEAD +======= +// DML2Event execute the dml and return the corresponding row changed event. +// caution: it does not support `delete` since the key value cannot be found +// after the query executed. +func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.RowChangedEvent { + s.tk.MustExec(dml) + + tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(schema, table) + require.True(s.t, ok) + + key, value := s.getLastKeyValue(tableInfo.ID) + ts := s.schemaStorage.GetLastSnapshot().CurrentTs() + rawKV := &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + Value: value, + OldValue: nil, + StartTs: ts - 1, + CRTs: ts + 1, + } + polymorphicEvent := model.NewPolymorphicEvent(rawKV) + err := s.mounter.DecodeEvent(context.Background(), polymorphicEvent) + require.NoError(s.t, err) + return polymorphicEvent.Row +} + +func (s *SchemaTestHelper) getLastKeyValue(tableID int64) (key, value []byte) { + txn, err := s.storage.Begin() + require.NoError(s.t, err) + defer txn.Rollback() //nolint:errcheck + + start, end := spanz.GetTableRange(tableID) + iter, err := txn.Iter(start, end) + require.NoError(s.t, err) + defer iter.Close() + for iter.Valid() { + key = iter.Key() + value = iter.Value() + err = iter.Next() + require.NoError(s.t, err) + } + return key, value +} + +// DDL2Event executes the DDL and return the corresponding event. +func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent { + s.tk.MustExec(ddl) + jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1) + require.NoError(s.t, err) + require.Len(s.t, jobs, 1) + // Set State from Synced to Done. + // Because jobs are put to history queue after TiDB alter its state from + // Done to Synced. + jobs[0].State = timodel.JobStateDone + res := jobs[0] + if res.Type == timodel.ActionRenameTables { + // the RawArgs field in job fetched from tidb snapshot meta is incorrent, + // so we manually construct `job.RawArgs` to do the workaround. + // we assume the old schema name is same as the new schema name here. + // for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test" + schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0] + tableNum := len(res.BinlogInfo.MultipleTableInfos) + oldSchemaIDs := make([]int64, tableNum) + for i := 0; i < tableNum; i++ { + oldSchemaIDs[i] = res.SchemaID + } + oldTableIDs := make([]int64, tableNum) + for i := 0; i < tableNum; i++ { + oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID + } + newTableNames := make([]timodel.CIStr, tableNum) + for i := 0; i < tableNum; i++ { + newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name + } + oldSchemaNames := make([]timodel.CIStr, tableNum) + for i := 0; i < tableNum; i++ { + oldSchemaNames[i] = timodel.NewCIStr(schema) + } + newSchemaIDs := oldSchemaIDs + + args := []interface{}{ + oldSchemaIDs, newSchemaIDs, + newTableNames, oldTableIDs, oldSchemaNames, + } + rawArgs, err := json.Marshal(args) + require.NoError(s.t, err) + res.RawArgs = rawArgs + } + + err = s.schemaStorage.HandleDDLJob(res) + require.NoError(s.t, err) + + ver, err := s.storage.CurrentVersion(oracle.GlobalTxnScope) + require.NoError(s.t, err) + s.schemaStorage.AdvanceResolvedTs(ver.Ver) + + ctx := context.Background() + + events, err := s.schemaStorage.BuildDDLEvents(ctx, res) + require.NoError(s.t, err) + + return events[0] +} + +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) // Storage returns the tikv storage func (s *SchemaTestHelper) Storage() kv.Storage { return s.storage diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 8d4fcce010a..64dc19bb6af 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -80,6 +80,10 @@ const ( // time interval to force kv client to terminate gRPC stream and reconnect var reconnectInterval = 60 * time.Minute +// streamAlterInterval is the interval to limit the frequency of creating/deleting streams. +// Make it a variable so that we can change it in unit test. +var streamAlterInterval = 1 * time.Second + type regionStatefulEvent struct { changeEvent *cdcpb.Event resolvedTsEvent *resolvedTsEvent @@ -133,6 +137,14 @@ func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo { type eventFeedStream struct { client cdcpb.ChangeData_EventFeedClient conn *sharedConn + // addr is the address of the TiKV store + addr string + // storeID is the ID of the TiKV store + storeID uint64 + // id is the stream ID, which is used to identify the stream. + id uint64 + // cancel is used to cancel the gRPC stream + cancel context.CancelFunc } // CDCKVClient is an interface to receives kv changed logs from TiKV @@ -221,7 +233,12 @@ func NewCDCClient( return c } -func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) { +func (c *CDCClient) newStream( + ctx context.Context, + cancel context.CancelFunc, + addr string, + storeID uint64, +) (stream *eventFeedStream, newStreamErr error) { streamFunc := func() (err error) { var conn *sharedConn defer func() { @@ -244,13 +261,25 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) return cerror.WrapError(cerror.ErrTiKVEventFeed, err) } stream = &eventFeedStream{ - client: streamClient, - conn: conn, + client: streamClient, + conn: conn, + addr: addr, + storeID: storeID, + id: allocateStreamID(), + cancel: cancel, } log.Debug("created stream to store", zap.String("namespace", c.changefeed.Namespace), zap.String("changefeed", c.changefeed.ID), +<<<<<<< HEAD zap.String("addr", addr)) +======= + zap.Int64("tableID", c.tableID), + zap.String("tableName", c.tableName), + zap.String("store", addr), + zap.Uint64("storeID", storeID), + zap.Uint64("streamID", stream.id)) +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) return nil } if c.config.Debug.EnableKVConnectBackOff { @@ -316,15 +345,21 @@ func (c *CDCClient) CommitTs() model.Ts { return ingressCommitTs } -var currentID uint64 = 0 +var currentRequestID uint64 = 0 -func allocID() uint64 { - return atomic.AddUint64(¤tID, 1) +func allocateRequestID() uint64 { + return atomic.AddUint64(¤tRequestID, 1) } // used in test only -func currentRequestID() uint64 { - return atomic.LoadUint64(¤tID) +func getCurrentRequestID() uint64 { + return atomic.LoadUint64(¤tRequestID) +} + +var currentStreamID uint64 = 0 + +func allocateStreamID() uint64 { + return atomic.AddUint64(¤tStreamID, 1) } type eventFeedSession struct { @@ -358,9 +393,15 @@ type eventFeedSession struct { errChSizeGauge prometheus.Gauge rangeChSizeGauge prometheus.Gauge - streams map[string]*eventFeedStream - streamsLock sync.RWMutex - streamsCanceller map[string]context.CancelFunc + // storeStreamsCache is used to cache the established gRPC streams to TiKV stores. + storeStreamsCache struct { + sync.RWMutex + m map[string]*eventFeedStream + // lastAlterTime is used to record last time a stream is created/deleted to/from the cache. + // It is used to avoid creation/deleting streams too frequently, which may cause + // huge overhead of incremental region scanning in TiKV. + lastAlterTime map[string]time.Time + } // use sync.Pool to store resolved ts event only, because resolved ts event // has the same size and generate cycle. @@ -378,18 +419,23 @@ func newEventFeedSession( startTs uint64, eventCh chan<- model.RegionFeedEvent, ) *eventFeedSession { +<<<<<<< HEAD id := allocID() idStr := strconv.FormatUint(id, 10) +======= + id := allocateRequestID() +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) rangeLock := regionlock.NewRegionRangeLock( id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) - return &eventFeedSession{ + res := &eventFeedSession{ client: client, startTs: startTs, changefeed: client.changefeed, tableID: client.tableID, tableName: client.tableName, +<<<<<<< HEAD totalSpan: totalSpan, eventCh: eventCh, rangeLock: rangeLock, @@ -400,6 +446,19 @@ func newEventFeedSession( rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), streams: make(map[string]*eventFeedStream), streamsCanceller: make(map[string]context.CancelFunc), +======= + totalSpan: totalSpan, + eventCh: eventCh, + rangeLock: rangeLock, + lockResolver: lockResolver, + enableTableMonitor: enableTableMonitor, + regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), + errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), + rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -408,6 +467,9 @@ func newEventFeedSession( }, }, } + res.storeStreamsCache.m = make(map[string]*eventFeedStream) + res.storeStreamsCache.lastAlterTime = make(map[string]time.Time) + return res } func (s *eventFeedSession) eventFeed(ctx context.Context) error { @@ -589,7 +651,7 @@ func (s *eventFeedSession) requestRegionToStore( // Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map, // and it will be loaded by the receiver thread when it receives the first response from that region. We need this // to pass the region info to the receiver since the region info cannot be inferred from the response from TiKV. - storePendingRegions := make(map[string]*syncRegionFeedStateMap) + storePendingRegions := make(map[uint64]*syncRegionFeedStateMap) header := &cdcpb.Header{ ClusterId: s.client.clusterID, @@ -605,7 +667,7 @@ func (s *eventFeedSession) requestRegionToStore( return errors.Trace(ctx.Err()) case sri = <-s.regionRouter.Out(): } - requestID := allocID() + requestID := allocateRequestID() rpcCtx := sri.rpcCtx regionID := rpcCtx.Meta.GetId() @@ -637,10 +699,8 @@ func (s *eventFeedSession) requestRegionToStore( // regions map, the old map will be used in old `receiveFromStream` // and won't be deleted until that goroutine exits. pendingRegions := newSyncRegionFeedStateMap() - storePendingRegions[storeAddr] = pendingRegions streamCtx, streamCancel := context.WithCancel(ctx) - _ = streamCancel // to avoid possible context leak warning from govet - stream, err = s.client.newStream(streamCtx, storeAddr, storeID) + stream, err = s.client.newStream(streamCtx, streamCancel, storeAddr, storeID) if err != nil { // get stream failed, maybe the store is down permanently, we should try to relocate the active store log.Warn("get grpc stream client failed", @@ -664,22 +724,28 @@ func (s *eventFeedSession) requestRegionToStore( s.onRegionFail(ctx, errInfo) continue } - s.addStream(storeAddr, stream, streamCancel) + + storePendingRegions[stream.id] = pendingRegions + s.addStream(stream) log.Info("creating new stream to store to send request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), zap.Uint64("storeID", storeID), +<<<<<<< HEAD zap.String("addr", storeAddr)) +======= + zap.String("store", storeAddr), + zap.Uint64("streamID", stream.id)) +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) g.Go(func() error { - defer s.deleteStream(storeAddr) - return s.receiveFromStream(ctx, storeAddr, storeID, stream.client, pendingRegions) + return s.receiveFromStream(ctx, stream, pendingRegions) }) } - pendingRegions, ok := storePendingRegions[storeAddr] + pendingRegions, ok := storePendingRegions[stream.id] if !ok { // Should never happen log.Panic("pending regions is not found for store", @@ -692,8 +758,12 @@ func (s *eventFeedSession) requestRegionToStore( state := newRegionFeedState(sri, requestID) pendingRegions.setByRequestID(requestID, state) +<<<<<<< HEAD log.Info("start new request", +======= + s.client.logRegionDetails("start new request", +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), @@ -702,9 +772,8 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("addr", storeAddr)) err = stream.client.Send(req) - - // If Send error, the receiver should have received error too or will receive error soon. So we don't need - // to do extra work here. + // If Send returns an error, the stream.client.Recv (In s.receiveFromStream) + // would also receive an error. if err != nil { log.Warn("send request to stream failed", zap.String("namespace", s.changefeed.Namespace), @@ -713,6 +782,11 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("tableName", s.tableName), zap.String("addr", storeAddr), zap.Uint64("storeID", storeID), +<<<<<<< HEAD +======= + zap.String("store", storeAddr), + zap.Uint64("streamID", stream.id), +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), zap.Error(err)) @@ -722,20 +796,24 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), +<<<<<<< HEAD zap.String("addr", storeAddr), +======= + zap.Uint64("streamID", stream.id), +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) zap.Uint64("storeID", storeID), zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), zap.Error(err)) } - // Delete the stream from the map so that the next time the store is accessed, the stream will be - // re-established. - s.deleteStream(storeAddr) + // Delete the stream from the cache so that when next time the store is accessed, + // the stream can be re-established. + s.deleteStream(stream) // Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is // requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all // pending regions, the new pending regions that are requested after reconnecting won't be stopped // incorrectly. - delete(storePendingRegions, storeAddr) + delete(storePendingRegions, stream.id) // Remove the region from pendingRegions. If it's already removed, it should be already retried by // `receiveFromStream`, so no need to retry here. @@ -743,7 +821,17 @@ func (s *eventFeedSession) requestRegionToStore( if !ok { continue } +<<<<<<< HEAD +======= + s.client.logRegionDetails("region send to store failed", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("regionId", sri.verID.GetID()), + zap.Stringer("span", &sri.span)) +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{}) s.onRegionFail(ctx, errInfo) } @@ -979,14 +1067,12 @@ func (s *eventFeedSession) getRPCContextForRegion(ctx context.Context, id tikv.R // routine exits to establish these regions. func (s *eventFeedSession) receiveFromStream( parentCtx context.Context, - addr string, - storeID uint64, - stream cdcpb.ChangeData_EventFeedClient, + stream *eventFeedStream, pendingRegions *syncRegionFeedStateMap, ) error { var tsStat *tableStoreStat s.client.tableStoreStats.Lock() - key := fmt.Sprintf("%d_%d", s.totalSpan.TableID, storeID) + key := fmt.Sprintf("%d_%d", s.totalSpan.TableID, stream.storeID) if tsStat = s.client.tableStoreStats.v[key]; tsStat == nil { tsStat = new(tableStoreStat) s.client.tableStoreStats.v[key] = tsStat @@ -1000,7 +1086,15 @@ func (s *eventFeedSession) receiveFromStream( log.Info("stream to store closed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), +<<<<<<< HEAD zap.String("addr", addr), zap.Uint64("storeID", storeID)) +======= + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("store", stream.addr), + zap.Uint64("storeID", stream.storeID), + zap.Uint64("streamID", stream.id)) +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) failpoint.Inject("kvClientStreamCloseDelay", nil) @@ -1013,10 +1107,21 @@ func (s *eventFeedSession) receiveFromStream( metricSendEventBatchResolvedSize := batchResolvedEventSize. WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) +<<<<<<< HEAD +======= + metricReceiveBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-receiver") + metricProcessBusyRatio := workerBusyRatio.WithLabelValues( + s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-processor") +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic - worker := newRegionWorker(parentCtx, s.changefeed, s, addr, pendingRegions) + worker := newRegionWorker( + parentCtx, + stream, + s, + pendingRegions) defer worker.evictAllRegions() ctx, cancel := context.WithCancel(parentCtx) @@ -1034,10 +1139,22 @@ func (s *eventFeedSession) receiveFromStream( eg.Go(func() error { err := handleExit(worker.run()) if err != nil { +<<<<<<< HEAD log.Error("region worker exited with error", zap.Error(err), zap.Any("changefeed", s.changefeed), zap.Any("addr", addr), zap.Any("storeID", storeID)) +======= + log.Error("region worker exited with error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("store", stream.addr), + zap.Uint64("storeID", stream.storeID), + zap.Uint64("streamID", stream.id), + zap.Error(err)) +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) } return err }) @@ -1045,7 +1162,12 @@ func (s *eventFeedSession) receiveFromStream( receiveEvents := func() error { maxCommitTs := model.Ts(0) for { +<<<<<<< HEAD cevent, err := stream.Recv() +======= + startToReceive := time.Now() + cevent, err := stream.client.Recv() +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) { if op.(string) == "error" { @@ -1066,18 +1188,35 @@ func (s *eventFeedSession) receiveFromStream( "receive from stream canceled", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), +<<<<<<< HEAD zap.String("addr", addr), zap.Uint64("storeID", storeID), ) +======= + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("store", stream.addr), + zap.Uint64("storeID", stream.storeID), + zap.Uint64("streamID", stream.id)) +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) } else { log.Warn( "failed to receive from stream", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), +<<<<<<< HEAD zap.String("addr", addr), zap.Uint64("storeID", storeID), zap.Error(err), ) +======= + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("store", stream.addr), + zap.Uint64("storeID", stream.storeID), + zap.Uint64("streamID", stream.id), + zap.Error(err)) +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) // Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down // tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new // election @@ -1088,13 +1227,9 @@ func (s *eventFeedSession) receiveFromStream( // needs time to recover, kv client doesn't need to retry frequently. // TODO: add a better retry backoff or rate limitter time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - - // TODO: better to closes the send direction of the stream to notify - // the other side, but it is not safe to call CloseSend concurrently - // with SendMsg, in future refactor we should refine the recv loop - s.deleteStream(addr) - // send nil regionStatefulEvent to signal worker exit + // worker.sendEvents will return error if ctx is canceled + // In this case, we should return the error to the caller to cancel the whole job. err = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) if err != nil { return err @@ -1296,37 +1431,88 @@ func (s *eventFeedSession) sendResolvedTs( return nil } -func (s *eventFeedSession) addStream(storeAddr string, stream *eventFeedStream, cancel context.CancelFunc) { - s.streamsLock.Lock() - defer s.streamsLock.Unlock() - s.streams[storeAddr] = stream - s.streamsCanceller[storeAddr] = cancel +func (s *eventFeedSession) addStream(stream *eventFeedStream) { + s.storeStreamsCache.Lock() + defer s.storeStreamsCache.Unlock() + if lastTime, ok := s.storeStreamsCache.lastAlterTime[stream.addr]; ok { + if time.Since(lastTime) < streamAlterInterval { + log.Warn( + "add a stream of a same store too frequently, wait a while and try again", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("storeID", stream.storeID), + zap.Duration("sinceLastTime", time.Since(lastTime))) + time.Sleep(streamAlterInterval - time.Since(lastTime)) + } + } + s.storeStreamsCache.m[stream.addr] = stream + s.storeStreamsCache.lastAlterTime[stream.addr] = time.Now() } -func (s *eventFeedSession) deleteStream(storeAddr string) { - s.streamsLock.Lock() - defer s.streamsLock.Unlock() - if stream, ok := s.streams[storeAddr]; ok { - s.client.grpcPool.ReleaseConn(stream.conn, storeAddr) - delete(s.streams, storeAddr) +// deleteStream deletes a stream from the session.streams. +// If the stream is not found, it will be ignored. +func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { + s.storeStreamsCache.Lock() + defer s.storeStreamsCache.Unlock() + + streamInMap, ok := s.storeStreamsCache.m[streamToDelete.addr] + if !ok { + log.Warn("delete stream failed, stream not found, ignore it", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Uint64("streamIDInMap", streamInMap.id)) + return } - if cancel, ok := s.streamsCanceller[storeAddr]; ok { - cancel() - delete(s.streamsCanceller, storeAddr) + + if streamInMap.id != streamToDelete.id { + log.Warn("delete stream failed, stream id mismatch, ignore it", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Uint64("streamIDInMap", streamInMap.id)) + return } -} -func (s *eventFeedSession) getStream(storeAddr string) (stream *eventFeedStream, ok bool) { - s.streamsLock.RLock() - defer s.streamsLock.RUnlock() - stream, ok = s.streams[storeAddr] - return + if lastTime, ok := s.storeStreamsCache.lastAlterTime[streamToDelete.addr]; ok { + if time.Since(lastTime) < streamAlterInterval { + log.Warn( + "delete a stream of a same store too frequently, wait a while and try again", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Duration("sinceLastTime", time.Since(lastTime))) + time.Sleep(streamAlterInterval - time.Since(lastTime)) + } + } + + s.client.grpcPool.ReleaseConn(streamToDelete.conn, streamToDelete.addr) + delete(s.storeStreamsCache.m, streamToDelete.addr) + streamToDelete.cancel() + s.storeStreamsCache.lastAlterTime[streamToDelete.addr] = time.Now() + + log.Info("A stream to store has been removed", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("store", streamToDelete.addr), + zap.Uint64("storeID", streamToDelete.storeID), + zap.Uint64("streamID", streamToDelete.id)) } -func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.CancelFunc, ok bool) { - s.streamsLock.RLock() - defer s.streamsLock.RUnlock() - cancel, ok = s.streamsCanceller[storeAddr] +func (s *eventFeedSession) getStream(storeAddr string) (stream *eventFeedStream, ok bool) { + s.storeStreamsCache.RLock() + defer s.storeStreamsCache.RUnlock() + stream, ok = s.storeStreamsCache.m[storeAddr] return } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 9cc31215b14..8ef2e59e7e7 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -55,6 +55,7 @@ func Test(t *testing.T) { conf := config.GetDefaultServerConfig() config.StoreGlobalServerConfig(conf) InitWorkerPool() + streamAlterInterval = 1 * time.Microsecond go func() { RunWorkerPool(context.Background()) //nolint:errcheck }() @@ -276,11 +277,11 @@ func newMockServiceSpecificAddr( // waitRequestID waits request ID larger than the given allocated ID func waitRequestID(t *testing.T, allocatedID uint64) { err := retry.Do(context.Background(), func() error { - if currentRequestID() > allocatedID { + if getCurrentRequestID() > allocatedID { return nil } - return errors.Errorf("request id %d is not larger than %d", currentRequestID(), allocatedID) - }, retry.WithBackoffBaseDelay(10), retry.WithMaxTries(20)) + return errors.Errorf("request id %d is not larger than %d", getCurrentRequestID(), allocatedID) + }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(100)) require.Nil(t, err) } @@ -311,7 +312,7 @@ func TestConnectOfflineTiKV(t *testing.T) { // {1,2} is the storeID, {4,5} is the peerID, means peer4 is in the store1 cluster.Bootstrap(3, []uint64{1, 2}, []uint64{4, 5}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -341,7 +342,7 @@ func TestConnectOfflineTiKV(t *testing.T) { Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ ResolvedTs: ts, }, @@ -354,7 +355,7 @@ func TestConnectOfflineTiKV(t *testing.T) { require.Equal(t, ts, event.Resolved.ResolvedTs) } - initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) + initialized := mockInitializedEvent(3 /* regionID */, getCurrentRequestID()) ch2 <- initialized cluster.ChangeLeader(3, 5) @@ -379,13 +380,11 @@ func TestConnectOfflineTiKV(t *testing.T) { require.FailNow(t, "reconnection not succeed in 1 second") } checkEvent(event, ver.Ver) - // check gRPC connection active counter is updated correctly bucket, ok := grpcPool.bucketConns[invalidStore] require.True(t, ok) empty := bucket.recycle() require.True(t, empty) - cancel() } @@ -413,7 +412,7 @@ func TestRecvLargeMessageSize(t *testing.T) { cluster.AddStore(2, addr) cluster.Bootstrap(3, []uint64{2}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -436,7 +435,7 @@ func TestRecvLargeMessageSize(t *testing.T) { // new session, new request waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) + initialized := mockInitializedEvent(3 /* regionID */, getCurrentRequestID()) ch2 <- initialized var event model.RegionFeedEvent @@ -451,7 +450,7 @@ func TestRecvLargeMessageSize(t *testing.T) { largeMsg := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -513,7 +512,7 @@ func TestHandleError(t *testing.T) { cluster.SplitRaw(region3, region4, []byte("b"), []uint64{6, 7}, 6) cluster.SplitRaw(region4, region5, []byte("c"), []uint64{8, 9}, 9) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -540,7 +539,7 @@ func TestHandleError(t *testing.T) { notLeader := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ NotLeader: &errorpb.NotLeader{ @@ -562,7 +561,7 @@ func TestHandleError(t *testing.T) { epochNotMatch := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ EpochNotMatch: &errorpb.EpochNotMatch{}, @@ -576,7 +575,7 @@ func TestHandleError(t *testing.T) { regionNotFound := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ RegionNotFound: &errorpb.RegionNotFound{}, @@ -590,7 +589,7 @@ func TestHandleError(t *testing.T) { unknownErr := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{}, }, @@ -615,7 +614,7 @@ consumePreResolvedTs: // new session, no leader request, epoch not match request, // region not found request, unknown error request, normal request waitRequestID(t, baseAllocatedID+5) - initialized := mockInitializedEvent(3 /* regionID */, currentRequestID()) + initialized := mockInitializedEvent(3 /* regionID */, getCurrentRequestID()) ch2 <- initialized makeEvent := func(ts uint64) *cdcpb.ChangeDataEvent { @@ -623,7 +622,7 @@ consumePreResolvedTs: Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ ResolvedTs: ts, }, @@ -672,7 +671,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -698,7 +697,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { incompatibility := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ Compatibility: &cdcpb.Compatibility{ @@ -739,7 +738,7 @@ func TestClusterIDMismatch(t *testing.T) { cluster.AddStore(1, addr) cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -766,7 +765,7 @@ func TestClusterIDMismatch(t *testing.T) { clusterIDMismatchEvent := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ ClusterIdMismatch: &cdcpb.ClusterIDMismatch{ @@ -808,7 +807,7 @@ func testHandleFeedEvent(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -836,7 +835,7 @@ func testHandleFeedEvent(t *testing.T) { // simulate commit comes before prewrite { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -851,7 +850,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -868,7 +867,7 @@ func testHandleFeedEvent(t *testing.T) { // prewrite and commit in the normal sequence { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -883,7 +882,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -900,7 +899,7 @@ func testHandleFeedEvent(t *testing.T) { // commit event before initializtion without prewrite matched will be ignored { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -915,7 +914,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -931,7 +930,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -946,11 +945,11 @@ func testHandleFeedEvent(t *testing.T) { }, }, }} - initialized := mockInitializedEvent(3 /*regionID */, currentRequestID()) + initialized := mockInitializedEvent(3 /*regionID */, getCurrentRequestID()) eventsAfterInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -964,7 +963,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -979,7 +978,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -993,7 +992,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -1008,7 +1007,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -1024,7 +1023,7 @@ func testHandleFeedEvent(t *testing.T) { // simulate TiKV sends txn heartbeat, which is a prewrite event with empty value { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -1038,7 +1037,7 @@ func testHandleFeedEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -1055,7 +1054,7 @@ func testHandleFeedEvent(t *testing.T) { eventResolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 135}, }, }} @@ -1360,6 +1359,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { + cancel() close(ch1) server1.Stop() wg.Wait() @@ -1381,7 +1381,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError") }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -1403,14 +1403,14 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { // wait request id allocated with: new session, new request waitRequestID(t, baseAllocatedID+1) - initialized1 := mockInitializedEvent(regionID, currentRequestID()) + initialized1 := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized1 err = retry.Do(context.Background(), func() error { if len(ch1) == 0 { return nil } return errors.New("message is not sent") - }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(10)) + }, retry.WithBackoffBaseDelay(200), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(30)) require.Nil(t, err) @@ -1425,13 +1425,13 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { // wait request id allocated with: new session, new request*2 waitRequestID(t, baseAllocatedID+2) - initialized2 := mockInitializedEvent(regionID, currentRequestID()) + initialized2 := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized2 resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, }, }} @@ -1498,6 +1498,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { + cancel() close(ch1) server1.Stop() wg.Wait() @@ -1514,7 +1515,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -1538,15 +1539,15 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { // wait request id allocated with: new session, new request waitRequestID(t, baseAllocatedID+1) err = retry.Do(context.Background(), func() error { - if atomic.LoadUint64(&requestID) == currentRequestID() { + if atomic.LoadUint64(&requestID) == getCurrentRequestID() { return nil } return errors.Errorf("request is not received, requestID: %d, expected: %d", - atomic.LoadUint64(&requestID), currentRequestID()) - }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10)) + atomic.LoadUint64(&requestID), getCurrentRequestID()) + }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(30)) require.Nil(t, err) - initialized1 := mockInitializedEvent(regionID, currentRequestID()) + initialized1 := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized1 err = retry.Do(context.Background(), func() error { if len(ch1) == 0 { @@ -1560,7 +1561,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, }, }} @@ -1592,15 +1593,15 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { // wait request id allocated with: new session, new request*2 waitRequestID(t, baseAllocatedID+2) err = retry.Do(context.Background(), func() error { - if atomic.LoadUint64(&requestID) == currentRequestID() { + if atomic.LoadUint64(&requestID) == getCurrentRequestID() { return nil } return errors.Errorf("request is not received, requestID: %d, expected: %d", - atomic.LoadUint64(&requestID), currentRequestID()) - }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(10)) + atomic.LoadUint64(&requestID), getCurrentRequestID()) + }, retry.WithBackoffBaseDelay(50), retry.WithMaxTries(30)) require.Nil(t, err) - initialized2 := mockInitializedEvent(regionID, currentRequestID()) + initialized2 := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized2 err = retry.Do(context.Background(), func() error { if len(ch1) == 0 { @@ -1614,7 +1615,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { resolved = &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 130}, }, }} @@ -1800,7 +1801,7 @@ func TestNoPendingRegionError(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -1826,13 +1827,13 @@ func TestNoPendingRegionError(t *testing.T) { noPendingRegionEvent := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID() + 1, // an invalid request id + RequestId: getCurrentRequestID() + 1, // an invalid request id Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 100}, }, }} ch1 <- noPendingRegionEvent - initialized := mockInitializedEvent(3, currentRequestID()) + initialized := mockInitializedEvent(3, getCurrentRequestID()) ch1 <- initialized ev := <-eventCh require.NotNil(t, ev.Resolved) @@ -1841,7 +1842,7 @@ func TestNoPendingRegionError(t *testing.T) { resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 200}, }, }} @@ -1879,7 +1880,7 @@ func TestDropStaleRequest(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -1902,22 +1903,22 @@ func TestDropStaleRequest(t *testing.T) { // wait request id allocated with: new session, new request waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(regionID, currentRequestID()) + initialized := mockInitializedEvent(regionID, getCurrentRequestID()) eventsAfterInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 120}, }, // This event will be dropped { RegionId: regionID, - RequestId: currentRequestID() - 1, + RequestId: getCurrentRequestID() - 1, Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 125}, }, { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 130}, }, }} @@ -1993,7 +1994,7 @@ func TestResolveLock(t *testing.T) { defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientResolveLockInterval") }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2015,7 +2016,7 @@ func TestResolveLock(t *testing.T) { // wait request id allocated with: new session, new request waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(regionID, currentRequestID()) + initialized := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized physical, logical, err := pdClient.GetTS(ctx) require.Nil(t, err) @@ -2023,7 +2024,7 @@ func TestResolveLock(t *testing.T) { resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: tso}, }, }} @@ -2072,6 +2073,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { server1, addr1 := newMockService(ctx, t, srv1, wg) defer func() { + cancel() close(ch1) server1.Stop() wg.Wait() @@ -2099,7 +2101,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientErrUnreachable") }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2124,7 +2126,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { waitRequestID(t, baseAllocatedID+1) for _, event := range events { for _, ev := range event.Events { - ev.RequestId = currentRequestID() + ev.RequestId = getCurrentRequestID() } ch1 <- event } @@ -2138,7 +2140,7 @@ func TestCommittedFallback(t *testing.T) { {Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2163,7 +2165,7 @@ func TestDuplicateRequest(t *testing.T) { {Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ DuplicateRequest: &cdcpb.DuplicateRequest{RegionId: 3}, @@ -2227,7 +2229,7 @@ func testEventAfterFeedStop(t *testing.T) { defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientSingleFeedProcessDelay") }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2253,7 +2255,7 @@ func testEventAfterFeedStop(t *testing.T) { epochNotMatch := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ EpochNotMatch: &errorpb.EpochNotMatch{}, @@ -2268,7 +2270,7 @@ func testEventAfterFeedStop(t *testing.T) { committed := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2283,7 +2285,7 @@ func testEventAfterFeedStop(t *testing.T) { }, }, }} - initialized := mockInitializedEvent(regionID, currentRequestID()) + initialized := mockInitializedEvent(regionID, getCurrentRequestID()) resolved := &cdcpb.ChangeDataEvent{ ResolvedTs: &cdcpb.ResolvedTs{ Regions: []uint64{3}, @@ -2333,8 +2335,8 @@ func testEventAfterFeedStop(t *testing.T) { require.Nil(t, err) // wait request id allocated with: new session, 2 * new request - committedClone.Events[0].RequestId = currentRequestID() - initializedClone.Events[0].RequestId = currentRequestID() + committedClone.Events[0].RequestId = getCurrentRequestID() + initializedClone.Events[0].RequestId = getCurrentRequestID() ch2 <- committedClone ch2 <- initializedClone ch2 <- resolvedClone @@ -2414,7 +2416,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2441,7 +2443,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { // will be filtered out { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2457,7 +2459,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2472,12 +2474,12 @@ func TestOutOfRegionRangeEvent(t *testing.T) { }, }, }} - initialized := mockInitializedEvent(3 /*regionID */, currentRequestID()) + initialized := mockInitializedEvent(3 /*regionID */, getCurrentRequestID()) eventsAfterInit := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ // will be filtered out { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2493,7 +2495,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { // will be filtered out { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2508,7 +2510,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2523,7 +2525,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { }, { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -2632,7 +2634,7 @@ func TestResolveLockNoCandidate(t *testing.T) { cluster.AddStore(storeID, addr1) cluster.Bootstrap(regionID, []uint64{storeID}, []uint64{peerID}, peerID) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2654,7 +2656,7 @@ func TestResolveLockNoCandidate(t *testing.T) { // wait request id allocated with: new session, new request waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(regionID, currentRequestID()) + initialized := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized var wg2 sync.WaitGroup @@ -2668,7 +2670,7 @@ func TestResolveLockNoCandidate(t *testing.T) { resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: tso}, }, }} @@ -2728,7 +2730,7 @@ func TestFailRegionReentrant(t *testing.T) { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientRegionReentrantError") _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientRegionReentrantErrorDelay") }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -2753,7 +2755,7 @@ func TestFailRegionReentrant(t *testing.T) { unknownErr := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{}, }, @@ -2761,7 +2763,7 @@ func TestFailRegionReentrant(t *testing.T) { }} ch1 <- unknownErr // use a fake event to trigger one more stream.Recv - initialized := mockInitializedEvent(regionID, currentRequestID()) + initialized := mockInitializedEvent(regionID, getCurrentRequestID()) ch1 <- initialized // since re-establish new region request is delayed by `kvClientRegionReentrantErrorDelay` // there will be reentrant region failover, the kv client should not panic. @@ -2899,14 +2901,14 @@ func testClientErrNoPendingRegion(t *testing.T) { require.Equal(t, context.Canceled, errors.Cause(err)) }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() // wait the second region is scheduled time.Sleep(time.Millisecond * 500) waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(regionID3, currentRequestID()) + initialized := mockInitializedEvent(regionID3, getCurrentRequestID()) ch1 <- initialized waitRequestID(t, baseAllocatedID+2) - initialized = mockInitializedEvent(regionID4, currentRequestID()) + initialized = mockInitializedEvent(regionID4, getCurrentRequestID()) ch1 <- initialized // wait the kvClientPendingRegionDelay ends, and the second region is processed time.Sleep(time.Second * 2) @@ -2977,9 +2979,9 @@ func testKVClientForceReconnect(t *testing.T) { require.Equal(t, context.Canceled, errors.Cause(err)) }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() waitRequestID(t, baseAllocatedID+1) - initialized := mockInitializedEvent(regionID3, currentRequestID()) + initialized := mockInitializedEvent(regionID3, getCurrentRequestID()) ch1 <- initialized // Connection close for timeout @@ -3225,7 +3227,7 @@ func TestEvTimeUpdate(t *testing.T) { reconnectInterval = originalReconnectInterval }() - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -3253,7 +3255,7 @@ func TestEvTimeUpdate(t *testing.T) { events := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Entries_{ Entries: &cdcpb.Event_Entries{ Entries: []*cdcpb.Event_Row{{ @@ -3351,7 +3353,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() changefeed := model.DefaultChangeFeedID("changefeed-test") lockResolver := txnutil.NewLockerResolver(kvStorage, changefeed, util.RoleTester) grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) @@ -3377,7 +3379,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { epochNotMatch := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: 3, - RequestId: currentRequestID(), + RequestId: getCurrentRequestID(), Event: &cdcpb.Event_Error{ Error: &cdcpb.Error{ EpochNotMatch: &errorpb.EpochNotMatch{}, @@ -3454,7 +3456,7 @@ func TestPrewriteNotMatchError(t *testing.T) { ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) - baseAllocatedID := currentRequestID() + baseAllocatedID := getCurrentRequestID() wg.Add(1) go func() { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index ddd1c9046fa..593f1751958 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -92,6 +92,7 @@ for event processing to increase throughput. type regionWorker struct { parentCtx context.Context session *eventFeedSession + stream *eventFeedStream inputCh chan []*regionStatefulEvent outputCh chan<- model.RegionFeedEvent @@ -108,11 +109,8 @@ type regionWorker struct { metrics *regionWorkerMetrics - storeAddr string - // how many pending input events - inputPending int32 - + inputPending int32 pendingRegions *syncRegionFeedStateMap } @@ -143,10 +141,13 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric } func newRegionWorker( - ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, + ctx context.Context, + stream *eventFeedStream, + s *eventFeedSession, pendingRegions *syncRegionFeedStateMap, ) *regionWorker { return ®ionWorker{ +<<<<<<< HEAD parentCtx: ctx, session: s, inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), @@ -160,6 +161,20 @@ func newRegionWorker( metrics: newRegionWorkerMetrics(changefeedID), inputPending: 0, +======= + parentCtx: ctx, + session: s, + inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), + outputCh: s.eventCh, + stream: stream, + errorCh: make(chan error, 1), + statesManager: newRegionStateManager(-1), + rtsManager: newRegionTsManager(), + rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), + concurrency: int(s.client.config.KVClient.WorkerConcurrent), + metrics: newRegionWorkerMetrics(s.changefeed, strconv.FormatInt(s.tableID, 10), stream.addr), + inputPending: 0, +>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) pendingRegions: pendingRegions, } } @@ -197,6 +212,15 @@ func (w *regionWorker) checkShouldExit() error { // If there is no region maintained by this region worker, exit it and // cancel the gRPC stream. if empty && w.pendingRegions.len() == 0 { + log.Info("A single region error happens before, "+ + "and there is no region maintained by this region worker, "+ + "exit it and cancel the gRPC stream", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.String("storeAddr", w.stream.addr), + zap.Uint64("streamID", w.stream.id), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName)) w.cancelStream(time.Duration(0)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } @@ -238,6 +262,18 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState // `ErrPrewriteNotMatch` would cause duplicated request to the same region, // so cancel the original gRPC stream before restarts a new stream. if cerror.ErrPrewriteNotMatch.Equal(err) { + log.Info("meet ErrPrewriteNotMatch error, cancel the gRPC stream", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.String("storeAddr", w.stream.addr), + zap.Uint64("streamID", w.stream.id), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName), + zap.Uint64("regionID", regionID), + zap.Uint64("requestID", state.requestID), + zap.Stringer("span", &state.sri.span), + zap.Uint64("resolvedTs", state.sri.resolvedTs()), + zap.Error(err)) w.cancelStream(time.Second) } @@ -340,7 +376,10 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), - zap.String("addr", w.storeAddr), + zap.String("storeAddr", w.stream.addr), + zap.Uint64("streamID", w.stream.id), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName), zap.Uint64("regionID", rts.regionID), zap.Stringer("span", &state.sri.span), zap.Duration("duration", sinceLastResolvedTs), @@ -563,6 +602,13 @@ func (w *regionWorker) collectWorkpoolError(ctx context.Context) error { func (w *regionWorker) checkErrorReconnect(err error) error { if errors.Cause(err) == errReconnect { + log.Info("kv client reconnect triggered, cancel the gRPC stream", + zap.String("namespace", w.session.client.changefeed.Namespace), + zap.String("changefeed", w.session.client.changefeed.ID), + zap.String("storeAddr", w.stream.addr), + zap.Uint64("streamID", w.stream.id), + zap.Int64("tableID", w.session.tableID), + zap.String("tableName", w.session.tableName)) w.cancelStream(time.Second) // if stream is already deleted, just ignore errReconnect return nil @@ -570,22 +616,18 @@ func (w *regionWorker) checkErrorReconnect(err error) error { return err } +// Note(dongmen): Please log the reason of calling this function in the caller. +// This will be helpful for troubleshooting. func (w *regionWorker) cancelStream(delay time.Duration) { - cancel, ok := w.session.getStreamCancel(w.storeAddr) - if ok { - // cancel the stream to trigger strem.Recv with context cancel error - // Note use context cancel is the only way to terminate a gRPC stream - cancel() - // Failover in stream.Recv has 0-100ms delay, the onRegionFail - // should be called after stream has been deleted. Add a delay here - // to avoid too frequent region rebuilt. - time.Sleep(delay) - } else { - log.Warn("gRPC stream cancel func not found", - zap.String("addr", w.storeAddr), - zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID)) - } + // cancel the stream to make strem.Recv returns a context cancel error + // This will make the receiveFromStream goroutine exit and the stream can + // be re-established by the caller. + // Note: use context cancel is the only way to terminate a gRPC stream. + w.stream.cancel() + // Failover in stream.Recv has 0-100ms delay, the onRegionFail + // should be called after stream has been deleted. Add a delay here + // to avoid too frequent region rebuilt. + time.Sleep(delay) } func (w *regionWorker) run() error { diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index c8041246a96..370b28f5380 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -159,7 +159,11 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { &tikv.RPCContext{}), 0) state.sri.lockedRange = ®ionlock.LockedRange{} state.start() - worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) + stream := &eventFeedStream{ + storeID: 1, + id: 2, + } + worker := newRegionWorker(ctx, stream, s, newSyncRegionFeedStateMap()) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -322,7 +326,11 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { s1.sri.lockedRange = ®ionlock.LockedRange{} s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() - w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) + stream := &eventFeedStream{ + storeID: 1, + id: 2, + } + w := newRegionWorker(ctx, stream, s, newSyncRegionFeedStateMap()) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5, From ac400b8694811385bc50b14d6f227df99118ad84 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 26 Feb 2024 10:04:03 +0800 Subject: [PATCH 2/6] resolve conflict --- cdc/entry/schema_test_helper.go | 128 -------------------------------- cdc/kv/client.go | 86 +-------------------- cdc/kv/region_worker.go | 18 +---- 3 files changed, 5 insertions(+), 227 deletions(-) diff --git a/cdc/entry/schema_test_helper.go b/cdc/entry/schema_test_helper.go index 7bbc03459f2..e4f639e0917 100644 --- a/cdc/entry/schema_test_helper.go +++ b/cdc/entry/schema_test_helper.go @@ -17,15 +17,10 @@ package entry import ( -<<<<<<< HEAD -======= - "context" ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) "encoding/json" "strings" "testing" -<<<<<<< HEAD ticonfig "github.com/pingcap/tidb/config" tiddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" @@ -35,23 +30,6 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/testkit" -======= - ticonfig "github.com/pingcap/tidb/pkg/config" - tiddl "github.com/pingcap/tidb/pkg/ddl" - "github.com/pingcap/tidb/pkg/domain" - "github.com/pingcap/tidb/pkg/kv" - timeta "github.com/pingcap/tidb/pkg/meta" - timodel "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/session" - "github.com/pingcap/tidb/pkg/store/mockstore" - "github.com/pingcap/tidb/pkg/testkit" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/filter" - "github.com/pingcap/tiflow/pkg/integrity" - "github.com/pingcap/tiflow/pkg/spanz" - "github.com/pingcap/tiflow/pkg/util" ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" ) @@ -152,112 +130,6 @@ func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job { return jobs } -<<<<<<< HEAD -======= -// DML2Event execute the dml and return the corresponding row changed event. -// caution: it does not support `delete` since the key value cannot be found -// after the query executed. -func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.RowChangedEvent { - s.tk.MustExec(dml) - - tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(schema, table) - require.True(s.t, ok) - - key, value := s.getLastKeyValue(tableInfo.ID) - ts := s.schemaStorage.GetLastSnapshot().CurrentTs() - rawKV := &model.RawKVEntry{ - OpType: model.OpTypePut, - Key: key, - Value: value, - OldValue: nil, - StartTs: ts - 1, - CRTs: ts + 1, - } - polymorphicEvent := model.NewPolymorphicEvent(rawKV) - err := s.mounter.DecodeEvent(context.Background(), polymorphicEvent) - require.NoError(s.t, err) - return polymorphicEvent.Row -} - -func (s *SchemaTestHelper) getLastKeyValue(tableID int64) (key, value []byte) { - txn, err := s.storage.Begin() - require.NoError(s.t, err) - defer txn.Rollback() //nolint:errcheck - - start, end := spanz.GetTableRange(tableID) - iter, err := txn.Iter(start, end) - require.NoError(s.t, err) - defer iter.Close() - for iter.Valid() { - key = iter.Key() - value = iter.Value() - err = iter.Next() - require.NoError(s.t, err) - } - return key, value -} - -// DDL2Event executes the DDL and return the corresponding event. -func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent { - s.tk.MustExec(ddl) - jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1) - require.NoError(s.t, err) - require.Len(s.t, jobs, 1) - // Set State from Synced to Done. - // Because jobs are put to history queue after TiDB alter its state from - // Done to Synced. - jobs[0].State = timodel.JobStateDone - res := jobs[0] - if res.Type == timodel.ActionRenameTables { - // the RawArgs field in job fetched from tidb snapshot meta is incorrent, - // so we manually construct `job.RawArgs` to do the workaround. - // we assume the old schema name is same as the new schema name here. - // for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test" - schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0] - tableNum := len(res.BinlogInfo.MultipleTableInfos) - oldSchemaIDs := make([]int64, tableNum) - for i := 0; i < tableNum; i++ { - oldSchemaIDs[i] = res.SchemaID - } - oldTableIDs := make([]int64, tableNum) - for i := 0; i < tableNum; i++ { - oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID - } - newTableNames := make([]timodel.CIStr, tableNum) - for i := 0; i < tableNum; i++ { - newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name - } - oldSchemaNames := make([]timodel.CIStr, tableNum) - for i := 0; i < tableNum; i++ { - oldSchemaNames[i] = timodel.NewCIStr(schema) - } - newSchemaIDs := oldSchemaIDs - - args := []interface{}{ - oldSchemaIDs, newSchemaIDs, - newTableNames, oldTableIDs, oldSchemaNames, - } - rawArgs, err := json.Marshal(args) - require.NoError(s.t, err) - res.RawArgs = rawArgs - } - - err = s.schemaStorage.HandleDDLJob(res) - require.NoError(s.t, err) - - ver, err := s.storage.CurrentVersion(oracle.GlobalTxnScope) - require.NoError(s.t, err) - s.schemaStorage.AdvanceResolvedTs(ver.Ver) - - ctx := context.Background() - - events, err := s.schemaStorage.BuildDDLEvents(ctx, res) - require.NoError(s.t, err) - - return events[0] -} - ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) // Storage returns the tikv storage func (s *SchemaTestHelper) Storage() kv.Storage { return s.storage diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 64dc19bb6af..2ebcf849edc 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -271,15 +271,11 @@ func (c *CDCClient) newStream( log.Debug("created stream to store", zap.String("namespace", c.changefeed.Namespace), zap.String("changefeed", c.changefeed.ID), -<<<<<<< HEAD - zap.String("addr", addr)) -======= zap.Int64("tableID", c.tableID), zap.String("tableName", c.tableName), zap.String("store", addr), zap.Uint64("storeID", storeID), zap.Uint64("streamID", stream.id)) ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) return nil } if c.config.Debug.EnableKVConnectBackOff { @@ -419,12 +415,9 @@ func newEventFeedSession( startTs uint64, eventCh chan<- model.RegionFeedEvent, ) *eventFeedSession { -<<<<<<< HEAD - id := allocID() - idStr := strconv.FormatUint(id, 10) -======= id := allocateRequestID() ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) + idStr := strconv.FormatUint(id, 10) + rangeLock := regionlock.NewRegionRangeLock( id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) @@ -435,7 +428,6 @@ func newEventFeedSession( tableID: client.tableID, tableName: client.tableName, -<<<<<<< HEAD totalSpan: totalSpan, eventCh: eventCh, rangeLock: rangeLock, @@ -444,21 +436,6 @@ func newEventFeedSession( regionChSizeGauge: clientChannelSize.WithLabelValues("region"), errChSizeGauge: clientChannelSize.WithLabelValues("err"), rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), - streams: make(map[string]*eventFeedStream), - streamsCanceller: make(map[string]context.CancelFunc), -======= - totalSpan: totalSpan, - eventCh: eventCh, - rangeLock: rangeLock, - lockResolver: lockResolver, - enableTableMonitor: enableTableMonitor, - regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, - client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), - errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, - client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), - rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, - client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -733,12 +710,8 @@ func (s *eventFeedSession) requestRegionToStore( zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), zap.Uint64("storeID", storeID), -<<<<<<< HEAD - zap.String("addr", storeAddr)) -======= zap.String("store", storeAddr), zap.Uint64("streamID", stream.id)) ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) g.Go(func() error { return s.receiveFromStream(ctx, stream, pendingRegions) @@ -758,12 +731,8 @@ func (s *eventFeedSession) requestRegionToStore( state := newRegionFeedState(sri, requestID) pendingRegions.setByRequestID(requestID, state) -<<<<<<< HEAD log.Info("start new request", -======= - s.client.logRegionDetails("start new request", ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), @@ -782,11 +751,8 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("tableName", s.tableName), zap.String("addr", storeAddr), zap.Uint64("storeID", storeID), -<<<<<<< HEAD -======= zap.String("store", storeAddr), zap.Uint64("streamID", stream.id), ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), zap.Error(err)) @@ -796,11 +762,7 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), -<<<<<<< HEAD - zap.String("addr", storeAddr), -======= zap.Uint64("streamID", stream.id), ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) zap.Uint64("storeID", storeID), zap.Uint64("regionID", regionID), zap.Uint64("requestID", requestID), @@ -821,17 +783,13 @@ func (s *eventFeedSession) requestRegionToStore( if !ok { continue } -<<<<<<< HEAD - -======= - s.client.logRegionDetails("region send to store failed", + log.Debug("region send to store failed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), zap.Any("regionId", sri.verID.GetID()), zap.Stringer("span", &sri.span)) ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{}) s.onRegionFail(ctx, errInfo) } @@ -1086,15 +1044,11 @@ func (s *eventFeedSession) receiveFromStream( log.Info("stream to store closed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), -<<<<<<< HEAD - zap.String("addr", addr), zap.Uint64("storeID", storeID)) -======= zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), zap.String("store", stream.addr), zap.Uint64("storeID", stream.storeID), zap.Uint64("streamID", stream.id)) ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) failpoint.Inject("kvClientStreamCloseDelay", nil) @@ -1107,13 +1061,6 @@ func (s *eventFeedSession) receiveFromStream( metricSendEventBatchResolvedSize := batchResolvedEventSize. WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) -<<<<<<< HEAD -======= - metricReceiveBusyRatio := workerBusyRatio.WithLabelValues( - s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-receiver") - metricProcessBusyRatio := workerBusyRatio.WithLabelValues( - s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-processor") ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic @@ -1139,12 +1086,6 @@ func (s *eventFeedSession) receiveFromStream( eg.Go(func() error { err := handleExit(worker.run()) if err != nil { -<<<<<<< HEAD - log.Error("region worker exited with error", zap.Error(err), - zap.Any("changefeed", s.changefeed), - zap.Any("addr", addr), - zap.Any("storeID", storeID)) -======= log.Error("region worker exited with error", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -1154,7 +1095,6 @@ func (s *eventFeedSession) receiveFromStream( zap.Uint64("storeID", stream.storeID), zap.Uint64("streamID", stream.id), zap.Error(err)) ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) } return err }) @@ -1162,12 +1102,7 @@ func (s *eventFeedSession) receiveFromStream( receiveEvents := func() error { maxCommitTs := model.Ts(0) for { -<<<<<<< HEAD - cevent, err := stream.Recv() -======= - startToReceive := time.Now() cevent, err := stream.client.Recv() ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) { if op.(string) == "error" { @@ -1188,35 +1123,22 @@ func (s *eventFeedSession) receiveFromStream( "receive from stream canceled", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), -<<<<<<< HEAD - zap.String("addr", addr), - zap.Uint64("storeID", storeID), - ) -======= zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), zap.String("store", stream.addr), zap.Uint64("storeID", stream.storeID), zap.Uint64("streamID", stream.id)) ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) } else { log.Warn( "failed to receive from stream", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), -<<<<<<< HEAD - zap.String("addr", addr), - zap.Uint64("storeID", storeID), - zap.Error(err), - ) -======= zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), zap.String("store", stream.addr), zap.Uint64("storeID", stream.storeID), zap.Uint64("streamID", stream.id), zap.Error(err)) ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) // Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down // tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new // election @@ -1261,7 +1183,7 @@ func (s *eventFeedSession) receiveFromStream( } } } - err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, addr) + err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, stream.addr) if err != nil { return err } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 593f1751958..66ec757d6a1 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -147,21 +147,6 @@ func newRegionWorker( pendingRegions *syncRegionFeedStateMap, ) *regionWorker { return ®ionWorker{ -<<<<<<< HEAD - parentCtx: ctx, - session: s, - inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), - outputCh: s.eventCh, - errorCh: make(chan error, 1), - statesManager: newRegionStateManager(-1), - rtsManager: newRegionTsManager(), - rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), - storeAddr: addr, - concurrency: s.client.config.KVClient.WorkerConcurrent, - metrics: newRegionWorkerMetrics(changefeedID), - inputPending: 0, - -======= parentCtx: ctx, session: s, inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), @@ -172,9 +157,8 @@ func newRegionWorker( rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), concurrency: int(s.client.config.KVClient.WorkerConcurrent), - metrics: newRegionWorkerMetrics(s.changefeed, strconv.FormatInt(s.tableID, 10), stream.addr), + metrics: newRegionWorkerMetrics(s.changefeed), inputPending: 0, ->>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559)) pendingRegions: pendingRegions, } } From a8576c744fca6a4c914cb500bf62aa93a94993b5 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Thu, 8 Feb 2024 13:46:15 +0800 Subject: [PATCH 3/6] resolve conflict --- cdc/kv/client.go | 302 ++++++++++++++++------------------- cdc/kv/client_test.go | 6 + cdc/kv/region_worker.go | 46 +++--- cdc/kv/region_worker_test.go | 6 +- 4 files changed, 169 insertions(+), 191 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 2ebcf849edc..57e3e6bff5f 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -137,6 +137,8 @@ func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo { type eventFeedStream struct { client cdcpb.ChangeData_EventFeedClient conn *sharedConn + // regions is used to store the state of the regions that are being processed by the stream. + regions *syncRegionFeedStateMap // addr is the address of the TiKV store addr string // storeID is the ID of the TiKV store @@ -144,7 +146,15 @@ type eventFeedStream struct { // id is the stream ID, which is used to identify the stream. id uint64 // cancel is used to cancel the gRPC stream - cancel context.CancelFunc + cancel context.CancelFunc + isCanceled atomic.Bool + createTime time.Time +} + +func (s *eventFeedStream) close() { + if s.isCanceled.CompareAndSwap(false, true) { + s.cancel() + } } // CDCKVClient is an interface to receives kv changed logs from TiKV @@ -235,59 +245,58 @@ func NewCDCClient( func (c *CDCClient) newStream( ctx context.Context, - cancel context.CancelFunc, addr string, storeID uint64, -) (stream *eventFeedStream, newStreamErr error) { - streamFunc := func() (err error) { - var conn *sharedConn - defer func() { - if err != nil && conn != nil { - c.grpcPool.ReleaseConn(conn, addr) - } - }() - conn, err = c.grpcPool.GetConn(addr) - if err != nil { - return errors.Trace(err) - } - err = version.CheckStoreVersion(ctx, c.pd, storeID) - if err != nil { - return errors.Trace(err) - } - client := cdcpb.NewChangeDataClient(conn.ClientConn) - var streamClient cdcpb.ChangeData_EventFeedClient - streamClient, err = client.EventFeed(ctx) - if err != nil { - return cerror.WrapError(cerror.ErrTiKVEventFeed, err) - } - stream = &eventFeedStream{ - client: streamClient, - conn: conn, - addr: addr, - storeID: storeID, - id: allocateStreamID(), - cancel: cancel, +) (stream *eventFeedStream, err error) { + var conn *sharedConn + defer func() { + if err != nil && conn != nil { + c.grpcPool.ReleaseConn(conn, addr) } - log.Debug("created stream to store", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Int64("tableID", c.tableID), - zap.String("tableName", c.tableName), - zap.String("store", addr), - zap.Uint64("storeID", storeID), - zap.Uint64("streamID", stream.id)) - return nil + }() + + conn, err = c.grpcPool.GetConn(addr) + if err != nil { + return nil, errors.Trace(err) } - if c.config.Debug.EnableKVConnectBackOff { - newStreamErr = retry.Do(ctx, streamFunc, - retry.WithBackoffBaseDelay(100), - retry.WithMaxTries(2), - retry.WithIsRetryableErr(cerror.IsRetryableError), - ) - return + + ctx, cancel := context.WithCancel(ctx) + err = version.CheckStoreVersion(ctx, c.pd, storeID) + if err != nil { + cancel() + return nil, errors.Trace(err) } - newStreamErr = streamFunc() - return + + client := cdcpb.NewChangeDataClient(conn.ClientConn) + var streamClient cdcpb.ChangeData_EventFeedClient + + streamClient, err = client.EventFeed(ctx) + if err != nil { + cancel() + return nil, cerror.WrapError(cerror.ErrTiKVEventFeed, err) + } + + stream = &eventFeedStream{ + client: streamClient, + conn: conn, + regions: newSyncRegionFeedStateMap(), + addr: addr, + storeID: storeID, + id: allocateStreamID(), + cancel: cancel, + isCanceled: atomic.Bool{}, + createTime: time.Now(), + } + + log.Info("created stream to store", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Int64("tableID", c.tableID), + zap.String("tableName", c.tableName), + zap.String("store", addr), + zap.Uint64("storeID", storeID), + zap.Uint64("streamID", stream.id)) + return stream, nil } // EventFeed divides a EventFeed request on range boundaries and establishes @@ -390,14 +399,9 @@ type eventFeedSession struct { rangeChSizeGauge prometheus.Gauge // storeStreamsCache is used to cache the established gRPC streams to TiKV stores. - storeStreamsCache struct { - sync.RWMutex - m map[string]*eventFeedStream - // lastAlterTime is used to record last time a stream is created/deleted to/from the cache. - // It is used to avoid creation/deleting streams too frequently, which may cause - // huge overhead of incremental region scanning in TiKV. - lastAlterTime map[string]time.Time - } + // Note: The cache is not thread-safe, so it should be accessed in the same goroutine. + // For now, it is only accessed in the `requestRegionToStore` goroutine. + storeStreamsCache map[string]*eventFeedStream // use sync.Pool to store resolved ts event only, because resolved ts event // has the same size and generate cycle. @@ -421,21 +425,24 @@ func newEventFeedSession( rangeLock := regionlock.NewRegionRangeLock( id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) - res := &eventFeedSession{ - client: client, - startTs: startTs, - changefeed: client.changefeed, - tableID: client.tableID, - tableName: client.tableName, - - totalSpan: totalSpan, - eventCh: eventCh, - rangeLock: rangeLock, - lockResolver: lockResolver, - id: idStr, - regionChSizeGauge: clientChannelSize.WithLabelValues("region"), - errChSizeGauge: clientChannelSize.WithLabelValues("err"), - rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), + return &eventFeedSession{ + client: client, + startTs: startTs, + changefeed: client.changefeed, + tableID: client.tableID, + tableName: client.tableName, + storeStreamsCache: make(map[string]*eventFeedStream), + totalSpan: totalSpan, + eventCh: eventCh, + rangeLock: rangeLock, + lockResolver: lockResolver, + enableTableMonitor: enableTableMonitor, + regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), + errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), + rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, + client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -444,9 +451,6 @@ func newEventFeedSession( }, }, } - res.storeStreamsCache.m = make(map[string]*eventFeedStream) - res.storeStreamsCache.lastAlterTime = make(map[string]time.Time) - return res } func (s *eventFeedSession) eventFeed(ctx context.Context) error { @@ -625,11 +629,6 @@ func (s *eventFeedSession) requestRegionToStore( ctx context.Context, g *errgroup.Group, ) error { - // Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map, - // and it will be loaded by the receiver thread when it receives the first response from that region. We need this - // to pass the region info to the receiver since the region info cannot be inferred from the response from TiKV. - storePendingRegions := make(map[uint64]*syncRegionFeedStateMap) - header := &cdcpb.Header{ ClusterId: s.client.clusterID, TicdcVersion: version.ReleaseSemver(), @@ -666,18 +665,18 @@ func (s *eventFeedSession) requestRegionToStore( // each TiKV store has an independent pendingRegions. storeAddr := rpcCtx.Addr storeID := rpcCtx.Peer.GetStoreId() + var ( stream *eventFeedStream err error ) - stream, ok := s.getStream(storeAddr) - if !ok { - // when a new stream is established, always create a new pending - // regions map, the old map will be used in old `receiveFromStream` - // and won't be deleted until that goroutine exits. - pendingRegions := newSyncRegionFeedStateMap() - streamCtx, streamCancel := context.WithCancel(ctx) - stream, err = s.client.newStream(streamCtx, streamCancel, storeAddr, storeID) + stream, ok := s.storeStreamsCache[storeAddr] + if !ok || stream.isCanceled.Load() { + if ok { + // If the stream is canceled, we need to delete it from the cache and close it. + s.deleteStream(stream) + } + stream, err = s.client.newStream(ctx, storeAddr, storeID) if err != nil { // get stream failed, maybe the store is down permanently, we should try to relocate the active store log.Warn("get grpc stream client failed", @@ -702,8 +701,8 @@ func (s *eventFeedSession) requestRegionToStore( continue } - storePendingRegions[stream.id] = pendingRegions s.addStream(stream) + log.Info("creating new stream to store to send request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -712,26 +711,13 @@ func (s *eventFeedSession) requestRegionToStore( zap.Uint64("storeID", storeID), zap.String("store", storeAddr), zap.Uint64("streamID", stream.id)) - g.Go(func() error { - return s.receiveFromStream(ctx, stream, pendingRegions) + return s.receiveFromStream(ctx, stream) }) } - pendingRegions, ok := storePendingRegions[stream.id] - if !ok { - // Should never happen - log.Panic("pending regions is not found for store", - zap.String("namespace", s.changefeed.Namespace), - zap.String("changefeed", s.changefeed.ID), - zap.Int64("tableID", s.tableID), - zap.String("tableName", s.tableName), - zap.String("store", storeAddr)) - } - state := newRegionFeedState(sri, requestID) - pendingRegions.setByRequestID(requestID, state) - + stream.regions.setByRequestID(requestID, state) log.Info("start new request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -741,8 +727,6 @@ func (s *eventFeedSession) requestRegionToStore( zap.String("addr", storeAddr)) err = stream.client.Send(req) - // If Send returns an error, the stream.client.Recv (In s.receiveFromStream) - // would also receive an error. if err != nil { log.Warn("send request to stream failed", zap.String("namespace", s.changefeed.Namespace), @@ -768,18 +752,13 @@ func (s *eventFeedSession) requestRegionToStore( zap.Uint64("requestID", requestID), zap.Error(err)) } - // Delete the stream from the cache so that when next time the store is accessed, - // the stream can be re-established. - s.deleteStream(stream) - // Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is - // requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all - // pending regions, the new pending regions that are requested after reconnecting won't be stopped - // incorrectly. - delete(storePendingRegions, stream.id) + // Delete the stream from the cache so that when next time a region of + // this store is requested, a new stream to this store will be created. + s.deleteStream(stream) // Remove the region from pendingRegions. If it's already removed, it should be already retried by // `receiveFromStream`, so no need to retry here. - _, ok := pendingRegions.takeByRequestID(requestID) + _, ok := stream.regions.takeByRequestID(requestID) if !ok { continue } @@ -1026,7 +1005,6 @@ func (s *eventFeedSession) getRPCContextForRegion(ctx context.Context, id tikv.R func (s *eventFeedSession) receiveFromStream( parentCtx context.Context, stream *eventFeedStream, - pendingRegions *syncRegionFeedStateMap, ) error { var tsStat *tableStoreStat s.client.tableStoreStats.Lock() @@ -1052,7 +1030,7 @@ func (s *eventFeedSession) receiveFromStream( failpoint.Inject("kvClientStreamCloseDelay", nil) - remainingRegions := pendingRegions.takeAll() + remainingRegions := stream.regions.takeAll() for _, state := range remainingRegions { errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs()) s.onRegionFail(parentCtx, errInfo) @@ -1064,11 +1042,7 @@ func (s *eventFeedSession) receiveFromStream( // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic - worker := newRegionWorker( - parentCtx, - stream, - s, - pendingRegions) + worker := newRegionWorker(parentCtx, stream, s) defer worker.evictAllRegions() ctx, cancel := context.WithCancel(parentCtx) @@ -1183,7 +1157,8 @@ func (s *eventFeedSession) receiveFromStream( } } } - err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, stream.addr) + + err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, stream.regions) if err != nil { return err } @@ -1353,34 +1328,41 @@ func (s *eventFeedSession) sendResolvedTs( return nil } +// addStream adds a stream to the session.streams. +// Note: It must be called with deleteStream in a same goroutine. func (s *eventFeedSession) addStream(stream *eventFeedStream) { - s.storeStreamsCache.Lock() - defer s.storeStreamsCache.Unlock() - if lastTime, ok := s.storeStreamsCache.lastAlterTime[stream.addr]; ok { - if time.Since(lastTime) < streamAlterInterval { - log.Warn( - "add a stream of a same store too frequently, wait a while and try again", + oldStream, ok := s.storeStreamsCache[stream.addr] + if ok { + failpoint.Inject("kvClientAddDuplicatedStream", func() { + log.Error("A stream to a same store already exists, it shouldn't happen, please report a bug", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.Uint64("storeID", stream.storeID), - zap.Duration("sinceLastTime", time.Since(lastTime))) - time.Sleep(streamAlterInterval - time.Since(lastTime)) - } + zap.Uint64("oldStreamID", oldStream.id), + zap.Uint64("newStreamID", stream.id)) + }) + // There is no need to return an error here because even if it happens, + // it does not harm the data correctness, but may only cause some lag spikes. + // Log it to help us improve the code. + log.Error("A stream to a same store already exists, it shouldn't happen, please report a bug", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("oldStreamID", oldStream.id), + zap.Uint64("newStreamID", stream.id)) } - s.storeStreamsCache.m[stream.addr] = stream - s.storeStreamsCache.lastAlterTime[stream.addr] = time.Now() + s.storeStreamsCache[stream.addr] = stream } // deleteStream deletes a stream from the session.streams. -// If the stream is not found, it will be ignored. +// If the stream is not found, it takes no effect. func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { - s.storeStreamsCache.Lock() - defer s.storeStreamsCache.Unlock() - - streamInMap, ok := s.storeStreamsCache.m[streamToDelete.addr] + streamInMap, ok := s.storeStreamsCache[streamToDelete.addr] if !ok { + // This should not happen, but it will be no harm if it happens. + // Log a warning message to help us diagnose the problem. log.Warn("delete stream failed, stream not found, ignore it", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -1390,8 +1372,9 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { zap.Uint64("streamIDInMap", streamInMap.id)) return } - if streamInMap.id != streamToDelete.id { + // This should not happen, but it will be no harm if it happens. + // Log a warning message to help us diagnose the problem. log.Warn("delete stream failed, stream id mismatch, ignore it", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -1402,25 +1385,20 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { return } - if lastTime, ok := s.storeStreamsCache.lastAlterTime[streamToDelete.addr]; ok { - if time.Since(lastTime) < streamAlterInterval { - log.Warn( - "delete a stream of a same store too frequently, wait a while and try again", - zap.String("namespace", s.changefeed.Namespace), - zap.String("changefeed", s.changefeed.ID), - zap.Int64("tableID", s.tableID), - zap.String("tableName", s.tableName), - zap.Uint64("streamID", streamToDelete.id), - zap.Duration("sinceLastTime", time.Since(lastTime))) - time.Sleep(streamAlterInterval - time.Since(lastTime)) - } + if time.Since(streamToDelete.createTime) < streamAlterInterval { + log.Warn("It's too soon to delete a stream, wait for a while", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("streamID", streamToDelete.id), + zap.Duration("sinceCreateDuration", time.Since(streamToDelete.createTime))) + time.Sleep(streamAlterInterval - time.Since(streamToDelete.createTime)) } s.client.grpcPool.ReleaseConn(streamToDelete.conn, streamToDelete.addr) - delete(s.storeStreamsCache.m, streamToDelete.addr) - streamToDelete.cancel() - s.storeStreamsCache.lastAlterTime[streamToDelete.addr] = time.Now() - + streamToDelete.close() + delete(s.storeStreamsCache, streamToDelete.addr) log.Info("A stream to store has been removed", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -1428,14 +1406,8 @@ func (s *eventFeedSession) deleteStream(streamToDelete *eventFeedStream) { zap.String("tableName", s.tableName), zap.String("store", streamToDelete.addr), zap.Uint64("storeID", streamToDelete.storeID), - zap.Uint64("streamID", streamToDelete.id)) -} - -func (s *eventFeedSession) getStream(storeAddr string) (stream *eventFeedStream, ok bool) { - s.storeStreamsCache.RLock() - defer s.storeStreamsCache.RUnlock() - stream, ok = s.storeStreamsCache.m[storeAddr] - return + zap.Uint64("streamID", streamToDelete.id), + zap.Duration("sinceCreateDuration", time.Since(streamToDelete.createTime))) } func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 8ef2e59e7e7..2504c1fb3b8 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -1376,6 +1376,12 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID, []uint64{1}, []uint64{4}, 4) + err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientAddDuplicatedStream", failpointStr) + require.Nil(t, err) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/kv/kvClientAddDuplicatedStream") + }() + err = failpoint.Enable("github.com/pingcap/tiflow/cdc/kv/kvClientStreamRecvError", failpointStr) require.Nil(t, err) defer func() { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 66ec757d6a1..76ece9a6dd0 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -18,6 +18,7 @@ import ( "encoding/hex" "reflect" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -109,9 +110,8 @@ type regionWorker struct { metrics *regionWorkerMetrics - // how many pending input events - inputPending int32 - pendingRegions *syncRegionFeedStateMap + // how many pending input events from the input channel + inputPendingEvents int32 } func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { @@ -144,22 +144,20 @@ func newRegionWorker( ctx context.Context, stream *eventFeedStream, s *eventFeedSession, - pendingRegions *syncRegionFeedStateMap, ) *regionWorker { return ®ionWorker{ - parentCtx: ctx, - session: s, - inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), - outputCh: s.eventCh, - stream: stream, - errorCh: make(chan error, 1), - statesManager: newRegionStateManager(-1), - rtsManager: newRegionTsManager(), - rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), - concurrency: int(s.client.config.KVClient.WorkerConcurrent), - metrics: newRegionWorkerMetrics(s.changefeed), - inputPending: 0, - pendingRegions: pendingRegions, + parentCtx: ctx, + session: s, + inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), + outputCh: s.eventCh, + stream: stream, + errorCh: make(chan error, 1), + statesManager: newRegionStateManager(-1), + rtsManager: newRegionTsManager(), + rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), + concurrency: int(s.client.config.KVClient.WorkerConcurrent), + metrics: newRegionWorkerMetrics(s.changefeed, strconv.FormatInt(s.tableID, 10), stream.addr), + inputPendingEvents: 0, } } @@ -195,9 +193,9 @@ func (w *regionWorker) checkShouldExit() error { empty := w.checkRegionStateEmpty() // If there is no region maintained by this region worker, exit it and // cancel the gRPC stream. - if empty && w.pendingRegions.len() == 0 { + if empty && w.stream.regions.len() == 0 { log.Info("A single region error happens before, "+ - "and there is no region maintained by this region worker, "+ + "and there is no region maintained by the stream, "+ "exit it and cancel the gRPC stream", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), @@ -499,13 +497,13 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { } regionEventsBatchSize.Observe(float64(len(events))) - inputPending := atomic.LoadInt32(&w.inputPending) + inputPending := atomic.LoadInt32(&w.inputPendingEvents) if highWatermarkMet { highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark } else { highWatermarkMet = int(inputPending) >= regionWorkerHighWatermark } - atomic.AddInt32(&w.inputPending, -int32(len(events))) + atomic.AddInt32(&w.inputPendingEvents, -int32(len(events))) if highWatermarkMet { // All events in one batch can be hashed into one handle slot. @@ -607,7 +605,7 @@ func (w *regionWorker) cancelStream(delay time.Duration) { // This will make the receiveFromStream goroutine exit and the stream can // be re-established by the caller. // Note: use context cancel is the only way to terminate a gRPC stream. - w.stream.cancel() + w.stream.close() // Failover in stream.Recv has 0-100ms delay, the onRegionFail // should be called after stream has been deleted. Add a delay here // to avoid too frequent region rebuilt. @@ -870,10 +868,10 @@ func (w *regionWorker) evictAllRegions() { // sendEvents puts events into inputCh and updates some internal states. // Callers must ensure that all items in events can be hashed into one handle slot. func (w *regionWorker) sendEvents(ctx context.Context, events []*regionStatefulEvent) error { - atomic.AddInt32(&w.inputPending, int32(len(events))) + atomic.AddInt32(&w.inputPendingEvents, int32(len(events))) select { case <-ctx.Done(): - atomic.AddInt32(&w.inputPending, -int32(len(events))) + atomic.AddInt32(&w.inputPendingEvents, -int32(len(events))) return ctx.Err() case w.inputCh <- events: return nil diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index 370b28f5380..992ec6b382a 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -162,8 +162,9 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { stream := &eventFeedStream{ storeID: 1, id: 2, + regions: newSyncRegionFeedStateMap(), } - worker := newRegionWorker(ctx, stream, s, newSyncRegionFeedStateMap()) + worker := newRegionWorker(ctx, stream, s) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -329,8 +330,9 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { stream := &eventFeedStream{ storeID: 1, id: 2, + regions: newSyncRegionFeedStateMap(), } - w := newRegionWorker(ctx, stream, s, newSyncRegionFeedStateMap()) + w := newRegionWorker(ctx, stream, s) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5, From 91cc40ad09fdf58f4a300c13166b8f73ea43d151 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 26 Feb 2024 10:28:42 +0800 Subject: [PATCH 4/6] resolve conflict 2 --- cdc/kv/client.go | 25 +++++++++++-------------- cdc/kv/region_worker.go | 3 +-- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 57e3e6bff5f..7b081d61c83 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -420,23 +420,20 @@ func newEventFeedSession( eventCh chan<- model.RegionFeedEvent, ) *eventFeedSession { id := allocateRequestID() - idStr := strconv.FormatUint(id, 10) - rangeLock := regionlock.NewRegionRangeLock( id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) return &eventFeedSession{ - client: client, - startTs: startTs, - changefeed: client.changefeed, - tableID: client.tableID, - tableName: client.tableName, - storeStreamsCache: make(map[string]*eventFeedStream), - totalSpan: totalSpan, - eventCh: eventCh, - rangeLock: rangeLock, - lockResolver: lockResolver, - enableTableMonitor: enableTableMonitor, + client: client, + startTs: startTs, + changefeed: client.changefeed, + tableID: client.tableID, + tableName: client.tableName, + storeStreamsCache: make(map[string]*eventFeedStream), + totalSpan: totalSpan, + eventCh: eventCh, + rangeLock: rangeLock, + lockResolver: lockResolver, regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, @@ -1158,7 +1155,7 @@ func (s *eventFeedSession) receiveFromStream( } } - err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, stream.regions) + err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, stream.regions, stream.addr) if err != nil { return err } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 76ece9a6dd0..3ab2e6df4ae 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -18,7 +18,6 @@ import ( "encoding/hex" "reflect" "runtime" - "strconv" "sync" "sync/atomic" "time" @@ -156,7 +155,7 @@ func newRegionWorker( rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), concurrency: int(s.client.config.KVClient.WorkerConcurrent), - metrics: newRegionWorkerMetrics(s.changefeed, strconv.FormatInt(s.tableID, 10), stream.addr), + metrics: newRegionWorkerMetrics(s.changefeed), inputPendingEvents: 0, } } From 88746eee38734ebb7eb008ee8ca6cc5231fb17fe Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 26 Feb 2024 11:16:44 +0800 Subject: [PATCH 5/6] fix error --- cdc/kv/client.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 7b081d61c83..6e951b2b6b2 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -18,7 +18,6 @@ import ( "fmt" "io" "math/rand" - "strconv" "strings" "sync" "sync/atomic" @@ -392,8 +391,6 @@ type eventFeedSession struct { rangeLock *regionlock.RegionRangeLock - // To identify metrics of different eventFeedSession - id string regionChSizeGauge prometheus.Gauge errChSizeGauge prometheus.Gauge rangeChSizeGauge prometheus.Gauge @@ -434,12 +431,9 @@ func newEventFeedSession( eventCh: eventCh, rangeLock: rangeLock, lockResolver: lockResolver, - regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, - client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), - errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, - client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), - rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, - client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), + regionChSizeGauge: clientChannelSize.WithLabelValues("region"), + errChSizeGauge: clientChannelSize.WithLabelValues("err"), + rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ From 0542167eb5eaf1d074efe4f5c84ced8cfe0cf556 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 26 Feb 2024 18:19:01 +0800 Subject: [PATCH 6/6] fix make check error --- cdc/kv/region_worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 3ab2e6df4ae..d296aa07660 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -154,7 +154,7 @@ func newRegionWorker( statesManager: newRegionStateManager(-1), rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), - concurrency: int(s.client.config.KVClient.WorkerConcurrent), + concurrency: s.client.config.KVClient.WorkerConcurrent, metrics: newRegionWorkerMetrics(s.changefeed), inputPendingEvents: 0, }