Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checker(dm): port free space lightning precheck #7733

Merged
merged 18 commits into from
Dec 2, 2022
4 changes: 4 additions & 0 deletions dm/checker/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func ignoreExcept(itemMap map[string]struct{}) []string {
config.OnlineDDLChecking,
config.BinlogDBChecking,
config.TargetDBPrivilegeChecking,
config.LightningFreeSpaceChecking,
config.LightningDownstreamVersionChecking,
config.LightningRegionDistributionChecking,
config.LightningEmptyRegionChecking,
}
ignoreCheckingItems := make([]string, 0, len(items)-len(itemMap))
for _, i := range items {
Expand Down
256 changes: 210 additions & 46 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ import (
"time"

_ "github.com/go-sql-driver/mysql" // for mysql
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/restore"
"github.com/pingcap/tidb/br/pkg/lightning/restore/opts"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/dbutil"
"github.com/pingcap/tidb/util/filter"
regexprrouter "github.com/pingcap/tidb/util/regexpr-router"
Expand Down Expand Up @@ -115,23 +119,32 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, e
return c
}

// Init implements Unit interface.
func (c *Checker) Init(ctx context.Context) (err error) {
rollbackHolder := fr.NewRollbackHolder("checker")
defer func() {
if err != nil {
rollbackHolder.RollbackReverseOrder()
}
}()

rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: c.closeDBs})

c.tctx = tcontext.NewContext(ctx, log.With(zap.String("unit", "task check")))

// 1. get allow-list of tables and routed table name from upstream and downstream
// tablePairInfo records information about a upstream-downstream table pair including
// members may have repeated meanings but have different data structure to satisfy different usages.
type tablePairInfo struct {
// downstream table -> sourceID -> upstream tables
tablesPerDownstreamTable map[filter.Table]map[string][]filter.Table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think PerXXX naming is quite hard to know what's inside, maybe targetTable2SourceTableMap, below too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's two name type: source/target and upstream/downstream. Do you prefer we always use source/target?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer source/target, they have less chars 😅, upstream/downstream are ok too

// downstream database -> tables under this database
downstreamTablesPerDB map[string][]filter.Table
// number of sharding tables of a downstream table among all upstreams.
shardNumPerDownstreamTable map[filter.Table]int
// sourceID -> tables of this upstream in allow-list
tablesPerSourceID map[string][]filter.Table
// sourceID -> databases that contain block-list tables
interestedDBPerUpstream []map[string]struct{}
// sourceID -> downstream table -> source tables
tableMapPerUpstreamWithSourceID map[string]map[filter.Table][]filter.Table
// downstream table -> extended columns
extendedColumnPerTable map[filter.Table][]string
// byte size of all upstream tables, counting both data and index
totalDataSize *atomic.Int64
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, err error) {
info = &tablePairInfo{}
eg, ctx2 := errgroup.WithContext(ctx)
// upstream instance index -> targetTable -> sourceTables

// do network things concurrently
tableMapPerUpstream := make([]map[filter.Table][]filter.Table, len(c.instances))
extendedColumnPerTable := map[filter.Table][]string{}
extendedColumnPerTableMu := sync.Mutex{}
Expand All @@ -154,55 +167,98 @@ func (c *Checker) Init(ctx context.Context) (err error) {
})
}
if egErr := eg.Wait(); egErr != nil {
return egErr
return nil, egErr
}

info.totalDataSize = atomic.NewInt64(0)
if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok &&
c.stCfgs[0].LoaderConfig.ImportMode == config.LoadModePhysical &&
c.stCfgs[0].Mode != config.ModeIncrement {
// TODO: concurrently read it intra-source later
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR is a bit large, I prefer fix it in next PR

for idx := range c.instances {
i := idx
eg.Go(func() error {
for _, sourceTables := range tableMapPerUpstream[i] {
for _, sourceTable := range sourceTables {
size, err2 := utils.FetchTableEstimatedBytes(
ctx,
c.instances[i].sourceDB.DB,
sourceTable.Schema,
sourceTable.Name,
)
if err2 != nil {
return err2
}
info.totalDataSize.Add(size)
}
}
return nil
})
}
}
if egErr := eg.Wait(); egErr != nil {
return nil, egErr
}

