Skip to content

Commit

Permalink
checker(dm): port some precheck from lightning (#7617)
Browse files Browse the repository at this point in the history
ref #3510
  • Loading branch information
lance6716 authored Nov 24, 2022
1 parent 507ba3c commit e0bce5e
Show file tree
Hide file tree
Showing 5 changed files with 351 additions and 14 deletions.
41 changes: 41 additions & 0 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"time"

_ "github.com/go-sql-driver/mysql" // for mysql
"github.com/pingcap/tidb/br/pkg/lightning/restore"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/dbutil"
"github.com/pingcap/tidb/util/filter"
regexprrouter "github.com/pingcap/tidb/util/regexpr-router"
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/loader"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/checker"
Expand Down Expand Up @@ -336,6 +338,45 @@ func (c *Checker) Init(ctx context.Context) (err error) {
}
}

if instance.cfg.Mode != config.ModeIncrement && instance.cfg.LoaderConfig.ImportMode == config.LoadModePhysical {
lCfg, err := loader.GetLightningConfig(loader.MakeGlobalConfig(instance.cfg), instance.cfg)
if err != nil {
return err
}
// Adjust will raise error when this field is empty, so we set any non empty value here.
lCfg.Mydumper.SourceDir = "noop://"
err = lCfg.Adjust(ctx)
if err != nil {
return err
}

builder, err := restore.NewPrecheckItemBuilderFromConfig(c.tctx.Context(), lCfg)
if err != nil {
return err
}
if _, ok := c.checkingItems[config.LightningEmptyRegionChecking]; ok {
lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterEmptyRegion)
if err != nil {
return err
}
c.checkList = append(c.checkList, checker.NewLightningEmptyRegionChecker(lChecker))
}
if _, ok := c.checkingItems[config.LightningRegionDistributionChecking]; ok {
lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterRegionDist)
if err != nil {
return err
}
c.checkList = append(c.checkList, checker.NewLightningRegionDistributionChecker(lChecker))
}
if _, ok := c.checkingItems[config.LightningDownstreamVersionChecking]; ok {
lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterVersion)
if err != nil {
return err
}
c.checkList = append(c.checkList, checker.NewLightningClusterVersionChecker(lChecker))
}
}

c.tctx.Logger.Info(c.displayCheckingItems())
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions dm/config/checking_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (
BinlogDBChecking = "binlog_db"
ConnNumberChecking = "conn_number"
TargetDBPrivilegeChecking = "target_privilege"
// lighting prechecks.
LightningEmptyRegionChecking = "empty_region"
LightningRegionDistributionChecking = "region_distribution"
LightningDownstreamVersionChecking = "downstream_version"
)

// AllCheckingItems contains all checking items.
Expand All @@ -56,6 +60,10 @@ var AllCheckingItems = map[string]string{
BinlogDBChecking: "binlog db checking item",
ConnNumberChecking: "connection number checking item",
TargetDBPrivilegeChecking: "privileges of target DB checking item",
// lightning prechecks
LightningEmptyRegionChecking: "physical import mode empty region checking item",
LightningRegionDistributionChecking: "physical import mode region distribution checking item",
LightningDownstreamVersionChecking: "physical import mode downstream TiDB/PD/TiKV version checking item",
}

