-
Notifications
You must be signed in to change notification settings - Fork 283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
loader(dm): register tls config when clean checkpoint #3522
Changes from 28 commits
ceff2c5
38032dd
3b2d9cc
929a355
0a41374
02039f7
2a732cc
89940a8
16df138
557a06a
8007383
c787aca
0685924
8093c6e
c584bac
8ee46cd
e7ff6d7
f904eff
6bdda82
f63c58a
045d82c
b49cc99
2b04bb6
aead0da
388bfce
e2b938b
d134f15
4566dde
fa145d0
be7c29b
3966613
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,14 +19,11 @@ import ( | |
"strings" | ||
"sync" | ||
|
||
"github.com/docker/go-units" | ||
"github.com/pingcap/errors" | ||
"github.com/pingcap/failpoint" | ||
"github.com/pingcap/tidb-tools/pkg/dbutil" | ||
"github.com/pingcap/tidb/br/pkg/lightning" | ||
"github.com/pingcap/tidb/br/pkg/lightning/common" | ||
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config" | ||
"github.com/pingcap/tidb/parser/mysql" | ||
"go.etcd.io/etcd/clientv3" | ||
"go.uber.org/atomic" | ||
"go.uber.org/zap" | ||
|
@@ -38,6 +35,7 @@ import ( | |
"github.com/pingcap/ticdc/dm/pkg/conn" | ||
tcontext "github.com/pingcap/ticdc/dm/pkg/context" | ||
"github.com/pingcap/ticdc/dm/pkg/log" | ||
"github.com/pingcap/ticdc/dm/pkg/terror" | ||
"github.com/pingcap/ticdc/dm/pkg/utils" | ||
) | ||
|
||
|
@@ -51,36 +49,38 @@ const ( | |
type LightningLoader struct { | ||
sync.RWMutex | ||
|
||
cfg *config.SubTaskConfig | ||
cli *clientv3.Client | ||
checkPoint CheckPoint | ||
checkPointList *LightningCheckpointList | ||
workerName string | ||
logger log.Logger | ||
core *lightning.Lightning | ||
toDB *conn.BaseDB | ||
toDBConns []*DBConn | ||
lightningConfig *lcfg.GlobalConfig | ||
timeZone string | ||
timeZone string | ||
lightningGlobalConfig *lcfg.GlobalConfig | ||
cfg *config.SubTaskConfig | ||
|
||
checkPoint CheckPoint | ||
checkPointList *LightningCheckpointList | ||
|
||
logger log.Logger | ||
cli *clientv3.Client | ||
core *lightning.Lightning | ||
cancel context.CancelFunc // for per task context, which maybe different from lightning context | ||
|
||
toDBConns []*DBConn | ||
toDB *conn.BaseDB | ||
|
||
workerName string | ||
finish atomic.Bool | ||
closed atomic.Bool | ||
metaBinlog atomic.String | ||
metaBinlogGTID atomic.String | ||
cancel context.CancelFunc // for per task context, which maybe different from lightning context | ||
} | ||
|
||
// NewLightning creates a new Loader importing data with lightning. | ||
func NewLightning(cfg *config.SubTaskConfig, cli *clientv3.Client, workerName string) *LightningLoader { | ||
lightningCfg := makeGlobalConfig(cfg) | ||
core := lightning.New(lightningCfg) | ||
loader := &LightningLoader{ | ||
cfg: cfg, | ||
cli: cli, | ||
core: core, | ||
lightningConfig: lightningCfg, | ||
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "lightning-load")), | ||
workerName: workerName, | ||
cfg: cfg, | ||
cli: cli, | ||
workerName: workerName, | ||
lightningGlobalConfig: lightningCfg, | ||
core: lightning.New(lightningCfg), | ||
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "lightning-load")), | ||
} | ||
return loader | ||
} | ||
|
@@ -91,19 +91,23 @@ func makeGlobalConfig(cfg *config.SubTaskConfig) *lcfg.GlobalConfig { | |
lightningCfg.Security.CAPath = cfg.To.Security.SSLCA | ||
lightningCfg.Security.CertPath = cfg.To.Security.SSLCert | ||
lightningCfg.Security.KeyPath = cfg.To.Security.SSLKey | ||
// use task name as tls config name to prevent multiple subtasks from conflicting with each other | ||
lightningCfg.Security.TLSConfigName = cfg.Name | ||
} | ||
lightningCfg.TiDB.Host = cfg.To.Host | ||
lightningCfg.TiDB.Psw = cfg.To.Password | ||
lightningCfg.TiDB.User = cfg.To.User | ||
lightningCfg.TiDB.Port = cfg.To.Port | ||
lightningCfg.TiDB.StatusPort = cfg.TiDB.StatusPort | ||
lightningCfg.TiDB.PdAddr = cfg.TiDB.PdAddr | ||
lightningCfg.TiDB.LogLevel = cfg.LogLevel | ||
lightningCfg.TikvImporter.Backend = cfg.TiDB.Backend | ||
lightningCfg.PostRestore.Checksum = lcfg.OpLevelOff | ||
if cfg.TiDB.Backend == lcfg.BackendLocal { | ||
lightningCfg.TikvImporter.SortedKVDir = cfg.Dir | ||
} | ||
lightningCfg.Mydumper.SourceDir = cfg.Dir | ||
lightningCfg.App.Config.File = "" // make lightning not init logger, see more in https://github.com/pingcap/tidb/pull/29291 | ||
return lightningCfg | ||
} | ||
|
||
|
@@ -156,8 +160,8 @@ func (l *LightningLoader) Init(ctx context.Context) (err error) { | |
} | ||
|
||
func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) error { | ||
l.Lock() | ||
taskCtx, cancel := context.WithCancel(ctx) | ||
l.Lock() | ||
l.cancel = cancel | ||
l.Unlock() | ||
err := l.core.RunOnce(taskCtx, cfg, nil) | ||
|
@@ -172,7 +176,6 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) er | |
} | ||
} | ||
}) | ||
l.logger.Info("end runLightning") | ||
return err | ||
} | ||
|
||
|
@@ -204,39 +207,30 @@ func (l *LightningLoader) restore(ctx context.Context) error { | |
} | ||
} | ||
cfg := lcfg.NewConfig() | ||
if err = cfg.LoadFromGlobal(l.lightningConfig); err != nil { | ||
if err = cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil { | ||
return err | ||
} | ||
cfg.Routes = l.cfg.RouteRules | ||
cfg.Checkpoint.Driver = lcfg.CheckpointDriverMySQL | ||
cfg.Checkpoint.Schema = config.TiDBLightningCheckpointPrefix + dbutil.TableName(l.workerName, l.cfg.Name) | ||
cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin | ||
param := common.MySQLConnectParam{ | ||
Host: cfg.TiDB.Host, | ||
Port: cfg.TiDB.Port, | ||
User: cfg.TiDB.User, | ||
Password: cfg.TiDB.Psw, | ||
SQLMode: mysql.DefaultSQLMode, | ||
MaxAllowedPacket: 64 * units.MiB, | ||
TLS: cfg.TiDB.TLS, | ||
} | ||
cfg.Checkpoint.DSN = param.ToDSN() | ||
cfg.TiDB.Vars = make(map[string]string) | ||
if l.cfg.To.Session != nil { | ||
for k, v := range l.cfg.To.Session { | ||
cfg.TiDB.Vars[k] = v | ||
} | ||
} | ||
|
||
cfg.TiDB.StrSQLMode = l.cfg.LoaderConfig.SQLMode | ||
cfg.TiDB.Vars = map[string]string{ | ||
"time_zone": l.timeZone, | ||
} | ||
if err = cfg.Adjust(ctx); err != nil { | ||
return err | ||
} | ||
cfg.TiDB.Vars = map[string]string{"time_zone": l.timeZone} | ||
err = l.runLightning(ctx, cfg) | ||
if err == nil { | ||
// lightning will auto deregister tls when task done, so we need to register it again for removing checkpoint | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we might let lightning not deregister the certificate. If there're more than one subtasks, the certificate may still be deregister between line 245 and line 249 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now lightning allow register tls config with different name, and it test is passed, ptal 🧡 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CheckpointRemove should be postponed to cleanmeta. I will add this logic in a following PR, so LGTM. |
||
if l.cfg.To.Security != nil { | ||
if registerErr := cfg.Security.RegisterMySQL(); registerErr != nil { | ||
return terror.ErrConnRegistryTLSConfig.Delegate(registerErr) | ||
} | ||
defer cfg.Security.DeregisterMySQL() | ||
} | ||
err = lightning.CheckpointRemove(ctx, cfg, "all") | ||
} | ||
if err == nil { | ||
|
@@ -300,10 +294,7 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult) | |
default: | ||
} | ||
l.logger.Info("lightning load end", zap.Bool("IsCanceled", isCanceled)) | ||
pr <- pb.ProcessResult{ | ||
IsCanceled: isCanceled, | ||
Errors: errs, | ||
} | ||
pr <- pb.ProcessResult{IsCanceled: isCanceled, Errors: errs} | ||
} | ||
|
||
func (l *LightningLoader) isClosed() bool { | ||
|
@@ -345,7 +336,9 @@ func (l *LightningLoader) Resume(ctx context.Context, pr chan pb.ProcessResult) | |
l.logger.Warn("try to resume, but already closed") | ||
return | ||
} | ||
l.core = lightning.New(l.lightningConfig) | ||
l.Lock() | ||
lance6716 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
l.core = lightning.New(l.lightningGlobalConfig) | ||
l.Unlock() | ||
// continue the processing | ||
l.Process(ctx, pr) | ||
} | ||
|
@@ -355,7 +348,8 @@ func (l *LightningLoader) Resume(ctx context.Context, pr chan pb.ProcessResult) | |
// now no config diff implemented, so simply re-init use new config | ||
// no binlog filter for loader need to update. | ||
func (l *LightningLoader) Update(ctx context.Context, cfg *config.SubTaskConfig) error { | ||
// update l.cfg | ||
l.Lock() | ||
defer l.Unlock() | ||
l.cfg.BAList = cfg.BAList | ||
l.cfg.RouteRules = cfg.RouteRules | ||
l.cfg.ColumnMappingRules = cfg.ColumnMappingRules | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
check-struct-only = false | ||
check-thread-count = 4 | ||
export-fix-sql = true | ||
|
||
[routes.rule1] | ||
schema-pattern = "tls" | ||
target-schema = "tls2" | ||
|
||
[task] | ||
output-dir = "/tmp/ticdc_dm_test/output" | ||
source-instances = ["mysql1"] | ||
target-check-tables = ["tls2.t"] | ||
target-instance = "tidb0" | ||
|
||
[data-sources.mysql1] | ||
host = "127.0.0.1" | ||
password = "123456" | ||
port = 3306 | ||
route-rules = ["rule1"] | ||
user = "root" | ||
|
||
[data-sources.tidb0] | ||
host = "127.0.0.1" | ||
port = 4400 | ||
user = "root" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
--- | ||
name: test2 | ||
task-mode: all | ||
is-sharding: false | ||
meta-schema: "dm_meta" | ||
|
||
target-database: | ||
host: "127.0.0.1" | ||
port: 4400 | ||
user: "root" | ||
password: "" | ||
security: | ||
ssl-ca: "dir-placeholer/task-ca.pem" | ||
ssl-cert: "dir-placeholer/dm.pem" | ||
ssl-key: "dir-placeholer/dm.key" | ||
|
||
mysql-instances: | ||
- source-id: "mysql-replica-01" | ||
black-white-list: "instance" | ||
route-rules: ["route-rule-1"] | ||
mydumper-config-name: "global" | ||
loader-config-name: "global" | ||
syncer-config-name: "global" | ||
|
||
black-white-list: | ||
instance: | ||
do-dbs: ["tls"] | ||
|
||
routes: | ||
route-rule-1: | ||
schema-pattern: "tls" | ||
target-schema: "tls2" | ||
|
||
mydumpers: | ||
global: | ||
threads: 4 | ||
chunk-filesize: 0 | ||
skip-tz-utc: true | ||
extra-args: "--statement-size=100" | ||
|
||
loaders: | ||
global: | ||
pool-size: 16 | ||
dir: "./dumped_data" | ||
|
||
syncers: | ||
global: | ||
worker-count: 16 | ||
batch: 100 | ||
|
||
tidb: | ||
backend: "tidb" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,5 +42,5 @@ syncers: | |
worker-count: 16 | ||
batch: 100 | ||
|
||
#tidb: | ||
# backend: "tidb" | ||
tidb: | ||
backend: "tidb" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lightning will auto adjust config in hehe, don't need to adjust again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hehe 😄