Skip to content

Commit

Permalink
sink/mysql(ticdc): make mysql sink to handle ipv6 address correctly (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Jul 1, 2022
1 parent 962d232 commit 6d1341c
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 8 deletions.
11 changes: 7 additions & 4 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"fmt"
"net"
"net/url"
"strconv"
"strings"
Expand Down Expand Up @@ -96,17 +97,19 @@ func NewMySQLSink(
// dsn format of the driver:
// [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
username := sinkURI.User.Username()
password, _ := sinkURI.User.Password()
hostName := sinkURI.Hostname()
port := sinkURI.Port()
if username == "" {
username = "root"
}
password, _ := sinkURI.User.Password()
hostName := sinkURI.Hostname()
port := sinkURI.Port()
if port == "" {
port = "4000"
}

dsnStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", username, password, hostName, port, params.tls)
// This will handle the IPv6 address format.
host := net.JoinHostPort(hostName, port)
dsnStr := fmt.Sprintf("%s:%s@tcp(%s)/%s", username, password, host, params.tls)
dsn, err := dmysql.ParseDSN(dsnStr)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
Expand Down
10 changes: 7 additions & 3 deletions cdc/sink/mysql/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"fmt"
"net"
"net/url"
"strings"

Expand Down Expand Up @@ -94,16 +95,19 @@ func newMySQLSyncpointStore(ctx context.Context,
// dsn format of the driver:
// [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
username := sinkURI.User.Username()
password, _ := sinkURI.User.Password()
port := sinkURI.Port()
if username == "" {
username = "root"
}
password, _ := sinkURI.User.Password()
port := sinkURI.Port()
if port == "" {
port = "4000"
}

dsnStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", username, password, sinkURI.Hostname(), port, tlsParam)
// This will handle the IPv6 address format.
host := net.JoinHostPort(sinkURI.Hostname(), port)

dsnStr := fmt.Sprintf("%s:%s@tcp(%s)/%s", username, password, host, tlsParam)
dsn, err := dmysql.ParseDSN(dsnStr)
if err != nil {
return nil, errors.Trace(err)
Expand Down
40 changes: 40 additions & 0 deletions cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,46 @@ func TestNewMySQLSink(t *testing.T) {
require.Nil(t, err)
}

func TestNewMySQLSinkWithIPv6Address(t *testing.T) {
dbIndex := 0
mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) {
require.Contains(t, dsnStr, "root@tcp([::1]:3306)")
defer func() {
dbIndex++
}()
if dbIndex == 0 {
// test db
db, err := mockTestDB(true)
require.Nil(t, err)
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
mock.ExpectClose()
require.Nil(t, err)
return db, nil
}
backupGetDBConn := GetDBConnImpl
GetDBConnImpl = mockGetDBConn
defer func() {
GetDBConnImpl = backupGetDBConn
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
changefeed := model.DefaultChangeFeedID("test-changefeed")
// See https://www.ietf.org/rfc/rfc2732.txt, we have to use brackets to wrap IPv6 address.
sinkURI, err := url.Parse("mysql://[::1]:3306/?time-zone=UTC&worker-count=4")
require.Nil(t, err)
rc := config.GetDefaultReplicaConfig()
sink, err := NewMySQLSink(ctx,
changefeed,
sinkURI, rc)
require.Nil(t, err)
err = sink.Close(ctx)
require.Nil(t, err)
}

func TestMySQLSinkClose(t *testing.T) {
dbIndex := 0
mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) {
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/mysql/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"fmt"
"net"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -58,7 +59,10 @@ func NewSimpleMySQLSink(
port = "4000"
}

dsnStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/?multiStatements=true", username, password, sinkURI.Hostname(), port)
// This will handle the IPv6 address format.
host := net.JoinHostPort(sinkURI.Hostname(), port)

dsnStr := fmt.Sprintf("%s:%s@tcp(%s)/?multiStatements=true", username, password, host)
dsn, err := dmysql.ParseDSN(dsnStr)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
Expand Down

0 comments on commit 6d1341c

Please sign in to comment.