Skip to content

Commit

Permalink
syncer(dm): fix different output format for operate-schema get (#5824) (
Browse files Browse the repository at this point in the history
#5887)

close #5688
  • Loading branch information
ti-chi-bot authored Jul 27, 2022
1 parent 9f9a42a commit 2e7bf36
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dm/pkg/conn/basedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestGetBaseConn(t *testing.T) {
require.NotNil(t, dbConn)

mock.ExpectQuery("select 1").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("1"))
// nolint:sqlclosecheck
// nolint:sqlclosecheck,rowserrcheck
rows, err := dbConn.QuerySQL(tctx, "select 1")
require.NoError(t, err)
ids := make([]int, 0, 1)
Expand Down
6 changes: 5 additions & 1 deletion dm/syncer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"

"github.com/pingcap/tidb-tools/pkg/filter"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/pkg/quotes"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/dm/config"
Expand Down Expand Up @@ -71,7 +73,9 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR
s.tctx.L().Info("table schema is not in checkpoint, fetch from downstream",
zap.String("table", sourceTable.String()))
targetTable := s.route(sourceTable)
return utils.GetTableCreateSQL(ctx, s.downstreamTrackConn.BaseConn.DBConn, targetTable.String())
result, err2 := utils.GetTableCreateSQL(ctx, s.downstreamTrackConn.BaseConn.DBConn, targetTable.String())
result = strings.Replace(result, fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(targetTable.Name)), fmt.Sprintf("CREATE TABLE %s", quotes.QuoteName(sourceTable.Name)), 1)
return utils.CreateTableSQLToOneRow(result), err2
}

result := bytes.NewBuffer(make([]byte, 0, 512))
Expand Down
45 changes: 45 additions & 0 deletions dm/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,51 @@ func (s *testSyncerSuite) TestRun(c *C) {
testJobs.RUnlock()

cancel()
<-resultCh // wait for the process to finish

// test OperateSchema starts
ctx, cancel = context.WithCancel(context.Background())

syncer.sessCtx = utils.NewSessionCtx(map[string]string{"time_zone": "UTC"})
sourceSchemaFromCheckPoint, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"})
c.Assert(err, IsNil)

syncer.tableRouter, err = router.NewTableRouter(false, []*router.TableRule{{
SchemaPattern: "test_1",
TablePattern: "t_1",
TargetSchema: "test_1",
TargetTable: "t_2",
}})
c.Assert(err, IsNil)

syncer.checkpoint.(*RemoteCheckPoint).points = make(map[string]map[string]*binlogPoint)

showTableResultString := "CREATE TABLE `t_2` (\n" +
" `id` int(11) NOT NULL,\n" +
" `name` varchar(24) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */,\n" +
" KEY `index1` (`name`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"

mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("t_2", showTableResultString))

sourceSchemaFromDownstream, err := syncer.OperateSchema(ctx, &pb.OperateWorkerSchemaRequest{Op: pb.SchemaOp_GetSchema, Database: "test_1", Table: "t_1"})
c.Assert(err, IsNil)

sourceSchemaExpected := "CREATE TABLE `t_1` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(24) DEFAULT NULL," +
" PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */," +
" KEY `index1` (`name`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"
c.Assert(sourceSchemaFromCheckPoint, Equals, sourceSchemaExpected)
c.Assert(sourceSchemaFromDownstream, Equals, sourceSchemaExpected)

cancel()
// test OperateSchema ends

syncer.Close()
c.Assert(syncer.isClosed(), IsTrue)

Expand Down

0 comments on commit 2e7bf36

Please sign in to comment.