From 6b77bff2cfc4d9ebeccb6ef8f7fba1a63cedd0a6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sat, 25 Jun 2022 11:14:37 +0800 Subject: [PATCH] syncer(dm): fix different output format for operate-schema get (#5824) (#6031) close pingcap/tiflow#5688 --- dm/syncer/schema.go | 6 +++++- dm/syncer/syncer_test.go | 46 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/dm/syncer/schema.go b/dm/syncer/schema.go index 1190028f006..1971eaad9e5 100644 --- a/dm/syncer/schema.go +++ b/dm/syncer/schema.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "strings" "github.com/pingcap/tidb-tools/pkg/filter" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/tiflow/dm/syncer/dbconn" + "github.com/pingcap/tiflow/pkg/quotes" "go.uber.org/zap" "github.com/pingcap/tiflow/dm/dm/config" @@ -72,7 +74,9 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR s.tctx.L().Info("table schema is not in checkpoint, fetch from downstream", zap.String("table", sourceTable.String())) targetTable := s.route(sourceTable) - return dbconn.GetTableCreateSQL(s.tctx.WithContext(ctx), s.downstreamTrackConn, targetTable.String()) + result, err2 := dbconn.GetTableCreateSQL(s.tctx.WithContext(ctx), s.downstreamTrackConn, targetTable.String()) + result = strings.Replace(result, fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(targetTable.Name)), fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(sourceTable.Name)), 1) + return utils.CreateTableSQLToOneRow(result), err2 } result := bytes.NewBuffer(make([]byte, 0, 512)) diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 0bdd73594e1..8ce318276e3 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -1024,6 +1024,52 @@ func (s *testSyncerSuite) TestRun(c *C) { testJobs.RUnlock() cancel() + <-resultCh // wait for the process to finish + + // test OperateSchema starts + ctx, cancel = context.WithCancel(context.Background()) + + syncer.sessCtx = utils.NewSessionCtx(map[string]string{"time_zone": "UTC"}) + sourceSchemaFromCheckPoint, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"}) + c.Assert(err, IsNil) + + syncer.tableRouter, err = router.NewTableRouter(false, nil) + c.Assert(err, IsNil) + c.Assert(syncer.tableRouter.AddRule(&router.TableRule{ + SchemaPattern: "test_1", + TablePattern: "t_1", + TargetSchema: "test_1", + TargetTable: "t_2", + }), IsNil) + + syncer.checkpoint.(*RemoteCheckPoint).points = make(map[string]map[string]*binlogPoint) + + showTableResultString := "CREATE TABLE `t_2` (\n" + + " `id` int(11) NOT NULL,\n" + + " `name` varchar(24) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */,\n" + + " KEY `index1` (`name`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin" + + mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("t_2", showTableResultString)) + + sourceSchemaFromDownstream, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"}) + c.Assert(err, IsNil) + + sourceSchemaExpected := "CREATE TABLE `t_1` (" + + " `id` int(11) NOT NULL," + + " `name` varchar(24) DEFAULT NULL," + + " PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */," + + " KEY `index1` (`name`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin" + c.Assert(sourceSchemaFromCheckPoint, Equals, sourceSchemaExpected) + c.Assert(sourceSchemaFromDownstream, Equals, sourceSchemaExpected) + + cancel() + // test OperateSchema ends + syncer.Close() c.Assert(syncer.isClosed(), IsTrue)