From c3759e5ee9970d74f446f6f77a0a647dedc4d70f Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 17 Dec 2021 20:50:36 +0800 Subject: [PATCH] This is an automated cherry-pick of #3899 Signed-off-by: ti-chi-bot --- cdc/sink/buffer_sink.go | 214 +++++++++++++++++++++++++++++++++++ cdc/sink/buffer_sink_test.go | 152 +++++++++++++++++++++++++ cdc/sink/manager.go | 59 +++++++++- cdc/sink/metrics.go | 8 -- cdc/sink/sink.go | 41 ++++++- cdc/sink/table_sink.go | 109 ++++++++++++++++++ 6 files changed, 566 insertions(+), 17 deletions(-) create mode 100644 cdc/sink/buffer_sink.go create mode 100644 cdc/sink/buffer_sink_test.go create mode 100644 cdc/sink/table_sink.go diff --git a/cdc/sink/buffer_sink.go b/cdc/sink/buffer_sink.go new file mode 100644 index 00000000000..2a0e2a09be6 --- /dev/null +++ b/cdc/sink/buffer_sink.go @@ -0,0 +1,214 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +const maxFlushBatchSize = 512 + +// bufferSink buffers emitted events and checkpoints and flush asynchronously. +// Note that it is a thread-safe Sink implementation. +type bufferSink struct { + Sink + changeFeedCheckpointTs uint64 + tableCheckpointTsMap sync.Map + buffer map[model.TableID][]*model.RowChangedEvent + bufferMu sync.Mutex + flushTsChan chan flushMsg + drawbackChan chan drawbackMsg +} + +var _ Sink = (*bufferSink)(nil) + +func newBufferSink( + backendSink Sink, checkpointTs model.Ts, drawbackChan chan drawbackMsg, +) *bufferSink { + sink := &bufferSink{ + Sink: backendSink, + // buffer shares the same flow control with table sink + buffer: make(map[model.TableID][]*model.RowChangedEvent), + changeFeedCheckpointTs: checkpointTs, + flushTsChan: make(chan flushMsg, maxFlushBatchSize), + drawbackChan: drawbackChan, + } + return sink +} + +type runState struct { + batch [maxFlushBatchSize]flushMsg + + metricFlushDuration prometheus.Observer + metricEmitRowDuration prometheus.Observer + metricTotalRows prometheus.Counter +} + +func (b *bufferSink) run(ctx context.Context, errCh chan error) { + changefeedID := util.ChangefeedIDFromCtx(ctx) + advertiseAddr := util.CaptureAddrFromCtx(ctx) + state := runState{ + metricFlushDuration: flushRowChangedDuration.WithLabelValues(advertiseAddr, changefeedID, "Flush"), + metricEmitRowDuration: flushRowChangedDuration.WithLabelValues(advertiseAddr, changefeedID, "EmitRow"), + metricTotalRows: bufferSinkTotalRowsCountCounter.WithLabelValues(advertiseAddr, changefeedID), + } + defer func() { + flushRowChangedDuration.DeleteLabelValues(advertiseAddr, changefeedID, "Flush") + flushRowChangedDuration.DeleteLabelValues(advertiseAddr, changefeedID, "EmitRow") + bufferSinkTotalRowsCountCounter.DeleteLabelValues(advertiseAddr, changefeedID) + }() + + for { + keepRun, err := b.runOnce(ctx, &state) + if err != nil && errors.Cause(err) != context.Canceled { + errCh <- err + return + } + if !keepRun { + return + } + } +} + +func (b *bufferSink) runOnce(ctx context.Context, state *runState) (bool, error) { + batchSize, batch := 0, state.batch + push := func(event flushMsg) { + batch[batchSize] = event + batchSize++ + } + select { + case <-ctx.Done(): + return false, ctx.Err() + case drawback := <-b.drawbackChan: + b.bufferMu.Lock() + delete(b.buffer, drawback.tableID) + b.bufferMu.Unlock() + close(drawback.callback) + case event := <-b.flushTsChan: + push(event) + RecvBatch: + for batchSize < maxFlushBatchSize { + select { + case event := <-b.flushTsChan: + push(event) + default: + break RecvBatch + } + } + } + + b.bufferMu.Lock() + startEmit := time.Now() + // find all rows before resolvedTs and emit to backend sink + for i := 0; i < batchSize; i++ { + tableID, resolvedTs := batch[i].tableID, batch[i].resolvedTs + rows := b.buffer[tableID] + i := sort.Search(len(rows), func(i int) bool { + return rows[i].CommitTs > resolvedTs + }) + if i == 0 { + continue + } + state.metricTotalRows.Add(float64(i)) + + err := b.Sink.EmitRowChangedEvents(ctx, rows[:i]...) + if err != nil { + b.bufferMu.Unlock() + return false, errors.Trace(err) + } + + // put remaining rows back to buffer + // append to a new, fixed slice to avoid lazy GC + b.buffer[tableID] = append(make([]*model.RowChangedEvent, 0, len(rows[i:])), rows[i:]...) + } + b.bufferMu.Unlock() + state.metricEmitRowDuration.Observe(time.Since(startEmit).Seconds()) + + startFlush := time.Now() + for i := 0; i < batchSize; i++ { + tableID, resolvedTs := batch[i].tableID, batch[i].resolvedTs + checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, tableID, resolvedTs) + if err != nil { + return false, errors.Trace(err) + } + b.tableCheckpointTsMap.Store(tableID, checkpointTs) + } + now := time.Now() + state.metricFlushDuration.Observe(now.Sub(startFlush).Seconds()) + if now.Sub(startEmit) > time.Second { + log.Warn("flush row changed events too slow", + zap.Int("batchSize", batchSize), + zap.Duration("duration", now.Sub(startEmit)), + util.ZapFieldChangefeed(ctx)) + } + + return true, nil +} + +func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if len(rows) == 0 { + return nil + } + tableID := rows[0].Table.TableID + b.bufferMu.Lock() + b.buffer[tableID] = append(b.buffer[tableID], rows...) + b.bufferMu.Unlock() + } + return nil +} + +func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + select { + case <-ctx.Done(): + return b.getTableCheckpointTs(tableID), ctx.Err() + case b.flushTsChan <- flushMsg{ + tableID: tableID, + resolvedTs: resolvedTs, + }: + } + return b.getTableCheckpointTs(tableID), nil +} + +type flushMsg struct { + tableID model.TableID + resolvedTs uint64 +} + +func (b *bufferSink) getTableCheckpointTs(tableID model.TableID) uint64 { + checkPoints, ok := b.tableCheckpointTsMap.Load(tableID) + if ok { + return checkPoints.(uint64) + } + return atomic.LoadUint64(&b.changeFeedCheckpointTs) +} + +// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick +func (b *bufferSink) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { + atomic.StoreUint64(&b.changeFeedCheckpointTs, checkpointTs) +} diff --git a/cdc/sink/buffer_sink_test.go b/cdc/sink/buffer_sink_test.go new file mode 100644 index 00000000000..b790fac5204 --- /dev/null +++ b/cdc/sink/buffer_sink_test.go @@ -0,0 +1,152 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "fmt" + "math" + "testing" + "time" + + "github.com/pingcap/ticdc/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestTableIsNotFlushed(t *testing.T) { + t.Parallel() + + b := bufferSink{changeFeedCheckpointTs: 1} + require.Equal(t, uint64(1), b.getTableCheckpointTs(2)) + b.UpdateChangeFeedCheckpointTs(3) + require.Equal(t, uint64(3), b.getTableCheckpointTs(2)) +} + +func TestFlushTable(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + b := newBufferSink(newBlackHoleSink(ctx, make(map[string]string)), 5, make(chan drawbackMsg)) + go b.run(ctx, make(chan error)) + + require.Equal(t, uint64(5), b.getTableCheckpointTs(2)) + require.Nil(t, b.EmitRowChangedEvents(ctx)) + tbl1 := &model.TableName{TableID: 1} + tbl2 := &model.TableName{TableID: 2} + tbl3 := &model.TableName{TableID: 3} + tbl4 := &model.TableName{TableID: 4} + require.Nil(t, b.EmitRowChangedEvents(ctx, []*model.RowChangedEvent{ + {CommitTs: 6, Table: tbl1}, + {CommitTs: 6, Table: tbl2}, + {CommitTs: 6, Table: tbl3}, + {CommitTs: 6, Table: tbl4}, + {CommitTs: 10, Table: tbl1}, + {CommitTs: 10, Table: tbl2}, + {CommitTs: 10, Table: tbl3}, + {CommitTs: 10, Table: tbl4}, + }...)) + checkpoint, err := b.FlushRowChangedEvents(ctx, 1, 7) + require.True(t, checkpoint <= 7) + require.Nil(t, err) + checkpoint, err = b.FlushRowChangedEvents(ctx, 2, 6) + require.True(t, checkpoint <= 6) + require.Nil(t, err) + checkpoint, err = b.FlushRowChangedEvents(ctx, 3, 8) + require.True(t, checkpoint <= 8) + require.Nil(t, err) + time.Sleep(200 * time.Millisecond) + require.Equal(t, uint64(7), b.getTableCheckpointTs(1)) + require.Equal(t, uint64(6), b.getTableCheckpointTs(2)) + require.Equal(t, uint64(8), b.getTableCheckpointTs(3)) + require.Equal(t, uint64(5), b.getTableCheckpointTs(4)) + b.UpdateChangeFeedCheckpointTs(6) + require.Equal(t, uint64(7), b.getTableCheckpointTs(1)) + require.Equal(t, uint64(6), b.getTableCheckpointTs(2)) + require.Equal(t, uint64(8), b.getTableCheckpointTs(3)) + require.Equal(t, uint64(6), b.getTableCheckpointTs(4)) +} + +func TestFlushFailed(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.TODO()) + b := newBufferSink(newBlackHoleSink(ctx, make(map[string]string)), 5, make(chan drawbackMsg)) + go b.run(ctx, make(chan error)) + + checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8) + require.True(t, checkpoint <= 8) + require.Nil(t, err) + time.Sleep(200 * time.Millisecond) + require.Equal(t, uint64(8), b.getTableCheckpointTs(3)) + cancel() + checkpoint, _ = b.FlushRowChangedEvents(ctx, 3, 18) + require.Equal(t, uint64(8), checkpoint) + checkpoint, _ = b.FlushRowChangedEvents(ctx, 1, 18) + require.Equal(t, uint64(5), checkpoint) + time.Sleep(200 * time.Millisecond) + require.Equal(t, uint64(8), b.getTableCheckpointTs(3)) + require.Equal(t, uint64(5), b.getTableCheckpointTs(1)) +} + +type benchSink struct { + Sink +} + +func (b *benchSink) EmitRowChangedEvents( + ctx context.Context, rows ...*model.RowChangedEvent, +) error { + return nil +} + +func (b *benchSink) FlushRowChangedEvents( + ctx context.Context, tableID model.TableID, resolvedTs uint64, +) (uint64, error) { + return 0, nil +} + +func BenchmarkRun(b *testing.B) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + state := runState{ + metricFlushDuration: flushRowChangedDuration.WithLabelValues(b.Name(), b.Name(), "Flush"), + metricEmitRowDuration: flushRowChangedDuration.WithLabelValues(b.Name(), b.Name(), "EmitRow"), + metricTotalRows: bufferSinkTotalRowsCountCounter.WithLabelValues(b.Name(), b.Name()), + } + + for exp := 0; exp < 9; exp++ { + count := int(math.Pow(4, float64(exp))) + s := newBufferSink(&benchSink{}, 5, make(chan drawbackMsg)) + s.flushTsChan = make(chan flushMsg, count) + for i := 0; i < count; i++ { + s.buffer[int64(i)] = []*model.RowChangedEvent{{CommitTs: 5}} + } + b.ResetTimer() + + b.Run(fmt.Sprintf("%d table(s)", count), func(b *testing.B) { + for i := 0; i < b.N; i++ { + for j := 0; j < count; j++ { + s.flushTsChan <- flushMsg{tableID: int64(0)} + } + for len(s.flushTsChan) != 0 { + keepRun, err := s.runOnce(ctx, &state) + if err != nil || !keepRun { + b.Fatal(keepRun, err) + } + } + } + }) + } +} diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index ae80bc7df9e..8fe34f4f22b 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -15,11 +15,13 @@ package sink import ( "context" +<<<<<<< HEAD "math" "sort" +======= +>>>>>>> b5a52cec7 (sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) (#3899)) "sync" "sync/atomic" - "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -29,6 +31,7 @@ import ( "go.uber.org/zap" ) +<<<<<<< HEAD const ( defaultMetricInterval = time.Second * 15 ) @@ -39,9 +42,17 @@ type Manager struct { checkpointTs model.Ts tableSinks map[model.TableID]*tableSink tableSinksMu sync.Mutex - - flushMu sync.Mutex - flushing int64 +======= +// Manager manages table sinks, maintains the relationship between table sinks +// and backendSink. +// Manager is thread-safe. +type Manager struct { + bufSink *bufferSink + tableCheckpointTsMap sync.Map + tableSinks map[model.TableID]*tableSink + tableSinksMu sync.Mutex + changeFeedCheckpointTs uint64 +>>>>>>> b5a52cec7 (sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) (#3899)) drawbackChan chan drawbackMsg @@ -56,9 +67,16 @@ func NewManager( captureAddr string, changefeedID model.ChangeFeedID, ) *Manager { drawbackChan := make(chan drawbackMsg, 16) + bufSink := newBufferSink(backendSink, checkpointTs, drawbackChan) + go bufSink.run(ctx, errCh) return &Manager{ +<<<<<<< HEAD backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan), checkpointTs: checkpointTs, +======= + bufSink: bufSink, + changeFeedCheckpointTs: checkpointTs, +>>>>>>> b5a52cec7 (sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) (#3899)) tableSinks: make(map[model.TableID]*tableSink), drawbackChan: drawbackChan, captureAddr: captureAddr, @@ -75,10 +93,17 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) log.Panic("the table sink already exists", zap.Uint64("tableID", uint64(tableID))) } sink := &tableSink{ +<<<<<<< HEAD tableID: tableID, manager: m, buffer: make([]*model.RowChangedEvent, 0, 128), emittedTs: checkpointTs, +======= + tableID: tableID, + manager: m, + buffer: make([]*model.RowChangedEvent, 0, 128), + redoManager: redoManager, +>>>>>>> b5a52cec7 (sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) (#3899)) } m.tableSinks[tableID] = sink return sink @@ -86,6 +111,7 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) // Close closes the Sink manager and backend Sink, this method can be reentrantly called func (m *Manager) Close(ctx context.Context) error { +<<<<<<< HEAD tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID) return m.backendSink.Close(ctx) } @@ -102,10 +128,18 @@ func (m *Manager) getMinEmittedTs() model.Ts { if minTs > emittedTs { minTs = emittedTs } +======= + m.tableSinksMu.Lock() + defer m.tableSinksMu.Unlock() + tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID) + if m.bufSink != nil { + return m.bufSink.Close(ctx) +>>>>>>> b5a52cec7 (sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) (#3899)) } - return minTs + return nil } +<<<<<<< HEAD func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) { // NOTICE: Because all table sinks will try to flush backend sink, // which will cause a lot of lock contention and blocking in high concurrency cases. @@ -123,6 +157,10 @@ func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) { }() minEmittedTs := m.getMinEmittedTs() checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, minEmittedTs) +======= +func (m *Manager) flushBackendSink(ctx context.Context, tableID model.TableID, resolvedTs uint64) (model.Ts, error) { + checkpointTs, err := m.bufSink.FlushRowChangedEvents(ctx, tableID, resolvedTs) +>>>>>>> b5a52cec7 (sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) (#3899)) if err != nil { return m.getCheckpointTs(), errors.Trace(err) } @@ -145,7 +183,11 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e return ctx.Err() case <-callback: } +<<<<<<< HEAD return m.backendSink.Barrier(ctx) +======= + return m.bufSink.Barrier(ctx, tableID) +>>>>>>> b5a52cec7 (sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) (#3899)) } func (m *Manager) getCheckpointTs() uint64 { @@ -171,6 +213,7 @@ func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row return nil } +<<<<<<< HEAD func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { // the table sink doesn't receive the DDL event return nil @@ -210,6 +253,12 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 ckpt, err := t.manager.flushBackendSink(ctx) if err != nil { return ckpt, err +======= +func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) { + atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs) + if m.bufSink != nil { + m.bufSink.UpdateChangeFeedCheckpointTs(checkpointTs) +>>>>>>> b5a52cec7 (sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) (#3899)) } logAbnormalCheckpoint(ckpt) return ckpt, err diff --git a/cdc/sink/metrics.go b/cdc/sink/metrics.go index 7d7380f6ad1..6d728fa0957 100644 --- a/cdc/sink/metrics.go +++ b/cdc/sink/metrics.go @@ -86,13 +86,6 @@ var ( Help: "Bucketed histogram of processing time (s) of flushing events in processor", Buckets: prometheus.ExponentialBuckets(0.002 /* 2ms */, 2, 20), }, []string{"capture", "changefeed", "type"}) - bufferChanSizeGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "sink", - Name: "buffer_chan_size", - Help: "size of row changed event buffer channel in sink manager", - }, []string{"capture", "changefeed"}) tableSinkTotalRowsCountCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -122,7 +115,6 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(totalRowsCountGauge) registry.MustRegister(totalFlushedRowsCountGauge) registry.MustRegister(flushRowChangedDuration) - registry.MustRegister(bufferChanSizeGauge) registry.MustRegister(tableSinkTotalRowsCountCounter) registry.MustRegister(bufferSinkTotalRowsCountCounter) } diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 12a381d0f9f..5df24df56d9 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -37,26 +37,59 @@ type Sink interface { // EmitRowChangedEvents sends Row Changed Event to Sink // EmitRowChangedEvents may write rows to downstream directly; + // + // EmitRowChangedEvents is thread-safe. + // FIXME: some sink implementation, they should be. EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error // EmitDDLEvent sends DDL Event to Sink // EmitDDLEvent should execute DDL to downstream synchronously + // + // EmitDDLEvent is thread-safe. + // FIXME: some sink implementation, they should be. EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error +<<<<<<< HEAD // FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. // TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) - - // EmitCheckpointTs sends CheckpointTs to Sink - // TiCDC guarantees that all Events **in the cluster** which of commitTs less than or equal `checkpointTs` are sent to downstream successfully. +======= + // FlushRowChangedEvents flushes each row which of commitTs less than or + // equal to `resolvedTs` into downstream. + // TiCDC guarantees that all the Events whose commitTs is less than or + // equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` + // + // FlushRowChangedEvents is thread-safe. + // FIXME: some sink implementation, they should be. + FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) +>>>>>>> b5a52cec7 (sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) (#3899)) + + // EmitCheckpointTs sends CheckpointTs to Sink. + // TiCDC guarantees that all Events **in the cluster** which of commitTs + // less than or equal `checkpointTs` are sent to downstream successfully. + // + // EmitCheckpointTs is thread-safe. + // FIXME: some sink implementation, they should be. EmitCheckpointTs(ctx context.Context, ts uint64) error - // Close closes the Sink + // Close closes the Sink. + // + // Close is thread-safe and idempotent. Close(ctx context.Context) error +<<<<<<< HEAD // Barrier is a synchronous function to wait all events to be flushed in underlying sink // Note once Barrier is called, the resolved ts won't be pushed until the Barrier call returns. Barrier(ctx context.Context) error +======= + // Barrier is a synchronous function to wait all events to be flushed + // in underlying sink. + // Note once Barrier is called, the resolved ts won't be pushed until + // the Barrier call returns. + // + // Barrier is thread-safe. + Barrier(ctx context.Context, tableID model.TableID) error +>>>>>>> b5a52cec7 (sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) (#3899)) } var sinkIniterMap = make(map[string]sinkInitFunc) diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go new file mode 100644 index 00000000000..331d75950bc --- /dev/null +++ b/cdc/sink/table_sink.go @@ -0,0 +1,109 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "sort" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/redo" + "go.uber.org/zap" +) + +type tableSink struct { + tableID model.TableID + manager *Manager + buffer []*model.RowChangedEvent + redoManager redo.LogManager +} + +func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + t.buffer = append(t.buffer, rows...) + t.manager.metricsTableSinkTotalRows.Add(float64(len(rows))) + if t.redoManager.Enabled() { + return t.redoManager.EmitRowChangedEvents(ctx, t.tableID, rows...) + } + return nil +} + +func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + // the table sink doesn't receive the DDL event + return nil +} + +// FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs +// is required to be no more than global resolvedTs, table barrierTs and table +// redo log watermarkTs. +func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + if tableID != t.tableID { + log.Panic("inconsistent table sink", + zap.Int64("tableID", tableID), zap.Int64("sinkTableID", t.tableID)) + } + i := sort.Search(len(t.buffer), func(i int) bool { + return t.buffer[i].CommitTs > resolvedTs + }) + if i == 0 { + return t.flushResolvedTs(ctx, resolvedTs) + } + resolvedRows := t.buffer[:i] + t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) + + err := t.manager.bufSink.EmitRowChangedEvents(ctx, resolvedRows...) + if err != nil { + return t.manager.getCheckpointTs(tableID), errors.Trace(err) + } + return t.flushResolvedTs(ctx, resolvedTs) +} + +func (t *tableSink) flushResolvedTs(ctx context.Context, resolvedTs uint64) (uint64, error) { + redoTs, err := t.flushRedoLogs(ctx, resolvedTs) + if err != nil { + return t.manager.getCheckpointTs(t.tableID), err + } + if redoTs < resolvedTs { + resolvedTs = redoTs + } + return t.manager.flushBackendSink(ctx, t.tableID, resolvedTs) +} + +// flushRedoLogs flush redo logs and returns redo log resolved ts which means +// all events before the ts have been persisted to redo log storage. +func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) { + if t.redoManager.Enabled() { + err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs) + if err != nil { + return 0, err + } + return t.redoManager.GetMinResolvedTs(), nil + } + return resolvedTs, nil +} + +func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { + // the table sink doesn't receive the checkpoint event + return nil +} + +// Note once the Close is called, no more events can be written to this table sink +func (t *tableSink) Close(ctx context.Context) error { + return t.manager.destroyTableSink(ctx, t.tableID) +} + +// Barrier is not used in table sink +func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error { + return nil +}