// 2. calculate needed data structure, like sharding tables of a target table
// from multiple upstream...

// targetTable -> sourceID -> sourceTables
tablesPerTargetTable := make(map[filter.Table]map[string][]filter.Table)
// sharding table number of a target table
shardNumPerTargetTable := make(map[filter.Table]int)
info.extendedColumnPerTable = extendedColumnPerTable
info.tablesPerDownstreamTable = make(map[filter.Table]map[string][]filter.Table)
info.shardNumPerDownstreamTable = make(map[filter.Table]int)
info.downstreamTablesPerDB = make(map[string][]filter.Table)

for i, inst := range c.instances {
mapping := tableMapPerUpstream[i]
err = sameTableNameDetection(mapping)
if err != nil {
return err
return nil, err
}

sourceID := inst.cfg.SourceID
for targetTable, sourceTables := range mapping {
tablesPerSource, ok := tablesPerTargetTable[targetTable]
tablesPerSource, ok := info.tablesPerDownstreamTable[targetTable]
if !ok {
tablesPerSource = make(map[string][]filter.Table)
tablesPerTargetTable[targetTable] = tablesPerSource
info.tablesPerDownstreamTable[targetTable] = tablesPerSource
}
tablesPerSource[sourceID] = append(tablesPerSource[sourceID], sourceTables...)
shardNumPerTargetTable[targetTable] += len(sourceTables)
info.shardNumPerDownstreamTable[targetTable] += len(sourceTables)
info.downstreamTablesPerDB[targetTable.Schema] = append(info.downstreamTablesPerDB[targetTable.Schema], targetTable)
}
}

// calculate allow-list tables and databases they belongs to per upstream
// sourceID -> tables
allowTablesPerUpstream := make(map[string][]filter.Table, len(c.instances))
relatedDBPerUpstream := make([]map[string]struct{}, len(c.instances))
tableMapPerUpstreamWithSourceID := make(map[string]map[filter.Table][]filter.Table, len(c.instances))
info.tablesPerSourceID = make(map[string][]filter.Table, len(c.instances))
info.interestedDBPerUpstream = make([]map[string]struct{}, len(c.instances))
info.tableMapPerUpstreamWithSourceID = make(map[string]map[filter.Table][]filter.Table, len(c.instances))
for i, inst := range c.instances {
sourceID := inst.cfg.SourceID
relatedDBPerUpstream[i] = make(map[string]struct{})
info.interestedDBPerUpstream[i] = make(map[string]struct{})
mapping := tableMapPerUpstream[i]
tableMapPerUpstreamWithSourceID[sourceID] = mapping
info.tableMapPerUpstreamWithSourceID[sourceID] = mapping
for _, tables := range mapping {
allowTablesPerUpstream[sourceID] = append(allowTablesPerUpstream[sourceID], tables...)
info.tablesPerSourceID[sourceID] = append(info.tablesPerSourceID[sourceID], tables...)
for _, table := range tables {
relatedDBPerUpstream[i][table.Schema] = struct{}{}
info.interestedDBPerUpstream[i][table.Schema] = struct{}{}
}
}
}
return info, nil
}

// Init implements Unit interface.
func (c *Checker) Init(ctx context.Context) (err error) {
rollbackHolder := fr.NewRollbackHolder("checker")
defer func() {
if err != nil {
rollbackHolder.RollbackReverseOrder()
}
}()

// 3. create checkers
rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: c.closeDBs})

c.tctx = tcontext.NewContext(ctx, log.With(zap.String("unit", "task check")))
info, err := c.getTablePairInfo(ctx)
if err != nil {
return err
}

