Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TiCDC supports a snapshot-map in replication #932

Merged
merged 38 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
4e8689e
create dev branch
Colins110 Aug 12, 2020
88318c3
add create sync point logic
Colins110 Aug 18, 2020
e4cadbd
add sync table logic
Colins110 Aug 20, 2020
f26eead
add proxy for docker
Colins110 Sep 1, 2020
3fd1b3d
[9.1] fix the logic
Colins110 Sep 1, 2020
97a1615
record the sync point base on timer(the ddl and table migration sync…
Colins110 Sep 3, 2020
94ec1d4
add ddl syncpoint record
Colins110 Sep 5, 2020
6abad54
a liitle change for createSyncpointSink()
Colins110 Sep 7, 2020
c13ccc3
9.9 fix for syncpointsinl logic
Colins110 Sep 8, 2020
abd124b
add integration test for sync point record
Colins110 Sep 11, 2020
091927b
9.13 add sync-point and sync-interval flag for 'cdc cli changefeed cr…
Colins110 Sep 13, 2020
0a83609
add some flags in 'cdc cli changefeed'
Colins110 Sep 13, 2020
1da2a73
fix the changefeed resume logic
Colins110 Sep 14, 2020
8f25e2f
Merge branch 'master' into dev
Colins110 Sep 15, 2020
b1dad1d
merge master and fix some err
Colins110 Sep 15, 2020
7e2ca72
fix some err about sql for ci-lint
Colins110 Sep 15, 2020
22fca2a
fix code fmt
Colins110 Sep 15, 2020
85d1482
fix the test script to run all integration test by one command
Colins110 Sep 16, 2020
2563a39
fix the nil pointer err in owner.go
Colins110 Sep 17, 2020
17de492
fix the logic of deal with NewSyncpointSinklink's err in owner.go and…
Colins110 Sep 17, 2020
8a16cbd
remove unused var defaultSyncInterval
Colins110 Sep 17, 2020
fb8e6cc
fix sink's scheme identification in mysql.go
Colins110 Sep 18, 2020
a90718e
fix data race for c.updateResolvedTs in changefeed.go
Colins110 Sep 18, 2020
aeb7eb5
fix some log's info in changefeed.go
Colins110 Sep 18, 2020
872c029
change 'syncPoint' -> 'syncPointEnabled' and 'syncInterval' -> 'syncP…
Colins110 Sep 18, 2020
09f5043
WrapError about mysql in changefeed
Colins110 Sep 23, 2020
a15dcec
fix syncpointMutex bug in changefeed.go
Colins110 Sep 30, 2020
7cfa8fe
fix err log in changefeed.go
Colins110 Sep 30, 2020
4264afa
fix syncpoint bug with ddl(wait to exec) in changefeed.go
Colins110 Sep 30, 2020
97099c6
Merge branch 'master' into dev
Colins110 Sep 30, 2020
91bfda1
use raw sql.db to record syncpoint
Colins110 Oct 5, 2020
3c846af
Diversified ts-map storage implementation
Colins110 Oct 5, 2020
4753199
fix a liitle code about format
Colins110 Oct 9, 2020
f6f3993
change some info in log(owner.go)
Colins110 Oct 9, 2020
6ab63c7
set syncpoint's tablename and schema name to be constants
Colins110 Oct 10, 2020
416baee
change name 'syncpointSink' to 'syncpointLink'
Colins110 Oct 10, 2020
5045d09
change name 'syncpointLink' to 'syncpointStore'
Colins110 Oct 10, 2020
3383eab
Merge branch 'master' into dev
ti-srebot Oct 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ for details on submitting patches and the contribution workflow.
## License

TiCDC is under the Apache 2.0 license. See the [LICENSE](./LICENSE) file for details.

## Branch For dev
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
138 changes: 129 additions & 9 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package cdc

import (
"context"
"database/sql"
"fmt"
"math"
"time"
Expand All @@ -35,6 +36,8 @@ import (
"go.uber.org/zap"
)

const defaultSyncInterval = time.Minute * 10 //default Interval for record syncpoint

type tableIDMap = map[model.TableID]struct{}

// OwnerDDLHandler defines the ddl handler for Owner
Expand Down Expand Up @@ -78,14 +81,19 @@ type changeFeed struct {
info *model.ChangeFeedInfo
status *model.ChangeFeedStatus

schema *entry.SingleSchemaSnapshot
ddlState model.ChangeFeedDDLState
targetTs uint64
taskStatus model.ProcessorsInfos
taskPositions map[model.CaptureID]*model.TaskPosition
filter *filter.Filter
sink sink.Sink
scheduler scheduler.Scheduler
schema *entry.SingleSchemaSnapshot
ddlState model.ChangeFeedDDLState
targetTs uint64
ddlTs uint64
updateResolvedTs bool
startTimer chan bool
syncDB *sql.DB
syncCancel context.CancelFunc
taskStatus model.ProcessorsInfos
taskPositions map[model.CaptureID]*model.TaskPosition
filter *filter.Filter
sink sink.Sink
scheduler scheduler.Scheduler

cyclicEnabled bool

Expand Down Expand Up @@ -712,6 +720,26 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
}
}

//sync-point on
if c.info.SyncPoint {
//if (c.status.ResolvedTs == c.status.CheckpointTs && (c.updateResolvedTs == false || c.status.ResolvedTs == c.ddlResolvedTs)) || c.status.ResolvedTs == 0 {
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
if c.status.ResolvedTs == c.status.CheckpointTs && !c.updateResolvedTs {
log.Info("achive the sync point with timer", zap.Uint64("ResolvedTs", c.status.ResolvedTs), zap.Uint64("CheckpointTs", c.status.CheckpointTs), zap.Bool("updateResolvedTs", c.updateResolvedTs), zap.Uint64("ddlResolvedTs", c.ddlResolvedTs), zap.Uint64("ddlTs", c.ddlTs))
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
c.updateResolvedTs = true
c.sinkSyncpoint(ctx)
}

if c.status.ResolvedTs == 0 {
c.updateResolvedTs = true
}

if c.status.ResolvedTs == c.status.CheckpointTs && c.status.ResolvedTs == c.ddlTs {
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
log.Info("achive the sync point with ddl", zap.Uint64("ResolvedTs", c.status.ResolvedTs), zap.Uint64("CheckpointTs", c.status.CheckpointTs), zap.Bool("updateResolvedTs", c.updateResolvedTs), zap.Uint64("ddlResolvedTs", c.ddlResolvedTs), zap.Uint64("ddlTs", c.ddlTs))
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
c.sinkSyncpoint(ctx)
c.ddlTs = 0
}
}

if len(c.taskPositions) < len(c.taskStatus) {
log.Debug("skip update resolved ts",
zap.Int("taskPositions", len(c.taskPositions)),
Expand Down Expand Up @@ -782,6 +810,7 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {

if minResolvedTs > c.ddlResolvedTs {
minResolvedTs = c.ddlResolvedTs
//c.ddlTs = c.ddlResolvedTs
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -793,6 +822,7 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
if len(c.ddlJobHistory) > 0 && minResolvedTs >= c.ddlJobHistory[0].BinlogInfo.FinishedTS {
minResolvedTs = c.ddlJobHistory[0].BinlogInfo.FinishedTS
c.ddlState = model.ChangeFeedWaitToExecDDL
c.ddlTs = minResolvedTs
}

// if downstream sink is the MQ sink, the MQ sink do not promise that checkpoint is less than globalResolvedTs
Expand All @@ -803,7 +833,19 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {

var tsUpdated bool

if minResolvedTs > c.status.ResolvedTs {
//syncpoint on
if c.info.SyncPoint {
if c.updateResolvedTs && minResolvedTs > c.status.ResolvedTs {
//prevResolvedTs := c.status.ResolvedTs
c.status.ResolvedTs = minResolvedTs
tsUpdated = true
/*if prevResolvedTs == c.status.CheckpointTs || prevResolvedTs == 0 {
//到达sync point
//todo 需要重新开始启动计时
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
c.startTimer <- true
}*/
}
} else if minResolvedTs > c.status.ResolvedTs {
c.status.ResolvedTs = minResolvedTs
tsUpdated = true
}
Expand Down Expand Up @@ -847,6 +889,84 @@ func (c *changeFeed) pullDDLJob() error {
return nil
}


// startSyncPeriod start a timer for every changefeed to create sync point by time
func (c *changeFeed) startSyncPeriod(ctx context.Context, interval time.Duration) {
log.Debug("sync ticker start", zap.Duration("sync-interval", interval))
go func(ctx context.Context) {
ticker := time.NewTicker(interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.updateResolvedTs = false
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}(ctx)
}

//createSynctable create a sync table to record the
func (c *changeFeed) createSynctable(ctx context.Context) error {
database := "TiCDC"
tx, err := c.syncDB.BeginTx(ctx, nil)
if err != nil {
log.Info("create sync table: begin Tx fail")
return err
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
}
_, err = tx.Exec("CREATE DATABASE IF NOT EXISTS " + database)
//_,err := c.syncDB.Exec("CREATE DATABASE IF NOT EXISTS " + database)
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
tx.Rollback()
return err
}
_, err = tx.Exec("USE " + database)
if err != nil {
tx.Rollback()
return err
}
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS syncpoint (cf varchar(255),primary_ts varchar(18),secondary_ts varchar(18),PRIMARY KEY ( `primary_ts` ) )")
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
tx.Rollback()
return err
}
err = tx.Commit()
return err
}

//sinkSyncpoint record the syncpoint(a map with ts) in downstream db
func (c *changeFeed) sinkSyncpoint(ctx context.Context) error {
tx, err := c.syncDB.BeginTx(ctx, nil)
if err != nil {
log.Info("sync table: begin Tx fail")
return err
}
row := tx.QueryRow("select @@tidb_current_ts")
var slaveTs string
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
err = row.Scan(&slaveTs)
if err != nil {
log.Info("sync table: get tidb_current_ts err")
tx.Rollback()
return err
}
tx.Exec("insert into TiCDC.syncpoint(cf, primary_ts, secondary_ts) VALUES (?,?,?)", c.id, c.status.CheckpointTs, slaveTs)
//tx.Exec("insert into TiCDC.syncpoint( master_ts, slave_ts) VALUES (?,?)", c.status.CheckpointTs, 0)
tx.Commit() //TODO deal with error
return nil
}

func (c *changeFeed) stopSyncPointTicker() {
if c.syncCancel != nil {
c.syncCancel()
c.syncCancel = nil
}
}

func (c *changeFeed) startSyncPointTicker(ctx context.Context, interval time.Duration) {
var syncCtx context.Context
syncCtx, c.syncCancel = context.WithCancel(ctx)
c.startSyncPeriod(syncCtx, interval)

func (c *changeFeed) Close() {
err := c.ddlHandler.Close()
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type ChangeFeedInfo struct {
State FeedState `json:"state"`
ErrorHis []int64 `json:"history"`
Error *RunningError `json:"error"`

SyncPoint bool `json:"sync-point"`
SyncInterval string `json:"sync-interval"`
}

var changeFeedIDRe *regexp.Regexp = regexp.MustCompile(`^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$`)
Expand Down
82 changes: 79 additions & 3 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (o *Owner) newChangeFeed(
}
errCh := make(chan error, 1)

sink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh)
primarySink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
primarySink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh)
sink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should use package sink in L360 (var syncpointStore sink.SyncpointStore) to create a syncpointStore,if the var named sink,the package sink will not be used in the follow code,there is a conflict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move the sink.SyncpointStore to another package is better, but it reuses some code in the sink package.
We can refine this in another PR.

if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -352,7 +352,12 @@ func (o *Owner) newChangeFeed(
cancel()
}()

err = sink.Initialize(ctx, sinkTableInfo)
err = primarySink.Initialize(ctx, sinkTableInfo)
if err != nil {
log.Error("error on running owner", zap.Error(err))
}
Colins110 marked this conversation as resolved.
Show resolved Hide resolved

syncDB, err := sink.NewSyncpointSinklink(ctx, info, id)
if err != nil {
log.Error("error on running owner", zap.Error(err))
}
Colins110 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -375,11 +380,16 @@ func (o *Owner) newChangeFeed(
ddlState: model.ChangeFeedSyncDML,
ddlExecutedTs: checkpointTs,
targetTs: info.GetTargetTs(),
ddlTs: 0,
updateResolvedTs: true,
startTimer: make(chan bool),
syncDB: syncDB,
syncCancel: nil,
taskStatus: processorsInfos,
taskPositions: taskPositions,
etcdCli: o.etcdClient,
filter: filter,
sink: sink,
sink: primarySink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebalanceTime: time.Now(),
}
Expand Down Expand Up @@ -523,6 +533,25 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error {
zap.String("changefeed", changeFeedID), zap.Error(err))
continue
}

if newCf.info.SyncPoint {
log.Info("syncpoint is on", zap.Bool("syncpoint", newCf.info.SyncPoint))
//create the sync table
err := newCf.createSynctable(ctx)
if err != nil {
return err
}

//start SyncPeriod for create the sync point
syncInterval, err := time.ParseDuration(newCf.info.SyncInterval)
if err != nil {
syncInterval = defaultSyncInterval
}
newCf.startSyncPointTicker(ctx, syncInterval)
} else {
log.Info("syncpoint is off", zap.Bool("syncpoint", newCf.info.SyncPoint))
}

o.changeFeeds[changeFeedID] = newCf
delete(o.stoppedFeeds, changeFeedID)
}
Expand Down Expand Up @@ -804,6 +833,7 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
cf.stopSyncPointTicker()
case model.AdminRemove, model.AdminFinish:
if cf != nil {
err := o.dispatchJob(ctx, job)
Expand Down Expand Up @@ -853,6 +883,7 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {
return errors.Trace(err)
}
}
cf.stopSyncPointTicker()
case model.AdminResume:
// resume changefeed must read checkpoint from ChangeFeedStatus
if cerror.ErrChangeFeedNotExists.Equal(err) {
Expand Down Expand Up @@ -1295,3 +1326,48 @@ func (o *Owner) startCaptureWatcher(ctx context.Context) {
}
}()
}

/*func (o *Owner) configureSinkURI(ctx context.Context, dsnCfg *dmysql.Config, tz *time.Location) (string, error) {
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
if dsnCfg.Params == nil {
dsnCfg.Params = make(map[string]string, 1)
}
dsnCfg.DBName = ""
dsnCfg.InterpolateParams = true
dsnCfg.MultiStatements = true
dsnCfg.Params["time_zone"] = fmt.Sprintf(`"%s"`, tz.String())

testDB, err := sql.Open("mysql", dsnCfg.FormatDSN())
if err != nil {
return "", errors.Annotate(err, "fail to open MySQL connection when configuring sink")
}
defer testDB.Close()
log.Debug("Opened connection to configure some tidb special parameters")

var variableName string
var autoRandomInsertEnabled string
queryStr := "show session variables like 'allow_auto_random_explicit_insert';"
err = testDB.QueryRowContext(ctx, queryStr).Scan(&variableName, &autoRandomInsertEnabled)
if err != nil && err != sql.ErrNoRows {
return "", errors.Annotate(err, "fail to query sink for support of auto-random")
}
if err == nil && (autoRandomInsertEnabled == "off" || autoRandomInsertEnabled == "0") {
dsnCfg.Params["allow_auto_random_explicit_insert"] = "1"
log.Debug("Set allow_auto_random_explicit_insert to 1")
}

var txnMode string
queryStr = "show session variables like 'tidb_txn_mode';"
err = testDB.QueryRowContext(ctx, queryStr).Scan(&variableName, &txnMode)
if err != nil && err != sql.ErrNoRows {
return "", errors.Annotate(err, "fail to query sink for txn mode")
}
/*if err == nil {
dsnCfg.Params["tidb_txn_mode"] = params.tidbTxnMode
}*/

/* dsnClone := dsnCfg.Clone()
dsnClone.Passwd = "******"
log.Info("sink uri is configured", zap.String("format dsn", dsnClone.FormatDSN()))

return dsnCfg.FormatDSN(), nil
}*/
Loading