// MaxSourceIDLength is the max length for dm-worker source id.
Expand Down
38 changes: 24 additions & 14 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type LightningLoader struct {

// NewLightning creates a new Loader importing data with lightning.
func NewLightning(cfg *config.SubTaskConfig, cli *clientv3.Client, workerName string) *LightningLoader {
lightningCfg := makeGlobalConfig(cfg)
lightningCfg := MakeGlobalConfig(cfg)
logger := log.L()
if cfg.FrameworkLogger != nil {
logger = log.Logger{Logger: cfg.FrameworkLogger}
Expand All @@ -97,7 +97,8 @@ func NewLightning(cfg *config.SubTaskConfig, cli *clientv3.Client, workerName st
return loader
}

func makeGlobalConfig(cfg *config.SubTaskConfig) *lcfg.GlobalConfig {
// MakeGlobalConfig converts subtask config to lightning global config.
func MakeGlobalConfig(cfg *config.SubTaskConfig) *lcfg.GlobalConfig {
lightningCfg := lcfg.NewGlobalConfig()
if cfg.To.Security != nil {
lightningCfg.Security.CABytes = cfg.To.Security.SSLCABytes
Expand Down Expand Up @@ -281,42 +282,51 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) er
return terror.ErrLoadLightningRuntime.Delegate(err)
}

func (l *LightningLoader) getLightningConfig() (*lcfg.Config, error) {
// GetLightningConfig returns the lightning task config for the lightning global config and DM subtask config.
func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTaskConfig) (*lcfg.Config, error) {
cfg := lcfg.NewConfig()
if err := cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil {
if err := cfg.LoadFromGlobal(globalCfg); err != nil {
return nil, err
}
// TableConcurrency is adjusted to the value of RegionConcurrency
// when using TiDB backend.
// TODO: should we set the TableConcurrency separately.
cfg.App.RegionConcurrency = l.cfg.LoaderConfig.PoolSize
cfg.Routes = l.cfg.RouteRules
cfg.App.RegionConcurrency = subtaskCfg.LoaderConfig.PoolSize
cfg.Routes = subtaskCfg.RouteRules

cfg.Checkpoint.Driver = lcfg.CheckpointDriverFile
var cpPath string
// l.cfg.LoaderConfig.Dir may be a s3 path, and Lightning supports checkpoint in s3, we can use storage.AdjustPath to adjust path both local and s3.
cpPath, err := storage.AdjustPath(l.cfg.LoaderConfig.Dir, string(filepath.Separator)+lightningCheckpointFileName)
cpPath, err := storage.AdjustPath(subtaskCfg.LoaderConfig.Dir, string(filepath.Separator)+lightningCheckpointFileName)
if err != nil {
return nil, err
}
cfg.Checkpoint.DSN = cpPath
cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin

cfg.TikvImporter.OnDuplicate = string(l.cfg.OnDuplicate)
cfg.TikvImporter.OnDuplicate = string(subtaskCfg.OnDuplicate)
cfg.TiDB.Vars = make(map[string]string)
cfg.Routes = l.cfg.RouteRules
if l.cfg.To.Session != nil {
for k, v := range l.cfg.To.Session {
cfg.Routes = subtaskCfg.RouteRules
if subtaskCfg.To.Session != nil {
for k, v := range subtaskCfg.To.Session {
cfg.TiDB.Vars[k] = v
}
}
cfg.TiDB.StrSQLMode = l.sqlMode
cfg.TiDB.Vars = map[string]string{
"time_zone": l.timeZone,
// always set transaction mode to optimistic
"tidb_txn_mode": "optimistic",
}
cfg.Mydumper.SourceID = l.cfg.SourceID
cfg.Mydumper.SourceID = subtaskCfg.SourceID
return cfg, nil
}

func (l *LightningLoader) getLightningConfig() (*lcfg.Config, error) {
cfg, err := GetLightningConfig(l.lightningGlobalConfig, l.cfg)
if err != nil {
return nil, err
}
cfg.TiDB.StrSQLMode = l.sqlMode
cfg.TiDB.Vars["time_zone"] = l.timeZone
return cfg, nil
}

Expand Down
137 changes: 137 additions & 0 deletions dm/pkg/checker/lightning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package checker

import (
"context"

"github.com/pingcap/tidb/br/pkg/lightning/restore"
)

func convertLightningPrecheck(
ctx context.Context,
dmResult *Result,
lightningPrechecker restore.PrecheckItem,
failLevel State,
instruction string,
) {
lightningResult, err := lightningPrechecker.Check(ctx)
if err != nil {
markCheckError(dmResult, err)
return
}
if !lightningResult.Passed {
dmResult.State = failLevel
dmResult.Instruction = instruction
dmResult.Errors = append(dmResult.Errors, &Error{Severity: failLevel, ShortErr: lightningResult.Message})
return
}
dmResult.State = StateSuccess
}

// LightningEmptyRegionChecker checks whether there are too many empty regions in the cluster.
type LightningEmptyRegionChecker struct {
inner restore.PrecheckItem
}

// NewLightningEmptyRegionChecker creates a new LightningEmptyRegionChecker.
func NewLightningEmptyRegionChecker(lightningChecker restore.PrecheckItem) RealChecker {
return &LightningEmptyRegionChecker{inner: lightningChecker}
}

// Name implements the RealChecker interface.
func (c *LightningEmptyRegionChecker) Name() string {
return "lightning_empty_region"
}

// Check implements the RealChecker interface.
func (c *LightningEmptyRegionChecker) Check(ctx context.Context) *Result {
result := &Result{
Name: c.Name(),
Desc: "check whether there are too many empty Regions in the TiKV under Physical import mode",
State: StateFailure,
}
convertLightningPrecheck(
ctx,
result,
c.inner,
StateWarning,
`you can change "region merge" related configuration in PD to speed up eliminating empty regions`,
)
return result
}

// LightningRegionDistributionChecker checks whether the region distribution is balanced.
type LightningRegionDistributionChecker struct {
inner restore.PrecheckItem
}

// NewLightningRegionDistributionChecker creates a new LightningRegionDistributionChecker.
func NewLightningRegionDistributionChecker(lightningChecker restore.PrecheckItem) RealChecker {
return &LightningRegionDistributionChecker{inner: lightningChecker}
}

// Name implements the RealChecker interface.
func (c *LightningRegionDistributionChecker) Name() string {
return "lightning_region_distribution"
}

// Check implements the RealChecker interface.
func (c *LightningRegionDistributionChecker) Check(ctx context.Context) *Result {
result := &Result{
Name: c.Name(),
Desc: "check whether the Regions in the TiKV cluster are distributed evenly under Physical import mode",
State: StateFailure,
}
convertLightningPrecheck(
ctx,
result,
c.inner,
StateWarning,
`you can change "region schedule" related configuration in PD to speed up balancing regions`,
)
return result
}

// LightningClusterVersionChecker checks whether the cluster version is compatible with Lightning.
type LightningClusterVersionChecker struct {
inner restore.PrecheckItem
}

// NewLightningClusterVersionChecker creates a new LightningClusterVersionChecker.
func NewLightningClusterVersionChecker(lightningChecker restore.PrecheckItem) RealChecker {
return &LightningClusterVersionChecker{inner: lightningChecker}
}

// Name implements the RealChecker interface.
func (c *LightningClusterVersionChecker) Name() string {
return "lightning_cluster_version"
}

// Check implements the RealChecker interface.
func (c *LightningClusterVersionChecker) Check(ctx context.Context) *Result {
result := &Result{
Name: c.Name(),
Desc: "check whether the downstream TiDB/PD/TiKV version meets the requirements of Physical import mode",
State: StateFailure,
}
convertLightningPrecheck(
ctx,
result,
c.inner,
StateFailure,
`you can switch to logical import mode which has no requirements on downstream cluster version`,
)
return result
}
Loading

0 comments on commit e0bce5e

Please sign in to comment.