Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038) #6140

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cdc/api/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
// test no change error
changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"}
oldInfo.SinkURI = "blackhole://"
oldInfo.Config.Sink.TxnAtomicity = "table"
newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
require.NotNil(t, err)
require.Regexp(t, ".*changefeed config is the same with the old one.*", err)
Expand Down
66 changes: 49 additions & 17 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@

package model

import (
"math"

"github.com/pingcap/log"
"go.uber.org/zap"
)

// PolymorphicEvent describes an event can be in multiple states.
type PolymorphicEvent struct {
StartTs uint64
// Commit or resolved TS
CRTs uint64
// Identify whether the resolved event is in batch mode.
Mode ResolvedMode
StartTs uint64
CRTs uint64
Resolved *ResolvedTs

RawKV *RawKVEntry
Row *RowChangedEvent
Expand Down Expand Up @@ -66,11 +71,6 @@ func (e *PolymorphicEvent) IsResolved() bool {
return e.RawKV.OpType == OpTypeResolved
}

// IsBatchResolved returns true if the event is batch resolved event.
func (e *PolymorphicEvent) IsBatchResolved() bool {
return e.IsResolved() && e.Mode == BatchResolvedMode
}

// ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order.
// It returns true if and only if i should precede j.
func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool {
Expand Down Expand Up @@ -108,16 +108,48 @@ const (

// ResolvedTs is the resolved timestamp of sink module.
type ResolvedTs struct {
Ts uint64
Mode ResolvedMode
Mode ResolvedMode
Ts uint64
BatchID uint64
}

// NewResolvedTs creates a new ResolvedTs.
// NewResolvedTs creates a normal ResolvedTs.
func NewResolvedTs(t uint64) ResolvedTs {
return ResolvedTs{Ts: t, Mode: NormalResolvedMode}
return ResolvedTs{Ts: t, Mode: NormalResolvedMode, BatchID: math.MaxUint64}
}

// IsBatchMode returns true if the resolved ts is BatchResolvedMode.
func (r ResolvedTs) IsBatchMode() bool {
return r.Mode == BatchResolvedMode
}

// ResolvedMark returns a timestamp `ts` based on the r.mode, which marks that all events
// whose commitTs is less than or equal to `ts` are sent to Sink.
func (r ResolvedTs) ResolvedMark() uint64 {
switch r.Mode {
case NormalResolvedMode:
// with NormalResolvedMode, cdc guarantees all events whose commitTs is
// less than or equal to `resolved.Ts` are sent to Sink.
return r.Ts
case BatchResolvedMode:
// with BatchResolvedMode, cdc guarantees all events whose commitTs is
// less than `resolved.Ts` are sent to Sink.
return r.Ts - 1
default:
log.Error("unknown resolved mode", zap.Any("resolved", r))
return 0
}
}

// EqualOrGreater judge whether the resolved ts is equal or greater than the given ts.
func (r ResolvedTs) EqualOrGreater(r1 ResolvedTs) bool {
if r.Ts == r1.Ts {
return r.BatchID >= r1.BatchID
}
return r.Ts > r1.Ts
}

// NewResolvedTsWithMode creates a ResolvedTs with a given batch type.
func NewResolvedTsWithMode(t uint64, m ResolvedMode) ResolvedTs {
return ResolvedTs{Ts: t, Mode: m}
// Less judge whether the resolved ts is less than the given ts.
func (r ResolvedTs) Less(r1 ResolvedTs) bool {
return !r.EqualOrGreater(r1)
}
39 changes: 39 additions & 0 deletions cdc/model/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package model

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -44,3 +45,41 @@ func TestPolymorphicEvent(t *testing.T) {
require.Equal(t, resolved.CRTs, polyEvent.CRTs)
require.Equal(t, uint64(0), polyEvent.StartTs)
}

func TestResolvedTs(t *testing.T) {
t.Parallel()

invalidResolvedTs := ResolvedTs{Mode: -1, Ts: 1}
require.Equal(t, uint64(0), invalidResolvedTs.ResolvedMark())

ts := rand.Uint64()%10 + 1
batchID := rand.Uint64()%10 + 1
normalResolvedTs := NewResolvedTs(ts)
batchResolvedTs1 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID}
require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs1))
require.False(t, batchResolvedTs1.EqualOrGreater(normalResolvedTs))
require.False(t, normalResolvedTs.Less(batchResolvedTs1))
require.True(t, batchResolvedTs1.Less(normalResolvedTs))

batchResolvedTs2 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID + 1}
require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs2))
require.True(t, batchResolvedTs2.EqualOrGreater(batchResolvedTs1))
require.True(t, batchResolvedTs2.Less(normalResolvedTs))
require.True(t, batchResolvedTs1.Less(batchResolvedTs2))

largerTs := ts + rand.Uint64()%10 + 1
largerResolvedTs := NewResolvedTs(largerTs)
require.True(t, largerResolvedTs.EqualOrGreater(normalResolvedTs))
largerBatchResolvedTs := ResolvedTs{
Mode: BatchResolvedMode,
Ts: largerTs,
BatchID: batchID,
}
require.True(t, largerBatchResolvedTs.EqualOrGreater(normalResolvedTs),
"largerBatchResolvedTs:%+v\nnormalResolvedTs:%+v", largerBatchResolvedTs, normalResolvedTs)

