-
Notifications
You must be signed in to change notification settings - Fork 283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TiCDC supports a snapshot-map in replication #932
Changes from 33 commits
4e8689e
88318c3
e4cadbd
f26eead
3fd1b3d
97a1615
94ec1d4
6abad54
c13ccc3
abd124b
091927b
0a83609
1da2a73
8f25e2f
b1dad1d
7e2ca72
22fca2a
85d1482
2563a39
17de492
8a16cbd
fb8e6cc
a90718e
aeb7eb5
872c029
09f5043
a15dcec
7cfa8fe
4264afa
97099c6
91bfda1
3c846af
4753199
f6f3993
6ab63c7
416baee
5045d09
3383eab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -333,13 +333,13 @@ func (o *Owner) newChangeFeed( | |||||
} | ||||||
errCh := make(chan error, 1) | ||||||
|
||||||
sink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh) | ||||||
primarySink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I should use package There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe move the |
||||||
if err != nil { | ||||||
return nil, errors.Trace(err) | ||||||
} | ||||||
defer func() { | ||||||
if resultErr != nil && sink != nil { | ||||||
sink.Close() | ||||||
if resultErr != nil && primarySink != nil { | ||||||
primarySink.Close() | ||||||
} | ||||||
}() | ||||||
go func() { | ||||||
|
@@ -352,11 +352,19 @@ func (o *Owner) newChangeFeed( | |||||
cancel() | ||||||
}() | ||||||
|
||||||
err = sink.Initialize(ctx, sinkTableInfo) | ||||||
err = primarySink.Initialize(ctx, sinkTableInfo) | ||||||
if err != nil { | ||||||
log.Error("error on running owner", zap.Error(err)) | ||||||
} | ||||||
|
||||||
var syncpointSink sink.SyncpointSink | ||||||
if info.SyncPointEnabled { | ||||||
syncpointSink, err = sink.NewSyncpointSink(ctx, id, info.SinkURI) | ||||||
if err != nil { | ||||||
return nil, errors.Trace(err) | ||||||
} | ||||||
} | ||||||
|
||||||
cf = &changeFeed{ | ||||||
info: info, | ||||||
id: id, | ||||||
|
@@ -375,11 +383,16 @@ func (o *Owner) newChangeFeed( | |||||
ddlState: model.ChangeFeedSyncDML, | ||||||
ddlExecutedTs: checkpointTs, | ||||||
targetTs: info.GetTargetTs(), | ||||||
ddlTs: 0, | ||||||
updateResolvedTs: true, | ||||||
startTimer: make(chan bool), | ||||||
syncpointSink: syncpointSink, | ||||||
syncCancel: nil, | ||||||
taskStatus: processorsInfos, | ||||||
taskPositions: taskPositions, | ||||||
etcdCli: o.etcdClient, | ||||||
filter: filter, | ||||||
sink: sink, | ||||||
sink: primarySink, | ||||||
cyclicEnabled: info.Config.Cyclic.IsEnabled(), | ||||||
lastRebalanceTime: time.Now(), | ||||||
} | ||||||
|
@@ -523,6 +536,19 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error { | |||||
zap.String("changefeed", changeFeedID), zap.Error(err)) | ||||||
continue | ||||||
} | ||||||
|
||||||
if newCf.info.SyncPointEnabled { | ||||||
log.Info("syncpoint is on", zap.Bool("syncpoint", newCf.info.SyncPointEnabled)) | ||||||
Colins110 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
//create the sync table | ||||||
err := newCf.syncpointSink.CreateSynctable(ctx) | ||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
newCf.startSyncPointTicker(ctx, newCf.info.SyncPointInterval) | ||||||
} else { | ||||||
log.Info("syncpoint is off", zap.Bool("syncpoint", newCf.info.SyncPointEnabled)) | ||||||
Colins110 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
|
||||||
o.changeFeeds[changeFeedID] = newCf | ||||||
delete(o.stoppedFeeds, changeFeedID) | ||||||
} | ||||||
|
@@ -647,6 +673,16 @@ func (o *Owner) handleDDL(ctx context.Context) error { | |||||
return nil | ||||||
} | ||||||
|
||||||
// handleSyncPoint call handleSyncPoint of every changefeeds | ||||||
func (o *Owner) handleSyncPoint(ctx context.Context) error { | ||||||
for _, cf := range o.changeFeeds { | ||||||
if err := cf.handleSyncPoint(ctx); err != nil { | ||||||
return errors.Trace(err) | ||||||
} | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
// dispatchJob dispatches job to processors | ||||||
func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error { | ||||||
cf, ok := o.changeFeeds[job.CfID] | ||||||
|
@@ -804,8 +840,10 @@ func (o *Owner) handleAdminJob(ctx context.Context) error { | |||||
if err != nil { | ||||||
return errors.Trace(err) | ||||||
} | ||||||
cf.stopSyncPointTicker() | ||||||
case model.AdminRemove, model.AdminFinish: | ||||||
if cf != nil { | ||||||
cf.stopSyncPointTicker() | ||||||
err := o.dispatchJob(ctx, job) | ||||||
if err != nil { | ||||||
return errors.Trace(err) | ||||||
|
@@ -1029,6 +1067,11 @@ func (o *Owner) run(ctx context.Context) error { | |||||
return errors.Trace(err) | ||||||
} | ||||||
|
||||||
err = o.handleSyncPoint(ctx) | ||||||
if err != nil { | ||||||
return errors.Trace(err) | ||||||
} | ||||||
|
||||||
err = o.handleAdminJob(ctx) | ||||||
if err != nil { | ||||||
return errors.Trace(err) | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe atomic operations are more readable?