From 8b34f0208bf800e7c1fdd670198ff4a9e8a8bb3b Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Mon, 18 Jul 2022 17:44:42 +0800 Subject: [PATCH 1/4] fix: config lightning region concurrency --- dm/loader/lightning.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dm/loader/lightning.go b/dm/loader/lightning.go index 6922f23c186..0f4a88eb33a 100644 --- a/dm/loader/lightning.go +++ b/dm/loader/lightning.go @@ -281,6 +281,10 @@ func (l *LightningLoader) restore(ctx context.Context) error { return err } cfg := lcfg.NewConfig() + // TableConcurrency is adjusted to the value of RegionConcurrency + // when using TiDB backend. + // TODO: should we set the TableConcurrency separately. + cfg.App.RegionConcurrency = l.cfg.LoaderConfig.PoolSize if err = cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil { return err } From df5017c185bb829304899d16e8fabeebb2825c04 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Tue, 19 Jul 2022 10:29:26 +0800 Subject: [PATCH 2/4] address comment --- dm/loader/lightning.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dm/loader/lightning.go b/dm/loader/lightning.go index 0f4a88eb33a..860763a925b 100644 --- a/dm/loader/lightning.go +++ b/dm/loader/lightning.go @@ -281,13 +281,13 @@ func (l *LightningLoader) restore(ctx context.Context) error { return err } cfg := lcfg.NewConfig() + if err = cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil { + return err + } // TableConcurrency is adjusted to the value of RegionConcurrency // when using TiDB backend. // TODO: should we set the TableConcurrency separately. cfg.App.RegionConcurrency = l.cfg.LoaderConfig.PoolSize - if err = cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil { - return err - } cfg.Routes = l.cfg.RouteRules cfg.Checkpoint.Driver = lcfg.CheckpointDriverFile From bcec1134c67af338e79104f1ddec89d91f9baee4 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Tue, 19 Jul 2022 14:13:33 +0800 Subject: [PATCH 3/4] add ut --- dm/loader/lightning.go | 70 +++++++++++++++++++++---------------- dm/loader/lightning_test.go | 33 +++++++++++++++++ 2 files changed, 72 insertions(+), 31 deletions(-) create mode 100644 dm/loader/lightning_test.go diff --git a/dm/loader/lightning.go b/dm/loader/lightning.go index 860763a925b..9d92f98a45c 100644 --- a/dm/loader/lightning.go +++ b/dm/loader/lightning.go @@ -266,6 +266,44 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) er return err } +func (l *LightningLoader) setLightningConfig() (*lcfg.Config, error) { + cfg := lcfg.NewConfig() + if err := cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil { + return nil, err + } + // TableConcurrency is adjusted to the value of RegionConcurrency + // when using TiDB backend. + // TODO: should we set the TableConcurrency separately. + cfg.App.RegionConcurrency = l.cfg.LoaderConfig.PoolSize + cfg.Routes = l.cfg.RouteRules + + cfg.Checkpoint.Driver = lcfg.CheckpointDriverFile + var cpPath string + // l.cfg.LoaderConfig.Dir may be a s3 path, and Lightning supports checkpoint in s3, we can use storage.AdjustPath to adjust path both local and s3. + cpPath, err := storage.AdjustPath(l.cfg.LoaderConfig.Dir, string(filepath.Separator)+lightningCheckpointFileName) + if err != nil { + return nil, err + } + cfg.Checkpoint.DSN = cpPath + cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin + + cfg.TikvImporter.OnDuplicate = string(l.cfg.OnDuplicate) + cfg.TiDB.Vars = make(map[string]string) + cfg.Routes = l.cfg.RouteRules + if l.cfg.To.Session != nil { + for k, v := range l.cfg.To.Session { + cfg.TiDB.Vars[k] = v + } + } + cfg.TiDB.StrSQLMode = l.sqlMode + cfg.TiDB.Vars = map[string]string{ + "time_zone": l.timeZone, + // always set transaction mode to optimistic + "tidb_txn_mode": "optimistic", + } + return cfg, nil +} + func (l *LightningLoader) restore(ctx context.Context) error { if err := putLoadTask(l.cli, l.cfg, l.workerName); err != nil { return err @@ -280,40 +318,10 @@ func (l *LightningLoader) restore(ctx context.Context) error { if err = l.checkPointList.RegisterCheckPoint(ctx); err != nil { return err } - cfg := lcfg.NewConfig() - if err = cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil { - return err - } - // TableConcurrency is adjusted to the value of RegionConcurrency - // when using TiDB backend. - // TODO: should we set the TableConcurrency separately. - cfg.App.RegionConcurrency = l.cfg.LoaderConfig.PoolSize - cfg.Routes = l.cfg.RouteRules - - cfg.Checkpoint.Driver = lcfg.CheckpointDriverFile - var cpPath string - // l.cfg.LoaderConfig.Dir may be a s3 path, and Lightning supports checkpoint in s3, we can use storage.AdjustPath to adjust path both local and s3. - cpPath, err = storage.AdjustPath(l.cfg.LoaderConfig.Dir, string(filepath.Separator)+lightningCheckpointFileName) + cfg, err := l.setLightningConfig() if err != nil { return err } - cfg.Checkpoint.DSN = cpPath - cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin - - cfg.TikvImporter.OnDuplicate = string(l.cfg.OnDuplicate) - cfg.TiDB.Vars = make(map[string]string) - cfg.Routes = l.cfg.RouteRules - if l.cfg.To.Session != nil { - for k, v := range l.cfg.To.Session { - cfg.TiDB.Vars[k] = v - } - } - cfg.TiDB.StrSQLMode = l.sqlMode - cfg.TiDB.Vars = map[string]string{ - "time_zone": l.timeZone, - // always set transaction mode to optimistic - "tidb_txn_mode": "optimistic", - } err = l.runLightning(ctx, cfg) if err == nil { l.finish.Store(true) diff --git a/dm/loader/lightning_test.go b/dm/loader/lightning_test.go new file mode 100644 index 00000000000..cf289f782c1 --- /dev/null +++ b/dm/loader/lightning_test.go @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package loader + +import ( + "testing" + + "github.com/pingcap/tiflow/dm/dm/config" + "github.com/stretchr/testify/require" +) + +func TestSetLightningConfig(t *testing.T) { + stCfg := &config.SubTaskConfig{ + LoaderConfig: config.LoaderConfig{ + PoolSize: 10, + }, + } + l := NewLightning(stCfg, nil, "") + cfg, err := l.setLightningConfig() + require.NoError(t, err) + require.Equal(t, stCfg.LoaderConfig.PoolSize, cfg.App.RegionConcurrency) +} From 0463e8f6d85aebb264a640a53f81f35d2e735637 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Tue, 19 Jul 2022 17:05:13 +0800 Subject: [PATCH 4/4] fix name & lint --- dm/loader/lightning.go | 9 +++++++-- dm/loader/lightning_test.go | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/dm/loader/lightning.go b/dm/loader/lightning.go index 9d92f98a45c..cce2e548941 100644 --- a/dm/loader/lightning.go +++ b/dm/loader/lightning.go @@ -266,7 +266,7 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) er return err } -func (l *LightningLoader) setLightningConfig() (*lcfg.Config, error) { +func (l *LightningLoader) getLightningConfig() (*lcfg.Config, error) { cfg := lcfg.NewConfig() if err := cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil { return nil, err @@ -318,7 +318,8 @@ func (l *LightningLoader) restore(ctx context.Context) error { if err = l.checkPointList.RegisterCheckPoint(ctx); err != nil { return err } - cfg, err := l.setLightningConfig() + var cfg *lcfg.Config + cfg, err = l.getLightningConfig() if err != nil { return err } @@ -326,6 +327,10 @@ func (l *LightningLoader) restore(ctx context.Context) error { if err == nil { l.finish.Store(true) err = l.checkPointList.UpdateStatus(ctx, lightningStatusFinished) + if err != nil { + l.logger.Error("failed to update checkpoint status", zap.Error(err)) + return err + } } else { l.logger.Error("failed to runlightning", zap.Error(err)) } diff --git a/dm/loader/lightning_test.go b/dm/loader/lightning_test.go index cf289f782c1..f86d6cdc3ee 100644 --- a/dm/loader/lightning_test.go +++ b/dm/loader/lightning_test.go @@ -27,7 +27,7 @@ func TestSetLightningConfig(t *testing.T) { }, } l := NewLightning(stCfg, nil, "") - cfg, err := l.setLightningConfig() + cfg, err := l.getLightningConfig() require.NoError(t, err) require.Equal(t, stCfg.LoaderConfig.PoolSize, cfg.App.RegionConcurrency) }