Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#6328
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
buchuitoudegou authored and ti-chi-bot committed Jul 20, 2022
1 parent 01a9b2e commit 13a7f71
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 0 deletions.
120 changes: 120 additions & 0 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,104 @@ func (l *LightningLoader) restore(ctx context.Context) error {
if err := l.checkPoint.Init(tctx, lightningCheckpointFile, 1); err != nil {
return err
}
<<<<<<< HEAD
if err := l.checkPoint.Load(tctx); err != nil {
=======

var opts []lightning.Option
if l.cfg.MetricsFactory != nil {
// this branch means dataflow engine has set a Factory, the Factory itself
// will register and deregister metrics, so we must use NoopRegistry
// to avoid duplicated registration.
opts = append(opts,
lightning.WithPromFactory(
promutil.NewWrappingFactory(
l.cfg.MetricsFactory,
"",
prometheus.Labels{"task": l.cfg.Name, "source_id": l.cfg.SourceID},
)),
lightning.WithPromRegistry(tidbpromutil.NewNoopRegistry()))
} else {
registry := prometheus.DefaultGatherer.(prometheus.Registerer)
failpoint.Inject("DontUnregister", func() {
registry = promutil.NewOnlyRegRegister(registry)
})

opts = append(opts,
lightning.WithPromFactory(
promutil.NewWrappingFactory(
tidbpromutil.NewDefaultFactory(),
"",
prometheus.Labels{"task": l.cfg.Name, "source_id": l.cfg.SourceID},
),
),
lightning.WithPromRegistry(registry))
}
if l.cfg.ExtStorage != nil {
opts = append(opts,
lightning.WithDumpFileStorage(l.cfg.ExtStorage),
lightning.WithCheckpointStorage(l.cfg.ExtStorage, lightningCheckpointFileName))
}
if l.cfg.FrameworkLogger != nil {
opts = append(opts, lightning.WithLogger(l.cfg.FrameworkLogger))
}

err = l.core.RunOnceWithOptions(taskCtx, cfg, opts...)
failpoint.Inject("LoadDataSlowDown", nil)
failpoint.Inject("LoadDataSlowDownByTask", func(val failpoint.Value) {
tasks := val.(string)
taskNames := strings.Split(tasks, ",")
for _, taskName := range taskNames {
if l.cfg.Name == taskName {
l.logger.Info("inject failpoint LoadDataSlowDownByTask in lightning loader", zap.String("task", taskName))
<-taskCtx.Done()
}
}
})
return err
}

func (l *LightningLoader) getLightningConfig() (*lcfg.Config, error) {
cfg := lcfg.NewConfig()
if err := cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil {
return nil, err
}
// TableConcurrency is adjusted to the value of RegionConcurrency
// when using TiDB backend.
// TODO: should we set the TableConcurrency separately.
cfg.App.RegionConcurrency = l.cfg.LoaderConfig.PoolSize
cfg.Routes = l.cfg.RouteRules

cfg.Checkpoint.Driver = lcfg.CheckpointDriverFile
var cpPath string
// l.cfg.LoaderConfig.Dir may be a s3 path, and Lightning supports checkpoint in s3, we can use storage.AdjustPath to adjust path both local and s3.
cpPath, err := storage.AdjustPath(l.cfg.LoaderConfig.Dir, string(filepath.Separator)+lightningCheckpointFileName)
if err != nil {
return nil, err
}
cfg.Checkpoint.DSN = cpPath
cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin

cfg.TikvImporter.OnDuplicate = string(l.cfg.OnDuplicate)
cfg.TiDB.Vars = make(map[string]string)
cfg.Routes = l.cfg.RouteRules
if l.cfg.To.Session != nil {
for k, v := range l.cfg.To.Session {
cfg.TiDB.Vars[k] = v
}
}
cfg.TiDB.StrSQLMode = l.sqlMode
cfg.TiDB.Vars = map[string]string{
"time_zone": l.timeZone,
// always set transaction mode to optimistic
"tidb_txn_mode": "optimistic",
}
return cfg, nil
}

func (l *LightningLoader) restore(ctx context.Context) error {
if err := putLoadTask(l.cli, l.cfg, l.workerName); err != nil {
>>>>>>> ac2d0906f (lightning(dm): enable to config concurrency when using lightning (#6328))
return err
}
db2Tables := make(map[string]Tables2DataFiles)
Expand All @@ -153,6 +250,7 @@ func (l *LightningLoader) restore(ctx context.Context) error {
if err = l.checkPoint.CalcProgress(db2Tables); err != nil {
return err
}
<<<<<<< HEAD
if !l.checkPoint.IsTableFinished(lightningCheckpointDB, lightningCheckpointTable) {
cfg := lcfg.NewConfig()
if err = cfg.LoadFromGlobal(l.lightningConfig); err != nil {
Expand Down Expand Up @@ -188,6 +286,28 @@ func (l *LightningLoader) restore(ctx context.Context) error {
offsetSQL := l.checkPoint.GenSQL(lightningCheckpointFile, 1)
err = l.toDBConns[0].executeSQL(tctx, []string{offsetSQL})
_ = l.checkPoint.UpdateOffset(lightningCheckpointFile, 1)
=======

if status < lightningStatusFinished {
if err = l.checkPointList.RegisterCheckPoint(ctx); err != nil {
return err
}
var cfg *lcfg.Config
cfg, err = l.getLightningConfig()
if err != nil {
return err
}
err = l.runLightning(ctx, cfg)
if err == nil {
l.finish.Store(true)
err = l.checkPointList.UpdateStatus(ctx, lightningStatusFinished)
if err != nil {
l.logger.Error("failed to update checkpoint status", zap.Error(err))
return err
}
} else {
l.logger.Error("failed to runlightning", zap.Error(err))
>>>>>>> ac2d0906f (lightning(dm): enable to config concurrency when using lightning (#6328))
}
} else {
l.finish.Store(true)
Expand Down
33 changes: 33 additions & 0 deletions dm/loader/lightning_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package loader

import (
"testing"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/stretchr/testify/require"
)

func TestSetLightningConfig(t *testing.T) {
stCfg := &config.SubTaskConfig{
LoaderConfig: config.LoaderConfig{
PoolSize: 10,
},
}
l := NewLightning(stCfg, nil, "")
cfg, err := l.getLightningConfig()
require.NoError(t, err)
require.Equal(t, stCfg.LoaderConfig.PoolSize, cfg.App.RegionConcurrency)
}

0 comments on commit 13a7f71

Please sign in to comment.