diff --git a/dm/pkg/conn/basedb_test.go b/dm/pkg/conn/basedb_test.go index f503f9a7743..d7216313df4 100644 --- a/dm/pkg/conn/basedb_test.go +++ b/dm/pkg/conn/basedb_test.go @@ -44,7 +44,7 @@ func TestGetBaseConn(t *testing.T) { require.NotNil(t, dbConn) mock.ExpectQuery("select 1").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("1")) - // nolint:sqlclosecheck + // nolint:sqlclosecheck,rowserrcheck rows, err := dbConn.QuerySQL(tctx, "select 1") require.NoError(t, err) ids := make([]int, 0, 1) diff --git a/dm/syncer/schema.go b/dm/syncer/schema.go index e4ffbb02432..0e2df132be9 100644 --- a/dm/syncer/schema.go +++ b/dm/syncer/schema.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "strings" "github.com/pingcap/tidb-tools/pkg/filter" @@ -27,6 +28,7 @@ import ( "github.com/pingcap/tidb/parser/format" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/dm/pkg/utils" + "github.com/pingcap/tiflow/pkg/quotes" "go.uber.org/zap" "github.com/pingcap/tiflow/dm/dm/config" @@ -71,7 +73,9 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR s.tctx.L().Info("table schema is not in checkpoint, fetch from downstream", zap.String("table", sourceTable.String())) targetTable := s.route(sourceTable) - return utils.GetTableCreateSQL(ctx, s.downstreamTrackConn.BaseConn.DBConn, targetTable.String()) + result, err2 := utils.GetTableCreateSQL(ctx, s.downstreamTrackConn.BaseConn.DBConn, targetTable.String()) + result = strings.Replace(result, fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(targetTable.Name)), fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(sourceTable.Name)), 1) + return utils.CreateTableSQLToOneRow(result), err2 } result := bytes.NewBuffer(make([]byte, 0, 512)) diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 7209d5302ca..5cfe71842ad 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -1008,6 +1008,51 @@ func (s *testSyncerSuite) TestRun(c *C) { testJobs.RUnlock() cancel() + <-resultCh // wait for the process to finish + + // test OperateSchema starts + ctx, cancel = context.WithCancel(context.Background()) + + syncer.sessCtx = utils.NewSessionCtx(map[string]string{"time_zone": "UTC"}) + sourceSchemaFromCheckPoint, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"}) + c.Assert(err, IsNil) + + syncer.tableRouter, err = router.NewTableRouter(false, []*router.TableRule{{ + SchemaPattern: "test_1", + TablePattern: "t_1", + TargetSchema: "test_1", + TargetTable: "t_2", + }}) + c.Assert(err, IsNil) + + syncer.checkpoint.(*RemoteCheckPoint).points = make(map[string]map[string]*binlogPoint) + + showTableResultString := "CREATE TABLE `t_2` (\n" + + " `id` int(11) NOT NULL,\n" + + " `name` varchar(24) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */,\n" + + " KEY `index1` (`name`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin" + + mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("t_2", showTableResultString)) + + sourceSchemaFromDownstream, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"}) + c.Assert(err, IsNil) + + sourceSchemaExpected := "CREATE TABLE `t_1` (" + + " `id` int(11) NOT NULL," + + " `name` varchar(24) DEFAULT NULL," + + " PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */," + + " KEY `index1` (`name`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin" + c.Assert(sourceSchemaFromCheckPoint, Equals, sourceSchemaExpected) + c.Assert(sourceSchemaFromDownstream, Equals, sourceSchemaExpected) + + cancel() + // test OperateSchema ends + syncer.Close() c.Assert(syncer.isClosed(), IsTrue)