Skip to content

Commit

Permalink
loader(dm): register tls config when clean checkpoint (pingcap#3522)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored and zhaoxinyu committed Dec 29, 2021
1 parent 4278953 commit 524e26c
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 114 deletions.
88 changes: 40 additions & 48 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ import (
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/parser/mysql"
"go.etcd.io/etcd/clientv3"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand All @@ -38,6 +35,7 @@ import (
"github.com/pingcap/ticdc/dm/pkg/conn"
tcontext "github.com/pingcap/ticdc/dm/pkg/context"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/pkg/utils"
)

Expand All @@ -51,36 +49,38 @@ const (
type LightningLoader struct {
sync.RWMutex

cfg *config.SubTaskConfig
cli *clientv3.Client
checkPoint CheckPoint
checkPointList *LightningCheckpointList
workerName string
logger log.Logger
core *lightning.Lightning
toDB *conn.BaseDB
toDBConns []*DBConn
lightningConfig *lcfg.GlobalConfig
timeZone string
timeZone string
lightningGlobalConfig *lcfg.GlobalConfig
cfg *config.SubTaskConfig

checkPoint CheckPoint
checkPointList *LightningCheckpointList

logger log.Logger
cli *clientv3.Client
core *lightning.Lightning
cancel context.CancelFunc // for per task context, which maybe different from lightning context

toDBConns []*DBConn
toDB *conn.BaseDB

workerName string
finish atomic.Bool
closed atomic.Bool
metaBinlog atomic.String
metaBinlogGTID atomic.String
cancel context.CancelFunc // for per task context, which maybe different from lightning context
}

// NewLightning creates a new Loader importing data with lightning.
func NewLightning(cfg *config.SubTaskConfig, cli *clientv3.Client, workerName string) *LightningLoader {
lightningCfg := makeGlobalConfig(cfg)
core := lightning.New(lightningCfg)
loader := &LightningLoader{
cfg: cfg,
cli: cli,
core: core,
lightningConfig: lightningCfg,
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "lightning-load")),
workerName: workerName,
cfg: cfg,
cli: cli,
workerName: workerName,
lightningGlobalConfig: lightningCfg,
core: lightning.New(lightningCfg),
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "lightning-load")),
}
return loader
}
Expand All @@ -91,19 +91,23 @@ func makeGlobalConfig(cfg *config.SubTaskConfig) *lcfg.GlobalConfig {
lightningCfg.Security.CAPath = cfg.To.Security.SSLCA
lightningCfg.Security.CertPath = cfg.To.Security.SSLCert
lightningCfg.Security.KeyPath = cfg.To.Security.SSLKey
// use task name as tls config name to prevent multiple subtasks from conflicting with each other
lightningCfg.Security.TLSConfigName = cfg.Name
}
lightningCfg.TiDB.Host = cfg.To.Host
lightningCfg.TiDB.Psw = cfg.To.Password
lightningCfg.TiDB.User = cfg.To.User
lightningCfg.TiDB.Port = cfg.To.Port
lightningCfg.TiDB.StatusPort = cfg.TiDB.StatusPort
lightningCfg.TiDB.PdAddr = cfg.TiDB.PdAddr
lightningCfg.TiDB.LogLevel = cfg.LogLevel
lightningCfg.TikvImporter.Backend = cfg.TiDB.Backend
lightningCfg.PostRestore.Checksum = lcfg.OpLevelOff
if cfg.TiDB.Backend == lcfg.BackendLocal {
lightningCfg.TikvImporter.SortedKVDir = cfg.Dir
}
lightningCfg.Mydumper.SourceDir = cfg.Dir
lightningCfg.App.Config.File = "" // make lightning not init logger, see more in https://github.com/pingcap/tidb/pull/29291
return lightningCfg
}

Expand Down Expand Up @@ -156,8 +160,8 @@ func (l *LightningLoader) Init(ctx context.Context) (err error) {
}

func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) error {
l.Lock()
taskCtx, cancel := context.WithCancel(ctx)
l.Lock()
l.cancel = cancel
l.Unlock()
err := l.core.RunOnce(taskCtx, cfg, nil)
Expand All @@ -172,7 +176,6 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) er
}
}
})
l.logger.Info("end runLightning")
return err
}

