Skip to content

Commit

Permalink
*(ticdc): split old update kv entry after restarting changefeed (#10919
Browse files Browse the repository at this point in the history
…) (#11030)

close #10918
  • Loading branch information
ti-chi-bot authored May 21, 2024
1 parent 288bb64 commit a0bd689
Show file tree
Hide file tree
Showing 28 changed files with 1,062 additions and 108 deletions.
5 changes: 5 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type RawKVEntry struct {
RegionID uint64 `msg:"region_id"`
}

// IsUpdate checks if the event is an update event.
func (v *RawKVEntry) IsUpdate() bool {
return v.OpType == OpTypePut && v.OldValue != nil && v.Value != nil
}

func (v *RawKVEntry) String() string {
// TODO: redact values.
return fmt.Sprintf(
Expand Down
23 changes: 11 additions & 12 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,18 +800,17 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, there is no need to split a single unique key changed update event, this
// is also to keep the backward compatibility, the same behavior as before.
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) {
return false
}
return true
return !sink.IsMySQLCompatibleScheme(sinkScheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
Expand Down Expand Up @@ -841,8 +840,8 @@ func trySplitAndSortUpdateEvent(

// This indicates that it is an update event. if the pk or uk is updated,
// we need to split it into two events (delete and insert).
if e.IsUpdate() && shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if e.IsUpdate() && ShouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := SplitUpdateEvent(e)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -859,10 +858,10 @@ func trySplitAndSortUpdateEvent(
return rowChangedEvents, nil
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
Expand All @@ -884,8 +883,8 @@ func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(
// SplitUpdateEvent splits an update event into a delete and an insert event.
func SplitUpdateEvent(
updateEvent *RowChangedEvent,
) (*RowChangedEvent, *RowChangedEvent, error) {
if updateEvent == nil {
Expand Down
9 changes: 8 additions & 1 deletion cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func TestTrySplitAndSortUpdateEventEmpty(t *testing.T) {
require.Equal(t, 0, len(result))
}

func TestTrySplitAndSortUpdateEvent(t *testing.T) {
func TestTxnTrySplitAndSortUpdateEvent(t *testing.T) {
t.Parallel()

// Update handle key.
Expand Down Expand Up @@ -608,4 +608,11 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}
21 changes: 20 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"net/url"
"strconv"
"sync"
"time"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -470,6 +472,18 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `info` parameter is sent by metadata store, the `info` must be the latest value snapshot.
// the `status` parameter is sent by metadata store, the `status` must be the latest value snapshot.
Expand Down Expand Up @@ -619,9 +633,14 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
return errors.Trace(err)
}

pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode))
sortEngine, util.GetOrZero(p.latestInfo.Config.BDRMode),
pullerSafeModeAtStart)
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)
Expand Down
5 changes: 5 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))

// For duplicate entry error, we fast fail to restart changefeed.
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down
83 changes: 73 additions & 10 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/kv"
Expand All @@ -28,10 +29,14 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/pkg/config"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -43,6 +48,8 @@ type pullerWrapperCreator func(
tableName string,
startTs model.Ts,
bdrMode bool,
shouldSplitKVEntry pullerwrapper.ShouldSplitKVEntry,
splitUpdateKVEntry pullerwrapper.SplitUpdateKVEntry,
) pullerwrapper.Wrapper

type tablePullers struct {
Expand Down Expand Up @@ -71,10 +78,13 @@ type SourceManager struct {
engine engine.SortEngine
// Used to indicate whether the changefeed is in BDR mode.
bdrMode bool
// startTs is the timestamp when SourceManager starts.
startTs model.Ts

// if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers`
// will be used. Otherwise `multiplexingPuller` will be used instead.
multiplexing bool
safeModeAtStart bool
tablePullers tablePullers
multiplexingPuller multiplexingPuller
}
Expand All @@ -86,9 +96,10 @@ func New(
mg entry.MounterGroup,
engine engine.SortEngine,
bdrMode bool,
safeModeAtStart bool,
) *SourceManager {
multiplexing := config.GetGlobalServerConfig().KVClient.EnableMultiplexing
return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, pullerwrapper.NewPullerWrapper)
return newSourceManager(changefeedID, up, mg, engine, bdrMode, multiplexing, safeModeAtStart, pullerwrapper.NewPullerWrapper)
}

// NewForTest creates a new source manager for testing.
Expand All @@ -99,7 +110,24 @@ func NewForTest(
engine engine.SortEngine,
bdrMode bool,
) *SourceManager {
return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, pullerwrapper.NewPullerWrapperForTest)
return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, false, pullerwrapper.NewPullerWrapperForTest)
}

func isOldUpdateKVEntry(raw *model.RawKVEntry, thresholdTs model.Ts) bool {
return raw != nil && raw.IsUpdate() && raw.CRTs < thresholdTs
}

func splitUpdateKVEntry(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) {
if raw == nil {
return nil, nil, errors.New("nil event cannot be split")
}
deleteKVEntry := *raw
deleteKVEntry.Value = nil

insertKVEntry := *raw
insertKVEntry.OldValue = nil

return &deleteKVEntry, &insertKVEntry, nil
}

