From 42c587e98c53bb1c71540749a4d170056e60d624 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 23 Mar 2020 23:10:34 +0800 Subject: [PATCH 01/14] check table --- cdc/capture.go | 23 ---------- cdc/entry/schema_storage.go | 5 +++ cdc/kv/store_op.go | 84 ++++++++++++++++++++++++++++++++++++- cdc/model/sink.go | 12 ++++++ cdc/owner.go | 67 +++++++++++++++++------------ cdc/processor.go | 65 ++-------------------------- cdc/puller/mock_puller.go | 8 ++-- cmd/client.go | 64 ++++++++++++++++++++++++++++ tests/dailytest/case.go | 43 ++++++++++++++++++- 9 files changed, 252 insertions(+), 119 deletions(-) diff --git a/cdc/capture.go b/cdc/capture.go index 05e3d9cee68..f328e739214 100644 --- a/cdc/capture.go +++ b/cdc/capture.go @@ -15,7 +15,6 @@ package cdc import ( "context" - "fmt" "sync" "time" @@ -25,11 +24,7 @@ import ( "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/roles" - "github.com/pingcap/ticdc/pkg/flags" "github.com/pingcap/ticdc/pkg/util" - tidbkv "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/store" - "github.com/pingcap/tidb/store/tikv" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/mvcc" @@ -189,21 +184,3 @@ func (c *Capture) Close(ctx context.Context) error { func (c *Capture) register(ctx context.Context) error { return errors.Trace(c.etcdClient.PutCaptureInfo(ctx, c.info, c.session.Lease())) } - -func createTiStore(urls string) (tidbkv.Storage, error) { - urlv, err := flags.NewURLsValue(urls) - if err != nil { - return nil, errors.Trace(err) - } - - // Ignore error if it is already registered. - _ = store.Register("tikv", tikv.Driver{}) - - tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString()) - tiStore, err := store.New(tiPath) - if err != nil { - return nil, errors.Trace(err) - } - - return tiStore, nil -} diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 9f4ea577410..3b0cb9aec2a 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -213,6 +213,11 @@ func (ti *TableInfo) IsColumnUnique(colID int64) bool { return exist } +// ExistTableUniqueColumn returns whether the table has the unique column +func (ti *TableInfo) ExistTableUniqueColumn() bool { + return len(ti.UniqueColumns) != 0 +} + // IsIndexUnique returns whether the index is unique func (ti *TableInfo) IsIndexUnique(indexInfo *timodel.IndexInfo) bool { if indexInfo.Primary { diff --git a/cdc/kv/store_op.go b/cdc/kv/store_op.go index dc450b648e8..ce58648cb68 100644 --- a/cdc/kv/store_op.go +++ b/cdc/kv/store_op.go @@ -14,16 +14,22 @@ package kv import ( + "fmt" "sort" + "github.com/pingcap/ticdc/pkg/flags" + "github.com/pingcap/tidb/store" + "github.com/pingcap/tidb/store/helper" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/errors" "github.com/pingcap/parser/model" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" ) -// LoadHistoryDDLJobs loads all history DDL jobs from TiDB. -func LoadHistoryDDLJobs(tiStore tidbkv.Storage) ([]*model.Job, error) { +func loadHistoryDDLJobs(tiStore tidbkv.Storage) ([]*model.Job, error) { snapMeta, err := getSnapshotMeta(tiStore) if err != nil { return nil, errors.Trace(err) @@ -52,3 +58,77 @@ func getSnapshotMeta(tiStore tidbkv.Storage) (*meta.Meta, error) { } return meta.NewSnapshotMeta(snapshot), nil } + +// LoadHistoryDDLJobs loads all history DDL jobs from TiDB. +func LoadHistoryDDLJobs(kvStore tidbkv.Storage) ([]*model.Job, error) { + originalJobs, err := loadHistoryDDLJobs(kvStore) + jobs := make([]*model.Job, 0, len(originalJobs)) + if err != nil { + return nil, err + } + tikvStorage, ok := kvStore.(tikv.Storage) + for _, job := range originalJobs { + if job.State != model.JobStateSynced && job.State != model.JobStateDone { + continue + } + if ok { + err := resetFinishedTs(tikvStorage, job) + if err != nil { + return nil, errors.Trace(err) + } + } + jobs = append(jobs, job) + } + return jobs, nil +} + +func resetFinishedTs(kvStore tikv.Storage, job *model.Job) error { + helper := helper.NewHelper(kvStore) + diffKey := schemaDiffKey(job.BinlogInfo.SchemaVersion) + resp, err := helper.GetMvccByEncodedKey(diffKey) + if err != nil { + return errors.Trace(err) + } + mvcc := resp.GetInfo() + if mvcc == nil || len(mvcc.Writes) == 0 { + return errors.NotFoundf("mvcc info, ddl job id: %d, schema version: %d", job.ID, job.BinlogInfo.SchemaVersion) + } + var finishedTS uint64 + for _, w := range mvcc.Writes { + if finishedTS < w.CommitTs { + finishedTS = w.CommitTs + } + } + job.BinlogInfo.FinishedTS = finishedTS + return nil +} + +func CreateTiStore(urls string) (tidbkv.Storage, error) { + urlv, err := flags.NewURLsValue(urls) + if err != nil { + return nil, errors.Trace(err) + } + + // Ignore error if it is already registered. + _ = store.Register("tikv", tikv.Driver{}) + + tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString()) + tiStore, err := store.New(tiPath) + if err != nil { + return nil, errors.Trace(err) + } + + return tiStore, nil +} + +func schemaDiffKey(schemaVersion int64) []byte { + metaPrefix := []byte("m") + mSchemaDiffPrefix := "Diff" + StringData := 's' + key := []byte(fmt.Sprintf("%s:%d", mSchemaDiffPrefix, schemaVersion)) + + ek := make([]byte, 0, len(metaPrefix)+len(key)+24) + ek = append(ek, metaPrefix...) + ek = codec.EncodeBytes(ek, key) + return codec.EncodeUint(ek, uint64(StringData)) +} diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 029369e77b2..6bc6d71f432 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -122,3 +122,15 @@ func (e *DDLEvent) FromMqMessage(key *MqMessageKey, value *MqMessageDDL) { e.Type = value.Type e.Query = value.Query } + +func (e *DDLEvent) FromJob(job *model.Job) { + var tableName string + if job.BinlogInfo.TableInfo != nil { + tableName = job.BinlogInfo.TableInfo.Name.O + } + e.Ts = job.BinlogInfo.FinishedTS + e.Query = job.Query + e.Schema = job.SchemaName + e.Table = tableName + e.Type = job.Type +} diff --git a/cdc/owner.go b/cdc/owner.go index 6c5f8a12b3c..c57486ba21d 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -18,6 +18,7 @@ import ( "fmt" "io" "math" + "strings" "sync" "time" @@ -317,12 +318,34 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri } } -func (c *changeFeed) applyJob(job *timodel.Job) error { +func (c *changeFeed) checkJob(job *timodel.Job) (skip bool) { + switch job.Type { + case timodel.ActionCreateTable: + tableInfo := entry.WrapTableInfo(job.BinlogInfo.TableInfo) + log.Warn("this table is not eligible to duplicate", zap.Reflect("job", job)) + return !tableInfo.ExistTableUniqueColumn() + case timodel.ActionDropColumn, timodel.ActionDropIndex, timodel.ActionDropPrimaryKey: + tableInfo := entry.WrapTableInfo(job.BinlogInfo.TableInfo) + if tableInfo.ExistTableUniqueColumn() { + return false + } + log.Warn("this table is not eligible to duplicate, stop to duplicate this table", zap.Reflect("job", job)) + c.removeTable(uint64(job.SchemaID), uint64(job.TableID)) + return true + } + return false +} + +func (c *changeFeed) applyJob(job *timodel.Job) (skip bool, err error) { log.Info("apply job", zap.String("sql", job.Query), zap.Stringer("job", job)) schamaName, tableName, _, err := c.schema.HandleDDL(job) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) + } + + if c.checkJob(job) { + return true, nil } schemaID := uint64(job.SchemaID) @@ -349,7 +372,7 @@ func (c *changeFeed) applyJob(job *timodel.Job) error { c.addTable(schemaID, addID, job.BinlogInfo.FinishedTS, entry.TableName{Schema: schamaName, Table: tableName}) } - return nil + return false, nil } type ownerImpl struct { @@ -517,7 +540,12 @@ func (o *ownerImpl) newChangeFeed( log.Info("Find new changefeed", zap.Reflect("info", info), zap.String("id", id), zap.Uint64("checkpoint ts", checkpointTs)) - jobs, err := getHistoryDDLJobs(o.pdEndpoints) + // TODO here we create another pb client,we should reuse them + kvStore, err := kv.CreateTiStore(strings.Join(o.pdEndpoints, ",")) + if err != nil { + return nil, err + } + jobs, err := kv.LoadHistoryDDLJobs(kvStore) if err != nil { return nil, errors.Trace(err) } @@ -823,7 +851,8 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C return nil } } - + ddlEvent := new(model.DDLEvent) + ddlEvent.FromJob(todoDDLJob) // Execute DDL Job asynchronously c.ddlState = model.ChangeFeedExecDDL log.Debug("apply job", zap.Stringer("job", todoDDLJob), @@ -831,32 +860,16 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C zap.String("query", todoDDLJob.Query), zap.Uint64("ts", todoDDLJob.BinlogInfo.FinishedTS)) - var tableName, schemaName string - if todoDDLJob.BinlogInfo.TableInfo != nil { - tableName = todoDDLJob.BinlogInfo.TableInfo.Name.O - } // TODO consider some newly added DDL types such as `ActionCreateSequence` - if todoDDLJob.Type != timodel.ActionCreateSchema { - dbInfo, exist := c.schema.SchemaByID(todoDDLJob.SchemaID) - if !exist { - return errors.NotFoundf("schema %d not found", todoDDLJob.SchemaID) - } - schemaName = dbInfo.Name.O - } else { - schemaName = todoDDLJob.BinlogInfo.DBInfo.Name.O - } - ddlEvent := &model.DDLEvent{ - Ts: todoDDLJob.BinlogInfo.FinishedTS, - Query: todoDDLJob.Query, - Schema: schemaName, - Table: tableName, - Type: todoDDLJob.Type, - } - - err := c.applyJob(todoDDLJob) + skip, err := c.applyJob(todoDDLJob) if err != nil { return errors.Trace(err) } + if skip { + c.ddlJobHistory = c.ddlJobHistory[1:] + c.ddlExecutedTs = todoDDLJob.BinlogInfo.FinishedTS + c.ddlState = model.ChangeFeedSyncDML + } c.banlanceOrphanTables(ctx, captures) diff --git a/cdc/processor.go b/cdc/processor.go index 79f16c55a55..fce87621e75 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -28,7 +28,6 @@ import ( "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" - timodel "github.com/pingcap/parser/model" pd "github.com/pingcap/pd/client" "github.com/pingcap/ticdc/cdc/entry" "github.com/pingcap/ticdc/cdc/kv" @@ -38,10 +37,7 @@ import ( "github.com/pingcap/ticdc/cdc/sink" "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/ticdc/pkg/util" - "github.com/pingcap/tidb/store/helper" - "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" - "github.com/pingcap/tidb/util/codec" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" "go.uber.org/zap" @@ -553,7 +549,9 @@ func (p *processor) syncResolved(ctx context.Context) error { } func createSchemaBuilder(pdEndpoints []string, ddlEventCh <-chan *model.RawKVEntry) (*entry.StorageBuilder, error) { - jobs, err := getHistoryDDLJobs(pdEndpoints) + // TODO here we create another pb client,we should reuse them + kvStore, err := kv.CreateTiStore(strings.Join(pdEndpoints, ",")) + jobs, err := kv.LoadHistoryDDLJobs(kvStore) if err != nil { return nil, errors.Trace(err) } @@ -561,63 +559,6 @@ func createSchemaBuilder(pdEndpoints []string, ddlEventCh <-chan *model.RawKVEnt return builder, nil } -func getHistoryDDLJobs(pdEndpoints []string) ([]*timodel.Job, error) { - // TODO here we create another pb client,we should reuse them - kvStore, err := createTiStore(strings.Join(pdEndpoints, ",")) - if err != nil { - return nil, err - } - originalJobs, err := kv.LoadHistoryDDLJobs(kvStore) - jobs := make([]*timodel.Job, 0, len(originalJobs)) - if err != nil { - return nil, err - } - for _, job := range originalJobs { - if job.State != timodel.JobStateSynced && job.State != timodel.JobStateDone { - continue - } - err := resetFinishedTs(kvStore.(tikv.Storage), job) - if err != nil { - return nil, errors.Trace(err) - } - jobs = append(jobs, job) - } - return jobs, nil -} - -func resetFinishedTs(kvStore tikv.Storage, job *timodel.Job) error { - helper := helper.NewHelper(kvStore) - diffKey := schemaDiffKey(job.BinlogInfo.SchemaVersion) - resp, err := helper.GetMvccByEncodedKey(diffKey) - if err != nil { - return errors.Trace(err) - } - mvcc := resp.GetInfo() - if mvcc == nil || len(mvcc.Writes) == 0 { - return errors.NotFoundf("mvcc info, ddl job id: %d, schema version: %d", job.ID, job.BinlogInfo.SchemaVersion) - } - var finishedTS uint64 - for _, w := range mvcc.Writes { - if finishedTS < w.CommitTs { - finishedTS = w.CommitTs - } - } - job.BinlogInfo.FinishedTS = finishedTS - return nil -} - -func schemaDiffKey(schemaVersion int64) []byte { - metaPrefix := []byte("m") - mSchemaDiffPrefix := "Diff" - StringData := 's' - key := []byte(fmt.Sprintf("%s:%d", mSchemaDiffPrefix, schemaVersion)) - - ek := make([]byte, 0, len(metaPrefix)+len(key)+24) - ek = append(ek, metaPrefix...) - ek = codec.EncodeBytes(ek, key) - return codec.EncodeUint(ek, uint64(StringData)) -} - func createTsRWriter(cli kv.CDCEtcdClient, changefeedID, captureID string) (storage.ProcessorTsRWriter, error) { return storage.NewProcessorTsEtcdRWriter(cli, changefeedID, captureID) } diff --git a/cdc/puller/mock_puller.go b/cdc/puller/mock_puller.go index d118ddcc846..87e6869d5ac 100644 --- a/cdc/puller/mock_puller.go +++ b/cdc/puller/mock_puller.go @@ -11,7 +11,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/log" timodel "github.com/pingcap/parser/model" - "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/domain" @@ -243,9 +242,10 @@ func (m *MockPullerManager) GetTableInfo(schemaName, tableName string) *entry.Ta // GetDDLJobs returns the ddl jobs func (m *MockPullerManager) GetDDLJobs() []*timodel.Job { - jobs, err := kv.LoadHistoryDDLJobs(m.store) - m.c.Assert(err, check.IsNil) - return jobs + //jobs, err := kv.LoadHistoryDDLJobs(m.store) + //m.c.Assert(err, check.IsNil) + //return jobs + return nil } func (m *MockPullerManager) postPrewrite(req *kvrpcpb.PrewriteRequest, result []error) { diff --git a/cmd/client.go b/cmd/client.go index 845058d9691..c5b6d4b0dc2 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/pingcap/ticdc/cdc/entry" + "github.com/BurntSushi/toml" "github.com/chzyer/readline" _ "github.com/go-sql-driver/mysql" // mysql driver @@ -41,6 +43,7 @@ var ( sinkURI string configFile string cliPdAddr string + noConfirm bool cdcEtcdCli kv.CDCEtcdClient pdCli pd.Client @@ -191,6 +194,25 @@ func newCreateChangefeedCommand() *cobra.Command { Config: cfg, } + ineligibleTables, err := verifyTables(ctx, cfg) + if err != nil { + return err + } + if len(ineligibleTables) != 0 { + cmd.Printf("[WARN] some tables are not eligible to duplicate, %#v\n", ineligibleTables) + if !noConfirm { + cmd.Printf("Could you agree to ignore those tables, and continue to duplicate [Y/N]\n") + var yOrN string + _, err := fmt.Scan(&yOrN) + if err != nil { + return err + } + if strings.TrimSpace(yOrN) != "Y" { + cmd.Printf("Failed to create changefeed\n") + } + } + } + for _, opt := range opts { s := strings.SplitN(opt, "=", 2) if len(s) <= 0 { @@ -221,6 +243,7 @@ func newCreateChangefeedCommand() *cobra.Command { command.PersistentFlags().StringVar(&sinkURI, "sink-uri", "mysql://root:123456@127.0.0.1:3306/", "sink uri") command.PersistentFlags().StringVar(&configFile, "config", "", "Path of the configuration file") command.PersistentFlags().StringSliceVar(&opts, "opts", nil, "Extra options, in the `key=value` format") + command.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Extra options, in the `key=value` format") return command } @@ -243,6 +266,47 @@ func verifyStartTs(ctx context.Context, startTs uint64, cli kv.CDCEtcdClient) er return nil } +func verifyTables(ctx context.Context, cfg *util.ReplicaConfig) (ineligibleTables []entry.TableName, err error) { + kvStore, err := kv.CreateTiStore(cliPdAddr) + if err != nil { + return nil, err + } + jobs, err := kv.LoadHistoryDDLJobs(kvStore) + if err != nil { + return nil, errors.Trace(err) + } + + schemaStorage := entry.NewSingleStorage() + + for _, job := range jobs { + if job.BinlogInfo.FinishedTS > startTs { + break + } + _, _, _, err := schemaStorage.HandleDDL(job) + if err != nil { + return nil, errors.Trace(err) + } + } + filter, err := util.NewFilter(cfg) + if err != nil { + return nil, errors.Trace(err) + } + + for tID, tableName := range schemaStorage.CloneTables() { + tableInfo, exist := schemaStorage.TableByID(int64(tID)) + if !exist { + return nil, errors.NotFoundf("table %d", int64(tID)) + } + if filter.ShouldIgnoreTable(tableName.Schema, tableName.Table) { + continue + } + if !tableInfo.ExistTableUniqueColumn() { + ineligibleTables = append(ineligibleTables, tableName) + } + } + return +} + // strictDecodeFile decodes the toml file strictly. If any item in confFile file is not mapped // into the Config struct, issue an error and stop the server from starting. func strictDecodeFile(path, component string, cfg interface{}) error { diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index 2b1beeb79ee..f18412ebbd9 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -211,7 +211,7 @@ func (tr *testRunner) execSQLs(sqls []string) { // RunCase run some simple test case func RunCase(src *sql.DB, dst *sql.DB, schema string) { tr := &testRunner{src: src, dst: dst, schema: schema} - + ineligibleTable(tr, src, dst) runPKorUKcases(tr) tr.run(caseUpdateWhileAddingCol) @@ -327,6 +327,47 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) { // tr.execSQLs([]string{"DROP TABLE binlog_big;"}) } +func ineligibleTable(tr *testRunner, src *sql.DB, dst *sql.DB) { + sqls := []string{ + "CREATE TABLE ineligible_test.ineligible_table1 (uk int UNIQUE null, ncol int);", + "CREATE TABLE ineligible_test.ineligible_table2 (ncol1 int, ncol2 int);", + + "insert into ineligible_table1 (uk, ncol) values (1,1);", + "insert into ineligible_table1 (uk, ncol) values (null,2);", + + "insert into ineligible_table2 (uk, ncol) values (1,1);", + "insert into ineligible_table2 (uk, ncol) values (2,2);", + } + // execute SQL but don't check + for _, sql := range sqls { + mustExec(src, sql) + } + + sqls = []string{ + "CREATE TABLE eligible_table (uk int UNIQUE not null, ncol int);", + "insert into eligible_table (uk, ncol) values (1,1);", + "insert into eligible_table (uk, ncol) values (2,2);", + "insert into eligible_table (uk, ncol) values (3,4);", + } + // execute SQL and check + tr.execSQLs(sqls) + + rows, err := dst.Query("show tables") + if err != nil { + log.S().Fatalf("exec failed, sql: 'show tables', err: %+v", err) + } + for rows.Next() { + var tableName string + err := rows.Scan(&tableName) + if err != nil { + log.S().Fatalf("scan result set failed, err: %+v", err) + } + if tableName == "ineligible_table1" || tableName == "ineligible_table2" { + log.S().Fatalf("found unexpected table %s", tableName) + } + } +} + func caseUpdateWhileAddingCol(db *sql.DB) { mustExec(db, ` CREATE TABLE growing_cols ( From d6ea6fd69fe6e1ad918d83ae491a4494c0a2bb85 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 23 Mar 2020 23:12:37 +0800 Subject: [PATCH 02/14] fix lint --- cdc/kv/store_op.go | 1 + cdc/model/sink.go | 1 + cdc/processor.go | 3 +++ 3 files changed, 5 insertions(+) diff --git a/cdc/kv/store_op.go b/cdc/kv/store_op.go index ce58648cb68..b933fde9ca7 100644 --- a/cdc/kv/store_op.go +++ b/cdc/kv/store_op.go @@ -103,6 +103,7 @@ func resetFinishedTs(kvStore tikv.Storage, job *model.Job) error { return nil } +// CreateTiStore creates a new tikv storage client func CreateTiStore(urls string) (tidbkv.Storage, error) { urlv, err := flags.NewURLsValue(urls) if err != nil { diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 6bc6d71f432..ff62a9614be 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -123,6 +123,7 @@ func (e *DDLEvent) FromMqMessage(key *MqMessageKey, value *MqMessageDDL) { e.Query = value.Query } +// FromJob fills the values of DDLEvent from DDL job func (e *DDLEvent) FromJob(job *model.Job) { var tableName string if job.BinlogInfo.TableInfo != nil { diff --git a/cdc/processor.go b/cdc/processor.go index fce87621e75..bf0b7528670 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -551,6 +551,9 @@ func (p *processor) syncResolved(ctx context.Context) error { func createSchemaBuilder(pdEndpoints []string, ddlEventCh <-chan *model.RawKVEntry) (*entry.StorageBuilder, error) { // TODO here we create another pb client,we should reuse them kvStore, err := kv.CreateTiStore(strings.Join(pdEndpoints, ",")) + if err != nil { + return nil, errors.Trace(err) + } jobs, err := kv.LoadHistoryDDLJobs(kvStore) if err != nil { return nil, errors.Trace(err) From 4065c131e7492abe4dc518ca3bbf3fe5a7f13a61 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 23 Mar 2020 23:22:00 +0800 Subject: [PATCH 03/14] fix bug --- cdc/owner.go | 1 + cdc/puller/mock_puller.go | 8 +++++--- cmd/client.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index c57486ba21d..1a9547c4d3e 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -869,6 +869,7 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C c.ddlJobHistory = c.ddlJobHistory[1:] c.ddlExecutedTs = todoDDLJob.BinlogInfo.FinishedTS c.ddlState = model.ChangeFeedSyncDML + return nil } c.banlanceOrphanTables(ctx, captures) diff --git a/cdc/puller/mock_puller.go b/cdc/puller/mock_puller.go index 87e6869d5ac..ee315ca1cb7 100644 --- a/cdc/puller/mock_puller.go +++ b/cdc/puller/mock_puller.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "github.com/pingcap/ticdc/cdc/kv" + "github.com/pingcap/ticdc/cdc/entry" "github.com/pingcap/check" @@ -242,9 +244,9 @@ func (m *MockPullerManager) GetTableInfo(schemaName, tableName string) *entry.Ta // GetDDLJobs returns the ddl jobs func (m *MockPullerManager) GetDDLJobs() []*timodel.Job { - //jobs, err := kv.LoadHistoryDDLJobs(m.store) - //m.c.Assert(err, check.IsNil) - //return jobs + jobs, err := kv.LoadHistoryDDLJobs(m.store) + m.c.Assert(err, check.IsNil) + return jobs return nil } diff --git a/cmd/client.go b/cmd/client.go index c5b6d4b0dc2..94c481bc7b7 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -243,7 +243,7 @@ func newCreateChangefeedCommand() *cobra.Command { command.PersistentFlags().StringVar(&sinkURI, "sink-uri", "mysql://root:123456@127.0.0.1:3306/", "sink uri") command.PersistentFlags().StringVar(&configFile, "config", "", "Path of the configuration file") command.PersistentFlags().StringSliceVar(&opts, "opts", nil, "Extra options, in the `key=value` format") - command.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Extra options, in the `key=value` format") + command.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Don't ask user whether to ignore ineligible table") return command } From 510a23e24bbbcbb5d4e1941b91bbd59f6f93a703 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 23 Mar 2020 23:23:11 +0800 Subject: [PATCH 04/14] fix bug --- cdc/puller/mock_puller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cdc/puller/mock_puller.go b/cdc/puller/mock_puller.go index ee315ca1cb7..dfce0d392ab 100644 --- a/cdc/puller/mock_puller.go +++ b/cdc/puller/mock_puller.go @@ -247,7 +247,6 @@ func (m *MockPullerManager) GetDDLJobs() []*timodel.Job { jobs, err := kv.LoadHistoryDDLJobs(m.store) m.c.Assert(err, check.IsNil) return jobs - return nil } func (m *MockPullerManager) postPrewrite(req *kvrpcpb.PrewriteRequest, result []error) { From a39315205fabefe940f4665429253a62e4a1409b Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 24 Mar 2020 10:19:49 +0800 Subject: [PATCH 05/14] fix bug --- tests/dailytest/case.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index f18412ebbd9..d398e70ee5c 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -329,8 +329,8 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) { func ineligibleTable(tr *testRunner, src *sql.DB, dst *sql.DB) { sqls := []string{ - "CREATE TABLE ineligible_test.ineligible_table1 (uk int UNIQUE null, ncol int);", - "CREATE TABLE ineligible_test.ineligible_table2 (ncol1 int, ncol2 int);", + "CREATE TABLE ineligible_table1 (uk int UNIQUE null, ncol int);", + "CREATE TABLE ineligible_table2 (ncol1 int, ncol2 int);", "insert into ineligible_table1 (uk, ncol) values (1,1);", "insert into ineligible_table1 (uk, ncol) values (null,2);", From 43755643bf41136e536c6e8ee56f3a2362477c68 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 24 Mar 2020 10:53:10 +0800 Subject: [PATCH 06/14] fix bug --- tests/dailytest/case.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index d398e70ee5c..bdda90595e1 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -335,8 +335,8 @@ func ineligibleTable(tr *testRunner, src *sql.DB, dst *sql.DB) { "insert into ineligible_table1 (uk, ncol) values (1,1);", "insert into ineligible_table1 (uk, ncol) values (null,2);", - "insert into ineligible_table2 (uk, ncol) values (1,1);", - "insert into ineligible_table2 (uk, ncol) values (2,2);", + "insert into ineligible_table2 (ncol1, ncol2) values (1,1);", + "insert into ineligible_table2 (ncol1, ncol2) values (2,2);", } // execute SQL but don't check for _, sql := range sqls { From 7764e10efc0d8b3990bbe89c0b52bdbf3bff1461 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 24 Mar 2020 11:20:04 +0800 Subject: [PATCH 07/14] fix bug --- tests/dailytest/case.go | 42 ++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index bdda90595e1..a3a3b94bb4f 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -337,33 +337,37 @@ func ineligibleTable(tr *testRunner, src *sql.DB, dst *sql.DB) { "insert into ineligible_table2 (ncol1, ncol2) values (1,1);", "insert into ineligible_table2 (ncol1, ncol2) values (2,2);", - } - // execute SQL but don't check - for _, sql := range sqls { - mustExec(src, sql) - } - - sqls = []string{ "CREATE TABLE eligible_table (uk int UNIQUE not null, ncol int);", "insert into eligible_table (uk, ncol) values (1,1);", "insert into eligible_table (uk, ncol) values (2,2);", "insert into eligible_table (uk, ncol) values (3,4);", } - // execute SQL and check - tr.execSQLs(sqls) - - rows, err := dst.Query("show tables") - if err != nil { - log.S().Fatalf("exec failed, sql: 'show tables', err: %+v", err) + // execute SQL but don't check + for _, sql := range sqls { + mustExec(src, sql) } - for rows.Next() { - var tableName string - err := rows.Scan(&tableName) + + synced := false + for { + rows, err := dst.Query("show tables") if err != nil { - log.S().Fatalf("scan result set failed, err: %+v", err) + log.S().Fatalf("exec failed, sql: 'show tables', err: %+v", err) } - if tableName == "ineligible_table1" || tableName == "ineligible_table2" { - log.S().Fatalf("found unexpected table %s", tableName) + for rows.Next() { + var tableName string + err := rows.Scan(&tableName) + if err != nil { + log.S().Fatalf("scan result set failed, err: %+v", err) + } + if tableName == "ineligible_table1" || tableName == "ineligible_table2" { + log.S().Fatalf("found unexpected table %s", tableName) + } + if synced { + return + } + if tableName == "eligible_table" { + synced = true + } } } } From 5da35871e304284e3d41d2482eca544e8f7968dd Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 24 Mar 2020 11:25:01 +0800 Subject: [PATCH 08/14] fix bug --- tests/dailytest/case.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index a3a3b94bb4f..d9444cd8381 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -348,6 +348,7 @@ func ineligibleTable(tr *testRunner, src *sql.DB, dst *sql.DB) { } synced := false +TestLoop: for { rows, err := dst.Query("show tables") if err != nil { @@ -363,13 +364,23 @@ func ineligibleTable(tr *testRunner, src *sql.DB, dst *sql.DB) { log.S().Fatalf("found unexpected table %s", tableName) } if synced { - return + break TestLoop } if tableName == "eligible_table" { synced = true } } } + + // clean up + sqls = []string{ + "DROP TABLE ineligible_table1;", + "DROP TABLE ineligible_table2;", + "DROP TABLE ineligible_table;", + } + for _, sql := range sqls { + mustExec(src, sql) + } } func caseUpdateWhileAddingCol(db *sql.DB) { From 0df26e8c229a4868f5c449884027c2716be8456f Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 24 Mar 2020 11:39:01 +0800 Subject: [PATCH 09/14] fix bug --- cmd/client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/client.go b/cmd/client.go index 94c481bc7b7..3895f5302db 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/entry" "github.com/BurntSushi/toml" @@ -208,7 +210,7 @@ func newCreateChangefeedCommand() *cobra.Command { return err } if strings.TrimSpace(yOrN) != "Y" { - cmd.Printf("Failed to create changefeed\n") + log.S().Fatal("Failed to create changefeed\n") } } } From 915fbcb62973b7b755fb3b632bb272fd6623d1c6 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 24 Mar 2020 12:50:00 +0800 Subject: [PATCH 10/14] fix bug --- tests/dailytest/case.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index d9444cd8381..b6012415283 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -376,7 +376,7 @@ TestLoop: sqls = []string{ "DROP TABLE ineligible_table1;", "DROP TABLE ineligible_table2;", - "DROP TABLE ineligible_table;", + "DROP TABLE eligible_table;", } for _, sql := range sqls { mustExec(src, sql) From 369ca2ff373dc7ab6fa45b39f189407202c997d6 Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 25 Mar 2020 11:42:00 +0800 Subject: [PATCH 11/14] update --- cdc/entry/schema_storage.go | 29 ++++++++++++++++++++++++----- cdc/owner.go | 17 ++++++++++++----- cmd/client.go | 4 ++-- tests/dailytest/case.go | 15 ++++++++------- 4 files changed, 46 insertions(+), 19 deletions(-) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 3b0cb9aec2a..2f07006f204 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -38,7 +38,8 @@ type Storage struct { schemas map[int64]*timodel.DBInfo tables map[int64]*TableInfo - truncateTableID map[int64]struct{} + truncateTableID map[int64]struct{} + ineligibleTableID map[int64]struct{} schemaMetaVersion int64 lastHandledTs uint64 @@ -253,6 +254,7 @@ func NewSingleStorage() *Storage { s := &Storage{ version2SchemaTable: make(map[int64]TableName), truncateTableID: make(map[int64]struct{}), + ineligibleTableID: make(map[int64]struct{}), } s.tableIDToName = make(map[int64]TableName) @@ -387,6 +389,7 @@ func (s *Storage) DropTable(id int64) (string, error) { tableName := s.tableIDToName[id] delete(s.tableIDToName, id) delete(s.tableNameToID, tableName) + delete(s.ineligibleTableID, id) log.Debug("drop table success", zap.String("name", table.Name.O), zap.Int64("id", id)) return table.Name.O, nil @@ -400,7 +403,12 @@ func (s *Storage) CreateTable(schema *timodel.DBInfo, table *timodel.TableInfo) } schema.Tables = append(schema.Tables, table) - s.tables[table.ID] = WrapTableInfo(table) + tbl := WrapTableInfo(table) + s.tables[table.ID] = tbl + if !tbl.ExistTableUniqueColumn() { + log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) + s.ineligibleTableID[table.ID] = struct{}{} + } s.tableIDToName[table.ID] = TableName{Schema: schema.Name.O, Table: table.Name.O} s.tableNameToID[s.tableIDToName[table.ID]] = table.ID @@ -414,9 +422,12 @@ func (s *Storage) ReplaceTable(table *timodel.TableInfo) error { if !ok { return errors.NotFoundf("table %s(%d)", table.Name, table.ID) } - - s.tables[table.ID] = WrapTableInfo(table) - + tbl := WrapTableInfo(table) + s.tables[table.ID] = tbl + if !tbl.ExistTableUniqueColumn() { + log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) + s.ineligibleTableID[table.ID] = struct{}{} + } return nil } @@ -675,6 +686,9 @@ func (s *Storage) Clone() *Storage { for k, v := range s.truncateTableID { n.truncateTableID[k] = v } + for k, v := range s.ineligibleTableID { + n.ineligibleTableID[k] = v + } for k, v := range s.version2SchemaTable { n.version2SchemaTable[k] = v } @@ -693,6 +707,11 @@ func (s *Storage) IsTruncateTableID(id int64) bool { return ok } +func (s *Storage) IsIneligibleTableID(id int64) bool { + _, ok := s.ineligibleTableID[id] + return ok +} + // SkipJob skip the job should not be executed // TiDB write DDL Binlog for every DDL Job, we must ignore jobs that are cancelled or rollback // For older version TiDB, it write DDL Binlog in the txn that the state of job is changed to *synced* diff --git a/cdc/owner.go b/cdc/owner.go index 1a9547c4d3e..1e9e8ceb895 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -322,14 +322,17 @@ func (c *changeFeed) checkJob(job *timodel.Job) (skip bool) { switch job.Type { case timodel.ActionCreateTable: tableInfo := entry.WrapTableInfo(job.BinlogInfo.TableInfo) - log.Warn("this table is not eligible to duplicate", zap.Reflect("job", job)) - return !tableInfo.ExistTableUniqueColumn() + if !tableInfo.ExistTableUniqueColumn() { + log.Warn("this table is not eligible to replicate", zap.Reflect("job", job)) + return true + } + return false case timodel.ActionDropColumn, timodel.ActionDropIndex, timodel.ActionDropPrimaryKey: tableInfo := entry.WrapTableInfo(job.BinlogInfo.TableInfo) if tableInfo.ExistTableUniqueColumn() { return false } - log.Warn("this table is not eligible to duplicate, stop to duplicate this table", zap.Reflect("job", job)) + log.Warn("this table is not eligible to replicate, stop to replicate this table", zap.Reflect("job", job)) c.removeTable(uint64(job.SchemaID), uint64(job.TableID)) return true } @@ -344,11 +347,15 @@ func (c *changeFeed) applyJob(job *timodel.Job) (skip bool, err error) { return false, errors.Trace(err) } - if c.checkJob(job) { + schemaID := uint64(job.SchemaID) + if job.BinlogInfo != nil && job.BinlogInfo.TableInfo != nil && c.schema.IsIneligibleTableID(job.BinlogInfo.TableInfo.ID) { + tableID := uint64(job.BinlogInfo.TableInfo.ID) + if _, exist := c.tables[tableID]; exist { + c.removeTable(schemaID, tableID) + } return true, nil } - schemaID := uint64(job.SchemaID) // case table id set may change switch job.Type { case timodel.ActionCreateSchema: diff --git a/cmd/client.go b/cmd/client.go index 3895f5302db..2e123948cf2 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -201,9 +201,9 @@ func newCreateChangefeedCommand() *cobra.Command { return err } if len(ineligibleTables) != 0 { - cmd.Printf("[WARN] some tables are not eligible to duplicate, %#v\n", ineligibleTables) + cmd.Printf("[WARN] some tables are not eligible to replicate, %#v\n", ineligibleTables) if !noConfirm { - cmd.Printf("Could you agree to ignore those tables, and continue to duplicate [Y/N]\n") + cmd.Printf("Could you agree to ignore those tables, and continue to replicate [Y/N]\n") var yOrN string _, err := fmt.Scan(&yOrN) if err != nil { diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index b6012415283..378a50af981 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -333,14 +333,17 @@ func ineligibleTable(tr *testRunner, src *sql.DB, dst *sql.DB) { "CREATE TABLE ineligible_table2 (ncol1 int, ncol2 int);", "insert into ineligible_table1 (uk, ncol) values (1,1);", - "insert into ineligible_table1 (uk, ncol) values (null,2);", - - "insert into ineligible_table2 (ncol1, ncol2) values (1,1);", "insert into ineligible_table2 (ncol1, ncol2) values (2,2);", + "ALTER TABLE ineligible_table1 ADD COLUMN c1 INT NOT NULL;", + "ALTER TABLE ineligible_table2 ADD COLUMN c1 INT NOT NULL;", + "insert into ineligible_table1 (uk, ncol, c1) values (null,2,3);", + "insert into ineligible_table2 (ncol1, ncol2, c1) values (1,1,3);", + "CREATE TABLE eligible_table (uk int UNIQUE not null, ncol int);", "insert into eligible_table (uk, ncol) values (1,1);", "insert into eligible_table (uk, ncol) values (2,2);", - "insert into eligible_table (uk, ncol) values (3,4);", + "ALTER TABLE eligible_table ADD COLUMN c1 INT NOT NULL;", + "insert into eligible_table (uk, ncol, c1) values (3,4,5);", } // execute SQL but don't check for _, sql := range sqls { @@ -378,9 +381,7 @@ TestLoop: "DROP TABLE ineligible_table2;", "DROP TABLE eligible_table;", } - for _, sql := range sqls { - mustExec(src, sql) - } + tr.execSQLs(sqls) } func caseUpdateWhileAddingCol(db *sql.DB) { From 7b1c403ef26aead621db7dead9cd33414458614d Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 25 Mar 2020 11:42:38 +0800 Subject: [PATCH 12/14] fix lint --- cdc/entry/schema_storage.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 2f07006f204..9ad2ce23ca4 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -707,6 +707,7 @@ func (s *Storage) IsTruncateTableID(id int64) bool { return ok } +// IsTruncateTableID returns true if the table is ineligible func (s *Storage) IsIneligibleTableID(id int64) bool { _, ok := s.ineligibleTableID[id] return ok From 89096f21484d51042cfd1ac34e5a3a85802b6bf6 Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 25 Mar 2020 11:43:28 +0800 Subject: [PATCH 13/14] fix lint --- cdc/entry/schema_storage.go | 2 +- cdc/owner.go | 21 --------------------- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 9ad2ce23ca4..525d5db18b7 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -707,7 +707,7 @@ func (s *Storage) IsTruncateTableID(id int64) bool { return ok } -// IsTruncateTableID returns true if the table is ineligible +// IsIneligibleTableID returns true if the table is ineligible func (s *Storage) IsIneligibleTableID(id int64) bool { _, ok := s.ineligibleTableID[id] return ok diff --git a/cdc/owner.go b/cdc/owner.go index 1e9e8ceb895..e88e94ab367 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -318,27 +318,6 @@ func (c *changeFeed) banlanceOrphanTables(ctx context.Context, captures map[stri } } -func (c *changeFeed) checkJob(job *timodel.Job) (skip bool) { - switch job.Type { - case timodel.ActionCreateTable: - tableInfo := entry.WrapTableInfo(job.BinlogInfo.TableInfo) - if !tableInfo.ExistTableUniqueColumn() { - log.Warn("this table is not eligible to replicate", zap.Reflect("job", job)) - return true - } - return false - case timodel.ActionDropColumn, timodel.ActionDropIndex, timodel.ActionDropPrimaryKey: - tableInfo := entry.WrapTableInfo(job.BinlogInfo.TableInfo) - if tableInfo.ExistTableUniqueColumn() { - return false - } - log.Warn("this table is not eligible to replicate, stop to replicate this table", zap.Reflect("job", job)) - c.removeTable(uint64(job.SchemaID), uint64(job.TableID)) - return true - } - return false -} - func (c *changeFeed) applyJob(job *timodel.Job) (skip bool, err error) { log.Info("apply job", zap.String("sql", job.Query), zap.Stringer("job", job)) From 284f5ede71a69f35e07776671d833ca622eb9523 Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 25 Mar 2020 12:23:10 +0800 Subject: [PATCH 14/14] fix test --- cdc/entry/schema_storage.go | 1 + cdc/entry/schema_storage_test.go | 210 ++++++++++++------------------- 2 files changed, 82 insertions(+), 129 deletions(-) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 525d5db18b7..67a44c18d02 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -666,6 +666,7 @@ func (s *Storage) Clone() *Storage { tables: make(map[int64]*TableInfo), truncateTableID: make(map[int64]struct{}), + ineligibleTableID: make(map[int64]struct{}), version2SchemaTable: make(map[int64]TableName), } for k, v := range s.tableIDToName { diff --git a/cdc/entry/schema_storage_test.go b/cdc/entry/schema_storage_test.go index 2481c49c76b..afeec450dc9 100644 --- a/cdc/entry/schema_storage_test.go +++ b/cdc/entry/schema_storage_test.go @@ -13,9 +13,7 @@ package entry -/* import ( - "context" "fmt" parser_types "github.com/pingcap/parser/types" @@ -24,7 +22,6 @@ import ( "github.com/pingcap/errors" timodel "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/tidb/types" ) @@ -32,21 +29,7 @@ type schemaSuite struct{} var _ = Suite(&schemaSuite{}) -func NewStorageForTest(ctx context.Context, c *C, jobs []*timodel.Job) *Storage { - jobCh := make(chan *model.RawKVEntry) - storageBuilder, err := NewStorageBuilder(jobs, jobCh) - c.Assert(err, IsNil) - go func() { - err := storageBuilder.Run(ctx) - if err != nil && errors.Cause(err) != context.Canceled { - c.Fatal(err) - } - }() - return storageBuilder.Build(0) -} - func (t *schemaSuite) TestSchema(c *C) { - var jobs []*timodel.Job dbName := timodel.NewCIStr("Test") // db and ignoreDB info dbInfo := &timodel.DBInfo{ @@ -55,7 +38,7 @@ func (t *schemaSuite) TestSchema(c *C) { State: timodel.StatePublic, } // `createSchema` job1 - job1 := &timodel.Job{ + job := &timodel.Job{ ID: 3, State: timodel.JobStateSynced, SchemaID: 1, @@ -63,7 +46,28 @@ func (t *schemaSuite) TestSchema(c *C) { BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 1, DBInfo: dbInfo, FinishedTS: 123}, Query: "create database test", } - jobDup := &timodel.Job{ + // reconstruct the local schema + schema := NewSingleStorage() + _, _, _, err := schema.HandleDDL(job) + c.Assert(err, IsNil) + _, exist := schema.SchemaByID(job.SchemaID) + c.Assert(exist, IsTrue) + + // test drop schema + job = &timodel.Job{ + ID: 6, + State: timodel.JobStateSynced, + SchemaID: 1, + Type: timodel.ActionDropSchema, + BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 3, FinishedTS: 124}, + Query: "drop database test", + } + _, _, _, err = schema.HandleDDL(job) + c.Assert(err, IsNil) + _, exist = schema.SchemaByID(job.SchemaID) + c.Assert(exist, IsFalse) + + job = &timodel.Job{ ID: 3, State: timodel.JobStateSynced, SchemaID: 1, @@ -71,75 +75,26 @@ func (t *schemaSuite) TestSchema(c *C) { BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 2, DBInfo: dbInfo, FinishedTS: 124}, Query: "create database test", } - job2 := &timodel.Job{ - ID: 5, - State: timodel.JobStateRollbackDone, - BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 2, DBInfo: dbInfo, FinishedTS: 125}} - - jobs = append(jobs, job1, job2) - // reconstruct the local schema - ctx, cancel := context.WithCancel(context.Background()) - schema := NewStorageForTest(ctx, c, jobs) - err := schema.HandlePreviousDDLJobIfNeed(123) + _, _, _, err = schema.HandleDDL(job) c.Assert(err, IsNil) - cancel() - - // test drop schema - jobs = append( - jobs, - &timodel.Job{ - ID: 6, - State: timodel.JobStateSynced, - SchemaID: 1, - Type: timodel.ActionDropSchema, - BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 3, FinishedTS: 124}, - Query: "drop database test", - }, - ) - ctx, cancel = context.WithCancel(context.Background()) - schema = NewStorageForTest(ctx, c, jobs) - err = schema.HandlePreviousDDLJobIfNeed(124) - c.Assert(err, IsNil) - cancel() - - // test create schema already exist error - jobs = jobs[:0] - jobs = append(jobs, job1, jobDup, job2) - - ctx, cancel = context.WithCancel(context.Background()) - schema = NewStorageForTest(ctx, c, jobs) - err = schema.HandlePreviousDDLJobIfNeed(125) + _, _, _, err = schema.HandleDDL(job) c.Log(err) c.Assert(errors.IsAlreadyExists(err), IsTrue) - cancel() // test schema drop schema error - jobs = jobs[:0] - jobs = append( - jobs, - &timodel.Job{ - ID: 9, - State: timodel.JobStateSynced, - SchemaID: 1, - Type: timodel.ActionDropSchema, - BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 1, FinishedTS: 123}, - Query: "drop database test", - }, - ) - ctx, cancel = context.WithCancel(context.Background()) - schema = NewStorageForTest(ctx, c, jobs) - err = schema.HandlePreviousDDLJobIfNeed(123) + job = &timodel.Job{ + ID: 9, + State: timodel.JobStateSynced, + SchemaID: 1, + Type: timodel.ActionDropSchema, + BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 1, FinishedTS: 123}, + Query: "drop database test", + } + _, _, _, err = schema.HandleDDL(job) + c.Assert(err, IsNil) + _, _, _, err = schema.HandleDDL(job) c.Assert(errors.IsNotFound(err), IsTrue) - cancel() - - // test unresolved error - jobs = append(jobs, job1, job2) - ctx, cancel = context.WithCancel(context.Background()) - schema = NewStorageForTest(ctx, c, jobs) - err = schema.HandlePreviousDDLJobIfNeed(200) - c.Assert(errors.Cause(err), Equals, model.ErrUnresolved) - cancel() } func (*schemaSuite) TestTable(c *C) { @@ -221,6 +176,7 @@ func (*schemaSuite) TestTable(c *C) { jobs = append(jobs, job) // construct a historical `addIndex` job + tblInfo = tblInfo.Clone() tblInfo.Indices = []*timodel.IndexInfo{idxInfo} job = &timodel.Job{ ID: 8, @@ -234,10 +190,11 @@ func (*schemaSuite) TestTable(c *C) { jobs = append(jobs, job) // reconstruct the local schema - ctx, cancel := context.WithCancel(context.Background()) - schema := NewStorageForTest(ctx, c, jobs) - err := schema.HandlePreviousDDLJobIfNeed(126) - c.Assert(err, IsNil) + schema := NewSingleStorage() + for _, job := range jobs { + _, _, _, err := schema.HandleDDL(job) + c.Assert(err, IsNil) + } // check the historical db that constructed above whether in the schema list of local schema _, ok := schema.SchemaByID(dbInfo.ID) @@ -247,72 +204,68 @@ func (*schemaSuite) TestTable(c *C) { c.Assert(ok, IsTrue) c.Assert(table.Columns, HasLen, 1) c.Assert(table.Indices, HasLen, 1) - cancel() + + // test ineligible tables + c.Assert(schema.IsIneligibleTableID(2), IsTrue) + // check truncate table tblInfo1 := &timodel.TableInfo{ ID: 9, Name: tbName, State: timodel.StatePublic, } - jobs = append( - jobs, - &timodel.Job{ - ID: 9, - State: timodel.JobStateSynced, - SchemaID: 3, - TableID: 2, - Type: timodel.ActionTruncateTable, - BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 5, TableInfo: tblInfo1, FinishedTS: 127}, - Query: "truncate table " + tbName.O, - }, - ) + job = &timodel.Job{ + ID: 9, + State: timodel.JobStateSynced, + SchemaID: 3, + TableID: 2, + Type: timodel.ActionTruncateTable, + BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 5, TableInfo: tblInfo1, FinishedTS: 127}, + Query: "truncate table " + tbName.O, + } - ctx, cancel = context.WithCancel(context.Background()) - schema1 := NewStorageForTest(ctx, c, jobs) - err = schema1.HandlePreviousDDLJobIfNeed(127) + _, _, _, err := schema.HandleDDL(job) c.Assert(err, IsNil) - _, ok = schema1.TableByID(tblInfo1.ID) + + _, ok = schema.TableByID(tblInfo1.ID) c.Assert(ok, IsTrue) - _, ok = schema1.TableByID(2) + _, ok = schema.TableByID(2) c.Assert(ok, IsFalse) - cancel() + + // test ineligible tables + c.Assert(schema.IsIneligibleTableID(9), IsTrue) + c.Assert(schema.IsIneligibleTableID(2), IsFalse) // check drop table - jobs = append( - jobs, - &timodel.Job{ - ID: 9, - State: timodel.JobStateSynced, - SchemaID: 3, - TableID: 9, - Type: timodel.ActionDropTable, - BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 6, FinishedTS: 128}, - Query: "drop table " + tbName.O, - }, - ) - ctx, cancel = context.WithCancel(context.Background()) - schema2 := NewStorageForTest(ctx, c, jobs) - err = schema2.HandlePreviousDDLJobIfNeed(128) + job = &timodel.Job{ + ID: 9, + State: timodel.JobStateSynced, + SchemaID: 3, + TableID: 9, + Type: timodel.ActionDropTable, + BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 6, FinishedTS: 128}, + Query: "drop table " + tbName.O, + } + _, _, _, err = schema.HandleDDL(job) c.Assert(err, IsNil) - _, ok = schema2.TableByID(tblInfo.ID) + _, ok = schema.TableByID(tblInfo.ID) c.Assert(ok, IsFalse) - // test GetTableNameByID - _, ok = schema1.GetTableNameByID(9) - c.Assert(ok, IsTrue) + + // test ineligible tables + c.Assert(schema.IsIneligibleTableID(9), IsFalse) + // drop schema - _, err = schema1.DropSchema(3) + _, err = schema.DropSchema(3) c.Assert(err, IsNil) // test schema version c.Assert(schema.SchemaMetaVersion(), Equals, int64(0)) - cancel() + } func (t *schemaSuite) TestHandleDDL(c *C) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - schema := NewStorageForTest(ctx, c, nil) + schema := NewSingleStorage() dbName := timodel.NewCIStr("Test") colName := timodel.NewCIStr("A") tbName := timodel.NewCIStr("T") @@ -517,4 +470,3 @@ func (s *getUniqueKeysSuite) TestPKShouldBeInTheFirstPlaceWhenPKIsHandle(c *C) { {"uid"}, {"job"}, }) } -*/