Skip to content

Commit

Permalink
db(dm): use net.JoinHostPort to generate host-port part of URI (#6248
Browse files Browse the repository at this point in the history
…) (#6343)

close #6249
  • Loading branch information
ti-chi-bot authored Jul 28, 2022
1 parent 2e7bf36 commit bcd3c7e
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 151 deletions.
1 change: 0 additions & 1 deletion cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStor
s3storage, err := storage.New(ctx, backend, &storage.ExternalStorageOptions{
SendCredentials: false,
HTTPClient: nil,
SkipCheckPath: true,
})
if err != nil {
return nil, cerror.WrapError(cerror.ErrS3StorageInitialize, err)
Expand Down
1 change: 0 additions & 1 deletion cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ func NewS3Sink(ctx context.Context, sinkURI *url.URL, errCh chan error) (*s3Sink
}
s3storage, err := storage.New(ctx, backend, &storage.ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: true,
HTTPClient: nil,
})
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion dm/dm/portal/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -589,7 +590,8 @@ func generateTaskFileName(taskName string) string {

// openDB opens a mysql connection FD.
func openDB(cfg DBConfig, timeout int) (*sql.DB, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&timeout=%ds", cfg.User, cfg.Password, cfg.Host, cfg.Port, timeout)
hostPort := net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port))
dbDSN := fmt.Sprintf("%s:%s@tcp(%s)/?charset=utf8mb4&timeout=%ds", cfg.User, cfg.Password, hostPort, timeout)

dbConn, err := sql.Open("mysql", dbDSN)
if err != nil {
Expand Down
112 changes: 0 additions & 112 deletions dm/dm/portal/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package portal

import (
"bytes"
"database/sql"
"encoding/json"
"io"
"mime/multipart"
Expand All @@ -26,15 +25,12 @@ import (
"strings"
"testing"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/go-sql-driver/mysql"
. "github.com/pingcap/check"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/br/pkg/mock"

"github.com/pingcap/tiflow/dm/dm/config"
)

var _ = Suite(&testPortalSuite{})
Expand Down Expand Up @@ -155,82 +151,6 @@ func (t *testPortalSuite) initTaskCfg() {
}
}

func (t *testPortalSuite) TestCheck(c *C) {
wrongDBCfg := config.GetDBConfigForTest()
wrongDBCfg.User = "wrong"
wrongDBCfg.Port = t.mockClusterPort
dbCfgBytes := getTestDBCfgBytes(c, &wrongDBCfg)
req := httptest.NewRequest("POST", "/check", bytes.NewReader(dbCfgBytes))
resp := httptest.NewRecorder()

// will connection to database failed
t.portalHandler.Check(resp, req)
c.Log("resp", resp)
c.Assert(resp.Code, Equals, http.StatusBadRequest)

checkResult := &CheckResult{}
err := readJSON(resp.Body, checkResult)
c.Assert(err, IsNil)
c.Assert(checkResult.Result, Equals, failed)
c.Assert(checkResult.Error, Matches, "Error 1045: Access denied for user 'wrong'.*")

// don't need connection to database, and will return StatusOK
getDBConnFunc = t.getMockDB
defer func() {
getDBConnFunc = getDBConnFromReq
}()

resp = httptest.NewRecorder()
t.portalHandler.Check(resp, req)
c.Assert(resp.Code, Equals, http.StatusOK)

err = readJSON(resp.Body, checkResult)
c.Assert(err, IsNil)
c.Assert(checkResult.Result, Equals, success)
c.Assert(checkResult.Error, Equals, "")
}

func (t *testPortalSuite) TestGetSchemaInfo(c *C) {
dbCfg := config.GetDBConfigForTest()
dbCfg.User = "wrong"
dbCfg.Port = t.mockClusterPort
dbCfgBytes := getTestDBCfgBytes(c, &dbCfg)
req := httptest.NewRequest("POST", "/schema", bytes.NewReader(dbCfgBytes))
resp := httptest.NewRecorder()

t.portalHandler.GetSchemaInfo(resp, req)
c.Log("resp", resp)
c.Assert(resp.Code, Equals, http.StatusBadRequest)

schemaInfoResult := new(SchemaInfoResult)
err := readJSON(resp.Body, schemaInfoResult)
c.Assert(err, IsNil)
c.Assert(schemaInfoResult.Result, Equals, failed)
c.Assert(schemaInfoResult.Error, Matches, "Error 1045: Access denied for user 'wrong'.*")
c.Assert(schemaInfoResult.Tables, IsNil)

getDBConnFunc = t.getMockDB
defer func() {
getDBConnFunc = getDBConnFromReq
}()

resp = httptest.NewRecorder()
t.portalHandler.GetSchemaInfo(resp, req)
c.Assert(resp.Code, Equals, http.StatusOK)

err = readJSON(resp.Body, schemaInfoResult)
c.Assert(err, IsNil)
c.Assert(schemaInfoResult.Result, Equals, success)
c.Assert(schemaInfoResult.Error, Equals, "")
c.Assert(schemaInfoResult.Tables, HasLen, len(t.allTables)-1)
for i, schemaTables := range schemaInfoResult.Tables {
c.Assert(schemaTables.Schema, Equals, t.allTables[i].Schema)
for j, table := range schemaTables.Tables {
c.Assert(table, Equals, t.allTables[i].Tables[j])
}
}
}

func (t *testPortalSuite) TestGenerateAndDownloadAndAnalyzeConfig(c *C) {
t.initTaskCfg()

Expand Down Expand Up @@ -322,16 +242,6 @@ func (t *testPortalSuite) TestAnalyzeRuleName(c *C) {
}
}

func (t *testPortalSuite) getMockDB(req *http.Request, timeout int) (*sql.DB, string, error) {
db, mock, err := sqlmock.New()
if err != nil {
return nil, "", err
}
t.mockSchemaInfo(mock)

return db, "mock", nil
}

func (t *testPortalSuite) TestAdjustConfig(c *C) {
c.Assert(adjustConfig(t.taskConfig), IsNil)

Expand Down Expand Up @@ -385,30 +295,8 @@ func (t *testPortalSuite) TestGenerateMydumperCfgName(c *C) {
c.Assert(dumpCfgName, Equals, "source-1.dump")
}

func (t *testPortalSuite) mockSchemaInfo(mock sqlmock.Sqlmock) {
schemas := sqlmock.NewRows([]string{"Database"})
for _, tables := range t.allTables {
schemas.AddRow(tables.Schema)
}
mock.ExpectQuery("SHOW DATABASES").WillReturnRows(schemas)

for _, tables := range t.allTables {
tablesResult := sqlmock.NewRows([]string{"Tables_in_" + tables.Schema, "Table_type"})
for _, table := range tables.Tables {
tablesResult.AddRow(table, "BASE TABLE")
}
mock.ExpectQuery("SHOW FULL TABLES").WillReturnRows(tablesResult)
}
}

func (t *testPortalSuite) TestGenerateTaskFileName(c *C) {
taskName := "test"
fileName := generateTaskFileName(taskName)
c.Assert(fileName, Equals, "test-task.yaml")
}

func getTestDBCfgBytes(c *C, dbCfg *config.DBConfig) []byte {
dbCfgBytes, err := json.Marshal(dbCfg)
c.Assert(err, IsNil)
return dbCfgBytes
}
6 changes: 4 additions & 2 deletions dm/pkg/conn/basedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"errors"
"fmt"
"net"
"net/url"
"strconv"
"sync"
Expand Down Expand Up @@ -56,8 +57,9 @@ func init() {
func (d *DefaultDBProviderImpl) Apply(config *config.DBConfig) (*BaseDB, error) {
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=0",
config.User, config.Password, config.Host, config.Port)
hostPort := net.JoinHostPort(config.Host, strconv.Itoa(config.Port))
dsn := fmt.Sprintf("%s:%s@tcp(%s)/?charset=utf8mb4&interpolateParams=true&maxAllowedPacket=0",
config.User, config.Password, hostPort)

doFuncInClose := func() {}
if config.Security != nil {
Expand Down
1 change: 1 addition & 0 deletions dm/pkg/conn/basedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestGetBaseConn(t *testing.T) {
ids = append(ids, id)
}
require.Equal(t, []int{1}, ids)
require.NoError(t, rows.Err())

mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down
28 changes: 16 additions & 12 deletions dm/syncer/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ import (
"context"
"errors"
"fmt"
"strconv"
"reflect"
"strings"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-sql-driver/mysql"
. "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/server"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/dm/config"
Expand Down Expand Up @@ -406,6 +406,18 @@ func (s *testDDLSuite) TestResolveGeneratedColumnSQL(c *C) {
}
}

// in TiDB 5.3 mock cluster has bug, it doesn't set DSN correctly
// but in newer version of DM, we rewrite the test and using our own mock cluster,
// see https://github.com/pingcap/tiflow/blob/45bee3c178920e81d95086e89dc0b896345d8512/dm/syncer/ddl_test.go#L452
// so in here, we'll not try to fix bug in TiDB 5.3 and just extract the port
// using reflection.
func extractClusterPort(s *server.Server) int {
v := reflect.ValueOf(s)
field := v.Elem().FieldByName("cfg")
portVal := field.Elem().FieldByName("Port")
return int(portVal.Uint())
}

func (s *testDDLSuite) TestResolveOnlineDDL(c *C) {
cases := []struct {
sql string
Expand Down Expand Up @@ -447,12 +459,8 @@ func (s *testDDLSuite) TestResolveOnlineDDL(c *C) {
cluster, err := mock.NewCluster()
c.Assert(err, IsNil)
c.Assert(cluster.Start(), IsNil)
mysqlConfig, err := mysql.ParseDSN(cluster.DSN)
c.Assert(err, IsNil)
mockClusterPort, err := strconv.Atoi(strings.Split(mysqlConfig.Addr, ":")[1])
c.Assert(err, IsNil)
dbCfg := config.GetDBConfigForTest()
dbCfg.Port = mockClusterPort
dbCfg.Port = extractClusterPort(cluster.Server)
dbCfg.Password = ""
cfg := s.newSubTaskCfg(dbCfg)

Expand Down Expand Up @@ -530,12 +538,8 @@ func (s *testDDLSuite) TestMistakeOnlineDDLRegex(c *C) {
cluster, err := mock.NewCluster()
c.Assert(err, IsNil)
c.Assert(cluster.Start(), IsNil)
mysqlConfig, err := mysql.ParseDSN(cluster.DSN)
c.Assert(err, IsNil)
mockClusterPort, err := strconv.Atoi(strings.Split(mysqlConfig.Addr, ":")[1])
c.Assert(err, IsNil)
dbCfg := config.GetDBConfigForTest()
dbCfg.Port = mockClusterPort
dbCfg.Port = extractClusterPort(cluster.Server)
dbCfg.Password = ""
cfg := s.newSubTaskCfg(dbCfg)
for _, ca := range cases {
Expand Down
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ require (
github.com/frankban/quicktest v1.11.1 // indirect
github.com/getkin/kin-openapi v0.79.0
github.com/gin-gonic/gin v1.7.0
github.com/go-mysql-org/go-mysql v1.4.1-0.20211217061939-06f932768788
github.com/go-mysql-org/go-mysql v1.6.1-0.20220718092400-c855c26b37bd
github.com/go-sql-driver/mysql v1.6.0
github.com/gogo/gateway v1.1.0
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/btree v1.0.0
github.com/google/go-cmp v0.5.6
github.com/google/uuid v1.1.2
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
Expand All @@ -52,11 +52,11 @@ require (
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20210513014640-40f9a1999b3b
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/kvproto v0.0.0-20211011060348-d957056f1551
github.com/pingcap/kvproto v0.0.0-20220614124712-cd832493ec73
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/tidb v1.1.0-beta.0.20211111080905-76b00f3ec11e
github.com/pingcap/tidb v1.1.0-beta.0.20220727103312-d674b6486c83
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible
github.com/pingcap/tidb/parser v0.0.0-20211111080905-76b00f3ec11e
github.com/pingcap/tidb/parser v0.0.0-20220727103312-d674b6486c83
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
Expand All @@ -74,8 +74,8 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/tidwall/gjson v1.9.1
github.com/tidwall/sjson v1.2.2
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211029104011-2fd3841894de
github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20220614073506-266c33cb2c82
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379
github.com/tinylib/msgp v1.1.0
github.com/uber-go/atomic v1.4.0
github.com/unrolled/render v1.0.1
Expand Down
Loading

0 comments on commit bcd3c7e

Please sign in to comment.