diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 590da7f98cb..750928fa6d0 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math" + "sync" "time" "github.com/pingcap/errors" @@ -78,14 +79,20 @@ type changeFeed struct { info *model.ChangeFeedInfo status *model.ChangeFeedStatus - schema *entry.SingleSchemaSnapshot - ddlState model.ChangeFeedDDLState - targetTs uint64 - taskStatus model.ProcessorsInfos - taskPositions map[model.CaptureID]*model.TaskPosition - filter *filter.Filter - sink sink.Sink - scheduler scheduler.Scheduler + schema *entry.SingleSchemaSnapshot + ddlState model.ChangeFeedDDLState + targetTs uint64 + ddlTs uint64 + syncpointMutex sync.Mutex + updateResolvedTs bool + startTimer chan bool + syncpointStore sink.SyncpointStore + syncCancel context.CancelFunc + taskStatus model.ProcessorsInfos + taskPositions map[model.CaptureID]*model.TaskPosition + filter *filter.Filter + sink sink.Sink + scheduler scheduler.Scheduler cyclicEnabled bool @@ -685,6 +692,43 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C return nil } +// handleSyncPoint record every syncpoint to downstream if the syncpoint feature is enable +func (c *changeFeed) handleSyncPoint(ctx context.Context) error { + //sync-point on + if c.info.SyncPointEnabled { + c.syncpointMutex.Lock() + defer c.syncpointMutex.Unlock() + // ResolvedTs == CheckpointTs means a syncpoint reached; + // !c.updateResolvedTs means the syncpoint is setted by ticker; + // c.ddlTs == 0 means no DDL wait to exec and we can sink the syncpoint record securely ( c.ddlTs != 0 means some DDL should be sink to downstream and this syncpoint is fake ). + if c.status.ResolvedTs == c.status.CheckpointTs && !c.updateResolvedTs { + log.Info("sync point reached by ticker", zap.Uint64("ResolvedTs", c.status.ResolvedTs), zap.Uint64("CheckpointTs", c.status.CheckpointTs), zap.Bool("updateResolvedTs", c.updateResolvedTs), zap.Uint64("ddlResolvedTs", c.ddlResolvedTs), zap.Uint64("ddlTs", c.ddlTs), zap.Uint64("ddlExecutedTs", c.ddlExecutedTs)) + c.updateResolvedTs = true + err := c.syncpointStore.SinkSyncpoint(ctx, c.id, c.status.CheckpointTs) + if err != nil { + log.Error("syncpoint sink fail", zap.Uint64("ResolvedTs", c.status.ResolvedTs), zap.Uint64("CheckpointTs", c.status.CheckpointTs), zap.Error(err)) + } + } + + if c.status.ResolvedTs == 0 { + c.updateResolvedTs = true + } + + // ResolvedTs == CheckpointTs means a syncpoint reached; + // ResolvedTs == ddlTs means the syncpoint is setted by DDL; + // ddlTs <= ddlExecutedTs means the DDL has been execed. + if c.status.ResolvedTs == c.status.CheckpointTs && c.status.ResolvedTs == c.ddlTs && c.ddlTs <= c.ddlExecutedTs { + log.Info("sync point reached by ddl", zap.Uint64("ResolvedTs", c.status.ResolvedTs), zap.Uint64("CheckpointTs", c.status.CheckpointTs), zap.Bool("updateResolvedTs", c.updateResolvedTs), zap.Uint64("ddlResolvedTs", c.ddlResolvedTs), zap.Uint64("ddlTs", c.ddlTs), zap.Uint64("ddlExecutedTs", c.ddlExecutedTs)) + err := c.syncpointStore.SinkSyncpoint(ctx, c.id, c.status.CheckpointTs) + if err != nil { + log.Error("syncpoint sink fail", zap.Uint64("ResolvedTs", c.status.ResolvedTs), zap.Uint64("CheckpointTs", c.status.CheckpointTs), zap.Error(err)) + } + c.ddlTs = 0 + } + } + return nil +} + // calcResolvedTs update every changefeed's resolve ts and checkpoint ts. func (c *changeFeed) calcResolvedTs(ctx context.Context) error { if c.ddlState != model.ChangeFeedSyncDML && c.ddlState != model.ChangeFeedWaitToExecDDL { @@ -796,6 +840,7 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { if len(c.ddlJobHistory) > 0 && minResolvedTs >= c.ddlJobHistory[0].BinlogInfo.FinishedTS { minResolvedTs = c.ddlJobHistory[0].BinlogInfo.FinishedTS c.ddlState = model.ChangeFeedWaitToExecDDL + c.ddlTs = minResolvedTs } // if downstream sink is the MQ sink, the MQ sink do not promise that checkpoint is less than globalResolvedTs @@ -806,7 +851,15 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { var tsUpdated bool - if minResolvedTs > c.status.ResolvedTs { + //syncpoint on + if c.info.SyncPointEnabled { + c.syncpointMutex.Lock() + if c.updateResolvedTs && minResolvedTs > c.status.ResolvedTs { + c.status.ResolvedTs = minResolvedTs + tsUpdated = true + } + c.syncpointMutex.Unlock() + } else if minResolvedTs > c.status.ResolvedTs { c.status.ResolvedTs = minResolvedTs tsUpdated = true } @@ -850,6 +903,37 @@ func (c *changeFeed) pullDDLJob() error { return nil } +// startSyncPeriod start a timer for every changefeed to create sync point by time +func (c *changeFeed) startSyncPeriod(ctx context.Context, interval time.Duration) { + log.Debug("sync ticker start", zap.Duration("sync-interval", interval)) + go func(ctx context.Context) { + ticker := time.NewTicker(interval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.syncpointMutex.Lock() + c.updateResolvedTs = false + c.syncpointMutex.Unlock() + } + } + }(ctx) +} + +func (c *changeFeed) stopSyncPointTicker() { + if c.syncCancel != nil { + c.syncCancel() + c.syncCancel = nil + } +} + +func (c *changeFeed) startSyncPointTicker(ctx context.Context, interval time.Duration) { + var syncCtx context.Context + syncCtx, c.syncCancel = context.WithCancel(ctx) + c.startSyncPeriod(syncCtx, interval) +} + func (c *changeFeed) Close() { err := c.ddlHandler.Close() if err != nil { @@ -859,5 +943,11 @@ func (c *changeFeed) Close() { if err != nil { log.Warn("failed to close owner sink", zap.Error(err)) } + if c.syncpointStore != nil { + err = c.syncpointStore.Close() + if err != nil { + log.Warn("failed to close owner sink", zap.Error(err)) + } + } log.Info("changefeed closed", zap.String("id", c.id)) } diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index a3f75c8f835..2043ce66c5a 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -81,6 +81,9 @@ type ChangeFeedInfo struct { State FeedState `json:"state"` ErrorHis []int64 `json:"history"` Error *RunningError `json:"error"` + + SyncPointEnabled bool `json:"sync-point-enabled"` + SyncPointInterval time.Duration `json:"sync-point-interval"` } var changeFeedIDRe *regexp.Regexp = regexp.MustCompile(`^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$`) diff --git a/cdc/owner.go b/cdc/owner.go index b052319384e..84c430ebf04 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -333,13 +333,13 @@ func (o *Owner) newChangeFeed( } errCh := make(chan error, 1) - sink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh) + primarySink, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, errCh) if err != nil { return nil, errors.Trace(err) } defer func() { - if resultErr != nil && sink != nil { - sink.Close() + if resultErr != nil && primarySink != nil { + primarySink.Close() } }() go func() { @@ -352,11 +352,19 @@ func (o *Owner) newChangeFeed( cancel() }() - err = sink.Initialize(ctx, sinkTableInfo) + err = primarySink.Initialize(ctx, sinkTableInfo) if err != nil { log.Error("error on running owner", zap.Error(err)) } + var syncpointStore sink.SyncpointStore + if info.SyncPointEnabled { + syncpointStore, err = sink.NewSyncpointStore(ctx, id, info.SinkURI) + if err != nil { + return nil, errors.Trace(err) + } + } + cf = &changeFeed{ info: info, id: id, @@ -375,11 +383,16 @@ func (o *Owner) newChangeFeed( ddlState: model.ChangeFeedSyncDML, ddlExecutedTs: checkpointTs, targetTs: info.GetTargetTs(), + ddlTs: 0, + updateResolvedTs: true, + startTimer: make(chan bool), + syncpointStore: syncpointStore, + syncCancel: nil, taskStatus: processorsInfos, taskPositions: taskPositions, etcdCli: o.etcdClient, filter: filter, - sink: sink, + sink: primarySink, cyclicEnabled: info.Config.Cyclic.IsEnabled(), lastRebalanceTime: time.Now(), } @@ -523,6 +536,19 @@ func (o *Owner) loadChangeFeeds(ctx context.Context) error { zap.String("changefeed", changeFeedID), zap.Error(err)) continue } + + if newCf.info.SyncPointEnabled { + log.Info("syncpoint is on, creating the sync table") + //create the sync table + err := newCf.syncpointStore.CreateSynctable(ctx) + if err != nil { + return err + } + newCf.startSyncPointTicker(ctx, newCf.info.SyncPointInterval) + } else { + log.Info("syncpoint is off") + } + o.changeFeeds[changeFeedID] = newCf delete(o.stoppedFeeds, changeFeedID) } @@ -648,6 +674,16 @@ func (o *Owner) handleDDL(ctx context.Context) error { return nil } +// handleSyncPoint call handleSyncPoint of every changefeeds +func (o *Owner) handleSyncPoint(ctx context.Context) error { + for _, cf := range o.changeFeeds { + if err := cf.handleSyncPoint(ctx); err != nil { + return errors.Trace(err) + } + } + return nil +} + // dispatchJob dispatches job to processors func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error { cf, ok := o.changeFeeds[job.CfID] @@ -805,8 +841,10 @@ func (o *Owner) handleAdminJob(ctx context.Context) error { if err != nil { return errors.Trace(err) } + cf.stopSyncPointTicker() case model.AdminRemove, model.AdminFinish: if cf != nil { + cf.stopSyncPointTicker() err := o.dispatchJob(ctx, job) if err != nil { return errors.Trace(err) @@ -1030,6 +1068,11 @@ func (o *Owner) run(ctx context.Context) error { return errors.Trace(err) } + err = o.handleSyncPoint(ctx) + if err != nil { + return errors.Trace(err) + } + err = o.handleAdminJob(ctx) if err != nil { return errors.Trace(err) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 75f0752820b..5f672fe3fac 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -65,6 +65,9 @@ const ( defaultSafeMode = true ) +// SyncpointTableName is the name of table where all syncpoint maps sit +const syncpointTableName string = "syncpoint_v1" + var ( validSchemes = map[string]bool{ "mysql": true, @@ -74,6 +77,9 @@ var ( } ) +type mysqlSyncpointStore struct { + db *sql.DB +} type mysqlSink struct { db *sql.DB params *sinkParams @@ -1105,3 +1111,160 @@ func buildColumnList(names []string) string { return b.String() } + +// newSyncpointStore create a sink to record the syncpoint map in downstream DB for every changefeed +func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL) (SyncpointStore, error) { + var syncDB *sql.DB + + //todo If is neither mysql nor tidb, such as kafka, just ignore this feature. + scheme := strings.ToLower(sinkURI.Scheme) + if scheme != "mysql" && scheme != "tidb" && scheme != "mysql+ssl" && scheme != "tidb+ssl" { + return nil, errors.New("can create mysql sink with unsupported scheme") + } + params := defaultParams + s := sinkURI.Query().Get("tidb-txn-mode") + if s != "" { + if s == "pessimistic" || s == "optimistic" { + params.tidbTxnMode = s + } else { + log.Warn("invalid tidb-txn-mode, should be pessimistic or optimistic, use optimistic as default") + } + } + var tlsParam string + if sinkURI.Query().Get("ssl-ca") != "" { + credential := security.Credential{ + CAPath: sinkURI.Query().Get("ssl-ca"), + CertPath: sinkURI.Query().Get("ssl-cert"), + KeyPath: sinkURI.Query().Get("ssl-key"), + } + tlsCfg, err := credential.ToTLSConfig() + if err != nil { + return nil, errors.Annotate(err, "fail to open MySQL connection") + } + name := "cdc_mysql_tls" + "syncpoint" + id + err = dmysql.RegisterTLSConfig(name, tlsCfg) + if err != nil { + return nil, errors.Annotate(err, "fail to open MySQL connection") + } + tlsParam = "?tls=" + name + } + + // dsn format of the driver: + // [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] + username := sinkURI.User.Username() + password, _ := sinkURI.User.Password() + port := sinkURI.Port() + if username == "" { + username = "root" + } + if port == "" { + port = "4000" + } + + dsnStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", username, password, sinkURI.Hostname(), port, tlsParam) + dsn, err := dmysql.ParseDSN(dsnStr) + if err != nil { + return nil, errors.Trace(err) + } + + tz := util.TimezoneFromCtx(ctx) + // create test db used for parameter detection + if dsn.Params == nil { + dsn.Params = make(map[string]string, 1) + } + dsn.Params["time_zone"] = fmt.Sprintf(`"%s"`, tz.String()) + testDB, err := sql.Open("mysql", dsn.FormatDSN()) + if err != nil { + return nil, errors.Annotate( + cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection when configuring sink") + } + defer testDB.Close() + dsnStr, err = configureSinkURI(ctx, dsn, tz, params, testDB) + if err != nil { + return nil, errors.Trace(err) + } + syncDB, err = sql.Open("mysql", dsnStr) + if err != nil { + return nil, errors.Annotate(err, "Open database connection failed") + } + err = syncDB.PingContext(ctx) + if err != nil { + return nil, errors.Annotate(err, "fail to open MySQL connection") + } + + log.Info("Start mysql syncpoint sink") + syncpointStore := &mysqlSyncpointStore{ + db: syncDB, + } + + return syncpointStore, nil +} + +func (s *mysqlSyncpointStore) CreateSynctable(ctx context.Context) error { + database := mark.SchemaName + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + log.Error("create sync table: begin Tx fail", zap.Error(err)) + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + _, err = tx.Exec("CREATE DATABASE IF NOT EXISTS " + database) + if err != nil { + err2 := tx.Rollback() + if err2 != nil { + log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error()) + } + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + _, err = tx.Exec("USE " + database) + if err != nil { + err2 := tx.Rollback() + if err2 != nil { + log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error()) + } + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + _, err = tx.Exec("CREATE TABLE IF NOT EXISTS " + syncpointTableName + " (cf varchar(255),primary_ts varchar(18),secondary_ts varchar(18),PRIMARY KEY ( `cf`, `primary_ts` ) )") + if err != nil { + err2 := tx.Rollback() + if err2 != nil { + log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error()) + } + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + err = tx.Commit() + return cerror.WrapError(cerror.ErrMySQLTxnError, err) +} + +func (s *mysqlSyncpointStore) SinkSyncpoint(ctx context.Context, id string, checkpointTs uint64) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + log.Error("sync table: begin Tx fail", zap.Error(err)) + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + row := tx.QueryRow("select @@tidb_current_ts") + var secondaryTs string + err = row.Scan(&secondaryTs) + if err != nil { + log.Info("sync table: get tidb_current_ts err") + err2 := tx.Rollback() + if err2 != nil { + log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error()) + } + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + _, err = tx.Exec("insert into "+mark.SchemaName+"."+syncpointTableName+"(cf, primary_ts, secondary_ts) VALUES (?,?,?)", id, checkpointTs, secondaryTs) + if err != nil { + err2 := tx.Rollback() + if err2 != nil { + log.Error(cerror.WrapError(cerror.ErrMySQLTxnError, err2).Error()) + } + return cerror.WrapError(cerror.ErrMySQLTxnError, err) + } + err = tx.Commit() + return cerror.WrapError(cerror.ErrMySQLTxnError, err) +} + +func (s *mysqlSyncpointStore) Close() error { + err := s.db.Close() + return cerror.WrapError(cerror.ErrMySQLConnectionError, err) +} diff --git a/cdc/sink/syncpointStore.go b/cdc/sink/syncpointStore.go new file mode 100644 index 00000000000..d20cb5e5960 --- /dev/null +++ b/cdc/sink/syncpointStore.go @@ -0,0 +1,50 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "net/url" + "strings" + + "github.com/pingcap/ticdc/cdc/model" + cerror "github.com/pingcap/ticdc/pkg/errors" +) + +// SyncpointStore is an abstraction for anything that a changefeed may emit into. +type SyncpointStore interface { + // CreateSynctable create a table to record the syncpoints + CreateSynctable(ctx context.Context) error + + // SinkSyncpoint record the syncpoint(a map with ts) in downstream db + SinkSyncpoint(ctx context.Context, id string, checkpointTs uint64) error + + // Close closes the SyncpointSink + Close() error +} + +// NewSyncpointStore creates a new Spyncpoint sink with the sink-uri +func NewSyncpointStore(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr string) (SyncpointStore, error) { + // parse sinkURI as a URI + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + switch strings.ToLower(sinkURI.Scheme) { + case "mysql", "tidb", "mysql+ssl", "tidb+ssl": + return newMySQLSyncpointStore(ctx, changefeedID, sinkURI) + default: + return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", sinkURI.Scheme) + } +} diff --git a/cmd/client.go b/cmd/client.go index eccb7a04ffe..c3a12fced97 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -74,6 +74,9 @@ var ( captureID string interval uint + syncPointEnabled bool + syncPointInterval time.Duration + optForceRemove bool defaultContext context.Context diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index 431c297b6ae..1da8b8e6e01 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -275,15 +275,17 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate } } info := &model.ChangeFeedInfo{ - SinkURI: sinkURI, - Opts: make(map[string]string), - CreateTime: time.Now(), - StartTs: startTs, - TargetTs: targetTs, - Config: cfg, - Engine: model.SortEngine(sortEngine), - SortDir: sortDir, - State: model.StateNormal, + SinkURI: sinkURI, + Opts: make(map[string]string), + CreateTime: time.Now(), + StartTs: startTs, + TargetTs: targetTs, + Config: cfg, + Engine: model.SortEngine(sortEngine), + SortDir: sortDir, + State: model.StateNormal, + SyncPointEnabled: syncPointEnabled, + SyncPointInterval: syncPointInterval, } tz, err := util.GetTimezone(timezone) @@ -354,6 +356,8 @@ func changefeedConfigVariables(command *cobra.Command) { command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Expremental) Cyclic replication replica ID of changefeed") command.PersistentFlags().UintSliceVar(&cyclicFilterReplicaIDs, "cyclic-filter-replica-ids", []uint{}, "(Expremental) Cyclic replication filter replica ID of changefeed") command.PersistentFlags().BoolVar(&cyclicSyncDDL, "cyclic-sync-ddl", true, "(Expremental) Cyclic replication sync DDL of changefeed") + command.PersistentFlags().BoolVar(&syncPointEnabled, "sync-point", false, "(Expremental) Set and Record syncpoint in replication(default off)") + command.PersistentFlags().DurationVar(&syncPointInterval, "sync-interval", 10*time.Minute, "(Expremental) Set the interval for syncpoint in replication(default 10min)") } func newCreateChangefeedCommand() *cobra.Command { diff --git a/tests/syncpoint/conf/diff_config_final.toml b/tests/syncpoint/conf/diff_config_final.toml new file mode 100644 index 00000000000..88b51f6546c --- /dev/null +++ b/tests/syncpoint/conf/diff_config_final.toml @@ -0,0 +1,27 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "testSync" + tables = ["usertable","simple1","simple2"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" + instance-id = "target-1" \ No newline at end of file diff --git a/tests/syncpoint/conf/diff_config_part1.toml b/tests/syncpoint/conf/diff_config_part1.toml new file mode 100644 index 00000000000..9ceb944eb58 --- /dev/null +++ b/tests/syncpoint/conf/diff_config_part1.toml @@ -0,0 +1,23 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "testSync" + #tables = ["usertable","simple1","simple2"] + #tables = ["usertable"] + tables = ["~^"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + \ No newline at end of file diff --git a/tests/syncpoint/conf/diff_config_part2.toml b/tests/syncpoint/conf/diff_config_part2.toml new file mode 100644 index 00000000000..e3c8d8dd60e --- /dev/null +++ b/tests/syncpoint/conf/diff_config_part2.toml @@ -0,0 +1,8 @@ + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" + instance-id = "target-1" + \ No newline at end of file diff --git a/tests/syncpoint/conf/workload b/tests/syncpoint/conf/workload new file mode 100644 index 00000000000..7649fd9afb7 --- /dev/null +++ b/tests/syncpoint/conf/workload @@ -0,0 +1,13 @@ +threadcount=2 +recordcount=10 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/syncpoint/run.sh b/tests/syncpoint/run.sh new file mode 100755 index 00000000000..d75466fb311 --- /dev/null +++ b/tests/syncpoint/run.sh @@ -0,0 +1,135 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +CDC_COUNT=3 +DB_COUNT=4 + +# The follow `sleep 2` make sure the ddl and dml operation can always execute during the test. +# So every syncpoint will have different data and the diff tool can judge the syncpoint's validity. +function ddl() { + run_sql "DROP table IF EXISTS testSync.simple1" + sleep 2 + run_sql "DROP table IF EXISTS testSync.simple2" + sleep 2 + run_sql "CREATE table testSync.simple1(id int primary key, val int);" + sleep 2 + run_sql "INSERT INTO testSync.simple1(id, val) VALUES (1, 1);" + sleep 2 + run_sql "INSERT INTO testSync.simple1(id, val) VALUES (2, 2);" + sleep 2 + run_sql "INSERT INTO testSync.simple1(id, val) VALUES (3, 3);" + sleep 2 + run_sql "INSERT INTO testSync.simple1(id, val) VALUES (4, 4);" + sleep 2 + run_sql "INSERT INTO testSync.simple1(id, val) VALUES (5, 5);" + sleep 2 + run_sql "INSERT INTO testSync.simple1(id, val) VALUES (6, 6);" + sleep 2 + run_sql "CREATE table testSync.simple2(id int primary key, val int);" + sleep 2 + run_sql "INSERT INTO testSync.simple2(id, val) VALUES (1, 1);" + sleep 2 + run_sql "INSERT INTO testSync.simple2(id, val) VALUES (2, 2);" + sleep 2 + run_sql "INSERT INTO testSync.simple2(id, val) VALUES (3, 3);" + sleep 2 + run_sql "INSERT INTO testSync.simple2(id, val) VALUES (4, 4);" + sleep 2 + run_sql "INSERT INTO testSync.simple2(id, val) VALUES (5, 5);" + sleep 2 + run_sql "INSERT INTO testSync.simple2(id, val) VALUES (6, 6);" + sleep 2 + run_sql "CREATE index simple1_val ON testSync.simple1(val);" + sleep 2 + run_sql "CREATE index simple2_val ON testSync.simple2(val);" + sleep 2 + run_sql "DELETE FROM testSync.simple1 where id=1;" + sleep 2 + run_sql "DELETE FROM testSync.simple2 where id=1;" + sleep 2 + run_sql "DELETE FROM testSync.simple1 where id=2;" + sleep 2 + run_sql "DELETE FROM testSync.simple2 where id=2;" + sleep 2 + run_sql "DROP index simple1_val ON testSync.simple1;" + sleep 2 + run_sql "DROP index simple2_val ON testSync.simple2;" + sleep 2 +} + +function goSql() { + for i in {1..3} + do + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=testSync + sleep 2 + ddl + sleep 2 + done +} + +function deployConfig() { + cat $CUR/conf/diff_config_part1.toml > $CUR/conf/diff_config.toml + echo "snapshot = \"$1\"" >> $CUR/conf/diff_config.toml + cat $CUR/conf/diff_config_part2.toml >> $CUR/conf/diff_config.toml + echo "snapshot = \"$2\"" >> $CUR/conf/diff_config.toml +} + +function checkDiff() { + primaryArr=(`grep primary_ts $OUT_DIR/sql_res.$TEST_NAME.txt | awk -F ": " '{print $2}'`) + secondaryArr=(`grep secondary_ts $OUT_DIR/sql_res.$TEST_NAME.txt | awk -F ": " '{print $2}'`) + num=${#primaryArr[*]} + for ((i=0;i<$num;i++)) + do + deployConfig ${primaryArr[$i]} ${secondaryArr[$i]} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + done + rm $CUR/conf/diff_config.toml +} + +function run() { + if [ "$SINK_TYPE" != "mysql" ]; then + echo "kafka downstream isn't support syncpoint record" + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + run_sql "CREATE DATABASE testSync;" + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + + SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sync-point --sync-interval=10s + + goSql + + check_table_exists "testSync.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "testSync.simple1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "testSync.simple2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + sleep 60 + + run_sql "SELECT primary_ts, secondary_ts FROM tidb_cdc.syncpoint_v1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + echo "____________________________________" + cat "$OUT_DIR/sql_res.$TEST_NAME.txt" + checkDiff + check_sync_diff $WORK_DIR $CUR/conf/diff_config_final.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"