if _, ok := c.checkingItems[config.ConnNumberChecking]; ok {
if len(c.stCfgs) > 0 {
Expand Down Expand Up @@ -260,7 +316,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
c.checkList = append(c.checkList, checker.NewSourceDumpPrivilegeChecker(
instance.sourceDB.DB,
instance.sourceDBinfo,
allowTablesPerUpstream[sourceID],
info.tablesPerSourceID[sourceID],
exportCfg.Consistency,
c.dumpWholeInstance,
))
Expand All @@ -284,10 +340,10 @@ func (c *Checker) Init(ctx context.Context) (err error) {
c.checkList = append(c.checkList, checker.NewSourceReplicationPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.OnlineDDLChecking]; c.onlineDDL != nil && ok {
c.checkList = append(c.checkList, checker.NewOnlineDDLChecker(instance.sourceDB.DB, relatedDBPerUpstream[i], c.onlineDDL, instance.baList))
c.checkList = append(c.checkList, checker.NewOnlineDDLChecker(instance.sourceDB.DB, info.interestedDBPerUpstream[i], c.onlineDDL, instance.baList))
}
if _, ok := c.checkingItems[config.BinlogDBChecking]; ok {
c.checkList = append(c.checkList, checker.NewBinlogDBChecker(instance.sourceDB, instance.sourceDBinfo, relatedDBPerUpstream[i], instance.cfg.CaseSensitive))
c.checkList = append(c.checkList, checker.NewBinlogDBChecker(instance.sourceDB, instance.sourceDBinfo, info.interestedDBPerUpstream[i], instance.cfg.CaseSensitive))
}
}
}
Expand All @@ -297,8 +353,8 @@ func (c *Checker) Init(ctx context.Context) (err error) {
c.checkList = append(c.checkList, checker.NewTablesChecker(
upstreamDBs,
c.instances[0].targetDB.DB,
tableMapPerUpstreamWithSourceID,
extendedColumnPerTable,
info.tableMapPerUpstreamWithSourceID,
info.extendedColumnPerTable,
dumpThreads,
))
}
Expand All @@ -314,8 +370,8 @@ func (c *Checker) Init(ctx context.Context) (err error) {
return err
}
if isFresh {
for targetTable, shardingSet := range tablesPerTargetTable {
if shardNumPerTargetTable[targetTable] <= 1 {
for targetTable, shardingSet := range info.tablesPerDownstreamTable {
if info.shardNumPerDownstreamTable[targetTable] <= 1 {
continue
}
if instance.cfg.ShardMode == config.ShardPessimistic {
Expand All @@ -338,7 +394,17 @@ func (c *Checker) Init(ctx context.Context) (err error) {
}
}

if instance.cfg.Mode != config.ModeIncrement && instance.cfg.LoaderConfig.ImportMode == config.LoadModePhysical {
hasLightningPrecheck := false
for _, item := range config.LightningPrechecks {
if _, ok := c.checkingItems[item]; ok {
hasLightningPrecheck = true
break
}
}

if instance.cfg.Mode != config.ModeIncrement &&
instance.cfg.LoaderConfig.ImportMode == config.LoadModePhysical &&
hasLightningPrecheck {
lCfg, err := loader.GetLightningConfig(loader.MakeGlobalConfig(instance.cfg), instance.cfg)
if err != nil {
return err
Expand All @@ -350,10 +416,47 @@ func (c *Checker) Init(ctx context.Context) (err error) {
return err
}

builder, err := restore.NewPrecheckItemBuilderFromConfig(c.tctx.Context(), lCfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, lCfg)
if err != nil {
return err
}
targetDB, err := restore.DBFromConfig(ctx, lCfg.TiDB)
if err != nil {
return err
}
targetInfoGetter, err := restore.NewTargetInfoGetterImpl(lCfg, targetDB)
if err != nil {
return err
}

var dbMetas []*mydump.MDDatabaseMeta

// use downstream table for shard merging
for db, tables := range info.downstreamTablesPerDB {
mdTables := make([]*mydump.MDTableMeta, 0, len(tables))
for _, table := range tables {
mdTables = append(mdTables, &mydump.MDTableMeta{
DB: db,
Name: table.Name,
})
}
dbMetas = append(dbMetas, &mydump.MDDatabaseMeta{
Name: db,
Tables: mdTables,
})
}

builder := restore.NewPrecheckItemBuilder(
lCfg,
dbMetas,
newLightningPrecheckAdaptor(targetInfoGetter, info),
cpdb,
)

if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok {
c.checkList = append(c.checkList, checker.NewLightningFreeSpaceChecker(
info.totalDataSize.Load(), targetInfoGetter))
}
if _, ok := c.checkingItems[config.LightningEmptyRegionChecking]; ok {
lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterEmptyRegion)
if err != nil {
Expand All @@ -375,6 +478,13 @@ func (c *Checker) Init(ctx context.Context) (err error) {
}
c.checkList = append(c.checkList, checker.NewLightningClusterVersionChecker(lChecker))
}
if _, ok := c.checkingItems[config.LightningSortingSpaceChecking]; ok {
lChecker, err := builder.BuildPrecheckItem(restore.CheckLocalTempKVDir)
if err != nil {
return err
}
c.checkList = append(c.checkList, checker.NewLightningSortingSpaceChecker(lChecker))
}
}

c.tctx.Logger.Info(c.displayCheckingItems())
Expand Down Expand Up @@ -689,3 +799,57 @@ func sameTableNameDetection(tables map[filter.Table][]filter.Table) error {

return nil
}

// lightningPrecheckAdaptor implements the restore.PreRestoreInfoGetter interface.
type lightningPrecheckAdaptor struct {
restore.TargetInfoGetter
allTables map[string]*checkpoints.TidbDBInfo
sourceDataResult restore.EstimateSourceDataSizeResult
}

func newLightningPrecheckAdaptor(
targetInfoGetter restore.TargetInfoGetter,
info *tablePairInfo,
) *lightningPrecheckAdaptor {
var (
sourceDataResult restore.EstimateSourceDataSizeResult
allTables = make(map[string]*checkpoints.TidbDBInfo)
)
if info != nil {
sourceDataResult.SizeWithIndex = info.totalDataSize.Load()
}
for db, tables := range info.downstreamTablesPerDB {
allTables[db] = &checkpoints.TidbDBInfo{
Name: db,
Tables: make(map[string]*checkpoints.TidbTableInfo),
}
for _, table := range tables {
allTables[db].Tables[table.Name] = &checkpoints.TidbTableInfo{
DB: db,
Name: table.Name,
}
}
}
return &lightningPrecheckAdaptor{
TargetInfoGetter: targetInfoGetter,
allTables: allTables,
sourceDataResult: sourceDataResult,
}
}

func (l *lightningPrecheckAdaptor) GetAllTableStructures(ctx context.Context, opts ...opts.GetPreInfoOption) (map[string]*checkpoints.TidbDBInfo, error) {
// re-use with other checker? or in fact we only use other information than structure?
return l.allTables, nil
}

func (l *lightningPrecheckAdaptor) ReadFirstNRowsByTableName(ctx context.Context, schemaName string, tableName string, n int) (cols []string, rows [][]types.Datum, err error) {
return nil, nil, errors.New("not implemented")
}

func (l *lightningPrecheckAdaptor) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) (cols []string, rows [][]types.Datum, err error) {
return nil, nil, errors.New("not implemented")
}

func (l *lightningPrecheckAdaptor) EstimateSourceDataSize(ctx context.Context, opts ...opts.GetPreInfoOption) (*restore.EstimateSourceDataSizeResult, error) {
return &l.sourceDataResult, nil
}
Loading