smallerResolvedTs := NewResolvedTs(0)
require.True(t, normalResolvedTs.EqualOrGreater(smallerResolvedTs))
smallerBatchResolvedTs := ResolvedTs{Mode: BatchResolvedMode, Ts: 0, BatchID: batchID}
require.True(t, batchResolvedTs1.EqualOrGreater(smallerBatchResolvedTs))
}
98 changes: 74 additions & 24 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -73,35 +74,52 @@ type sinkNode struct {

// atomic oprations for model.ResolvedTs
resolvedTs atomic.Value
checkpointTs model.Ts
checkpointTs atomic.Value
targetTs model.Ts
barrierTs model.Ts

flowController tableFlowController

replicaConfig *config.ReplicaConfig
isTableActorMode bool
splitTxn bool
}

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(
tableID model.TableID,
sink sink.Sink,
startTs model.Ts,
targetTs model.Ts,
flowController tableFlowController,
splitTxn bool,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
checkpointTs: startTs,
barrierTs: startTs,
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,

flowController: flowController,
splitTxn: splitTxn,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
return sn
}

func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }
func (n *sinkNode) ResolvedTs() model.Ts { return n.getResolvedTs().ResolvedMark() }
func (n *sinkNode) CheckpointTs() model.Ts { return n.getCheckpointTs().ResolvedMark() }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }

func (n *sinkNode) getResolvedTs() model.ResolvedTs {
return n.resolvedTs.Load().(model.ResolvedTs)
}

func (n *sinkNode) getCheckpointTs() model.ResolvedTs {
return n.checkpointTs.Load().(model.ResolvedTs)
}

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
n.replicaConfig = ctx.ChangefeedVars().Info.Config
Expand Down Expand Up @@ -137,39 +155,39 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
n.status.Store(TableStatusStopped)
return
}
if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs {
if n.CheckpointTs() >= n.targetTs {
err = n.stop(ctx)
}
}()
currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs)
currentCheckpointTs := n.getCheckpointTs()
if resolved.Ts > currentBarrierTs {
resolved.Ts = currentBarrierTs
resolved = model.NewResolvedTs(currentBarrierTs)
}
if resolved.Ts > n.targetTs {
resolved.Ts = n.targetTs
resolved = model.NewResolvedTs(n.targetTs)
}
if resolved.Ts <= currentCheckpointTs {
if currentCheckpointTs.EqualOrGreater(resolved) {
return nil
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
if err != nil {
return errors.Trace(err)
}

// we must call flowController.Release immediately after we call
// FlushRowChangedEvents to prevent deadlock cause by checkpointTs
// fall back
n.flowController.Release(checkpointTs)
n.flowController.Release(checkpoint)

// the checkpointTs may fall back in some situation such as:
// 1. This table is newly added to the processor
// 2. There is one table in the processor that has a smaller
// checkpointTs than this one
if checkpointTs <= currentCheckpointTs {
if currentCheckpointTs.EqualOrGreater(checkpoint) {
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)
n.checkpointTs.Store(checkpoint)

return nil
}
Expand Down Expand Up @@ -293,6 +311,10 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if err := n.verifySplitTxn(event); err != nil {
return false, errors.Trace(err)
}

if event.IsResolved() {
if n.status.Load() == TableStatusInitializing {
n.status.Store(TableStatusRunning)
Expand All @@ -301,7 +323,13 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})

resolved := model.NewResolvedTsWithMode(event.CRTs, event.Mode)
var resolved model.ResolvedTs
if event.Resolved != nil {
resolved = *(event.Resolved)
} else {
resolved = model.NewResolvedTs(event.CRTs)
}

if err := n.flushSink(ctx, resolved); err != nil {
return false, errors.Trace(err)
}
Expand All @@ -312,7 +340,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
return false, errors.Trace(err)
}
case pmessage.MessageTypeTick:
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
if err := n.flushSink(ctx, n.getResolvedTs()); err != nil {
return false, errors.Trace(err)
}
case pmessage.MessageTypeCommand:
Expand All @@ -331,7 +359,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo

func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
atomic.StoreUint64(&n.barrierTs, ts)
if err := n.flushSink(ctx, n.ResolvedTs()); err != nil {
if err := n.flushSink(ctx, n.getResolvedTs()); err != nil {
return errors.Trace(err)
}
return nil
Expand All @@ -346,3 +374,25 @@ func (n *sinkNode) releaseResource(ctx context.Context) error {
n.flowController.Abort()
return n.sink.Close(ctx)
}

// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent
// with `SplitTxn==true`.
func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error {
if n.splitTxn {
return nil
}

// Fail-fast check, this situation should never happen normally when split transactions
// are not supported.
if e.Resolved != nil && e.Resolved.IsBatchMode() {
msg := fmt.Sprintf("batch mode resolved ts is not supported "+
"when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}

if e.Row != nil && e.Row.SplitTxn {
msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}
return nil
}
Loading