Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller(ticdc): fix wrong update splitting behavior after table scheduling (#11269) #11281

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package model

import (
"errors"
"fmt"

"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand Down Expand Up @@ -110,3 +111,20 @@ func (v *RawKVEntry) String() string {
func (v *RawKVEntry) ApproximateDataSize() int64 {
return int64(len(v.Key) + len(v.Value) + len(v.OldValue))
}

// ShouldSplitKVEntry checks whether the raw kv entry should be splitted.
type ShouldSplitKVEntry func(raw *RawKVEntry) bool

// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry.
func SplitUpdateKVEntry(raw *RawKVEntry) (*RawKVEntry, *RawKVEntry, error) {
if raw == nil {
return nil, nil, errors.New("nil event cannot be split")
}
deleteKVEntry := *raw
deleteKVEntry.Value = nil

insertKVEntry := *raw
insertKVEntry.OldValue = nil

return &deleteKVEntry, &insertKVEntry, nil
}
32 changes: 15 additions & 17 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,13 @@ func (p *processor) AddTableSpan(
zap.Bool("isPrepare", isPrepare))
}

p.sinkManager.r.AddTable(
table := p.sinkManager.r.AddTable(
span, startTs, p.changefeed.Info.TargetTs)
if p.redo.r.Enabled() {
p.redo.r.AddTable(span, startTs)
}
p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs)

p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs, table.GetReplicaTs)
return true, nil
}

Expand Down Expand Up @@ -447,18 +447,6 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts,
Expand Down Expand Up @@ -642,6 +630,16 @@ func (p *processor) createTaskPosition() (skipThisTick bool) {
return true
}

// isMysqlCompatibleBackend returns true if the sinkURIStr is mysql compatible.
func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
if p.initialized {
Expand Down Expand Up @@ -698,20 +696,20 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
return errors.Trace(err)
}

pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.changefeed.Info.SinkURI)
isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, p.changefeed.Info.Config.BDRMode,
pullerSafeModeAtStart)
isMysqlBackend)
p.sourceManager.name = "SourceManager"
p.sourceManager.spawn(stdCtx)

p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.changefeed.Info, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r)
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend)
p.sinkManager.name = "SinkManager"
p.sinkManager.spawn(stdCtx)

Expand Down
15 changes: 12 additions & 3 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type SinkManager struct {
// wg is used to wait for all workers to exit.
wg sync.WaitGroup

// isMysqlBackend indicates whether the backend is MySQL compatible.
isMysqlBackend bool

// Metric for table sink.
metricsTableSinkTotalRows prometheus.Counter

Expand All @@ -142,6 +145,7 @@ func New(
schemaStorage entry.SchemaStorage,
redoDMLMgr redo.DMLManager,
sourceManager *sourcemanager.SourceManager,
isMysqlBackend bool,
) *SinkManager {
m := &SinkManager{
changefeedID: changefeedID,
Expand All @@ -155,7 +159,7 @@ func New(
sinkTaskChan: make(chan *sinkTask),
sinkWorkerAvailable: make(chan struct{}, 1),
sinkRetry: retry.NewInfiniteErrorRetry(),

isMysqlBackend: isMysqlBackend,
metricsTableSinkTotalRows: tablesinkmetrics.TotalRowsCountCounter.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),

Expand Down Expand Up @@ -302,6 +306,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}

if m.isMysqlBackend {
// For MySQL backend, we should restart sink. Let owner to handle the error.
return errors.Trace(err)
}
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down Expand Up @@ -836,7 +845,7 @@ func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map
}

// AddTable adds a table(TableSink) to the sink manager.
func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) {
func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper {
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
span,
Expand Down Expand Up @@ -864,7 +873,6 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span))
return
}
m.sinkMemQuota.AddTable(span)
m.redoMemQuota.AddTable(span)
Expand All @@ -874,6 +882,7 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
zap.Stringer("span", &span),
zap.Uint64("startTs", startTs),
zap.Uint64("version", sinkWrapper.version))
return sinkWrapper
}

