diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index be091d7e5..e1e7d47c3 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -135,6 +135,15 @@ port = 3306 # encrypted_password = "" # password = "" # port = 3306 +# [syncer.to.checkpoint.security] +# Path of file that contains list of trusted SSL CAs. +# ssl-ca = "/path/to/ca.pem" +# Path of file that contains X509 certificate in PEM format. +# ssl-cert = "/path/to/drainer.pem" +# Path of file that contains X509 key in PEM format. +# ssl-key = "/path/to/drainer-key.pem" +# The common name which is allowed to connection with cluster components. +# cert-allowed-cn = ["binlog"] # Uncomment this if you want to use file as db-type. #[syncer.to] diff --git a/drainer/checkpoint/mysql.go b/drainer/checkpoint/mysql.go index 1b1ea20ec..df0c58ee8 100644 --- a/drainer/checkpoint/mysql.go +++ b/drainer/checkpoint/mysql.go @@ -24,7 +24,7 @@ import ( // mysql driver _ "github.com/go-sql-driver/mysql" - pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" + "github.com/pingcap/tidb-binlog/pkg/loader" ) // MysqlCheckPoint is a local savepoint struct for mysql @@ -45,12 +45,16 @@ type MysqlCheckPoint struct { var _ CheckPoint = &MysqlCheckPoint{} -var sqlOpenDB = pkgsql.OpenDB +var sqlOpenDB = loader.CreateDB func newMysql(cfg *Config) (CheckPoint, error) { setDefaultConfig(cfg) - db, err := sqlOpenDB("mysql", cfg.Db.Host, cfg.Db.Port, cfg.Db.User, cfg.Db.Password) + if cfg.Db.TLS != nil { + log.Info("enable TLS for saving checkpoint") + } + + db, err := sqlOpenDB(cfg.Db.User, cfg.Db.Password, cfg.Db.Host, cfg.Db.Port, cfg.Db.TLS) if err != nil { return nil, errors.Annotate(err, "open db failed") } diff --git a/drainer/checkpoint/mysql_test.go b/drainer/checkpoint/mysql_test.go index 5c6488cdb..30b06146c 100644 --- a/drainer/checkpoint/mysql_test.go +++ b/drainer/checkpoint/mysql_test.go @@ -14,6 +14,7 @@ package checkpoint import ( + "crypto/tls" "database/sql" "testing" @@ -118,7 +119,7 @@ var _ = Suite(&newMysqlSuite{}) func (s *newMysqlSuite) TestCannotOpenDB(c *C) { origOpen := sqlOpenDB defer func() { sqlOpenDB = origOpen }() - sqlOpenDB = func(proto, host string, port int, username, password string) (*sql.DB, error) { + sqlOpenDB = func(user, password string, host string, port int, tls *tls.Config) (*sql.DB, error) { return nil, errors.New("no db") } @@ -133,7 +134,7 @@ func (s *newMysqlSuite) TestCreationErrors(c *C) { origOpen := sqlOpenDB defer func() { sqlOpenDB = origOpen }() - sqlOpenDB = func(proto, host string, port int, username, password string) (*sql.DB, error) { + sqlOpenDB = func(user, password string, host string, port int, tls *tls.Config) (*sql.DB, error) { return db, nil } diff --git a/drainer/checkpoint/util.go b/drainer/checkpoint/util.go index 45867424c..0ecbc2a49 100644 --- a/drainer/checkpoint/util.go +++ b/drainer/checkpoint/util.go @@ -14,6 +14,7 @@ package checkpoint import ( + "crypto/tls" "database/sql" stderrors "errors" "fmt" @@ -29,10 +30,11 @@ var ErrNoCheckpointItem = stderrors.New("no any checkpoint item") // DBConfig is the DB configuration. type DBConfig struct { - Host string `toml:"host" json:"host"` - User string `toml:"user" json:"user"` - Password string `toml:"password" json:"password"` - Port int `toml:"port" json:"port"` + Host string `toml:"host" json:"host"` + User string `toml:"user" json:"user"` + Password string `toml:"password" json:"password"` + Port int `toml:"port" json:"port"` + TLS *tls.Config `toml:"-" json:"-"` } // Config is the savepoint configuration diff --git a/drainer/config.go b/drainer/config.go index 17e9e68b5..af8eb9eb3 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -309,6 +309,11 @@ func (cfg *Config) Parse(args []string) error { if err != nil { return errors.Errorf("tls config %+v error %v", cfg.SyncerCfg.To.Security, err) } + + cfg.SyncerCfg.To.Checkpoint.TLS, err = cfg.SyncerCfg.To.Checkpoint.Security.ToTLSConfig() + if err != nil { + return errors.Errorf("tls config %+v error %v", cfg.SyncerCfg.To.Checkpoint.Security, err) + } } if err = cfg.adjustConfig(); err != nil { diff --git a/drainer/sync/util.go b/drainer/sync/util.go index ee5785641..36409b1e8 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -56,8 +56,10 @@ type CheckpointConfig struct { User string `toml:"user" json:"user"` Password string `toml:"password" json:"password"` // if EncryptedPassword is not empty, Password will be ignore. - EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` - Port int `toml:"port" json:"port"` + EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` + Port int `toml:"port" json:"port"` + Security security.Config `toml:"security" json:"security"` + TLS *tls.Config `toml:"-" json:"-"` } type baseError struct { diff --git a/drainer/util.go b/drainer/util.go index f595ec6df..09c52d7b6 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -82,7 +82,6 @@ func (g *taskGroup) Wait() { // GenCheckPointCfg returns an CheckPoint config instance func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) { - checkpointCfg := &checkpoint.Config{ ClusterID: id, InitialCommitTS: cfg.InitialCommitTS, @@ -103,6 +102,7 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) { User: toCheckpoint.User, Password: toCheckpoint.Password, Port: toCheckpoint.Port, + TLS: toCheckpoint.TLS, } case "": switch cfg.SyncerCfg.DestDBType { @@ -113,6 +113,7 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) { User: cfg.SyncerCfg.To.User, Password: cfg.SyncerCfg.To.Password, Port: cfg.SyncerCfg.To.Port, + TLS: cfg.SyncerCfg.To.TLS, } case "pb", "file": checkpointCfg.CheckpointType = "file"