Skip to content

Commit

Permalink
syncer(dm): fix different output format for operate-schema get (#5824) (
Browse files Browse the repository at this point in the history
#6031)

close #5688
  • Loading branch information
ti-chi-bot authored Jun 25, 2022
1 parent f775a76 commit 6b77bff
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
6 changes: 5 additions & 1 deletion dm/syncer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"

"github.com/pingcap/tidb-tools/pkg/filter"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/dm/syncer/dbconn"
"github.com/pingcap/tiflow/pkg/quotes"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/dm/config"
Expand Down Expand Up @@ -72,7 +74,9 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR
s.tctx.L().Info("table schema is not in checkpoint, fetch from downstream",
zap.String("table", sourceTable.String()))
targetTable := s.route(sourceTable)
return dbconn.GetTableCreateSQL(s.tctx.WithContext(ctx), s.downstreamTrackConn, targetTable.String())
result, err2 := dbconn.GetTableCreateSQL(s.tctx.WithContext(ctx), s.downstreamTrackConn, targetTable.String())
result = strings.Replace(result, fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(targetTable.Name)), fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(sourceTable.Name)), 1)
return utils.CreateTableSQLToOneRow(result), err2
}

result := bytes.NewBuffer(make([]byte, 0, 512))
Expand Down
46 changes: 46 additions & 0 deletions dm/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,52 @@ func (s *testSyncerSuite) TestRun(c *C) {
testJobs.RUnlock()

cancel()
<-resultCh // wait for the process to finish

// test OperateSchema starts
ctx, cancel = context.WithCancel(context.Background())

syncer.sessCtx = utils.NewSessionCtx(map[string]string{"time_zone": "UTC"})
sourceSchemaFromCheckPoint, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"})
c.Assert(err, IsNil)

syncer.tableRouter, err = router.NewTableRouter(false, nil)
c.Assert(err, IsNil)
c.Assert(syncer.tableRouter.AddRule(&router.TableRule{
SchemaPattern: "test_1",
TablePattern: "t_1",
TargetSchema: "test_1",
TargetTable: "t_2",
}), IsNil)

syncer.checkpoint.(*RemoteCheckPoint).points = make(map[string]map[string]*binlogPoint)

showTableResultString := "CREATE TABLE `t_2` (\n" +
" `id` int(11) NOT NULL,\n" +
" `name` varchar(24) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */,\n" +
" KEY `index1` (`name`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"

mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("t_2", showTableResultString))

sourceSchemaFromDownstream, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"})
c.Assert(err, IsNil)

sourceSchemaExpected := "CREATE TABLE `t_1` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(24) DEFAULT NULL," +
" PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */," +
" KEY `index1` (`name`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"
c.Assert(sourceSchemaFromCheckPoint, Equals, sourceSchemaExpected)
c.Assert(sourceSchemaFromDownstream, Equals, sourceSchemaExpected)

cancel()
// test OperateSchema ends

syncer.Close()
c.Assert(syncer.isClosed(), IsTrue)

Expand Down

0 comments on commit 6b77bff

Please sign in to comment.