func newSourceManager(
Expand All @@ -109,16 +137,18 @@ func newSourceManager(
engine engine.SortEngine,
bdrMode bool,
multiplexing bool,
safeModeAtStart bool,
pullerWrapperCreator pullerWrapperCreator,
) *SourceManager {
mgr := &SourceManager{
ready: make(chan struct{}),
changefeedID: changefeedID,
up: up,
mg: mg,
engine: engine,
bdrMode: bdrMode,
multiplexing: multiplexing,
ready: make(chan struct{}),
changefeedID: changefeedID,
up: up,
mg: mg,
engine: engine,
bdrMode: bdrMode,
multiplexing: multiplexing,
safeModeAtStart: safeModeAtStart,
}
if !multiplexing {
mgr.tablePullers.errChan = make(chan error, 16)
Expand All @@ -137,7 +167,10 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo
return
}

p := m.tablePullers.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode)
shouldSplitKVEntry := func(raw *model.RawKVEntry) bool {
return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs)
}
p := m.tablePullers.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode, shouldSplitKVEntry, splitUpdateKVEntry)
p.Start(m.tablePullers.ctx, m.up, m.engine, m.tablePullers.errChan)
m.tablePullers.Store(span, p)
}
Expand Down Expand Up @@ -198,6 +231,11 @@ func (m *SourceManager) GetTableSorterStats(span tablepb.Span) engine.TableStats

// Run implements util.Runnable.
func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error {
startTs, err := getCurrentTs(ctx, m.up.PDClient)
if err != nil {
return err
}
m.startTs = startTs
if m.multiplexing {
serverConfig := config.GetGlobalServerConfig()
grpcPool := sharedconn.NewConnAndClientPool(m.up.SecurityConfig, kv.GetGlobalGrpcMetrics())
Expand All @@ -206,9 +244,14 @@ func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error {
m.up.PDClient, grpcPool, m.up.RegionCache, m.up.PDClock,
txnutil.NewLockerResolver(m.up.KVStorage.(tikv.Storage), m.changefeedID),
)
shouldSplitKVEntry := func(raw *model.RawKVEntry) bool {
return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs)
}
m.multiplexingPuller.puller = pullerwrapper.NewMultiplexingPullerWrapper(
m.changefeedID, client, m.engine,
int(serverConfig.KVClient.FrontierConcurrent),
shouldSplitKVEntry,
splitUpdateKVEntry,
)

close(m.ready)
Expand Down Expand Up @@ -266,3 +309,23 @@ func (m *SourceManager) Close() {
func (m *SourceManager) Add(span tablepb.Span, events ...*model.PolymorphicEvent) {
m.engine.Add(span, events...)
}

func getCurrentTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) {
backoffBaseDelayInMs := int64(100)
totalRetryDuration := 10 * time.Second
var replicateTs model.Ts
err := retry.Do(ctx, func() error {
phy, logic, err := pdClient.GetTS(ctx)
if err != nil {
return errors.Trace(err)
}
replicateTs = oracle.ComposeTS(phy, logic)
return nil
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
retry.WithTotalRetryDuratoin(totalRetryDuration),
retry.WithIsRetryableErr(cerrors.IsRetryableError))
if err != nil {
return model.Ts(0), errors.Trace(err)
}
return replicateTs, nil
}
2 changes: 2 additions & 0 deletions cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func NewPullerWrapperForTest(
tableName string,
startTs model.Ts,
bdrMode bool,
shouldSplitKVEntry ShouldSplitKVEntry,
splitUpdateKVEntry SplitUpdateKVEntry,
) Wrapper {
return &dummyPullerWrapper{}
}
Expand Down
16 changes: 14 additions & 2 deletions cdc/processor/sourcemanager/puller/multiplexing_puller_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func NewMultiplexingPullerWrapper(
client *kv.SharedClient,
eventSortEngine engine.SortEngine,
frontiers int,
shouldSplitKVEntry ShouldSplitKVEntry,
splitUpdateKVEntry SplitUpdateKVEntry,
) *MultiplexingWrapper {
consume := func(ctx context.Context, raw *model.RawKVEntry, spans []tablepb.Span) error {
if len(spans) > 1 {
Expand All @@ -45,8 +47,18 @@ func NewMultiplexingPullerWrapper(
zap.String("changefeed", changefeed.ID))
}
if raw != nil {
pEvent := model.NewPolymorphicEvent(raw)
eventSortEngine.Add(spans[0], pEvent)
if shouldSplitKVEntry(raw) {
deleteKVEntry, insertKVEntry, err := splitUpdateKVEntry(raw)
if err != nil {
return err
}
deleteEvent := model.NewPolymorphicEvent(deleteKVEntry)
insertEvent := model.NewPolymorphicEvent(insertKVEntry)
eventSortEngine.Add(spans[0], deleteEvent, insertEvent)
} else {
pEvent := model.NewPolymorphicEvent(raw)
eventSortEngine.Add(spans[0], pEvent)
}
}
return nil
}
Expand Down
Loading

0 comments on commit a0bd689

Please sign in to comment.