From 099658873399d0791cb66d5d254081316c570a76 Mon Sep 17 00:00:00 2001 From: ForwardStar <61535734+ForwardStar@users.noreply.github.com> Date: Wed, 15 Jun 2022 14:16:34 +0800 Subject: [PATCH 1/5] This is an automated cherry-pick of #5824 Signed-off-by: ti-chi-bot --- dm/syncer/schema.go | 29 +++++++++++++++++++++ dm/syncer/syncer_test.go | 54 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/dm/syncer/schema.go b/dm/syncer/schema.go index a7d1c2360f5..e688994d61a 100644 --- a/dm/syncer/schema.go +++ b/dm/syncer/schema.go @@ -16,12 +16,23 @@ package syncer import ( "context" "encoding/json" +<<<<<<< HEAD +======= + "fmt" + "regexp" +>>>>>>> 1ba147108 (syncer(dm): fix different output format for operate-schema get (#5824)) "strings" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" "github.com/pingcap/tidb/parser/model" +<<<<<<< HEAD +======= + "github.com/pingcap/tidb/util/filter" + "github.com/pingcap/tiflow/dm/pkg/utils" + "github.com/pingcap/tiflow/pkg/quotes" +>>>>>>> 1ba147108 (syncer(dm): fix different output format for operate-schema get (#5824)) "go.uber.org/zap" "github.com/pingcap/tiflow/dm/dm/config" @@ -61,9 +72,27 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR } return string(tableListJSON), err case pb.SchemaOp_GetSchema: +<<<<<<< HEAD // we only try to get schema from schema-tracker now. // in other words, we can not get the schema if any DDL/DML has been replicated, or set a schema previously. return s.schemaTracker.GetCreateTable(ctx, sourceTable) +======= + // when task is paused, schemaTracker is closed. We get the table structure from checkpoint. + ti := s.checkpoint.GetTableInfo(req.Database, req.Table) + if ti == nil { + s.tctx.L().Info("table schema is not in checkpoint, fetch from downstream", + zap.String("table", sourceTable.String())) + targetTable := s.route(sourceTable) + result, err2 := dbconn.GetTableCreateSQL(s.tctx.WithContext(ctx), s.downstreamTrackConn, targetTable.String()) + result = strings.Replace(result, fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(targetTable.Name)), fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(sourceTable.Name)), 1) + return utils.CreateTableSQLToOneRow(result), err2 + } + + result := bytes.NewBuffer(make([]byte, 0, 512)) + err2 := executor.ConstructResultOfShowCreateTable(s.sessCtx, ti, autoid.Allocators{}, result) + return utils.CreateTableSQLToOneRow(result.String()), err2 + +>>>>>>> 1ba147108 (syncer(dm): fix different output format for operate-schema get (#5824)) case pb.SchemaOp_SetSchema: // for set schema, we must ensure it's a valid `CREATE TABLE` statement. // now, we only set schema for schema-tracker, diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 245c9186767..a63ee48eaf7 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -55,6 +55,12 @@ import ( "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" pmysql "github.com/pingcap/tidb/parser/mysql" +<<<<<<< HEAD +======= + "github.com/pingcap/tidb/util/filter" + regexprrouter "github.com/pingcap/tidb/util/regexpr-router" + router "github.com/pingcap/tidb/util/table-router" +>>>>>>> 1ba147108 (syncer(dm): fix different output format for operate-schema get (#5824)) "go.uber.org/zap" "github.com/pingcap/tiflow/pkg/errorutil" @@ -1017,6 +1023,54 @@ func (s *testSyncerSuite) TestRun(c *C) { testJobs.RUnlock() cancel() +<<<<<<< HEAD +======= + <-resultCh // wait for the process to finish + + // test OperateSchema starts + ctx, cancel = context.WithCancel(context.Background()) + + syncer.sessCtx = utils.NewSessionCtx(map[string]string{"time_zone": "UTC"}) + sourceSchemaFromCheckPoint, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"}) + c.Assert(err, IsNil) + + syncer.tableRouter = ®exprrouter.RouteTable{} + c.Assert(syncer.tableRouter.AddRule(&router.TableRule{ + SchemaPattern: "test_1", + TablePattern: "t_1", + TargetSchema: "test_1", + TargetTable: "t_2", + }), IsNil) + + syncer.checkpoint.(*RemoteCheckPoint).points = make(map[string]map[string]*binlogPoint) + + showTableResultString := "CREATE TABLE `t_2` (\n" + + " `id` int(11) NOT NULL,\n" + + " `name` varchar(24) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */,\n" + + " KEY `index1` (`name`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin" + + mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("t_2", showTableResultString)) + + sourceSchemaFromDownstream, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"}) + c.Assert(err, IsNil) + + sourceSchemaExpected := "CREATE TABLE `t_1` (" + + " `id` int(11) NOT NULL," + + " `name` varchar(24) DEFAULT NULL," + + " PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */," + + " KEY `index1` (`name`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin" + c.Assert(sourceSchemaFromCheckPoint, Equals, sourceSchemaExpected) + c.Assert(sourceSchemaFromDownstream, Equals, sourceSchemaExpected) + + cancel() + // test OperateSchema ends + +>>>>>>> 1ba147108 (syncer(dm): fix different output format for operate-schema get (#5824)) syncer.Close() c.Assert(syncer.isClosed(), IsTrue) From 0fdd386106b88e88d53b015d76a1045e61cf0ed8 Mon Sep 17 00:00:00 2001 From: ForwardStar <119010351@link.cuhk.edu.cn> Date: Thu, 23 Jun 2022 17:20:03 +0800 Subject: [PATCH 2/5] Fix cherry pick problems --- dm/pkg/terror/error_list.go | 2 ++ dm/pkg/utils/db.go | 7 ++++++ dm/syncer/checkpoint.go | 18 +++++++++++++++ dm/syncer/schema.go | 44 +++++++++++++++++++++++-------------- dm/syncer/syncer_test.go | 11 +--------- 5 files changed, 56 insertions(+), 26 deletions(-) diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index f1fac4e9406..9c7d2798807 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -436,6 +436,7 @@ const ( codeSyncerParseDDL codeSyncerUnsupportedStmt codeSyncerGetEvent + codeSyncerDownstreamTableNotFound ) // DM-master error code. @@ -1078,6 +1079,7 @@ var ( ErrSyncerParseDDL = New(codeSyncerParseDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "parse DDL: %s", "Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed.") ErrSyncerUnsupportedStmt = New(codeSyncerUnsupportedStmt, ClassSyncUnit, ScopeInternal, LevelHigh, "`%s` statement not supported in %s mode", "") ErrSyncerGetEvent = New(codeSyncerGetEvent, ClassSyncUnit, ScopeUpstream, LevelHigh, "get binlog event error: %v", "Please check if the binlog file could be parsed by `mysqlbinlog`.") + ErrSyncerDownstreamTableNotFound = New(codeSyncerDownstreamTableNotFound, ClassSyncUnit, ScopeInternal, LevelHigh, "downstream table %s not found", "") // DM-master error. ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid", "") diff --git a/dm/pkg/utils/db.go b/dm/pkg/utils/db.go index c582a016823..806b48165d0 100644 --- a/dm/pkg/utils/db.go +++ b/dm/pkg/utils/db.go @@ -632,3 +632,10 @@ func GetTableCreateSQL(ctx context.Context, conn *sql.Conn, tableID string) (sql } return createStr, nil } + +// CreateTableSQLToOneRow formats the result of SHOW CREATE TABLE to one row. +func CreateTableSQLToOneRow(sql string) string { + sql = strings.ReplaceAll(sql, "\n", "") + sql = strings.ReplaceAll(sql, " ", " ") + return sql +} diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index 2143b778323..c023b4a61f1 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -270,6 +270,9 @@ type CheckPoint interface { // TablePoint returns all table's stream checkpoint TablePoint() map[string]map[string]binlog.Location + // GetTableInfo returns the saved table info from table checkpoint for the given table, return nil when not found + GetTableInfo(schema string, table string) *model.TableInfo + // FlushedGlobalPoint returns the flushed global binlog stream's checkpoint // corresponding to to Meta.Pos and gtid FlushedGlobalPoint() binlog.Location @@ -808,6 +811,21 @@ func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]binlog.Location { return tablePoint } +func (cp *RemoteCheckPoint) GetTableInfo(schema string, table string) *model.TableInfo { + cp.RLock() + defer cp.RUnlock() + + tables, ok := cp.points[schema] + if !ok { + return nil + } + tablePoint, ok := tables[table] + if !ok { + return nil + } + return tablePoint.TableInfo() +} + // FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint. func (cp *RemoteCheckPoint) FlushedGlobalPoint() binlog.Location { cp.RLock() diff --git a/dm/syncer/schema.go b/dm/syncer/schema.go index e688994d61a..370fd77c8ee 100644 --- a/dm/syncer/schema.go +++ b/dm/syncer/schema.go @@ -14,25 +14,20 @@ package syncer import ( + "bytes" "context" "encoding/json" -<<<<<<< HEAD -======= "fmt" - "regexp" ->>>>>>> 1ba147108 (syncer(dm): fix different output format for operate-schema get (#5824)) "strings" "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" "github.com/pingcap/tidb/parser/model" -<<<<<<< HEAD -======= - "github.com/pingcap/tidb/util/filter" "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/tiflow/pkg/quotes" ->>>>>>> 1ba147108 (syncer(dm): fix different output format for operate-schema get (#5824)) "go.uber.org/zap" "github.com/pingcap/tiflow/dm/dm/config" @@ -72,19 +67,37 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR } return string(tableListJSON), err case pb.SchemaOp_GetSchema: -<<<<<<< HEAD - // we only try to get schema from schema-tracker now. - // in other words, we can not get the schema if any DDL/DML has been replicated, or set a schema previously. - return s.schemaTracker.GetCreateTable(ctx, sourceTable) -======= // when task is paused, schemaTracker is closed. We get the table structure from checkpoint. ti := s.checkpoint.GetTableInfo(req.Database, req.Table) if ti == nil { s.tctx.L().Info("table schema is not in checkpoint, fetch from downstream", zap.String("table", sourceTable.String())) targetTable := s.route(sourceTable) - result, err2 := dbconn.GetTableCreateSQL(s.tctx.WithContext(ctx), s.downstreamTrackConn, targetTable.String()) - result = strings.Replace(result, fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(targetTable.Name)), fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(sourceTable.Name)), 1) + + // Get table create SQL + tableID := targetTable.String() + querySQL := fmt.Sprintf("SHOW CREATE TABLE %s", tableID) + var table, createStr string + + rows, err2 := s.downstreamTrackConn.QuerySQL(s.tctx.WithContext(ctx), querySQL) + if err2 != nil { + return "", err2 + } + + defer rows.Close() + if rows.Next() { + if scanErr := rows.Scan(&table, &createStr); scanErr != nil { + return "", terror.DBErrorAdapt(scanErr, terror.ErrDBDriverError) + } + } else { + return "", terror.ErrSyncerDownstreamTableNotFound.Generate(tableID) + } + + if err = rows.Close(); err != nil { + return "", terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) + } + + result := strings.Replace(createStr, fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(targetTable.Name)), fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(sourceTable.Name)), 1) return utils.CreateTableSQLToOneRow(result), err2 } @@ -92,7 +105,6 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR err2 := executor.ConstructResultOfShowCreateTable(s.sessCtx, ti, autoid.Allocators{}, result) return utils.CreateTableSQLToOneRow(result.String()), err2 ->>>>>>> 1ba147108 (syncer(dm): fix different output format for operate-schema get (#5824)) case pb.SchemaOp_SetSchema: // for set schema, we must ensure it's a valid `CREATE TABLE` statement. // now, we only set schema for schema-tracker, diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index a63ee48eaf7..96b96366b9e 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -55,12 +55,6 @@ import ( "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" pmysql "github.com/pingcap/tidb/parser/mysql" -<<<<<<< HEAD -======= - "github.com/pingcap/tidb/util/filter" - regexprrouter "github.com/pingcap/tidb/util/regexpr-router" - router "github.com/pingcap/tidb/util/table-router" ->>>>>>> 1ba147108 (syncer(dm): fix different output format for operate-schema get (#5824)) "go.uber.org/zap" "github.com/pingcap/tiflow/pkg/errorutil" @@ -1023,8 +1017,6 @@ func (s *testSyncerSuite) TestRun(c *C) { testJobs.RUnlock() cancel() -<<<<<<< HEAD -======= <-resultCh // wait for the process to finish // test OperateSchema starts @@ -1034,7 +1026,7 @@ func (s *testSyncerSuite) TestRun(c *C) { sourceSchemaFromCheckPoint, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"}) c.Assert(err, IsNil) - syncer.tableRouter = ®exprrouter.RouteTable{} + syncer.tableRouter = &router.Table{} c.Assert(syncer.tableRouter.AddRule(&router.TableRule{ SchemaPattern: "test_1", TablePattern: "t_1", @@ -1070,7 +1062,6 @@ func (s *testSyncerSuite) TestRun(c *C) { cancel() // test OperateSchema ends ->>>>>>> 1ba147108 (syncer(dm): fix different output format for operate-schema get (#5824)) syncer.Close() c.Assert(syncer.isClosed(), IsTrue) From 85640c4d2d5c67d47bf6de68be6987586b4b5145 Mon Sep 17 00:00:00 2001 From: ForwardStar <119010351@link.cuhk.edu.cn> Date: Fri, 24 Jun 2022 12:05:40 +0800 Subject: [PATCH 3/5] Fix unit test errors --- dm/syncer/syncer_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 96b96366b9e..ae45cfe9a02 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -1026,7 +1026,8 @@ func (s *testSyncerSuite) TestRun(c *C) { sourceSchemaFromCheckPoint, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"}) c.Assert(err, IsNil) - syncer.tableRouter = &router.Table{} + syncer.tableRouter, err = router.NewTableRouter(false, nil) + c.Assert(err, IsNil) c.Assert(syncer.tableRouter.AddRule(&router.TableRule{ SchemaPattern: "test_1", TablePattern: "t_1", From 96bb072957d541f6da99af2c7c06c7fbf12f1579 Mon Sep 17 00:00:00 2001 From: ForwardStar <119010351@link.cuhk.edu.cn> Date: Fri, 24 Jun 2022 12:06:58 +0800 Subject: [PATCH 4/5] Fix unit test error --- dm/syncer/schema.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dm/syncer/schema.go b/dm/syncer/schema.go index fbca749a2aa..00850b8c518 100644 --- a/dm/syncer/schema.go +++ b/dm/syncer/schema.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/tiflow/pkg/quotes" - "github.com/pingcap/tiflow/dm/syncer/dbconn" "go.uber.org/zap" "github.com/pingcap/tiflow/dm/dm/config" From 87f929016aa2f30137abf32f31f7905808fee6aa Mon Sep 17 00:00:00 2001 From: ForwardStar <119010351@link.cuhk.edu.cn> Date: Fri, 24 Jun 2022 14:03:24 +0800 Subject: [PATCH 5/5] Solve linter-test failed problem --- dm/syncer/schema.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/dm/syncer/schema.go b/dm/syncer/schema.go index 00850b8c518..a8e4a2df6d5 100644 --- a/dm/syncer/schema.go +++ b/dm/syncer/schema.go @@ -51,21 +51,21 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR for i, schema := range allSchema { schemaList[i] = schema.Name.String() } - schemaListJSON, err := json.Marshal(schemaList) - if err != nil { - return "", terror.ErrSchemaTrackerMarshalJSON.Delegate(err, schemaList) + schemaListJSON, err2 := json.Marshal(schemaList) + if err2 != nil { + return "", terror.ErrSchemaTrackerMarshalJSON.Delegate(err2, schemaList) } - return string(schemaListJSON), err + return string(schemaListJSON), err2 case pb.SchemaOp_ListTable: - tables, err := s.schemaTracker.ListSchemaTables(req.Database) - if err != nil { - return "", err + tables, err2 := s.schemaTracker.ListSchemaTables(req.Database) + if err2 != nil { + return "", err2 } - tableListJSON, err := json.Marshal(tables) - if err != nil { - return "", terror.ErrSchemaTrackerMarshalJSON.Delegate(err, tables) + tableListJSON, err2 := json.Marshal(tables) + if err2 != nil { + return "", terror.ErrSchemaTrackerMarshalJSON.Delegate(err2, tables) } - return string(tableListJSON), err + return string(tableListJSON), err2 case pb.SchemaOp_GetSchema: // when task is paused, schemaTracker is closed. We get the table structure from checkpoint. ti := s.checkpoint.GetTableInfo(req.Database, req.Table)