Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TiCDC supports a snapshot-map in replication #932

Merged
merged 38 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
4e8689e
create dev branch
Colins110 Aug 12, 2020
88318c3
add create sync point logic
Colins110 Aug 18, 2020
e4cadbd
add sync table logic
Colins110 Aug 20, 2020
f26eead
add proxy for docker
Colins110 Sep 1, 2020
3fd1b3d
[9.1] fix the logic
Colins110 Sep 1, 2020
97a1615
record the sync point base on timer(the ddl and table migration sync…
Colins110 Sep 3, 2020
94ec1d4
add ddl syncpoint record
Colins110 Sep 5, 2020
6abad54
a liitle change for createSyncpointSink()
Colins110 Sep 7, 2020
c13ccc3
9.9 fix for syncpointsinl logic
Colins110 Sep 8, 2020
abd124b
add integration test for sync point record
Colins110 Sep 11, 2020
091927b
9.13 add sync-point and sync-interval flag for 'cdc cli changefeed cr…
Colins110 Sep 13, 2020
0a83609
add some flags in 'cdc cli changefeed'
Colins110 Sep 13, 2020
1da2a73
fix the changefeed resume logic
Colins110 Sep 14, 2020
8f25e2f
Merge branch 'master' into dev
Colins110 Sep 15, 2020
b1dad1d
merge master and fix some err
Colins110 Sep 15, 2020
7e2ca72
fix some err about sql for ci-lint
Colins110 Sep 15, 2020
22fca2a
fix code fmt
Colins110 Sep 15, 2020
85d1482
fix the test script to run all integration test by one command
Colins110 Sep 16, 2020
2563a39
fix the nil pointer err in owner.go
Colins110 Sep 17, 2020
17de492
fix the logic of deal with NewSyncpointSinklink's err in owner.go and…
Colins110 Sep 17, 2020
8a16cbd
remove unused var defaultSyncInterval
Colins110 Sep 17, 2020
fb8e6cc
fix sink's scheme identification in mysql.go
Colins110 Sep 18, 2020
a90718e
fix data race for c.updateResolvedTs in changefeed.go
Colins110 Sep 18, 2020
aeb7eb5
fix some log's info in changefeed.go
Colins110 Sep 18, 2020
872c029
change 'syncPoint' -> 'syncPointEnabled' and 'syncInterval' -> 'syncP…
Colins110 Sep 18, 2020
09f5043
WrapError about mysql in changefeed
Colins110 Sep 23, 2020
a15dcec
fix syncpointMutex bug in changefeed.go
Colins110 Sep 30, 2020
7cfa8fe
fix err log in changefeed.go
Colins110 Sep 30, 2020
4264afa
fix syncpoint bug with ddl(wait to exec) in changefeed.go
Colins110 Sep 30, 2020
97099c6
Merge branch 'master' into dev
Colins110 Sep 30, 2020
91bfda1
use raw sql.db to record syncpoint
Colins110 Oct 5, 2020
3c846af
Diversified ts-map storage implementation
Colins110 Oct 5, 2020
4753199
fix a liitle code about format
Colins110 Oct 9, 2020
f6f3993
change some info in log(owner.go)
Colins110 Oct 9, 2020
6ab63c7
set syncpoint's tablename and schema name to be constants
Colins110 Oct 10, 2020
416baee
change name 'syncpointSink' to 'syncpointLink'
Colins110 Oct 10, 2020
5045d09
change name 'syncpointLink' to 'syncpointStore'
Colins110 Oct 10, 2020
3383eab
Merge branch 'master' into dev
ti-srebot Oct 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 99 additions & 9 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -78,14 +79,20 @@ type changeFeed struct {
info *model.ChangeFeedInfo
status *model.ChangeFeedStatus

schema *entry.SingleSchemaSnapshot
ddlState model.ChangeFeedDDLState
targetTs uint64
taskStatus model.ProcessorsInfos
taskPositions map[model.CaptureID]*model.TaskPosition
filter *filter.Filter
sink sink.Sink
scheduler scheduler.Scheduler
schema *entry.SingleSchemaSnapshot
ddlState model.ChangeFeedDDLState
targetTs uint64
ddlTs uint64
syncpointMutex sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe atomic operations are more readable?

updateResolvedTs bool
startTimer chan bool
syncpointSink sink.SyncpointSink
syncCancel context.CancelFunc
taskStatus model.ProcessorsInfos
taskPositions map[model.CaptureID]*model.TaskPosition
filter *filter.Filter
sink sink.Sink
scheduler scheduler.Scheduler

cyclicEnabled bool

Expand Down Expand Up @@ -685,6 +692,43 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
return nil
}

// handleSyncPoint record every syncpoint to downstream if the syncpoint feature is enable
func (c *changeFeed) handleSyncPoint(ctx context.Context) error {
//sync-point on
if c.info.SyncPointEnabled {
c.syncpointMutex.Lock()
defer c.syncpointMutex.Unlock()
// ResolvedTs == CheckpointTs means a syncpoint reached;
// !c.updateResolvedTs means the syncpoint is setted by ticker;
// c.ddlTs == 0 means no DDL wait to exec and we can sink the syncpoint record securely ( c.ddlTs != 0 means some DDL should be sink to downstream and this syncpoint is fake ).
if c.status.ResolvedTs == c.status.CheckpointTs && !c.updateResolvedTs {
log.Info("sync point reached by ticker", zap.Uint64("ResolvedTs", c.status.ResolvedTs), zap.Uint64("CheckpointTs", c.status.CheckpointTs), zap.Bool("updateResolvedTs", c.updateResolvedTs), zap.Uint64("ddlResolvedTs", c.ddlResolvedTs), zap.Uint64("ddlTs", c.ddlTs), zap.Uint64("ddlExecutedTs", c.ddlExecutedTs))
c.updateResolvedTs = true
err := c.syncpointSink.SinkSyncpoint(ctx, c.id, c.status.CheckpointTs)
if err != nil {
log.Error("syncpoint sink fail", zap.Uint64("ResolvedTs", c.status.ResolvedTs), zap.Uint64("CheckpointTs", c.status.CheckpointTs), zap.Error(err))
}
}

if c.status.ResolvedTs == 0 {
c.updateResolvedTs = true
}

// ResolvedTs == CheckpointTs means a syncpoint reached;
// ResolvedTs == ddlTs means the syncpoint is setted by DDL;
// ddlTs <= ddlExecutedTs means the DDL has been execed.
if c.status.ResolvedTs == c.status.CheckpointTs && c.status.ResolvedTs == c.ddlTs && c.ddlTs <= c.ddlExecutedTs {
log.Info("sync point reached by ddl", zap.Uint64("ResolvedTs", c.status.ResolvedTs), zap.Uint64("CheckpointTs", c.status.CheckpointTs), zap.Bool("updateResolvedTs", c.updateResolvedTs), zap.Uint64("ddlResolvedTs", c.ddlResolvedTs), zap.Uint64("ddlTs", c.ddlTs), zap.Uint64("ddlExecutedTs", c.ddlExecutedTs))
err := c.syncpointSink.SinkSyncpoint(ctx, c.id, c.status.CheckpointTs)
if err != nil {
log.Error("syncpoint sink fail", zap.Uint64("ResolvedTs", c.status.ResolvedTs), zap.Uint64("CheckpointTs", c.status.CheckpointTs), zap.Error(err))
}
c.ddlTs = 0
}
}
return nil
}

// calcResolvedTs update every changefeed's resolve ts and checkpoint ts.
func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
if c.ddlState != model.ChangeFeedSyncDML && c.ddlState != model.ChangeFeedWaitToExecDDL {
Expand Down Expand Up @@ -796,6 +840,7 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
if len(c.ddlJobHistory) > 0 && minResolvedTs >= c.ddlJobHistory[0].BinlogInfo.FinishedTS {
minResolvedTs = c.ddlJobHistory[0].BinlogInfo.FinishedTS
c.ddlState = model.ChangeFeedWaitToExecDDL
c.ddlTs = minResolvedTs
}

// if downstream sink is the MQ sink, the MQ sink do not promise that checkpoint is less than globalResolvedTs
Expand All @@ -806,7 +851,15 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {

var tsUpdated bool

if minResolvedTs > c.status.ResolvedTs {
//syncpoint on
if c.info.SyncPointEnabled {
c.syncpointMutex.Lock()
if c.updateResolvedTs && minResolvedTs > c.status.ResolvedTs {
c.status.ResolvedTs = minResolvedTs
tsUpdated = true
}
c.syncpointMutex.Unlock()
} else if minResolvedTs > c.status.ResolvedTs {
c.status.ResolvedTs = minResolvedTs
tsUpdated = true
}
Expand Down Expand Up @@ -850,6 +903,37 @@ func (c *changeFeed) pullDDLJob() error {
return nil
}

// startSyncPeriod start a timer for every changefeed to create sync point by time
func (c *changeFeed) startSyncPeriod(ctx context.Context, interval time.Duration) {
log.Debug("sync ticker start", zap.Duration("sync-interval", interval))
go func(ctx context.Context) {
ticker := time.NewTicker(interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.syncpointMutex.Lock()
c.updateResolvedTs = false
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
c.syncpointMutex.Unlock()
}
}
}(ctx)
}

func (c *changeFeed) stopSyncPointTicker() {
if c.syncCancel != nil {
c.syncCancel()
c.syncCancel = nil
}
}

func (c *changeFeed) startSyncPointTicker(ctx context.Context, interval time.Duration) {
var syncCtx context.Context
syncCtx, c.syncCancel = context.WithCancel(ctx)
c.startSyncPeriod(syncCtx, interval)
}

func (c *changeFeed) Close() {
err := c.ddlHandler.Close()
if err != nil {
Expand All @@ -859,5 +943,11 @@ func (c *changeFeed) Close() {
if err != nil {
log.Warn("failed to close owner sink", zap.Error(err))
}
if c.syncpointSink != nil {
err = c.syncpointSink.Close()
if err != nil {
log.Warn("failed to close owner sink", zap.Error(err))
}
}
log.Info("changefeed closed", zap.String("id", c.id))
}
3 changes: 3 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type ChangeFeedInfo struct {
State FeedState `json:"state"`
ErrorHis []int64 `json:"history"`
Error *RunningError `json:"error"`

SyncPointEnabled bool `json:"sync-point-enabled"`
SyncPointInterval time.Duration `json:"sync-point-interval"`
}

var changeFeedIDRe *regexp.Regexp = regexp.MustCompile(`^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$`)
Expand Down
53 changes: 48 additions & 5 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,13 @@ func (o *Owner) newChangeFeed(
}
errCh := make(chan error, 1)

sink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh)
primarySink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
primarySink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh)
sink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should use package sink in L360 (var syncpointStore sink.SyncpointStore) to create a syncpointStore,if the var named sink,the package sink will not be used in the follow code,there is a conflict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move the sink.SyncpointStore to another package is better, but it reuses some code in the sink package.
We can refine this in another PR.

if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if resultErr != nil && sink != nil {
sink.Close()
if resultErr != nil && primarySink != nil {
primarySink.Close()
}
}()
go func() {
Expand All @@ -352,11 +352,19 @@ func (o *Owner) newChangeFeed(
cancel()
}()

err = sink.Initialize(ctx, sinkTableInfo)
err = primarySink.Initialize(ctx, sinkTableInfo)
if err != nil {
log.Error("error on running owner", zap.Error(err))
}

var syncpointSink sink.SyncpointSink
if info.SyncPointEnabled {
syncpointSink, err = sink.NewSyncpointSink(ctx, id, info.SinkURI)
if err != nil {
return nil, errors.Trace(err)
}
}

cf = &changeFeed{
info: info,
id: id,
Expand All @@ -375,11 +383,16 @@ func (o *Owner) newChangeFeed(
ddlState: model.ChangeFeedSyncDML,
ddlExecutedTs: checkpointTs,
targetTs: info.GetTargetTs(),
ddlTs: 0,
updateResolvedTs: true,
startTimer: make(chan bool),
syncpointSink: syncpointSink,
syncCancel: nil,
taskStatus: processorsInfos,
taskPositions: taskPositions,
etcdCli: o.etcdClient,
filter: filter,
sink: sink,
sink: primarySink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebalanceTime: time.Now(),
}
Expand Down Expand Up @@ -523,6 +536,19 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
zap.String("changefeed", changeFeedID), zap.Error(err))
continue
}

if newCf.info.SyncPointEnabled {
log.Info("syncpoint is on", zap.Bool("syncpoint", newCf.info.SyncPointEnabled))
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
//create the sync table
err := newCf.syncpointSink.CreateSynctable(ctx)
if err != nil {
return err
}
newCf.startSyncPointTicker(ctx, newCf.info.SyncPointInterval)
} else {
log.Info("syncpoint is off", zap.Bool("syncpoint", newCf.info.SyncPointEnabled))
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
}

o.changeFeeds[changeFeedID] = newCf
delete(o.stoppedFeeds, changeFeedID)
}
Expand Down Expand Up @@ -647,6 +673,16 @@ func (o *Owner) handleDDL(ctx context.Context) error {
return nil
}

// handleSyncPoint call handleSyncPoint of every changefeeds
func (o *Owner) handleSyncPoint(ctx context.Context) error {
for _, cf := range o.changeFeeds {
if err := cf.handleSyncPoint(ctx); err != nil {
return errors.Trace(err)
}
}
return nil
}

// dispatchJob dispatches job to processors
func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error {
cf, ok := o.changeFeeds[job.CfID]
Expand Down Expand Up @@ -804,8 +840,10 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
cf.stopSyncPointTicker()
case model.AdminRemove, model.AdminFinish:
if cf != nil {
cf.stopSyncPointTicker()
err := o.dispatchJob(ctx, job)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1029,6 +1067,11 @@ func (o *Owner) run(ctx context.Context) error {
return errors.Trace(err)
}

err = o.handleSyncPoint(ctx)
if err != nil {
return errors.Trace(err)
}

err = o.handleAdminJob(ctx)
if err != nil {
return errors.Trace(err)
Expand Down
Loading