// StartTable sets the table(TableSink) state to replicating.
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestAddTable(t *testing.T) {
require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap")
err := manager.StartTable(span, 1)
require.NoError(t, err)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs.Load())

progress := manager.sinkProgressHeap.pop()
require.Equal(t, span, progress.span)
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) {

span := spanz.TableIDToComparableSpan(1)

source.AddTable(span, "test", 100)
source.AddTable(span, "test", 100, func() model.Ts { return 0 })
manager.AddTable(span, 100, math.MaxUint64)
manager.StartTable(span, 100)
source.Add(span, model.NewResolvedPolymorphicEvent(0, 101))
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/manager_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func CreateManagerWithMemEngine(
go func() { handleError(sourceManager.Run(ctx)) }()
sourceManager.WaitForReady(ctx)

sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, nil, sourceManager)
sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, nil, sourceManager, false)
go func() { handleError(sinkManager.Run(ctx)) }()
sinkManager.WaitForReady(ctx)

Expand All @@ -90,6 +90,6 @@ func NewManagerWithMemEngine(
mg := &entry.MockMountGroup{}
schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64}
sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false)
sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, redoMgr, sourceManager)
sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, redoMgr, sourceManager, false)
return sinkManager, sourceManager, sortEngine
}
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
// For all events, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
e.Row.ReplicatingTs = task.tableSink.replicateTs.Load()
x, size = handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
e.Row.ReplicatingTs = task.tableSink.GetReplicaTs()
x, size := handleRowChangedEvents(w.changefeedID, task.span, e)
advancer.appendEvents(x, size)
allEventSize += size
Expand Down
25 changes: 18 additions & 7 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sinkmanager

import (
"context"
"math"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -77,7 +78,7 @@ type tableSinkWrapper struct {
receivedSorterResolvedTs atomic.Uint64

// replicateTs is the ts that the table sink has started to replicate.
replicateTs model.Ts
replicateTs atomic.Uint64
genReplicateTs func(ctx context.Context) (model.Ts, error)

// lastCleanTime indicates the last time the table has been cleaned.
Expand All @@ -90,6 +91,11 @@ type tableSinkWrapper struct {
rangeEventCountsMu sync.Mutex
}

// GetReplicaTs returns the replicate ts of the table sink.
func (t *tableSinkWrapper) GetReplicaTs() model.Ts {
return t.replicateTs.Load()
}

type rangeEventCount struct {
// firstPos and lastPos are used to merge many rangeEventCount into one.
firstPos engine.Position
Expand Down Expand Up @@ -132,31 +138,34 @@ func newTableSinkWrapper(

res.receivedSorterResolvedTs.Store(startTs)
res.barrierTs.Store(startTs)
res.replicateTs.Store(math.MaxUint64)
return res
}

func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err error) {
if t.replicateTs != 0 {
if t.replicateTs.Load() != math.MaxUint64 {
log.Panic("The table sink has already started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("startTs", startTs),
zap.Uint64("oldReplicateTs", t.replicateTs),
zap.Uint64("oldReplicateTs", t.replicateTs.Load()),
)
}

// FIXME(qupeng): it can be re-fetched later instead of fails.
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)

log.Info("Sink is started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("startTs", startTs),
zap.Uint64("replicateTs", t.replicateTs),
zap.Uint64("replicateTs", ts),
)

// This start ts maybe greater than the initial start ts of the table sink.
Expand Down Expand Up @@ -379,14 +388,16 @@ func (t *tableSinkWrapper) checkTableSinkHealth() (err error) {
// committed at downstream but we don't know. So we need to update `replicateTs`
// of the table so that we can re-send those events later.
func (t *tableSinkWrapper) restart(ctx context.Context) (err error) {
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)
log.Info("Sink is restarted",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span),
zap.Uint64("replicateTs", t.replicateTs))
zap.Uint64("replicateTs", ts))
return nil
}

Expand Down
Loading
Loading