Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix not using tls when saving checkpoint #988

Merged
merged 2 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ port = 3306
# encrypted_password = ""
# password = ""
# port = 3306
# [syncer.to.checkpoint.security]
# Path of file that contains list of trusted SSL CAs.
# ssl-ca = "/path/to/ca.pem"
# Path of file that contains X509 certificate in PEM format.
# ssl-cert = "/path/to/drainer.pem"
# Path of file that contains X509 key in PEM format.
# ssl-key = "/path/to/drainer-key.pem"
# The common name which is allowed to connection with cluster components.
# cert-allowed-cn = ["binlog"]

# Uncomment this if you want to use file as db-type.
#[syncer.to]
Expand Down
10 changes: 7 additions & 3 deletions drainer/checkpoint/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

// mysql driver
_ "github.com/go-sql-driver/mysql"
pkgsql "github.com/pingcap/tidb-binlog/pkg/sql"
"github.com/pingcap/tidb-binlog/pkg/loader"
)

// MysqlCheckPoint is a local savepoint struct for mysql
Expand All @@ -45,12 +45,16 @@ type MysqlCheckPoint struct {

var _ CheckPoint = &MysqlCheckPoint{}

var sqlOpenDB = pkgsql.OpenDB
var sqlOpenDB = loader.CreateDB

func newMysql(cfg *Config) (CheckPoint, error) {
setDefaultConfig(cfg)

db, err := sqlOpenDB("mysql", cfg.Db.Host, cfg.Db.Port, cfg.Db.User, cfg.Db.Password)
if cfg.Db.TLS != nil {
log.Info("enable TLS for saving checkpoint")
}

db, err := sqlOpenDB(cfg.Db.User, cfg.Db.Password, cfg.Db.Host, cfg.Db.Port, cfg.Db.TLS)
if err != nil {
return nil, errors.Annotate(err, "open db failed")
}
Expand Down
5 changes: 3 additions & 2 deletions drainer/checkpoint/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package checkpoint

import (
"crypto/tls"
"database/sql"
"testing"

Expand Down Expand Up @@ -118,7 +119,7 @@ var _ = Suite(&newMysqlSuite{})
func (s *newMysqlSuite) TestCannotOpenDB(c *C) {
origOpen := sqlOpenDB
defer func() { sqlOpenDB = origOpen }()
sqlOpenDB = func(proto, host string, port int, username, password string) (*sql.DB, error) {
sqlOpenDB = func(user, password string, host string, port int, tls *tls.Config) (*sql.DB, error) {
return nil, errors.New("no db")
}

Expand All @@ -133,7 +134,7 @@ func (s *newMysqlSuite) TestCreationErrors(c *C) {

origOpen := sqlOpenDB
defer func() { sqlOpenDB = origOpen }()
sqlOpenDB = func(proto, host string, port int, username, password string) (*sql.DB, error) {
sqlOpenDB = func(user, password string, host string, port int, tls *tls.Config) (*sql.DB, error) {
return db, nil
}

Expand Down
10 changes: 6 additions & 4 deletions drainer/checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package checkpoint

import (
"crypto/tls"
"database/sql"
stderrors "errors"
"fmt"
Expand All @@ -29,10 +30,11 @@ var ErrNoCheckpointItem = stderrors.New("no any checkpoint item")

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host"`
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
Port int `toml:"port" json:"port"`
Host string `toml:"host" json:"host"`
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
Port int `toml:"port" json:"port"`
TLS *tls.Config `toml:"-" json:"-"`
}

// Config is the savepoint configuration
Expand Down
5 changes: 5 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ func (cfg *Config) Parse(args []string) error {
if err != nil {
return errors.Errorf("tls config %+v error %v", cfg.SyncerCfg.To.Security, err)
}

cfg.SyncerCfg.To.Checkpoint.TLS, err = cfg.SyncerCfg.To.Checkpoint.Security.ToTLSConfig()
if err != nil {
return errors.Errorf("tls config %+v error %v", cfg.SyncerCfg.To.Checkpoint.Security, err)
}
}

if err = cfg.adjustConfig(); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ type CheckpointConfig struct {
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
// if EncryptedPassword is not empty, Password will be ignore.
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
Port int `toml:"port" json:"port"`
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
Port int `toml:"port" json:"port"`
Security security.Config `toml:"security" json:"security"`
TLS *tls.Config `toml:"-" json:"-"`
}

type baseError struct {
Expand Down
3 changes: 2 additions & 1 deletion drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func (g *taskGroup) Wait() {

// GenCheckPointCfg returns an CheckPoint config instance
func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) {

checkpointCfg := &checkpoint.Config{
ClusterID: id,
InitialCommitTS: cfg.InitialCommitTS,
Expand All @@ -103,6 +102,7 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) {
User: toCheckpoint.User,
Password: toCheckpoint.Password,
Port: toCheckpoint.Port,
TLS: toCheckpoint.TLS,
}
case "":
switch cfg.SyncerCfg.DestDBType {
Expand All @@ -113,6 +113,7 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) {
User: cfg.SyncerCfg.To.User,
Password: cfg.SyncerCfg.To.Password,
Port: cfg.SyncerCfg.To.Port,
TLS: cfg.SyncerCfg.To.TLS,
}
case "pb", "file":
checkpointCfg.CheckpointType = "file"
Expand Down