diff --git a/cdc/api/validator_test.go b/cdc/api/validator_test.go index 967e7b11cdb..8c52b6a7410 100644 --- a/cdc/api/validator_test.go +++ b/cdc/api/validator_test.go @@ -39,6 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { // test no change error changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"} oldInfo.SinkURI = "blackhole://" + oldInfo.Config.Sink.TxnAtomicity = "table" newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo) require.NotNil(t, err) require.Regexp(t, ".*changefeed config is the same with the old one.*", err) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 38f38bc2cc4..24f6e29f813 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -13,13 +13,18 @@ package model +import ( + "math" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + // PolymorphicEvent describes an event can be in multiple states. type PolymorphicEvent struct { - StartTs uint64 - // Commit or resolved TS - CRTs uint64 - // Identify whether the resolved event is in batch mode. - Mode ResolvedMode + StartTs uint64 + CRTs uint64 + Resolved *ResolvedTs RawKV *RawKVEntry Row *RowChangedEvent @@ -66,11 +71,6 @@ func (e *PolymorphicEvent) IsResolved() bool { return e.RawKV.OpType == OpTypeResolved } -// IsBatchResolved returns true if the event is batch resolved event. -func (e *PolymorphicEvent) IsBatchResolved() bool { - return e.IsResolved() && e.Mode == BatchResolvedMode -} - // ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order. // It returns true if and only if i should precede j. func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { @@ -108,16 +108,48 @@ const ( // ResolvedTs is the resolved timestamp of sink module. type ResolvedTs struct { - Ts uint64 - Mode ResolvedMode + Mode ResolvedMode + Ts uint64 + BatchID uint64 } -// NewResolvedTs creates a new ResolvedTs. +// NewResolvedTs creates a normal ResolvedTs. func NewResolvedTs(t uint64) ResolvedTs { - return ResolvedTs{Ts: t, Mode: NormalResolvedMode} + return ResolvedTs{Ts: t, Mode: NormalResolvedMode, BatchID: math.MaxUint64} +} + +// IsBatchMode returns true if the resolved ts is BatchResolvedMode. +func (r ResolvedTs) IsBatchMode() bool { + return r.Mode == BatchResolvedMode +} + +// ResolvedMark returns a timestamp `ts` based on the r.mode, which marks that all events +// whose commitTs is less than or equal to `ts` are sent to Sink. +func (r ResolvedTs) ResolvedMark() uint64 { + switch r.Mode { + case NormalResolvedMode: + // with NormalResolvedMode, cdc guarantees all events whose commitTs is + // less than or equal to `resolved.Ts` are sent to Sink. + return r.Ts + case BatchResolvedMode: + // with BatchResolvedMode, cdc guarantees all events whose commitTs is + // less than `resolved.Ts` are sent to Sink. + return r.Ts - 1 + default: + log.Error("unknown resolved mode", zap.Any("resolved", r)) + return 0 + } +} + +// EqualOrGreater judge whether the resolved ts is equal or greater than the given ts. +func (r ResolvedTs) EqualOrGreater(r1 ResolvedTs) bool { + if r.Ts == r1.Ts { + return r.BatchID >= r1.BatchID + } + return r.Ts > r1.Ts } -// NewResolvedTsWithMode creates a ResolvedTs with a given batch type. -func NewResolvedTsWithMode(t uint64, m ResolvedMode) ResolvedTs { - return ResolvedTs{Ts: t, Mode: m} +// Less judge whether the resolved ts is less than the given ts. +func (r ResolvedTs) Less(r1 ResolvedTs) bool { + return !r.EqualOrGreater(r1) } diff --git a/cdc/model/mounter_test.go b/cdc/model/mounter_test.go index 3e7ec14f152..46d067d2be6 100644 --- a/cdc/model/mounter_test.go +++ b/cdc/model/mounter_test.go @@ -14,6 +14,7 @@ package model import ( + "math/rand" "testing" "github.com/stretchr/testify/require" @@ -44,3 +45,41 @@ func TestPolymorphicEvent(t *testing.T) { require.Equal(t, resolved.CRTs, polyEvent.CRTs) require.Equal(t, uint64(0), polyEvent.StartTs) } + +func TestResolvedTs(t *testing.T) { + t.Parallel() + + invalidResolvedTs := ResolvedTs{Mode: -1, Ts: 1} + require.Equal(t, uint64(0), invalidResolvedTs.ResolvedMark()) + + ts := rand.Uint64()%10 + 1 + batchID := rand.Uint64()%10 + 1 + normalResolvedTs := NewResolvedTs(ts) + batchResolvedTs1 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID} + require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs1)) + require.False(t, batchResolvedTs1.EqualOrGreater(normalResolvedTs)) + require.False(t, normalResolvedTs.Less(batchResolvedTs1)) + require.True(t, batchResolvedTs1.Less(normalResolvedTs)) + + batchResolvedTs2 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID + 1} + require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs2)) + require.True(t, batchResolvedTs2.EqualOrGreater(batchResolvedTs1)) + require.True(t, batchResolvedTs2.Less(normalResolvedTs)) + require.True(t, batchResolvedTs1.Less(batchResolvedTs2)) + + largerTs := ts + rand.Uint64()%10 + 1 + largerResolvedTs := NewResolvedTs(largerTs) + require.True(t, largerResolvedTs.EqualOrGreater(normalResolvedTs)) + largerBatchResolvedTs := ResolvedTs{ + Mode: BatchResolvedMode, + Ts: largerTs, + BatchID: batchID, + } + require.True(t, largerBatchResolvedTs.EqualOrGreater(normalResolvedTs), + "largerBatchResolvedTs:%+v\nnormalResolvedTs:%+v", largerBatchResolvedTs, normalResolvedTs) + + smallerResolvedTs := NewResolvedTs(0) + require.True(t, normalResolvedTs.EqualOrGreater(smallerResolvedTs)) + smallerBatchResolvedTs := ResolvedTs{Mode: BatchResolvedMode, Ts: 0, BatchID: batchID} + require.True(t, batchResolvedTs1.EqualOrGreater(smallerBatchResolvedTs)) +} diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 37ecd559762..b89e682e6f7 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -15,6 +15,7 @@ package pipeline import ( "context" + "fmt" "sync/atomic" "time" @@ -73,7 +74,7 @@ type sinkNode struct { // atomic oprations for model.ResolvedTs resolvedTs atomic.Value - checkpointTs model.Ts + checkpointTs atomic.Value targetTs model.Ts barrierTs model.Ts @@ -81,27 +82,44 @@ type sinkNode struct { replicaConfig *config.ReplicaConfig isTableActorMode bool + splitTxn bool } -func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { +func newSinkNode( + tableID model.TableID, + sink sink.Sink, + startTs model.Ts, + targetTs model.Ts, + flowController tableFlowController, + splitTxn bool, +) *sinkNode { sn := &sinkNode{ - tableID: tableID, - sink: sink, - status: TableStatusInitializing, - targetTs: targetTs, - checkpointTs: startTs, - barrierTs: startTs, + tableID: tableID, + sink: sink, + status: TableStatusInitializing, + targetTs: targetTs, + barrierTs: startTs, flowController: flowController, + splitTxn: splitTxn, } sn.resolvedTs.Store(model.NewResolvedTs(startTs)) + sn.checkpointTs.Store(model.NewResolvedTs(startTs)) return sn } -func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) } -func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) } -func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } -func (n *sinkNode) Status() TableStatus { return n.status.Load() } +func (n *sinkNode) ResolvedTs() model.Ts { return n.getResolvedTs().ResolvedMark() } +func (n *sinkNode) CheckpointTs() model.Ts { return n.getCheckpointTs().ResolvedMark() } +func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } +func (n *sinkNode) Status() TableStatus { return n.status.Load() } + +func (n *sinkNode) getResolvedTs() model.ResolvedTs { + return n.resolvedTs.Load().(model.ResolvedTs) +} + +func (n *sinkNode) getCheckpointTs() model.ResolvedTs { + return n.checkpointTs.Load().(model.ResolvedTs) +} func (n *sinkNode) Init(ctx pipeline.NodeContext) error { n.replicaConfig = ctx.ChangefeedVars().Info.Config @@ -137,22 +155,22 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er n.status.Store(TableStatusStopped) return } - if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs { + if n.CheckpointTs() >= n.targetTs { err = n.stop(ctx) } }() currentBarrierTs := atomic.LoadUint64(&n.barrierTs) - currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs) + currentCheckpointTs := n.getCheckpointTs() if resolved.Ts > currentBarrierTs { - resolved.Ts = currentBarrierTs + resolved = model.NewResolvedTs(currentBarrierTs) } if resolved.Ts > n.targetTs { - resolved.Ts = n.targetTs + resolved = model.NewResolvedTs(n.targetTs) } - if resolved.Ts <= currentCheckpointTs { + if currentCheckpointTs.EqualOrGreater(resolved) { return nil } - checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved) + checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved) if err != nil { return errors.Trace(err) } @@ -160,16 +178,16 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er // we must call flowController.Release immediately after we call // FlushRowChangedEvents to prevent deadlock cause by checkpointTs // fall back - n.flowController.Release(checkpointTs) + n.flowController.Release(checkpoint) // the checkpointTs may fall back in some situation such as: // 1. This table is newly added to the processor // 2. There is one table in the processor that has a smaller // checkpointTs than this one - if checkpointTs <= currentCheckpointTs { + if currentCheckpointTs.EqualOrGreater(checkpoint) { return nil } - atomic.StoreUint64(&n.checkpointTs, checkpointTs) + n.checkpointTs.Store(checkpoint) return nil } @@ -293,6 +311,10 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo switch msg.Tp { case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent + if err := n.verifySplitTxn(event); err != nil { + return false, errors.Trace(err) + } + if event.IsResolved() { if n.status.Load() == TableStatusInitializing { n.status.Store(TableStatusRunning) @@ -301,7 +323,13 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo failpoint.Return(false, errors.New("processor sync resolved injected error")) }) - resolved := model.NewResolvedTsWithMode(event.CRTs, event.Mode) + var resolved model.ResolvedTs + if event.Resolved != nil { + resolved = *(event.Resolved) + } else { + resolved = model.NewResolvedTs(event.CRTs) + } + if err := n.flushSink(ctx, resolved); err != nil { return false, errors.Trace(err) } @@ -312,7 +340,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo return false, errors.Trace(err) } case pmessage.MessageTypeTick: - if err := n.flushSink(ctx, n.ResolvedTs()); err != nil { + if err := n.flushSink(ctx, n.getResolvedTs()); err != nil { return false, errors.Trace(err) } case pmessage.MessageTypeCommand: @@ -331,7 +359,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error { atomic.StoreUint64(&n.barrierTs, ts) - if err := n.flushSink(ctx, n.ResolvedTs()); err != nil { + if err := n.flushSink(ctx, n.getResolvedTs()); err != nil { return errors.Trace(err) } return nil @@ -346,3 +374,25 @@ func (n *sinkNode) releaseResource(ctx context.Context) error { n.flowController.Abort() return n.sink.Close(ctx) } + +// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent +// with `SplitTxn==true`. +func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error { + if n.splitTxn { + return nil + } + + // Fail-fast check, this situation should never happen normally when split transactions + // are not supported. + if e.Resolved != nil && e.Resolved.IsBatchMode() { + msg := fmt.Sprintf("batch mode resolved ts is not supported "+ + "when sink.splitTxn is %+v", n.splitTxn) + return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg) + } + + if e.Row != nil && e.Row.SplitTxn { + msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn) + return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg) + } + return nil +} diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 0c8f5ced58d..9304f53e892 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -43,12 +43,12 @@ type mockFlowController struct{} func (c *mockFlowController) Consume( msg *model.PolymorphicEvent, size uint64, - blockCallBack func(bool) error, + blockCallBack func(uint64) error, ) error { return nil } -func (c *mockFlowController) Release(resolvedTs uint64) { +func (c *mockFlowController) Release(resolved model.ResolvedTs) { } func (c *mockFlowController) Abort() { @@ -78,12 +78,12 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error func (s *mockSink) FlushRowChangedEvents( ctx context.Context, _ model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { s.received = append(s.received, struct { resolvedTs model.Ts row *model.RowChangedEvent }{resolvedTs: resolved.Ts}) - return resolved.Ts, nil + return resolved, nil } func (s *mockSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error { @@ -135,7 +135,7 @@ func TestStatus(t *testing.T) { }) // test stop at targetTs - node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) + node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -175,7 +175,7 @@ func TestStatus(t *testing.T) { require.Equal(t, uint64(10), node.CheckpointTs()) // test the stop at ts command - node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -206,7 +206,7 @@ func TestStatus(t *testing.T) { require.Equal(t, uint64(2), node.CheckpointTs()) // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts - node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -249,7 +249,7 @@ func TestStopStatus(t *testing.T) { }) closeCh := make(chan interface{}, 1) - node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}) + node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -287,7 +287,7 @@ func TestManyTs(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) require.Equal(t, TableStatusInitializing, node.Status()) @@ -422,7 +422,7 @@ func TestManyTs(t *testing.T) { {resolvedTs: 1}, }) sink.Reset() - require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs()) + require.Equal(t, model.NewResolvedTs(uint64(2)), node.getResolvedTs()) require.Equal(t, uint64(1), node.CheckpointTs()) require.Nil(t, node.Receive( @@ -435,7 +435,7 @@ func TestManyTs(t *testing.T) { {resolvedTs: 2}, }) sink.Reset() - require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs()) + require.Equal(t, model.NewResolvedTs(uint64(2)), node.getResolvedTs()) require.Equal(t, uint64(2), node.CheckpointTs()) } @@ -449,7 +449,7 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) // empty row, no Columns and PreColumns. @@ -471,7 +471,7 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) // nil row. @@ -529,7 +529,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false) require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) // nil row. @@ -636,7 +636,7 @@ type flushFlowController struct { releaseCounter int } -func (c *flushFlowController) Release(resolvedTs uint64) { +func (c *flushFlowController) Release(resolved model.ResolvedTs) { c.releaseCounter++ } @@ -650,11 +650,11 @@ var fallBackResolvedTs = uint64(10) func (s *flushSink) FlushRowChangedEvents( ctx context.Context, _ model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { if resolved.Ts == fallBackResolvedTs { - return 0, nil + return model.NewResolvedTs(0), nil } - return resolved.Ts, nil + return resolved, nil } // TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always @@ -674,17 +674,17 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { flowController := &flushFlowController{} sink := &flushSink{} // sNode is a sinkNode - sNode := newSinkNode(1, sink, 0, 10, flowController) + sNode := newSinkNode(1, sink, 0, 10, flowController, false) require.Nil(t, sNode.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) sNode.barrierTs = 10 err := sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(8))) require.Nil(t, err) - require.Equal(t, uint64(8), sNode.checkpointTs) + require.Equal(t, uint64(8), sNode.CheckpointTs()) require.Equal(t, 1, flowController.releaseCounter) // resolvedTs will fall back in this call err = sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(10))) require.Nil(t, err) - require.Equal(t, uint64(8), sNode.checkpointTs) + require.Equal(t, uint64(8), sNode.CheckpointTs()) require.Equal(t, 2, flowController.releaseCounter) } diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index f9cb796bcdb..37e2bf41dce 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -161,6 +161,20 @@ func (n *sorterNode) start( metricsTicker := time.NewTicker(flushMemoryMetricsDuration) defer metricsTicker.Stop() + resolvedTsInterpolateFunc := func(commitTs uint64) { + // checks the condition: cur_event_commit_ts > prev_event_commit_ts > last_resolved_ts + // If this is true, it implies that (1) the last transaction has finished, and we are + // processing the first event in a new transaction, (2) a resolved-ts is safe to be + // sent, but it has not yet. This means that we can interpolate prev_event_commit_ts + // as a resolved-ts, improving the frequency at which the sink flushes. + if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs { + lastSentResolvedTs = lastCRTs + lastSendResolvedTsTime = time.Now() + } + msg := model.NewResolvedPolymorphicEvent(0, lastSentResolvedTs) + ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg)) + } + for { // We must call `sorter.Output` before receiving resolved events. // Skip calling `sorter.Output` and caching output channel may fail @@ -188,34 +202,33 @@ func (n *sorterNode) start( commitTs := msg.CRTs // We interpolate a resolved-ts if none has been sent for some time. if time.Since(lastSendResolvedTsTime) > resolvedTsInterpolateInterval { - // checks the condition: cur_event_commit_ts > prev_event_commit_ts > last_resolved_ts - // If this is true, it implies that (1) the last transaction has finished, and we are processing - // the first event in a new transaction, (2) a resolved-ts is safe to be sent, but it has not yet. - // This means that we can interpolate prev_event_commit_ts as a resolved-ts, improving the frequency - // at which the sink flushes. - if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs { - lastSentResolvedTs = lastCRTs - lastSendResolvedTsTime = time.Now() - msg := model.NewResolvedPolymorphicEvent(0, lastCRTs) - ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg)) - } + resolvedTsInterpolateFunc(commitTs) } // We calculate memory consumption by RowChangedEvent size. // It's much larger than RawKVEntry. size := uint64(msg.Row.ApproximateBytes()) - // NOTE we allow the quota to be exceeded if blocking means interrupting a transaction. - // Otherwise the pipeline would deadlock. - err = n.flowController.Consume(msg, size, func(batch bool) error { - if batch { - log.Panic("cdc does not support the batch resolve mechanism at this time") - } else if lastCRTs > lastSentResolvedTs { + // NOTE when redo log enabled, we allow the quota to be exceeded if blocking + // means interrupting a transaction. Otherwise the pipeline would deadlock. + err = n.flowController.Consume(msg, size, func(batchID uint64) error { + if commitTs > lastCRTs { // If we are blocking, we send a Resolved Event here to elicit a sink-flush. // Not sending a Resolved Event here will very likely deadlock the pipeline. - lastSentResolvedTs = lastCRTs - lastSendResolvedTsTime = time.Now() + resolvedTsInterpolateFunc(commitTs) + } else if commitTs == lastCRTs { + // send batch resolve event msg := model.NewResolvedPolymorphicEvent(0, lastCRTs) + msg.Resolved = &model.ResolvedTs{ + Ts: commitTs, + Mode: model.BatchResolvedMode, + BatchID: batchID, + } ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg)) + } else { + log.Panic("flow control blocked, report a bug", + zap.Uint64("commitTs", commitTs), + zap.Uint64("lastCommitTs", lastCRTs), + zap.Uint64("lastSentResolvedTs", lastSentResolvedTs)) } return nil }) @@ -229,7 +242,7 @@ func (n *sorterNode) start( } return nil } - lastCRTs = commitTs + lastCRTs = msg.CRTs } else { // handle OpTypeResolved if msg.CRTs < lastSentResolvedTs { diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 0c540f0a0a9..5e104f5a053 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -79,8 +79,12 @@ type tablePipelineImpl struct { // TODO find a better name or avoid using an interface // We use an interface here for ease in unit testing. type tableFlowController interface { - Consume(msg *model.PolymorphicEvent, size uint64, blockCallBack func(batch bool) error) error - Release(resolvedTs uint64) + Consume( + msg *model.PolymorphicEvent, + size uint64, + blockCallBack func(batchID uint64) error, + ) error + Release(resolved model.ResolvedTs) Abort() GetConsumption() uint64 } @@ -92,7 +96,7 @@ func (t *tablePipelineImpl) ResolvedTs() model.Ts { // another replication barrier for consistent replication instead of reusing // the global resolved-ts. if redo.IsConsistentEnabled(t.replConfig.Consistent.Level) { - return t.sinkNode.ResolvedTs().Ts + return t.sinkNode.ResolvedTs() } return t.sorterNode.ResolvedTs() } @@ -180,6 +184,7 @@ func NewTablePipeline(ctx cdcContext.Context, sink sink.Sink, targetTs model.Ts, upstream *upstream.Upstream, + redoLogEnabled bool, ) TablePipeline { ctx, cancel := cdcContext.WithCancel(ctx) changefeed := ctx.ChangefeedVars().ID @@ -199,7 +204,10 @@ func NewTablePipeline(ctx cdcContext.Context, zap.String("tableName", tableName), zap.Int64("tableID", tableID), zap.Uint64("quota", perTableMemoryQuota)) - flowController := flowcontrol.NewTableFlowController(perTableMemoryQuota) + splitTxn := replConfig.Sink.TxnAtomicity.ShouldSplitTxn() + + flowController := flowcontrol.NewTableFlowController(perTableMemoryQuota, + redoLogEnabled, splitTxn) config := ctx.ChangefeedVars().Info.Config cyclicEnabled := config.Cyclic != nil && config.Cyclic.IsEnabled() runnerSize := defaultRunnersSize @@ -210,7 +218,7 @@ func NewTablePipeline(ctx cdcContext.Context, p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize) sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs, flowController, mounter, replConfig) - sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController) + sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController, splitTxn) p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName, changefeed, upstream)) diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 1944199f655..dcf58694556 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -57,7 +57,8 @@ type tableActor struct { // backend mounter mounter entry.Mounter // backend tableSink - tableSink sink.Sink + tableSink sink.Sink + redoManager redo.LogManager pullerNode *pullerNode sortNode *sorterNode @@ -103,6 +104,7 @@ func NewTableActor(cdcCtx cdcContext.Context, tableName string, replicaInfo *model.TableReplicaInfo, sink sink.Sink, + redoManager redo.LogManager, targetTs model.Ts, ) (TablePipeline, error) { config := cdcCtx.ChangefeedVars().Info.Config @@ -134,6 +136,7 @@ func NewTableActor(cdcCtx cdcContext.Context, replicaInfo: replicaInfo, replicaConfig: config, tableSink: sink, + redoManager: redoManager, targetTs: targetTs, started: false, @@ -279,7 +282,10 @@ func (t *tableActor) start(sdtTableContext context.Context) error { zap.String("tableName", t.tableName), zap.Uint64("quota", t.memoryQuota)) - flowController := flowcontrol.NewTableFlowController(t.memoryQuota) + splitTxn := t.replicaConfig.Sink.TxnAtomicity.ShouldSplitTxn() + + flowController := flowcontrol.NewTableFlowController(t.memoryQuota, + t.redoManager.Enabled(), splitTxn) sorterNode := newSorterNode(t.tableName, t.tableID, t.replicaInfo.StartTs, flowController, t.mounter, t.replicaConfig, @@ -318,7 +324,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { actorSinkNode := newSinkNode(t.tableID, t.tableSink, t.replicaInfo.StartTs, - t.targetTs, flowController) + t.targetTs, flowController, splitTxn) actorSinkNode.initWithReplicaConfig(true, t.replicaConfig) t.sinkNode = actorSinkNode @@ -431,7 +437,7 @@ func (t *tableActor) ResolvedTs() model.Ts { // another replication barrier for consistent replication instead of reusing // the global resolved-ts. if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) { - return t.sinkNode.ResolvedTs().Ts + return t.sinkNode.ResolvedTs() } return t.sortNode.ResolvedTs() } diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index 0f80a4e9b6f..3981798bf92 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -16,7 +16,6 @@ package pipeline import ( "context" "sync" - "sync/atomic" "testing" "time" @@ -50,7 +49,7 @@ func TestAsyncStopFailed(t *testing.T) { router: tableActorRouter, cancel: func() {}, reportErr: func(err error) {}, - sinkNode: newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}), + sinkNode: newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, false), } require.True(t, tbl.AsyncStop(1)) @@ -86,7 +85,7 @@ func TestTableActorInterface(t *testing.T) { require.Equal(t, TableStatusStopped, tbl.Status()) require.Equal(t, uint64(1), tbl.Workload().Workload) - atomic.StoreUint64(&sink.checkpointTs, 3) + sink.checkpointTs.Store(model.NewResolvedTs(3)) require.Equal(t, model.Ts(3), tbl.CheckpointTs()) require.Equal(t, model.Ts(5), tbl.ResolvedTs()) @@ -189,10 +188,10 @@ func TestPollTickMessage(t *testing.T) { status: TableStatusInitializing, sink: &mockSink{}, flowController: &mockFlowController{}, - checkpointTs: 10, targetTs: 11, } sn.resolvedTs.Store(model.NewResolvedTs(10)) + sn.checkpointTs.Store(model.NewResolvedTs(10)) tbl := tableActor{ sinkNode: sn, @@ -239,11 +238,11 @@ func TestPollStopMessage(t *testing.T) { func TestPollBarrierTsMessage(t *testing.T) { sn := &sinkNode{ - targetTs: 10, - checkpointTs: 5, - barrierTs: 8, + targetTs: 10, + barrierTs: 8, } sn.resolvedTs.Store(model.NewResolvedTs(5)) + sn.checkpointTs.Store(model.NewResolvedTs(5)) tbl := tableActor{ sinkNode: sn, @@ -359,7 +358,7 @@ func TestNewTableActor(t *testing.T) { &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, - }, &mockSink{}, 10) + }, &mockSink{}, redo.NewDisabledManager(), 10) require.NotNil(t, tbl) require.Nil(t, err) require.NotPanics(t, func() { @@ -375,7 +374,7 @@ func TestNewTableActor(t *testing.T) { &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, - }, &mockSink{}, 10) + }, &mockSink{}, redo.NewDisabledManager(), 10) require.Nil(t, tbl) require.NotNil(t, err) @@ -415,6 +414,8 @@ func TestTableActorStart(t *testing.T) { StartTs: 0, MarkTableID: 1, }, + redoManager: redo.NewDisabledManager(), + replicaConfig: config.GetDefaultReplicaConfig(), } require.Nil(t, tbl.start(ctx)) require.Equal(t, 1, len(tbl.nodes)) @@ -428,10 +429,12 @@ func TestTableActorStart(t *testing.T) { Config: config.GetDefaultReplicaConfig(), }, }, + redoManager: redo.NewDisabledManager(), replicaInfo: &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, }, + replicaConfig: config.GetDefaultReplicaConfig(), } tbl.cyclicEnabled = true require.Nil(t, tbl.start(ctx)) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 5d3ab1768d0..108a9c76f9b 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -813,6 +813,7 @@ func (p *processor) createTablePipelineImpl( tableName, replicaInfo, s, + p.redoManager, p.changefeed.Info.GetTargetTs()) if err != nil { return nil, errors.Trace(err) @@ -827,6 +828,7 @@ func (p *processor) createTablePipelineImpl( s, p.changefeed.Info.GetTargetTs(), p.upStream, + p.redoManager.Enabled(), ) } diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index e00394daee4..51481ca8d27 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -55,7 +55,7 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model func (b *blackHoleSink) FlushRowChangedEvents( ctx context.Context, _ model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolved.Ts)) err := b.statistics.RecordBatchExecution(func() (int, error) { // TODO: add some random replication latency @@ -65,7 +65,7 @@ func (b *blackHoleSink) FlushRowChangedEvents( return int(batchSize), nil }) b.statistics.PrintStatus(ctx) - return resolved.Ts, err + return resolved, err } func (b *blackHoleSink) EmitCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error { diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index 1e97ed6390e..0f1e0ce5050 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -25,37 +25,56 @@ import ( ) const ( - maxRowsPerTxn = 1024 - maxSizePerTxn = 1024 * 1024 /* 1MB */ - batchSize = 100 + defaultRowsPerTxn = 1024 + defaultSizePerTxn = 1024 * 1024 /* 1MB */ + defaultBatchSize = 100 ) // TableFlowController provides a convenient interface to control the memory consumption of a per table event stream type TableFlowController struct { - memoryQuota *tableMemoryQuota + memoryQuota *tableMemoryQuota + lastCommitTs uint64 queueMu struct { sync.Mutex queue deque.Deque } + + redoLogEnabled bool + splitTxn bool + // batchGroupCount is the number of txnSizeEntries with same commitTs, which could be: // 1. Different txns with same commitTs but different startTs // 2. TxnSizeEntry split from the same txns which exceeds max rows or max size - batchGroupCount uint + batchGroupCount uint64 + batchID uint64 - lastCommitTs uint64 + batchSize uint64 + maxRowsPerTxn uint64 + maxSizePerTxn uint64 } type txnSizeEntry struct { // txn id startTs uint64 commitTs uint64 + size uint64 rowCount uint64 + batchID uint64 } // NewTableFlowController creates a new TableFlowController -func NewTableFlowController(quota uint64) *TableFlowController { +func NewTableFlowController(quota uint64, redoLogEnabled bool, splitTxn bool) *TableFlowController { + log.Info("create table flow controller", + zap.Uint64("quota", quota), + zap.Bool("redoLogEnabled", redoLogEnabled), + zap.Bool("splitTxn", splitTxn)) + maxSizePerTxn := uint64(defaultSizePerTxn) + if maxSizePerTxn > quota { + maxSizePerTxn = quota + } + return &TableFlowController{ memoryQuota: newTableMemoryQuota(quota), queueMu: struct { @@ -64,6 +83,11 @@ func NewTableFlowController(quota uint64) *TableFlowController { }{ queue: deque.NewDeque(), }, + redoLogEnabled: redoLogEnabled, + splitTxn: splitTxn, + batchSize: defaultBatchSize, + maxRowsPerTxn: defaultRowsPerTxn, + maxSizePerTxn: maxSizePerTxn, } } @@ -72,10 +96,25 @@ func NewTableFlowController(quota uint64) *TableFlowController { func (c *TableFlowController) Consume( msg *model.PolymorphicEvent, size uint64, - callBack func(batch bool) error, + callBack func(batchID uint64) error, ) error { commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) + blockingCallBack := func() (err error) { + if commitTs > lastCommitTs || c.splitTxn { + // Call `callback` in two condition: + // 1. commitTs > lastCommitTs, handle new txn and send a normal resolved ts + // 2. commitTs == lastCommitTs && splitTxn = true, split the same txn and + // send a batch resolved ts + err = callBack(c.batchID) + } + + if commitTs == lastCommitTs { + c.batchID++ + c.resetBatch(lastCommitTs, commitTs) + } + return err + } if commitTs < lastCommitTs { log.Panic("commitTs regressed, report a bug", @@ -83,34 +122,32 @@ func (c *TableFlowController) Consume( zap.Uint64("lastCommitTs", c.lastCommitTs)) } - if commitTs > lastCommitTs { - err := c.memoryQuota.consumeWithBlocking(size, callBack) - if err != nil { + if commitTs == lastCommitTs && (c.redoLogEnabled || !c.splitTxn) { + // Here `commitTs == lastCommitTs` means we are not crossing transaction + // boundaries, `c.redoLogEnabled || !c.splitTxn` means batch resolved mode + // are not supported, hence we should use `forceConsume` to avoid deadlock. + if err := c.memoryQuota.forceConsume(size); err != nil { return errors.Trace(err) } } else { - // Here commitTs == lastCommitTs, which means that we are not crossing - // a transaction boundary. In this situation, we use `forceConsume` because - // blocking the event stream mid-transaction is highly likely to cause - // a deadlock. - // TODO fix this in the future, after we figure out how to elegantly support large txns. - err := c.memoryQuota.forceConsume(size) - if err != nil { + if err := c.memoryQuota.consumeWithBlocking(size, blockingCallBack); err != nil { return errors.Trace(err) } } - c.enqueueSingleMsg(msg, size) + c.enqueueSingleMsg(msg, size, blockingCallBack) return nil } -// Release is called when all events committed before resolvedTs has been freed from memory. -func (c *TableFlowController) Release(resolvedTs uint64) { +// Release releases the memory quota based on the given resolved timestamp. +func (c *TableFlowController) Release(resolved model.ResolvedTs) { var nBytesToRelease uint64 c.queueMu.Lock() for c.queueMu.queue.Len() > 0 { - if peeked := c.queueMu.queue.Front().(*txnSizeEntry); peeked.commitTs <= resolvedTs { + peeked := c.queueMu.queue.Front().(*txnSizeEntry) + if peeked.commitTs < resolved.Ts || + (peeked.commitTs == resolved.Ts && peeked.batchID <= resolved.BatchID) { nBytesToRelease += peeked.size c.queueMu.queue.PopFront() } else { @@ -123,59 +160,70 @@ func (c *TableFlowController) Release(resolvedTs uint64) { } // Note that msgs received by enqueueSingleMsg must be sorted by commitTs_startTs order. -func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size uint64) { +func (c *TableFlowController) enqueueSingleMsg( + msg *model.PolymorphicEvent, size uint64, callback func() error, +) { commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) c.queueMu.Lock() defer c.queueMu.Unlock() - var e deque.Elem + e := c.queueMu.queue.Back() // 1. Processing a new txn with different commitTs. - if e = c.queueMu.queue.Back(); e == nil || lastCommitTs < commitTs { + if e == nil || lastCommitTs < commitTs { atomic.StoreUint64(&c.lastCommitTs, commitTs) - c.queueMu.queue.PushBack(&txnSizeEntry{ - startTs: msg.StartTs, - commitTs: commitTs, - size: size, - rowCount: 1, - }) - c.batchGroupCount = 1 - msg.Row.SplitTxn = true + c.resetBatch(lastCommitTs, commitTs) + c.addEntry(msg, size) return } // Processing txns with the same commitTs. txnEntry := e.(*txnSizeEntry) if txnEntry.commitTs != lastCommitTs { - log.Panic("got wrong commitTs from deque, report a bug", + log.Panic("got wrong commitTs from deque in flow control, report a bug", zap.Uint64("lastCommitTs", c.lastCommitTs), zap.Uint64("commitTsInDeque", txnEntry.commitTs)) } // 2. Append row to current txn entry. - if txnEntry.startTs == msg.Row.StartTs && - txnEntry.rowCount < maxRowsPerTxn && txnEntry.size < maxSizePerTxn { + if txnEntry.batchID == c.batchID && txnEntry.startTs == msg.Row.StartTs && + txnEntry.rowCount < c.maxRowsPerTxn && txnEntry.size < c.maxSizePerTxn { txnEntry.size += size txnEntry.rowCount++ return } // 3. Split the txn or handle a new txn with the same commitTs. + if c.batchGroupCount >= c.batchSize { + _ = callback() + } + c.addEntry(msg, size) +} + +// addEntry should be called only if c.queueMu is locked. +func (c *TableFlowController) addEntry(msg *model.PolymorphicEvent, size uint64) { + c.batchGroupCount++ c.queueMu.queue.PushBack(&txnSizeEntry{ startTs: msg.StartTs, - commitTs: commitTs, + commitTs: msg.CRTs, size: size, rowCount: 1, + batchID: c.batchID, }) - c.batchGroupCount++ - msg.Row.SplitTxn = true + if c.splitTxn { + msg.Row.SplitTxn = true + } +} - if c.batchGroupCount >= batchSize { - c.batchGroupCount = 0 - // TODO(CharlesCheung): add batch resolve mechanism to mitigate oom problem - log.Debug("emit batch resolve event throw callback") +// resetBatch reset batchID and batchGroupCount if handling a new txn, Otherwise, +// just reset batchGroupCount. +func (c *TableFlowController) resetBatch(lastCommitTs, commitTs uint64) { + if lastCommitTs < commitTs { + // At least one batch for each txn. + c.batchID = 1 } + c.batchGroupCount = 0 } // Abort interrupts any ongoing Consume call diff --git a/cdc/sink/flowcontrol/flow_control_test.go b/cdc/sink/flowcontrol/flow_control_test.go index 6836299e4a4..aedd9d92445 100644 --- a/cdc/sink/flowcontrol/flow_control_test.go +++ b/cdc/sink/flowcontrol/flow_control_test.go @@ -21,12 +21,18 @@ import ( "testing" "time" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/stretchr/testify/require" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) -func dummyCallBack(_ bool) error { +func dummyCallBack() error { + return nil +} + +func dummyCallBackWithBatch(batchID uint64) error { return nil } @@ -35,7 +41,7 @@ type mockCallBacker struct { injectedErr error } -func (c *mockCallBacker) cb(_ bool) error { +func (c *mockCallBacker) cb(_ uint64) error { c.timesCalled += 1 return c.injectedErr } @@ -163,11 +169,11 @@ func TestMemoryQuotaReleaseZero(t *testing.T) { } type mockedEvent struct { - resolvedTs uint64 - size uint64 + resolved model.ResolvedTs + size uint64 } -func TestFlowControlBasic(t *testing.T) { +func TestFlowControlWithForceConsume(t *testing.T) { t.Parallel() var consumedBytes uint64 @@ -175,7 +181,7 @@ func TestFlowControlBasic(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(2048) + flowController := NewTableFlowController(2048, true, true) errg.Go(func() error { lastCommitTs := uint64(1) @@ -222,14 +228,14 @@ func TestFlowControlBasic(t *testing.T) { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: resolvedTs, + resolved: model.NewResolvedTs(resolvedTs), }: } resolvedTs = mockedRow.commitTs updatedResolvedTs = true } err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), - mockedRow.size, dummyCallBack) + mockedRow.size, dummyCallBackWithBatch) require.Nil(t, err) select { case <-ctx.Done(): @@ -248,7 +254,7 @@ func TestFlowControlBasic(t *testing.T) { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: resolvedTs, + resolved: model.NewResolvedTs(resolvedTs), }: } @@ -271,7 +277,250 @@ func TestFlowControlBasic(t *testing.T) { if event.size != 0 { atomic.AddUint64(&consumedBytes, -event.size) } else { - flowController.Release(event.resolvedTs) + flowController.Release(event.resolved) + } + } + + return nil + }) + + require.Nil(t, errg.Wait()) + require.Equal(t, uint64(0), atomic.LoadUint64(&consumedBytes)) + require.Equal(t, uint64(0), flowController.GetConsumption()) +} + +func TestFlowControlWithBatchAndForceConsume(t *testing.T) { + t.Parallel() + + var consumedBytes uint64 + ctx, cancel := context.WithTimeout(context.TODO(), time.Hour*10) + defer cancel() + errg, ctx := errgroup.WithContext(ctx) + mockedRowsCh := make(chan *txnSizeEntry, 1024) + flowController := NewTableFlowController(512, true, true) + maxBatch := uint64(3) + + // simulate a big txn + errg.Go(func() error { + lastCommitTs := uint64(1) + for i := uint64(0); i <= maxBatch*defaultRowsPerTxn*defaultBatchSize; i++ { + size := uint64(128 + rand.Int()%64) + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRowsCh <- &txnSizeEntry{ + commitTs: lastCommitTs, + size: size, + }: + } + } + + close(mockedRowsCh) + return nil + }) + + eventCh := make(chan *mockedEvent, 1024) + errg.Go(func() error { + defer close(eventCh) + lastCRTs := uint64(0) + maxBatchID := uint64(0) + for { + var mockedRow *txnSizeEntry + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRow = <-mockedRowsCh: + } + + if mockedRow == nil { + break + } + + atomic.AddUint64(&consumedBytes, mockedRow.size) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, func(batchID uint64) error { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: batchID, + }, + }: + } + log.Debug("", zap.Any("batchID", batchID)) + maxBatchID = batchID + return nil + }) + require.Nil(t, err) + lastCRTs = mockedRow.commitTs + + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + size: mockedRow.size, + }: + } + } + require.Less(t, uint64(0), flowController.GetConsumption()) + require.LessOrEqual(t, maxBatch, maxBatchID) + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: maxBatchID + 1, + }, + }: + } + time.Sleep(time.Millisecond * 500) + require.Equal(t, uint64(0), flowController.GetConsumption()) + return nil + }) + + errg.Go(func() error { + for { + var event *mockedEvent + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-eventCh: + } + + if event == nil { + break + } + + if event.size != 0 { + atomic.AddUint64(&consumedBytes, -event.size) + } else { + flowController.Release(event.resolved) + } + } + + return nil + }) + + require.Nil(t, errg.Wait()) + require.Equal(t, uint64(0), atomic.LoadUint64(&consumedBytes)) +} + +func TestFlowControlWithoutForceConsume(t *testing.T) { + t.Parallel() + + var consumedBytes uint64 + ctx, cancel := context.WithTimeout(context.TODO(), time.Hour*10) + defer cancel() + errg, ctx := errgroup.WithContext(ctx) + mockedRowsCh := make(chan *txnSizeEntry, 1024) + flowController := NewTableFlowController(512, false, true) + maxBatch := uint64(3) + + // simulate a big txn + errg.Go(func() error { + lastCommitTs := uint64(1) + for i := uint64(0); i < maxBatch*defaultRowsPerTxn*defaultBatchSize; i++ { + size := uint64(128 + rand.Int()%64) + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRowsCh <- &txnSizeEntry{ + commitTs: lastCommitTs, + size: size, + }: + } + } + + close(mockedRowsCh) + return nil + }) + + eventCh := make(chan *mockedEvent, 1024) + errg.Go(func() error { + defer close(eventCh) + lastCRTs := uint64(0) + maxBatchID := uint64(0) + for { + var mockedRow *txnSizeEntry + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRow = <-mockedRowsCh: + } + + if mockedRow == nil { + break + } + + atomic.AddUint64(&consumedBytes, mockedRow.size) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, func(batchID uint64) error { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: batchID, + }, + }: + } + log.Debug("", zap.Any("batchID", batchID)) + maxBatchID = batchID + return nil + }) + require.Nil(t, err) + lastCRTs = mockedRow.commitTs + + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + size: mockedRow.size, + }: + } + } + require.Less(t, uint64(0), flowController.GetConsumption()) + require.LessOrEqual(t, maxBatch, maxBatchID) + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: maxBatchID + 1, + }, + }: + } + time.Sleep(time.Millisecond * 500) + require.Equal(t, uint64(0), flowController.GetConsumption()) + return nil + }) + + errg.Go(func() error { + for { + var event *mockedEvent + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-eventCh: + } + + if event == nil { + break + } + + if event.size != 0 { + atomic.AddUint64(&consumedBytes, -event.size) + } else { + flowController.Release(event.resolved) } } @@ -286,7 +535,7 @@ func TestFlowControlAbort(t *testing.T) { t.Parallel() callBacker := &mockCallBacker{} - controller := NewTableFlowController(1024) + controller := NewTableFlowController(1024, false, false) var wg sync.WaitGroup wg.Add(1) go func() { @@ -317,7 +566,7 @@ func TestFlowControlCallBack(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(512) + flowController := NewTableFlowController(512, false, false) errg.Go(func() error { lastCommitTs := uint64(1) @@ -358,12 +607,12 @@ func TestFlowControlCallBack(t *testing.T) { atomic.AddUint64(&consumedBytes, mockedRow.size) err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), - mockedRow.size, func(bool) error { + mockedRow.size, func(uint64) error { select { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: lastCRTs, + resolved: model.NewResolvedTs(lastCRTs), }: } return nil @@ -383,7 +632,7 @@ func TestFlowControlCallBack(t *testing.T) { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: lastCRTs, + resolved: model.NewResolvedTs(lastCRTs), }: } @@ -406,7 +655,7 @@ func TestFlowControlCallBack(t *testing.T) { if event.size != 0 { atomic.AddUint64(&consumedBytes, -event.size) } else { - flowController.Release(event.resolvedTs) + flowController.Release(event.resolved) } } @@ -421,7 +670,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { t.Parallel() var wg sync.WaitGroup - controller := NewTableFlowController(512) + controller := NewTableFlowController(512, false, false) wg.Add(1) ctx, cancel := context.WithCancel(context.TODO()) @@ -429,7 +678,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(bool) error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(uint64) error { t.Error("unreachable") return nil }) @@ -442,11 +691,11 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { <-time.After(time.Second * 1) // makes sure that this test case is valid require.Equal(t, int32(1), atomic.LoadInt32(&isBlocked)) - controller.Release(1) + controller.Release(model.NewResolvedTs(1)) cancel() }() - err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(bool) error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(uint64) error { atomic.StoreInt32(&isBlocked, 1) <-ctx.Done() atomic.StoreInt32(&isBlocked, 0) @@ -463,7 +712,7 @@ func TestFlowControlCallBackError(t *testing.T) { t.Parallel() var wg sync.WaitGroup - controller := NewTableFlowController(512) + controller := NewTableFlowController(512, false, false) wg.Add(1) ctx, cancel := context.WithCancel(context.TODO()) @@ -471,12 +720,12 @@ func TestFlowControlCallBackError(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(bool) error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(uint64) error { t.Error("unreachable") return nil }) require.Nil(t, err) - err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(bool) error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(uint64) error { <-ctx.Done() return ctx.Err() }) @@ -492,8 +741,8 @@ func TestFlowControlCallBackError(t *testing.T) { func TestFlowControlConsumeLargerThanQuota(t *testing.T) { t.Parallel() - controller := NewTableFlowController(1024) - err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(bool) error { + controller := NewTableFlowController(1024, false, false) + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(uint64) error { t.Error("unreachable") return nil }) @@ -505,7 +754,7 @@ func BenchmarkTableFlowController(B *testing.B) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 102400) - flowController := NewTableFlowController(20 * 1024 * 1024) // 20M + flowController := NewTableFlowController(20*1024*1024, false, false) // 20M errg.Go(func() error { lastCommitTs := uint64(1) @@ -549,13 +798,13 @@ func BenchmarkTableFlowController(B *testing.B) { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: resolvedTs, + resolved: model.NewResolvedTs(resolvedTs), }: } resolvedTs = mockedRow.commitTs } err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), - mockedRow.size, dummyCallBack) + mockedRow.size, dummyCallBackWithBatch) if err != nil { B.Fatal(err) } @@ -571,7 +820,7 @@ func BenchmarkTableFlowController(B *testing.B) { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: resolvedTs, + resolved: model.NewResolvedTs(resolvedTs), }: } @@ -592,7 +841,7 @@ func BenchmarkTableFlowController(B *testing.B) { } if event.size == 0 { - flowController.Release(event.resolvedTs) + flowController.Release(event.resolved) } } diff --git a/cdc/sink/flowcontrol/table_memory_quota.go b/cdc/sink/flowcontrol/table_memory_quota.go index c563ba4f333..7ca15e7857f 100644 --- a/cdc/sink/flowcontrol/table_memory_quota.go +++ b/cdc/sink/flowcontrol/table_memory_quota.go @@ -54,9 +54,7 @@ func newTableMemoryQuota(quota uint64) *tableMemoryQuota { // block until enough memory has been freed up by release. // blockCallBack will be called if the function will block. // Should be used with care to prevent deadlock. -func (c *tableMemoryQuota) consumeWithBlocking( - nBytes uint64, blockCallBack func(bool) error, -) error { +func (c *tableMemoryQuota) consumeWithBlocking(nBytes uint64, blockCallBack func() error) error { if nBytes >= c.quota { return cerrors.ErrFlowControllerEventLargerThanQuota.GenWithStackByArgs(nBytes, c.quota) } @@ -64,7 +62,7 @@ func (c *tableMemoryQuota) consumeWithBlocking( c.consumed.Lock() if c.consumed.bytes+nBytes >= c.quota { c.consumed.Unlock() - err := blockCallBack(false) + err := blockCallBack() if err != nil { return errors.Trace(err) } diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index fec9b47865e..ade36762bb1 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -125,10 +125,10 @@ func (k *mqSink) AddTable(tableID model.TableID) error { // otherwise when the table is dispatched back again, // it may read the old values. // See: https://github.com/pingcap/tiflow/issues/4464#issuecomment-1085385382. - if checkpointTs, loaded := k.tableCheckpointTsMap.LoadAndDelete(tableID); loaded { + if checkpoint, loaded := k.tableCheckpointTsMap.LoadAndDelete(tableID); loaded { log.Info("clean up table checkpoint ts in MQ sink", zap.Int64("tableID", tableID), - zap.Uint64("checkpointTs", checkpointTs.(uint64))) + zap.Uint64("checkpointTs", checkpoint.(model.ResolvedTs).Ts)) } return nil @@ -174,25 +174,21 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha // FlushRowChangedEvents is thread-safe. func (k *mqSink) FlushRowChangedEvents( ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, -) (uint64, error) { - var checkpointTs uint64 - v, ok := k.tableCheckpointTsMap.Load(tableID) - if ok { - checkpointTs = v.(uint64) - } - if resolved.Ts <= checkpointTs { - return checkpointTs, nil +) (model.ResolvedTs, error) { + checkpoint := k.getTableCheckpointTs(tableID) + if checkpoint.EqualOrGreater(resolved) { + return checkpoint, nil } select { case <-ctx.Done(): - return 0, ctx.Err() + return model.NewResolvedTs(0), ctx.Err() case k.resolvedBuffer.In() <- resolvedTsEvent{ tableID: tableID, - resolved: model.NewResolvedTs(resolved.Ts), + resolved: resolved, }: } k.statistics.PrintStatus(ctx) - return checkpointTs, nil + return checkpoint, nil } // bgFlushTs flush resolvedTs to workers and flush the mqProducer @@ -217,7 +213,7 @@ func (k *mqSink) bgFlushTs(ctx context.Context) error { // Since CDC does not guarantee exactly once semantic, it won't cause any problem // here even if the table was moved or removed. // ref: https://github.com/pingcap/tiflow/pull/4356#discussion_r787405134 - k.tableCheckpointTsMap.Store(msg.tableID, resolved.Ts) + k.tableCheckpointTsMap.Store(msg.tableID, resolved) } } } @@ -360,6 +356,14 @@ func (k *mqSink) RemoveTable(cxt context.Context, tableID model.TableID) error { return nil } +func (k *mqSink) getTableCheckpointTs(tableID model.TableID) model.ResolvedTs { + v, ok := k.tableCheckpointTsMap.Load(tableID) + if ok { + return v.(model.ResolvedTs) + } + return model.NewResolvedTs(0) +} + func (k *mqSink) run(ctx context.Context) error { wg, ctx := errgroup.WithContext(ctx) wg.Go(func() error { @@ -400,10 +404,6 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } - if err := replicaConfig.ApplyProtocol(sinkURI).Validate(); err != nil { - return nil, errors.Trace(err) - } - saramaConfig, err := kafka.NewSaramaConfig(ctx, baseConfig) if err != nil { return nil, errors.Trace(err) @@ -493,10 +493,6 @@ func NewPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, if s != "" { replicaConfig.Sink.Protocol = s } - err := replicaConfig.Validate() - if err != nil { - return nil, err - } var protocol config.Protocol if err := protocol.FromString(replicaConfig.Sink.Protocol); err != nil { diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index 76211767f89..6e2cc8c85d9 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -56,7 +56,7 @@ func waitCheckpointTs(t *testing.T, s *mqSink, tableID int64, target uint64) uin var checkpointTs uint64 err := retry.Do(context.Background(), func() error { if v, ok := s.tableCheckpointTsMap.Load(tableID); ok { - checkpointTs = v.(uint64) + checkpointTs = v.(model.ResolvedTs).Ts } if checkpointTs >= target { return nil @@ -91,6 +91,7 @@ func TestKafkaSink(t *testing.T) { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient }() + require.Nil(t, replicaConfig.ValidateAndAdjust(sinkURI)) sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) require.Nil(t, err) @@ -119,9 +120,9 @@ func TestKafkaSink(t *testing.T) { checkpointTs := waitCheckpointTs(t, sink, tableID, uint64(120)) require.Equal(t, uint64(120), checkpointTs) // flush older resolved ts - checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(110))) + checkpoint, err := sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(110))) require.Nil(t, err) - require.Equal(t, uint64(120), checkpointTs) + require.Equal(t, uint64(120), checkpoint.Ts) // mock kafka broker processes 1 checkpoint ts event err = sink.EmitCheckpointTs(ctx, uint64(120), []model.TableName{{ @@ -187,6 +188,7 @@ func TestKafkaSinkFilter(t *testing.T) { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient }() + require.NoError(t, replicaConfig.ValidateAndAdjust(sinkURI)) sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) require.Nil(t, err) @@ -278,6 +280,7 @@ func TestFlushRowChangedEvents(t *testing.T) { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient }() + require.Nil(t, replicaConfig.ValidateAndAdjust(sinkURI)) sink, err := NewKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh) require.Nil(t, err) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 4f4cbeac7f8..19d1e41bb99 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -230,20 +230,20 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row // Concurrency Note: FlushRowChangedEvents is thread-safe. func (s *mysqlSink) FlushRowChangedEvents( ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { if err := s.error.Load(); err != nil { - return 0, err + return model.NewResolvedTs(0), err } v, ok := s.getTableResolvedTs(tableID) - if !ok || v.Ts < resolved.Ts { + if !ok || v.Less(resolved) { s.tableMaxResolvedTs.Store(tableID, resolved) } // check and throw error select { case <-ctx.Done(): - return 0, ctx.Err() + return model.NewResolvedTs(0), ctx.Err() case s.resolvedCh <- struct{}{}: // Notify `flushRowChangedEvents` to asynchronously write data. default: @@ -291,8 +291,8 @@ outer: continue outer } } - for tableID, resolvedTs := range checkpointTsMap { - s.tableCheckpointTs.Store(tableID, resolvedTs) + for tableID, resolved := range checkpointTsMap { + s.tableCheckpointTs.Store(tableID, resolved) } } } @@ -531,10 +531,10 @@ func (s *mysqlSink) cleanTableResource(tableID model.TableID) { zap.Int64("tableID", tableID), zap.Uint64("resolvedTs", resolved.(model.ResolvedTs).Ts)) } - if checkpointTs, loaded := s.tableCheckpointTs.LoadAndDelete(tableID); loaded { + if checkpoint, loaded := s.tableCheckpointTs.LoadAndDelete(tableID); loaded { log.Info("clean up table checkpoint ts in MySQL sink", zap.Int64("tableID", tableID), - zap.Uint64("checkpointTs", checkpointTs.(uint64))) + zap.Uint64("checkpointTs", checkpoint.(model.ResolvedTs).Ts)) } // try to remove table txn cache s.txnCache.RemoveTableTxn(tableID) @@ -559,11 +559,11 @@ func (s *mysqlSink) RemoveTable(ctx context.Context, tableID model.TableID) erro return errors.Trace(ctx.Err()) case <-ticker.C: maxResolved, ok := s.getTableResolvedTs(tableID) - log.Warn("Barrier doesn't return in time, may be stuck", + log.Warn("RemoveTable doesn't return in time, may be stuck", zap.Int64("tableID", tableID), zap.Bool("hasResolvedTs", ok), zap.Any("resolvedTs", maxResolved.Ts), - zap.Uint64("checkpointTs", s.getTableCheckpointTs(tableID))) + zap.Uint64("checkpointTs", s.getTableCheckpointTs(tableID).Ts)) default: if err := s.error.Load(); err != nil { return err @@ -573,15 +573,15 @@ func (s *mysqlSink) RemoveTable(ctx context.Context, tableID model.TableID) erro log.Info("No table resolvedTs is found", zap.Int64("tableID", tableID)) return nil } - tableCkpt := s.getTableCheckpointTs(tableID) - if tableCkpt >= maxResolved.Ts { + checkpoint := s.getTableCheckpointTs(tableID) + if checkpoint.EqualOrGreater(maxResolved) { return nil } - checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolved) + checkpoint, err := s.FlushRowChangedEvents(ctx, tableID, maxResolved) if err != nil { return err } - if checkpointTs >= maxResolved.Ts { + if checkpoint.Ts >= maxResolved.Ts { return nil } // short sleep to avoid cpu spin @@ -590,12 +590,12 @@ func (s *mysqlSink) RemoveTable(ctx context.Context, tableID model.TableID) erro } } -func (s *mysqlSink) getTableCheckpointTs(tableID model.TableID) uint64 { +func (s *mysqlSink) getTableCheckpointTs(tableID model.TableID) model.ResolvedTs { v, ok := s.tableCheckpointTs.Load(tableID) if ok { - return v.(uint64) + return v.(model.ResolvedTs) } - return uint64(0) + return model.NewResolvedTs(0) } func (s *mysqlSink) getTableResolvedTs(tableID model.TableID) (model.ResolvedTs, bool) { diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index e09718e9fed..199f524270e 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -1251,8 +1251,8 @@ func TestNewMySQLSinkExecDML(t *testing.T) { err = retry.Do(context.Background(), func() error { ts, err := sink.FlushRowChangedEvents(ctx, 1, model.NewResolvedTs(uint64(2))) require.Nil(t, err) - if ts < uint64(2) { - return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 2) + if ts.Ts < uint64(2) { + return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts.Ts, 2) } return nil }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) @@ -1262,8 +1262,8 @@ func TestNewMySQLSinkExecDML(t *testing.T) { err = retry.Do(context.Background(), func() error { ts, err := sink.FlushRowChangedEvents(ctx, 2, model.NewResolvedTs(uint64(4))) require.Nil(t, err) - if ts < uint64(4) { - return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 4) + if ts.Ts < uint64(4) { + return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts.Ts, 4) } return nil }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) @@ -1790,7 +1790,7 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { require.Nil(t, err) checkpoint, err := sink.FlushRowChangedEvents(ctx, model.TableID(1), model.NewResolvedTs(1)) require.Nil(t, err) - require.True(t, checkpoint <= 1) + require.True(t, checkpoint.Ts <= 1) rows := []*model.RowChangedEvent{ { Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, @@ -1808,9 +1808,9 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { err = sink.EmitRowChangedEvents(ctx, rows...) require.Nil(t, err) checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(1), model.NewResolvedTs(6)) - require.True(t, checkpoint <= 6) + require.True(t, checkpoint.Ts <= 6) require.Nil(t, err) - require.True(t, sink.getTableCheckpointTs(model.TableID(1)) <= 6) + require.True(t, sink.getTableCheckpointTs(model.TableID(1)).Ts <= 6) rows = []*model.RowChangedEvent{ { Table: &model.TableName{Schema: "s1", Table: "t2", TableID: 2}, @@ -1828,9 +1828,9 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { err = sink.EmitRowChangedEvents(ctx, rows...) require.Nil(t, err) checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), model.NewResolvedTs(5)) - require.True(t, checkpoint <= 5) + require.True(t, checkpoint.Ts <= 5) require.Nil(t, err) - require.True(t, sink.getTableCheckpointTs(model.TableID(2)) <= 5) + require.True(t, sink.getTableCheckpointTs(model.TableID(2)).Ts <= 5) _ = sink.Close(ctx) _, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), model.NewResolvedTs(6)) require.Nil(t, err) @@ -1906,7 +1906,7 @@ func TestCleanTableResource(t *testing.T) { require.Nil(t, s.EmitRowChangedEvents(ctx, &model.RowChangedEvent{ Table: &model.TableName{TableID: tblID, Schema: "test", Table: "t1"}, })) - s.tableCheckpointTs.Store(tblID, uint64(1)) + s.tableCheckpointTs.Store(tblID, model.NewResolvedTs(uint64(1))) s.tableMaxResolvedTs.Store(tblID, model.NewResolvedTs(uint64(2))) _, ok := s.txnCache.unresolvedTxns[tblID] require.True(t, ok) @@ -2094,7 +2094,7 @@ func TestMySQLSinkExecDMLError(t *testing.T) { if err != nil { break } - require.Less(t, ts, uint64(2)) + require.Less(t, ts.ResolvedMark(), uint64(2)) i++ } diff --git a/cdc/sink/mysql/simple_mysql_tester.go b/cdc/sink/mysql/simple_mysql_tester.go index ed68b1f12b5..9abad52a26e 100644 --- a/cdc/sink/mysql/simple_mysql_tester.go +++ b/cdc/sink/mysql/simple_mysql_tester.go @@ -179,7 +179,7 @@ func (s *simpleMySQLSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) // TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` func (s *simpleMySQLSink) FlushRowChangedEvents( ctx context.Context, _ model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { s.rowsBufferLock.Lock() defer s.rowsBufferLock.Unlock() newBuffer := make([]*model.RowChangedEvent, 0, len(s.rowsBuffer)) @@ -187,14 +187,14 @@ func (s *simpleMySQLSink) FlushRowChangedEvents( if row.CommitTs <= resolved.Ts { err := s.executeRowChangedEvents(ctx, row) if err != nil { - return 0, err + return model.NewResolvedTs(0), err } } else { newBuffer = append(newBuffer, row) } } s.rowsBuffer = newBuffer - return resolved.Ts, nil + return resolved, nil } // EmitCheckpointTs sends CheckpointTs to Sink diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 440e8bd96c0..f7cb0dd9470 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -112,7 +112,7 @@ func (c *unresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha // The returned map contains many txns grouped by tableID. for each table, the each commitTs of txn in txns slice is strictly increasing func (c *unresolvedTxnCache) Resolved( resolvedTsMap *sync.Map, -) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) { +) (map[model.TableID]model.ResolvedTs, map[model.TableID][]*model.SingleTableTxn) { c.unresolvedTxnsMu.Lock() defer c.unresolvedTxnsMu.Unlock() @@ -121,7 +121,7 @@ func (c *unresolvedTxnCache) Resolved( func splitResolvedTxn( resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs, -) (checkpointTsMap map[model.TableID]uint64, +) (checkpointTsMap map[model.TableID]model.ResolvedTs, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn, ) { var ( @@ -131,21 +131,21 @@ func splitResolvedTxn( resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs ) - checkpointTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) + checkpointTsMap = make(map[model.TableID]model.ResolvedTs, len(unresolvedTxns)) resolvedTsMap.Range(func(k, v any) bool { tableID := k.(model.TableID) resolved := v.(model.ResolvedTs) - checkpointTsMap[tableID] = resolved.Ts + checkpointTsMap[tableID] = resolved return true }) resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns)) - for tableID, resolvedTs := range checkpointTsMap { + for tableID, resolved := range checkpointTsMap { if txns, ok = unresolvedTxns[tableID]; !ok { continue } i := sort.Search(len(txns), func(i int) bool { - return txns[i].commitTs > resolvedTs + return txns[i].commitTs > resolved.Ts }) if i != 0 { if i == len(txns) { diff --git a/cdc/sink/mysql/txn_cache_test.go b/cdc/sink/mysql/txn_cache_test.go index d55bd25cf6c..f56d2a63785 100644 --- a/cdc/sink/mysql/txn_cache_test.go +++ b/cdc/sink/mysql/txn_cache_test.go @@ -266,7 +266,9 @@ func TestSplitResolvedTxn(test *testing.T) { for _, t := range tc { cache.Append(nil, t.input...) resolvedTsMap := sync.Map{} + expectedCheckpointTsMap := make(map[model.TableID]model.ResolvedTs) for tableID, ts := range t.resolvedTsMap { + expectedCheckpointTsMap[tableID] = model.NewResolvedTs(ts) resolvedTsMap.Store(tableID, model.NewResolvedTs(ts)) } checkpointTsMap, resolvedTxn := cache.Resolved(&resolvedTsMap) @@ -280,7 +282,7 @@ func TestSplitResolvedTxn(test *testing.T) { resolvedTxn[tableID] = txns } require.Equal(test, t.expected, resolvedTxn, cmp.Diff(resolvedTxn, t.expected)) - require.Equal(test, t.resolvedTsMap, checkpointTsMap) + require.Equal(test, expectedCheckpointTsMap, checkpointTsMap) } } } diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 7477e3fb6dd..3bc353fc2b0 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -62,7 +62,7 @@ type Sink interface { // FlushRowChangedEvents is thread-safe. FlushRowChangedEvents( ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, - ) (uint64, error) + ) (model.ResolvedTs, error) // EmitCheckpointTs sends CheckpointTs to Sink. // TiCDC guarantees that all Events **in the cluster** which of commitTs @@ -158,6 +158,9 @@ func New( if err != nil { return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) } + if err := config.ValidateAndAdjust(sinkURI); err != nil { + return nil, err + } if newSink, ok := sinkIniterMap[strings.ToLower(sinkURI.Scheme)]; ok { return newSink(ctx, changefeedID, sinkURI, filter, config, opts, errCh) } diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go index ee3bdc2a548..a3307fc6abf 100644 --- a/cdc/sink/table_sink.go +++ b/cdc/sink/table_sink.go @@ -73,7 +73,7 @@ func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // redo log watermarkTs. func (t *tableSink) FlushRowChangedEvents( ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { resolvedTs := resolved.Ts if tableID != t.tableID { log.Panic("inconsistent table sink", @@ -90,42 +90,35 @@ func (t *tableSink) FlushRowChangedEvents( err := t.backendSink.EmitRowChangedEvents(ctx, resolvedRows...) if err != nil { - return 0, errors.Trace(err) + return model.NewResolvedTs(0), errors.Trace(err) } return t.flushResolvedTs(ctx, resolved) } func (t *tableSink) flushResolvedTs( ctx context.Context, resolved model.ResolvedTs, -) (uint64, error) { - redoTs, err := t.flushRedoLogs(ctx, resolved.Ts) - if err != nil { - return 0, errors.Trace(err) - } - if redoTs < resolved.Ts { - resolved.Ts = redoTs +) (model.ResolvedTs, error) { + if t.redoManager.Enabled() { + if resolved.IsBatchMode() { + return model.NewResolvedTs(0), nil + } + err := t.redoManager.FlushLog(ctx, t.tableID, resolved.Ts) + if err != nil { + return model.NewResolvedTs(0), errors.Trace(err) + } + redoTs := t.redoManager.GetMinResolvedTs() + if redoTs < resolved.Ts { + resolved.Ts = redoTs + } } checkpointTs, err := t.backendSink.FlushRowChangedEvents(ctx, t.tableID, resolved) if err != nil { - return 0, errors.Trace(err) + return model.NewResolvedTs(0), errors.Trace(err) } return checkpointTs, nil } -// flushRedoLogs flush redo logs and returns redo log resolved ts which means -// all events before the ts have been persisted to redo log storage. -func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) { - if t.redoManager.Enabled() { - err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs) - if err != nil { - return 0, err - } - return t.redoManager.GetMinResolvedTs(), nil - } - return resolvedTs, nil -} - func (t *tableSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error { // the table sink doesn't receive the checkpoint event return nil diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index e33f54e5906..9baaa681592 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -784,18 +784,18 @@ func syncFlushRowChangedEvents(ctx context.Context, sink *partitionSink, resolve } // tables are flushed var ( - err error - checkpointTs uint64 + err error + checkpoint model.ResolvedTs ) flushedResolvedTs := true sink.tablesMap.Range(func(key, value interface{}) bool { tableID := key.(int64) - checkpointTs, err = sink.FlushRowChangedEvents(ctx, + checkpoint, err = sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(resolvedTs)) if err != nil { return false } - if checkpointTs < resolvedTs { + if checkpoint.Ts < resolvedTs { flushedResolvedTs = false } return true diff --git a/dm/tests/_utils/wait_process_exit b/dm/tests/_utils/wait_process_exit index 967c1c78466..dd92d0905df 100755 --- a/dm/tests/_utils/wait_process_exit +++ b/dm/tests/_utils/wait_process_exit @@ -16,9 +16,5 @@ while [ $WAIT_COUNT -lt 120 ]; do ((WAIT_COUNT++)) done -<<<<<<< HEAD echo "process $process didn't exit after 120 seconds, current processlist: $(pgrep $process)" -======= -echo "process $process didn't exit after 120 seconds, current processlist: $(ps aux | grep $process | grep -v 'grep')" ->>>>>>> 65e67fc4b (test(dm): fix unstable tests (#5865)) exit 1 diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index ec636b08088..ea89580a87a 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -761,6 +761,9 @@ var doc = `{ }, "schema-registry": { "type": "string" + }, + "transaction-atomicity": { + "type": "string" } } }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 687cb281589..e46768bed87 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -742,6 +742,9 @@ }, "schema-registry": { "type": "string" + }, + "transaction-atomicity": { + "type": "string" } } }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 0df46b926ba..a611b3df76f 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -41,6 +41,8 @@ definitions: type: string schema-registry: type: string + transaction-atomicity: + type: string type: object model.Capture: properties: diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index cefaa40b7d3..cf80b2bc01a 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -287,11 +287,15 @@ func (o *createChangefeedOptions) validate(ctx context.Context, cmd *cobra.Comma return errors.New("Creating changefeed without a sink-uri") } - err := o.cfg.Validate() + paredSinkURI, err := url.Parse(o.commonChangefeedOptions.sinkURI) if err != nil { return err } + if err = o.cfg.ValidateAndAdjust(paredSinkURI); err != nil { + return err + } + if err := o.validateStartTs(ctx); err != nil { return err } diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 61ae730d1cb..e4df5c725d8 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -189,7 +189,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { require.Equal(t, &config.MounterConfig{ WorkerNum: 16, }, cfg.Mounter) - err = cfg.Validate() + err = cfg.ValidateAndAdjust(nil) require.Nil(t, err) require.Equal(t, &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index e3e6e838e7c..9dc3aed4713 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -168,7 +168,8 @@ const ( ] } ], - "schema-registry": "" + "schema-registry": "", + "transaction-atomicity": "" }, "cyclic-replication": { "enable": false, diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 2952f216b8a..f15434156c8 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -123,10 +123,10 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { } } -// Validate verifies that each parameter is valid. -func (c *ReplicaConfig) Validate() error { +// ValidateAndAdjust verifies and adjusts the replica configuration. +func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { if c.Sink != nil { - err := c.Sink.validate(c.EnableOldValue) + err := c.Sink.validateAndAdjust(sinkURI, c.EnableOldValue) if err != nil { return err } @@ -134,15 +134,6 @@ func (c *ReplicaConfig) Validate() error { return nil } -// ApplyProtocol sinkURI to fill the `ReplicaConfig` -func (c *ReplicaConfig) ApplyProtocol(sinkURI *url.URL) *ReplicaConfig { - params := sinkURI.Query() - if s := params.Get(ProtocolKey); s != "" { - c.Sink.Protocol = s - } - return c -} - // GetDefaultReplicaConfig returns the default replica config. func GetDefaultReplicaConfig() *ReplicaConfig { return defaultReplicaConfig.Clone() diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 84f08bb1714..4bd98d7adfe 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -81,27 +81,28 @@ func TestReplicaConfigOutDated(t *testing.T) { {Matcher: []string{"a.c"}, DispatcherRule: "r2"}, {Matcher: []string{"a.d"}, DispatcherRule: "r2"}, } + conf.Sink.TxnAtomicity = unknowTxnAtomicity require.Equal(t, conf, conf2) } func TestReplicaConfigValidate(t *testing.T) { t.Parallel() conf := GetDefaultReplicaConfig() - require.Nil(t, conf.Validate()) + require.Nil(t, conf.ValidateAndAdjust(nil)) // Incorrect sink configuration. conf = GetDefaultReplicaConfig() conf.Sink.Protocol = "canal" conf.EnableOldValue = false require.Regexp(t, ".*canal protocol requires old value to be enabled.*", - conf.Validate()) + conf.ValidateAndAdjust(nil)) conf = GetDefaultReplicaConfig() conf.Sink.DispatchRules = []*DispatchRule{ {Matcher: []string{"a.b"}, DispatcherRule: "d1", PartitionRule: "r1"}, } require.Regexp(t, ".*dispatcher and partition cannot be configured both.*", - conf.Validate()) + conf.ValidateAndAdjust(nil)) // Correct sink configuration. conf = GetDefaultReplicaConfig() @@ -110,7 +111,7 @@ func TestReplicaConfigValidate(t *testing.T) { {Matcher: []string{"a.c"}, PartitionRule: "p1"}, {Matcher: []string{"a.d"}}, } - err := conf.Validate() + err := conf.ValidateAndAdjust(nil) require.Nil(t, err) rules := conf.Sink.DispatchRules require.Equal(t, "d1", rules[0].PartitionRule) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index b05c9be39e7..30964527e30 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -15,6 +15,7 @@ package config import ( "fmt" + "net/url" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -22,9 +23,36 @@ import ( "go.uber.org/zap" ) -// DefaultMaxMessageBytes sets the default value for max-message-bytes +// DefaultMaxMessageBytes sets the default value for max-message-bytes. const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M +// AtomicityLevel represents the atomicity level of a changefeed. +type AtomicityLevel string + +const ( + // unknowTxnAtomicity is the default atomicity level, which is invalid and will + // be set to a valid value when initializing sink in processor. + unknowTxnAtomicity AtomicityLevel = "" + + // noneTxnAtomicity means atomicity of transactions is not guaranteed + noneTxnAtomicity AtomicityLevel = "none" + + // tableTxnAtomicity means atomicity of single table transactions is guaranteed. + tableTxnAtomicity AtomicityLevel = "table" + + // globalTxnAtomicity means atomicity of cross table transactions is guaranteed, which + // is currently not supported by TiCDC. + // globalTxnAtomicity AtomicityLevel = "global" + + defaultMqTxnAtomicity AtomicityLevel = noneTxnAtomicity + defaultMysqlTxnAtomicity AtomicityLevel = tableTxnAtomicity +) + +// ShouldSplitTxn returns whether the sink should split txn. +func (l AtomicityLevel) ShouldSplitTxn() bool { + return l == noneTxnAtomicity +} + // ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. var ForceEnableOldValueProtocols = []string{ ProtocolCanal.String(), @@ -38,9 +66,10 @@ type SinkConfig struct { Protocol string `toml:"protocol" json:"protocol"` ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors"` SchemaRegistry string `toml:"schema-registry" json:"schema-registry"` + TxnAtomicity AtomicityLevel `toml:"transaction-atomicity" json:"transaction-atomicity"` } -// DispatchRule represents partition rule for a table +// DispatchRule represents partition rule for a table. type DispatchRule struct { Matcher []string `toml:"matcher" json:"matcher"` // Deprecated, please use PartitionRule. @@ -57,7 +86,11 @@ type ColumnSelector struct { Columns []string `toml:"columns" json:"columns"` } -func (s *SinkConfig) validate(enableOldValue bool) error { +func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) error { + if err := s.applyParameter(sinkURI); err != nil { + return err + } + if !enableOldValue { for _, protocolStr := range ForceEnableOldValueProtocols { if protocolStr == s.Protocol { @@ -86,3 +119,62 @@ func (s *SinkConfig) validate(enableOldValue bool) error { return nil } + +// applyParameter fill the `ReplicaConfig` and `TxnAtomicity` by sinkURI. +func (s *SinkConfig) applyParameter(sinkURI *url.URL) error { + if sinkURI == nil { + return nil + } + params := sinkURI.Query() + + txnAtomicity := params.Get("transaction-atomicity") + switch AtomicityLevel(txnAtomicity) { + case unknowTxnAtomicity: + // Set default value according to scheme. + if isMqScheme(sinkURI.Scheme) { + s.TxnAtomicity = defaultMqTxnAtomicity + } else { + s.TxnAtomicity = defaultMysqlTxnAtomicity + } + case noneTxnAtomicity: + s.TxnAtomicity = noneTxnAtomicity + case tableTxnAtomicity: + // MqSink only support `noneTxnAtomicity`. + if isMqScheme(sinkURI.Scheme) { + log.Warn("The configuration of transaction-atomicity is incompatible with scheme", + zap.Any("txnAtomicity", s.TxnAtomicity), + zap.String("scheme", sinkURI.Scheme), + zap.String("protocol", s.Protocol)) + s.TxnAtomicity = defaultMqTxnAtomicity + } else { + s.TxnAtomicity = tableTxnAtomicity + } + default: + errMsg := fmt.Sprintf("%s level atomicity is not supported by %s scheme", + txnAtomicity, sinkURI.Scheme) + return cerror.ErrSinkURIInvalid.GenWithStackByArgs(errMsg) + } + + s.Protocol = params.Get(ProtocolKey) + // validate that protocol is compatible with the scheme + if isMqScheme(sinkURI.Scheme) { + var protocol Protocol + err := protocol.FromString(s.Protocol) + if err != nil { + return err + } + } else if s.Protocol != "" { + return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol cannot "+ + "be configured when using %s scheme", sinkURI.Scheme)) + } + + log.Info("succeed to parse parameter from sink uri", + zap.String("protocol", s.Protocol), + zap.String("txnAtomicity", string(s.TxnAtomicity))) + return nil +} + +func isMqScheme(scheme string) bool { + return scheme == "kafka" || scheme == "kafka+ssl" || + scheme == "pulsar" || scheme == "pulsar+ssl" +} diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index c34fcd832bf..94378a25095 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -14,12 +14,13 @@ package config import ( + "net/url" "testing" "github.com/stretchr/testify/require" ) -func TestValidate(t *testing.T) { +func TestValidateOldValue(t *testing.T) { t.Parallel() testCases := []struct { protocol string @@ -73,9 +74,80 @@ func TestValidate(t *testing.T) { Protocol: tc.protocol, } if tc.expectedErr == "" { - require.Nil(t, cfg.validate(tc.enableOldValue)) + require.Nil(t, cfg.validateAndAdjust(nil, tc.enableOldValue)) } else { - require.Regexp(t, tc.expectedErr, cfg.validate(tc.enableOldValue)) + require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(nil, tc.enableOldValue)) + } + } +} + +func TestValidateApplyParameter(t *testing.T) { + t.Parallel() + testCases := []struct { + sinkURI string + expectedErr string + expectedLevel AtomicityLevel + }{ + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306", + expectedErr: "", + expectedLevel: tableTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=table", + expectedErr: "", + expectedLevel: tableTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=global", + expectedErr: "global level atomicity is not supported by.*", + }, + { + sinkURI: "tidb://normal:123456@127.0.0.1:3306?protocol=canal", + expectedErr: ".*protocol cannot be configured when using tidb scheme.*", + }, + { + sinkURI: "blackhole://normal:123456@127.0.0.1:3306?transaction-atomicity=none", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=none" + + "&protocol=open-protocol", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "pulsar://127.0.0.1:9092?transaction-atomicity=table" + + "&protocol=open-protocol", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?protocol=default", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=table", + expectedErr: ".*unknown .* protocol for Message Queue sink.*", + }, + } + + for _, tc := range testCases { + cfg := SinkConfig{} + parsedSinkURI, err := url.Parse(tc.sinkURI) + require.Nil(t, err) + if tc.expectedErr == "" { + require.Nil(t, cfg.validateAndAdjust(parsedSinkURI, true)) + require.Equal(t, tc.expectedLevel, cfg.TxnAtomicity) + } else { + require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI, true)) } } } diff --git a/tests/integration_tests/big_txn/conf/diff_config.toml b/tests/integration_tests/big_txn/conf/diff_config.toml new file mode 100644 index 00000000000..367c21817c8 --- /dev/null +++ b/tests/integration_tests/big_txn/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/big_txn/sync_diff/output" + + source-instances = ["tidb"] + + target-instance = "mysql" + + target-check-tables = ["big_txn.*"] + +[data-sources] +[data-sources.tidb] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.mysql] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/big_txn/conf/workload b/tests/integration_tests/big_txn/conf/workload new file mode 100644 index 00000000000..d15d4a81bfd --- /dev/null +++ b/tests/integration_tests/big_txn/conf/workload @@ -0,0 +1,14 @@ +threadcount=1 +recordcount=5000 +operationcount=0 +workload=core +fieldcount=100 + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/integration_tests/big_txn/run.sh b/tests/integration_tests/big_txn/run.sh new file mode 100755 index 00000000000..ca969dcb0d7 --- /dev/null +++ b/tests/integration_tests/big_txn/run.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="ticdc-big-txn-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + + run_sql "CREATE DATABASE big_txn;" + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=big_txn + + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + fi + + check_table_exists "big_txn.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + run_sql "CREATE TABLE big_txn.USERTABLE1 LIKE big_txn.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "INSERT INTO big_txn.USERTABLE1 SELECT * FROM big_txn.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 60 + check_table_exists "big_txn.USERTABLE1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_sql "CREATE TABLE big_txn.finish_mark_1 (a int primary key);" + sleep 120 + check_table_exists "big_txn.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"