Expand Down Expand Up @@ -204,39 +207,30 @@ func (l *LightningLoader) restore(ctx context.Context) error {
}
}
cfg := lcfg.NewConfig()
if err = cfg.LoadFromGlobal(l.lightningConfig); err != nil {
if err = cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil {
return err
}
cfg.Routes = l.cfg.RouteRules
cfg.Checkpoint.Driver = lcfg.CheckpointDriverMySQL
cfg.Checkpoint.Schema = config.TiDBLightningCheckpointPrefix + dbutil.TableName(l.workerName, l.cfg.Name)
cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin
param := common.MySQLConnectParam{
Host: cfg.TiDB.Host,
Port: cfg.TiDB.Port,
User: cfg.TiDB.User,
Password: cfg.TiDB.Psw,
SQLMode: mysql.DefaultSQLMode,
MaxAllowedPacket: 64 * units.MiB,
TLS: cfg.TiDB.TLS,
}
cfg.Checkpoint.DSN = param.ToDSN()
cfg.TiDB.Vars = make(map[string]string)
if l.cfg.To.Session != nil {
for k, v := range l.cfg.To.Session {
cfg.TiDB.Vars[k] = v
}
}

cfg.TiDB.StrSQLMode = l.cfg.LoaderConfig.SQLMode
cfg.TiDB.Vars = map[string]string{
"time_zone": l.timeZone,
}
if err = cfg.Adjust(ctx); err != nil {
return err
}
cfg.TiDB.Vars = map[string]string{"time_zone": l.timeZone}
err = l.runLightning(ctx, cfg)
if err == nil {
// lightning will auto deregister tls when task done, so we need to register it again for removing checkpoint
if l.cfg.To.Security != nil {
if registerErr := cfg.Security.RegisterMySQL(); registerErr != nil {
return terror.ErrConnRegistryTLSConfig.Delegate(registerErr)
}
defer cfg.Security.DeregisterMySQL()
}
err = lightning.CheckpointRemove(ctx, cfg, "all")
}
if err == nil {
Expand Down Expand Up @@ -300,10 +294,7 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult)
default:
}
l.logger.Info("lightning load end", zap.Bool("IsCanceled", isCanceled))
pr <- pb.ProcessResult{
IsCanceled: isCanceled,
Errors: errs,
}
pr <- pb.ProcessResult{IsCanceled: isCanceled, Errors: errs}
}

func (l *LightningLoader) isClosed() bool {
Expand Down Expand Up @@ -345,7 +336,7 @@ func (l *LightningLoader) Resume(ctx context.Context, pr chan pb.ProcessResult)
l.logger.Warn("try to resume, but already closed")
return
}
l.core = lightning.New(l.lightningConfig)
l.core = lightning.New(l.lightningGlobalConfig)
// continue the processing
l.Process(ctx, pr)
}
Expand All @@ -355,7 +346,8 @@ func (l *LightningLoader) Resume(ctx context.Context, pr chan pb.ProcessResult)
// now no config diff implemented, so simply re-init use new config
// no binlog filter for loader need to update.
func (l *LightningLoader) Update(ctx context.Context, cfg *config.SubTaskConfig) error {
// update l.cfg
l.Lock()
defer l.Unlock()
l.cfg.BAList = cfg.BAList
l.cfg.RouteRules = cfg.RouteRules
l.cfg.ColumnMappingRules = cfg.ColumnMappingRules
Expand Down
25 changes: 25 additions & 0 deletions dm/tests/tls/conf/diff_config-2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
check-struct-only = false
check-thread-count = 4
export-fix-sql = true

[routes.rule1]
schema-pattern = "tls"
target-schema = "tls2"

[task]
output-dir = "/tmp/ticdc_dm_test/output"
source-instances = ["mysql1"]
target-check-tables = ["tls2.t"]
target-instance = "tidb0"

[data-sources.mysql1]
host = "127.0.0.1"
password = "123456"
port = 3306
route-rules = ["rule1"]
user = "root"

[data-sources.tidb0]
host = "127.0.0.1"
port = 4400
user = "root"
52 changes: 52 additions & 0 deletions dm/tests/tls/conf/dm-task-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
name: test2
task-mode: all
is-sharding: false
meta-schema: "dm_meta"

target-database:
host: "127.0.0.1"
port: 4400
user: "root"
password: ""
security:
ssl-ca: "dir-placeholer/task-ca.pem"
ssl-cert: "dir-placeholer/dm.pem"
ssl-key: "dir-placeholer/dm.key"

mysql-instances:
- source-id: "mysql-replica-01"
black-white-list: "instance"
route-rules: ["route-rule-1"]
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

black-white-list:
instance:
do-dbs: ["tls"]

routes:
route-rule-1:
schema-pattern: "tls"
target-schema: "tls2"

mydumpers:
global:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "--statement-size=100"

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100

tidb:
backend: "tidb"
4 changes: 2 additions & 2 deletions dm/tests/tls/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ syncers:
worker-count: 16
batch: 100

#tidb:
# backend: "tidb"
tidb:
backend: "tidb"
Loading

0 comments on commit 524e26c

Please sign in to comment.