From a03c28adf8bfbc6d829630d5973dfade83a35d6a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 28 Nov 2022 18:16:02 +0800 Subject: [PATCH 01/16] checker(dm): port cluster free space lightning precheck Signed-off-by: lance6716 --- dm/checker/checker.go | 276 ++++++++++++++++++++++++++++-------- dm/config/checking_item.go | 1 + dm/pkg/checker/lightning.go | 32 +++++ dm/pkg/utils/common.go | 10 ++ 4 files changed, 260 insertions(+), 59 deletions(-) diff --git a/dm/checker/checker.go b/dm/checker/checker.go index 07a734dec98..ce5c7d79d9c 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -25,9 +25,13 @@ import ( "time" _ "github.com/go-sql-driver/mysql" // for mysql + "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/restore" + "github.com/pingcap/tidb/br/pkg/lightning/restore/opts" "github.com/pingcap/tidb/dumpling/export" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/dbutil" "github.com/pingcap/tidb/util/filter" regexprrouter "github.com/pingcap/tidb/util/regexpr-router" @@ -115,23 +119,32 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, e return c } -// Init implements Unit interface. -func (c *Checker) Init(ctx context.Context) (err error) { - rollbackHolder := fr.NewRollbackHolder("checker") - defer func() { - if err != nil { - rollbackHolder.RollbackReverseOrder() - } - }() - - rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: c.closeDBs}) - - c.tctx = tcontext.NewContext(ctx, log.With(zap.String("unit", "task check"))) - - // 1. get allow-list of tables and routed table name from upstream and downstream +// tablePairInfo records information about a upstream-downstream table pair including +// members may have repeated meanings but have different data structure to satisfy different usages. +// TODO: unify upstream/downstream vs source/target +type tablePairInfo struct { + // target table -> sourceID -> source tables + tablesPerTargetTable map[filter.Table]map[string][]filter.Table + // target database -> tables under this database + targetTablesPerDB map[string][]filter.Table + // number of sharding tables of a target table among all upstreams. + shardNumPerTargetTable map[filter.Table]int + // sourceID -> tables in allow-list + allowTablesPerUpstream map[string][]filter.Table + // sourceID -> databases that contain block-list tables + interestedDBPerUpstream []map[string]struct{} + // sourceID -> target table -> source tables + tableMapPerUpstreamWithSourceID map[string]map[filter.Table][]filter.Table + // target table -> extended columns + extendedColumnPerTable map[filter.Table][]string + // source index -> source tables -> size + tableSizePerSource []map[filter.Table]int64 +} +func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, err error) { eg, ctx2 := errgroup.WithContext(ctx) - // upstream instance index -> targetTable -> sourceTables + + // do network things concurrently tableMapPerUpstream := make([]map[filter.Table][]filter.Table, len(c.instances)) extendedColumnPerTable := map[filter.Table][]string{} extendedColumnPerTableMu := sync.Mutex{} @@ -154,55 +167,97 @@ func (c *Checker) Init(ctx context.Context) (err error) { }) } if egErr := eg.Wait(); egErr != nil { - return egErr + return nil, egErr + } + + var tableSizeMu sync.Mutex + tableSize := make([]map[filter.Table]int64, len(c.instances)) + if c.stCfgs[0].Mode != config.ModeIncrement { + // TODO: concurrently read it intra-source later + for idx := range c.instances { + i := idx + eg.Go(func() error { + for _, sourceTables := range tableMapPerUpstream[i] { + for _, sourceTable := range sourceTables { + size, err := utils.FetchTableEstimatedBytes( + ctx, + c.instances[i].sourceDB.DB, + sourceTable.Schema, + sourceTable.Name, + ) + if err != nil { + return err + } + tableSizeMu.Lock() + tableSize[i][sourceTable] = size + tableSizeMu.Unlock() + } + } + return nil + }) + } + } + if egErr := eg.Wait(); egErr != nil { + return nil, egErr } - // 2. calculate needed data structure, like sharding tables of a target table - // from multiple upstream... - - // targetTable -> sourceID -> sourceTables - tablesPerTargetTable := make(map[filter.Table]map[string][]filter.Table) - // sharding table number of a target table - shardNumPerTargetTable := make(map[filter.Table]int) + info.tableSizePerSource = tableSize + info.extendedColumnPerTable = extendedColumnPerTable + info.tablesPerTargetTable = make(map[filter.Table]map[string][]filter.Table) + info.shardNumPerTargetTable = make(map[filter.Table]int) + info.targetTablesPerDB = make(map[string][]filter.Table) for i, inst := range c.instances { mapping := tableMapPerUpstream[i] err = sameTableNameDetection(mapping) if err != nil { - return err + return nil, err } sourceID := inst.cfg.SourceID for targetTable, sourceTables := range mapping { - tablesPerSource, ok := tablesPerTargetTable[targetTable] + tablesPerSource, ok := info.tablesPerTargetTable[targetTable] if !ok { tablesPerSource = make(map[string][]filter.Table) - tablesPerTargetTable[targetTable] = tablesPerSource + info.tablesPerTargetTable[targetTable] = tablesPerSource } tablesPerSource[sourceID] = append(tablesPerSource[sourceID], sourceTables...) - shardNumPerTargetTable[targetTable] += len(sourceTables) + info.shardNumPerTargetTable[targetTable] += len(sourceTables) + info.targetTablesPerDB[targetTable.Schema] = append(info.targetTablesPerDB[targetTable.Schema], targetTable) } } - // calculate allow-list tables and databases they belongs to per upstream - // sourceID -> tables - allowTablesPerUpstream := make(map[string][]filter.Table, len(c.instances)) - relatedDBPerUpstream := make([]map[string]struct{}, len(c.instances)) - tableMapPerUpstreamWithSourceID := make(map[string]map[filter.Table][]filter.Table, len(c.instances)) + info.allowTablesPerUpstream = make(map[string][]filter.Table, len(c.instances)) + info.interestedDBPerUpstream = make([]map[string]struct{}, len(c.instances)) + info.tableMapPerUpstreamWithSourceID = make(map[string]map[filter.Table][]filter.Table, len(c.instances)) for i, inst := range c.instances { sourceID := inst.cfg.SourceID - relatedDBPerUpstream[i] = make(map[string]struct{}) + info.interestedDBPerUpstream[i] = make(map[string]struct{}) mapping := tableMapPerUpstream[i] - tableMapPerUpstreamWithSourceID[sourceID] = mapping + info.tableMapPerUpstreamWithSourceID[sourceID] = mapping for _, tables := range mapping { - allowTablesPerUpstream[sourceID] = append(allowTablesPerUpstream[sourceID], tables...) + info.allowTablesPerUpstream[sourceID] = append(info.allowTablesPerUpstream[sourceID], tables...) for _, table := range tables { - relatedDBPerUpstream[i][table.Schema] = struct{}{} + info.interestedDBPerUpstream[i][table.Schema] = struct{}{} } } } + return info, nil +} + +// Init implements Unit interface. +func (c *Checker) Init(ctx context.Context) (err error) { + rollbackHolder := fr.NewRollbackHolder("checker") + defer func() { + if err != nil { + rollbackHolder.RollbackReverseOrder() + } + }() - // 3. create checkers + rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: c.closeDBs}) + + c.tctx = tcontext.NewContext(ctx, log.With(zap.String("unit", "task check"))) + info, err := c.getTablePairInfo(ctx) if _, ok := c.checkingItems[config.ConnNumberChecking]; ok { if len(c.stCfgs) > 0 { @@ -260,7 +315,7 @@ func (c *Checker) Init(ctx context.Context) (err error) { c.checkList = append(c.checkList, checker.NewSourceDumpPrivilegeChecker( instance.sourceDB.DB, instance.sourceDBinfo, - allowTablesPerUpstream[sourceID], + info.allowTablesPerUpstream[sourceID], exportCfg.Consistency, c.dumpWholeInstance, )) @@ -284,10 +339,10 @@ func (c *Checker) Init(ctx context.Context) (err error) { c.checkList = append(c.checkList, checker.NewSourceReplicationPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo)) } if _, ok := c.checkingItems[config.OnlineDDLChecking]; c.onlineDDL != nil && ok { - c.checkList = append(c.checkList, checker.NewOnlineDDLChecker(instance.sourceDB.DB, relatedDBPerUpstream[i], c.onlineDDL, instance.baList)) + c.checkList = append(c.checkList, checker.NewOnlineDDLChecker(instance.sourceDB.DB, info.interestedDBPerUpstream[i], c.onlineDDL, instance.baList)) } if _, ok := c.checkingItems[config.BinlogDBChecking]; ok { - c.checkList = append(c.checkList, checker.NewBinlogDBChecker(instance.sourceDB, instance.sourceDBinfo, relatedDBPerUpstream[i], instance.cfg.CaseSensitive)) + c.checkList = append(c.checkList, checker.NewBinlogDBChecker(instance.sourceDB, instance.sourceDBinfo, info.interestedDBPerUpstream[i], instance.cfg.CaseSensitive)) } } } @@ -297,8 +352,8 @@ func (c *Checker) Init(ctx context.Context) (err error) { c.checkList = append(c.checkList, checker.NewTablesChecker( upstreamDBs, c.instances[0].targetDB.DB, - tableMapPerUpstreamWithSourceID, - extendedColumnPerTable, + info.tableMapPerUpstreamWithSourceID, + info.extendedColumnPerTable, dumpThreads, )) } @@ -314,8 +369,8 @@ func (c *Checker) Init(ctx context.Context) (err error) { return err } if isFresh { - for targetTable, shardingSet := range tablesPerTargetTable { - if shardNumPerTargetTable[targetTable] <= 1 { + for targetTable, shardingSet := range info.tablesPerTargetTable { + if info.shardNumPerTargetTable[targetTable] <= 1 { continue } if instance.cfg.ShardMode == config.ShardPessimistic { @@ -338,7 +393,7 @@ func (c *Checker) Init(ctx context.Context) (err error) { } } - if instance.cfg.Mode != config.ModeIncrement && instance.cfg.LoaderConfig.ImportMode == config.LoadModePhysical { + if instance.cfg.Mode != config.ModeIncrement { lCfg, err := loader.GetLightningConfig(loader.MakeGlobalConfig(instance.cfg), instance.cfg) if err != nil { return err @@ -350,30 +405,75 @@ func (c *Checker) Init(ctx context.Context) (err error) { return err } - builder, err := restore.NewPrecheckItemBuilderFromConfig(c.tctx.Context(), lCfg) + cpdb, err := checkpoints.OpenCheckpointsDB(ctx, lCfg) if err != nil { return err } - if _, ok := c.checkingItems[config.LightningEmptyRegionChecking]; ok { - lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterEmptyRegion) - if err != nil { - return err + targetDB, err := restore.DBFromConfig(ctx, lCfg.TiDB) + if err != nil { + return err + } + targetInfoGetter, err := restore.NewTargetInfoGetterImpl(lCfg, targetDB) + if err != nil { + return err + } + + var ( + dbMetas []*mydump.MDDatabaseMeta + ) + + // use downstream table for shard merging + for db, tables := range info.targetTablesPerDB { + mdTables := make([]*mydump.MDTableMeta, 0, len(tables)) + for _, table := range tables { + mdTables = append(mdTables, &mydump.MDTableMeta{ + DB: db, + Name: table.Name, + }) } - c.checkList = append(c.checkList, checker.NewLightningEmptyRegionChecker(lChecker)) + dbMetas = append(dbMetas, &mydump.MDDatabaseMeta{ + Name: db, + Tables: mdTables, + }) } - if _, ok := c.checkingItems[config.LightningRegionDistributionChecking]; ok { - lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterRegionDist) + + builder := restore.NewPrecheckItemBuilder( + lCfg, + dbMetas, + newLightningPrecheckAdaptor(targetInfoGetter, info), + cpdb, + ) + + if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok { + lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterSize) if err != nil { return err } - c.checkList = append(c.checkList, checker.NewLightningRegionDistributionChecker(lChecker)) + c.checkList = append(c.checkList, checker.NewLightningFreeSpaceChecker(lChecker)) } - if _, ok := c.checkingItems[config.LightningDownstreamVersionChecking]; ok { - lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterVersion) - if err != nil { - return err + + if instance.cfg.LoaderConfig.ImportMode == config.LoadModePhysical { + if _, ok := c.checkingItems[config.LightningEmptyRegionChecking]; ok { + lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterEmptyRegion) + if err != nil { + return err + } + c.checkList = append(c.checkList, checker.NewLightningEmptyRegionChecker(lChecker)) + } + if _, ok := c.checkingItems[config.LightningRegionDistributionChecking]; ok { + lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterRegionDist) + if err != nil { + return err + } + c.checkList = append(c.checkList, checker.NewLightningRegionDistributionChecker(lChecker)) + } + if _, ok := c.checkingItems[config.LightningDownstreamVersionChecking]; ok { + lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterVersion) + if err != nil { + return err + } + c.checkList = append(c.checkList, checker.NewLightningClusterVersionChecker(lChecker)) } - c.checkList = append(c.checkList, checker.NewLightningClusterVersionChecker(lChecker)) } } @@ -689,3 +789,61 @@ func sameTableNameDetection(tables map[filter.Table][]filter.Table) error { return nil } + +// lightningPrecheckAdaptor implements the restore.PreRestoreInfoGetter interface. +type lightningPrecheckAdaptor struct { + restore.TargetInfoGetter + allTables map[string]*checkpoints.TidbDBInfo + sourceDataResult restore.EstimateSourceDataSizeResult +} + +func newLightningPrecheckAdaptor( + targetInfoGetter restore.TargetInfoGetter, + info *tablePairInfo, +) *lightningPrecheckAdaptor { + var ( + sourceDataResult restore.EstimateSourceDataSizeResult + allTables = make(map[string]*checkpoints.TidbDBInfo) + ) + if info != nil { + for _, tables := range info.tableSizePerSource { + for _, size := range tables { + sourceDataResult.SizeWithIndex += size + } + } + } + for db, tables := range info.targetTablesPerDB { + allTables[db] = &checkpoints.TidbDBInfo{ + Name: db, + Tables: make(map[string]*checkpoints.TidbTableInfo), + } + for _, table := range tables { + allTables[db].Tables[table.Name] = &checkpoints.TidbTableInfo{ + DB: db, + Name: table.Name, + } + } + } + return &lightningPrecheckAdaptor{ + TargetInfoGetter: targetInfoGetter, + allTables: allTables, + sourceDataResult: sourceDataResult, + } +} + +func (l *lightningPrecheckAdaptor) GetAllTableStructures(ctx context.Context, opts ...opts.GetPreInfoOption) (map[string]*checkpoints.TidbDBInfo, error) { + // re-use with other checker? or in fact we only use other information than structure? + return l.allTables, nil +} + +func (l *lightningPrecheckAdaptor) ReadFirstNRowsByTableName(ctx context.Context, schemaName string, tableName string, n int) (cols []string, rows [][]types.Datum, err error) { + return nil, nil, errors.New("not implemented") +} + +func (l *lightningPrecheckAdaptor) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) (cols []string, rows [][]types.Datum, err error) { + return nil, nil, errors.New("not implemented") +} + +func (l *lightningPrecheckAdaptor) EstimateSourceDataSize(ctx context.Context, opts ...opts.GetPreInfoOption) (*restore.EstimateSourceDataSizeResult, error) { + return &l.sourceDataResult, nil +} diff --git a/dm/config/checking_item.go b/dm/config/checking_item.go index c403e10b510..92a32645b6f 100644 --- a/dm/config/checking_item.go +++ b/dm/config/checking_item.go @@ -41,6 +41,7 @@ const ( LightningEmptyRegionChecking = "empty_region" LightningRegionDistributionChecking = "region_distribution" LightningDownstreamVersionChecking = "downstream_version" + LightningFreeSpaceChecking = "free_space" ) // AllCheckingItems contains all checking items. diff --git a/dm/pkg/checker/lightning.go b/dm/pkg/checker/lightning.go index 156560d0205..c63542642cc 100644 --- a/dm/pkg/checker/lightning.go +++ b/dm/pkg/checker/lightning.go @@ -135,3 +135,35 @@ func (c *LightningClusterVersionChecker) Check(ctx context.Context) *Result { ) return result } + +// LightningFreeSpaceChecker checks whether the cluster has enough free space. +type LightningFreeSpaceChecker struct { + inner restore.PrecheckItem +} + +// NewLightningFreeSpaceChecker creates a new LightningFreeSpaceChecker. +func NewLightningFreeSpaceChecker(lightningChecker restore.PrecheckItem) RealChecker { + return &LightningFreeSpaceChecker{inner: lightningChecker} +} + +// Name implements the RealChecker interface. +func (c *LightningFreeSpaceChecker) Name() string { + return "lightning_free_space" +} + +// Check implements the RealChecker interface. +func (c *LightningFreeSpaceChecker) Check(ctx context.Context) *Result { + result := &Result{ + Name: c.Name(), + Desc: "check whether the downstream has enough free space to store the data to be migrated", + State: StateFailure, + } + convertLightningPrecheck( + ctx, + result, + c.inner, + StateWarning, + `You can try to scale-out more TiKV to gain more storage space`, + ) + return result +} diff --git a/dm/pkg/utils/common.go b/dm/pkg/utils/common.go index 020166087e7..8c100a5b939 100644 --- a/dm/pkg/utils/common.go +++ b/dm/pkg/utils/common.go @@ -168,6 +168,16 @@ func FetchTargetDoTables( return tableMapper, extendedColumnPerTable, nil } +// FetchTableEstimatedBytes returns the estimated size (data + index) in bytes of the table. +func FetchTableEstimatedBytes(ctx context.Context, db *sql.DB, schema string, table string) (int64, error) { + var size int64 + err := db.QueryRowContext(ctx, "SELECT data_length + index_length FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?", schema, table).Scan(&size) + if err != nil { + return 0, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + return size, nil +} + // LowerCaseTableNamesFlavor represents the type of db `lower_case_table_names` settings. type LowerCaseTableNamesFlavor uint8 From 12bfa0fd16dc4b007936ffcd93ca4d46238573c5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 28 Nov 2022 19:24:42 +0800 Subject: [PATCH 02/16] sync code Signed-off-by: lance6716 --- dm/checker/check_test.go | 4 ++++ dm/checker/checker.go | 20 +++++++++++++++----- dm/config/checking_item.go | 11 ++++++++++- dm/pkg/utils/common.go | 6 ++++++ dm/tests/lightning_mode/run.sh | 11 ++++++++--- 5 files changed, 43 insertions(+), 9 deletions(-) diff --git a/dm/checker/check_test.go b/dm/checker/check_test.go index f2b1a75e2ac..f95468380a3 100644 --- a/dm/checker/check_test.go +++ b/dm/checker/check_test.go @@ -52,6 +52,10 @@ func ignoreExcept(itemMap map[string]struct{}) []string { config.OnlineDDLChecking, config.BinlogDBChecking, config.TargetDBPrivilegeChecking, + config.LightningFreeSpaceChecking, + config.LightningDownstreamVersionChecking, + config.LightningRegionDistributionChecking, + config.LightningEmptyRegionChecking, } ignoreCheckingItems := make([]string, 0, len(items)-len(itemMap)) for _, i := range items { diff --git a/dm/checker/checker.go b/dm/checker/checker.go index ce5c7d79d9c..f18ae6cf61f 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -142,6 +142,7 @@ type tablePairInfo struct { } func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, err error) { + info = &tablePairInfo{} eg, ctx2 := errgroup.WithContext(ctx) // do network things concurrently @@ -172,7 +173,7 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er var tableSizeMu sync.Mutex tableSize := make([]map[filter.Table]int64, len(c.instances)) - if c.stCfgs[0].Mode != config.ModeIncrement { + if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok && c.stCfgs[0].Mode != config.ModeIncrement { // TODO: concurrently read it intra-source later for idx := range c.instances { i := idx @@ -258,6 +259,9 @@ func (c *Checker) Init(ctx context.Context) (err error) { c.tctx = tcontext.NewContext(ctx, log.With(zap.String("unit", "task check"))) info, err := c.getTablePairInfo(ctx) + if err != nil { + return err + } if _, ok := c.checkingItems[config.ConnNumberChecking]; ok { if len(c.stCfgs) > 0 { @@ -393,7 +397,15 @@ func (c *Checker) Init(ctx context.Context) (err error) { } } - if instance.cfg.Mode != config.ModeIncrement { + hasLightningPrecheck := false + for _, item := range config.LightningPrechecks { + if _, ok := c.checkingItems[item]; ok { + hasLightningPrecheck = true + break + } + } + + if instance.cfg.Mode != config.ModeIncrement && hasLightningPrecheck { lCfg, err := loader.GetLightningConfig(loader.MakeGlobalConfig(instance.cfg), instance.cfg) if err != nil { return err @@ -418,9 +430,7 @@ func (c *Checker) Init(ctx context.Context) (err error) { return err } - var ( - dbMetas []*mydump.MDDatabaseMeta - ) + var dbMetas []*mydump.MDDatabaseMeta // use downstream table for shard merging for db, tables := range info.targetTablesPerDB { diff --git a/dm/config/checking_item.go b/dm/config/checking_item.go index 92a32645b6f..d001a7b2d99 100644 --- a/dm/config/checking_item.go +++ b/dm/config/checking_item.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/tiflow/dm/pkg/terror" ) -// DM definition checking items. +// DM definition checking items. Don't forget to update AllCheckingItems and LightningPrechecks. const ( AllChecking = "all" DumpPrivilegeChecking = "dump_privilege" @@ -44,6 +44,14 @@ const ( LightningFreeSpaceChecking = "free_space" ) +// LightningPrechecks returns all checking items for lightning. +var LightningPrechecks = []string{ + LightningEmptyRegionChecking, + LightningRegionDistributionChecking, + LightningDownstreamVersionChecking, + LightningFreeSpaceChecking, +} + // AllCheckingItems contains all checking items. var AllCheckingItems = map[string]string{ AllChecking: "all checking items", @@ -65,6 +73,7 @@ var AllCheckingItems = map[string]string{ LightningEmptyRegionChecking: "physical import mode empty region checking item", LightningRegionDistributionChecking: "physical import mode region distribution checking item", LightningDownstreamVersionChecking: "physical import mode downstream TiDB/PD/TiKV version checking item", + LightningFreeSpaceChecking: "downstream free space checking item", } // MaxSourceIDLength is the max length for dm-worker source id. diff --git a/dm/pkg/utils/common.go b/dm/pkg/utils/common.go index 8c100a5b939..3154c65d86a 100644 --- a/dm/pkg/utils/common.go +++ b/dm/pkg/utils/common.go @@ -170,6 +170,12 @@ func FetchTargetDoTables( // FetchTableEstimatedBytes returns the estimated size (data + index) in bytes of the table. func FetchTableEstimatedBytes(ctx context.Context, db *sql.DB, schema string, table string) (int64, error) { + failpoint.Inject("VeryLargeTable", func(val failpoint.Value) { + tblName := val.(string) + if tblName == table { + failpoint.Return(1<<62, nil) + } + }) var size int64 err := db.QueryRowContext(ctx, "SELECT data_length + index_length FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?", schema, table).Scan(&size) if err != nil { diff --git a/dm/tests/lightning_mode/run.sh b/dm/tests/lightning_mode/run.sh index 6a9da556f14..33c9a417b33 100755 --- a/dm/tests/lightning_mode/run.sh +++ b/dm/tests/lightning_mode/run.sh @@ -13,6 +13,8 @@ function run() { run_downstream_cluster $WORK_DIR + export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/pkg/utils/VeryLargeTable=return("t1")' + run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 @@ -38,9 +40,11 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - # start DM task only - cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml - dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta" + read -p 123 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "check-task $cur/conf/dm-task.yaml" \ + "\"state\": \"warn\"" 1 + dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml @@ -71,6 +75,7 @@ function run() { killall -9 pd-server 2>/dev/null || true rm -rf /tmp/tidb || true run_tidb_server 4000 $TIDB_PASSWORD + export GO_FAILPOINTS='' } cleanup_data lightning_mode From a8706024de3ecd91a64b667f173d059765f051b8 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 28 Nov 2022 19:39:06 +0800 Subject: [PATCH 03/16] sync code Signed-off-by: lance6716 --- dm/checker/checker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dm/checker/checker.go b/dm/checker/checker.go index f18ae6cf61f..f7baec3ec5a 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -177,6 +177,7 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er // TODO: concurrently read it intra-source later for idx := range c.instances { i := idx + tableSize[i] = make(map[filter.Table]int64) eg.Go(func() error { for _, sourceTables := range tableMapPerUpstream[i] { for _, sourceTable := range sourceTables { From 2df9ee65218cadf411e8de57b7605c7e18d23613 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 28 Nov 2022 19:45:18 +0800 Subject: [PATCH 04/16] sync code Signed-off-by: lance6716 --- dm/tests/lightning_mode/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dm/tests/lightning_mode/run.sh b/dm/tests/lightning_mode/run.sh index 33c9a417b33..5629ab498a7 100755 --- a/dm/tests/lightning_mode/run.sh +++ b/dm/tests/lightning_mode/run.sh @@ -40,10 +40,10 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - read -p 123 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "check-task $cur/conf/dm-task.yaml" \ - "\"state\": \"warn\"" 1 + "Cluster doesn't have enough space" 1 \ + "but we need 4EiB" 1 dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" # use sync_diff_inspector to check full dump loader From cbc51bf7280fd09757d7ea67efbc1b8c011de359 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 28 Nov 2022 20:18:24 +0800 Subject: [PATCH 05/16] refine comment Signed-off-by: lance6716 --- dm/checker/checker.go | 57 +++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/dm/checker/checker.go b/dm/checker/checker.go index f7baec3ec5a..419687dfcf8 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -121,23 +121,22 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, e // tablePairInfo records information about a upstream-downstream table pair including // members may have repeated meanings but have different data structure to satisfy different usages. -// TODO: unify upstream/downstream vs source/target type tablePairInfo struct { - // target table -> sourceID -> source tables - tablesPerTargetTable map[filter.Table]map[string][]filter.Table - // target database -> tables under this database - targetTablesPerDB map[string][]filter.Table - // number of sharding tables of a target table among all upstreams. - shardNumPerTargetTable map[filter.Table]int - // sourceID -> tables in allow-list - allowTablesPerUpstream map[string][]filter.Table + // downstream table -> sourceID -> upstream tables + tablesPerDownstreamTable map[filter.Table]map[string][]filter.Table + // downstream database -> tables under this database + downstreamTablesPerDB map[string][]filter.Table + // number of sharding tables of a downstream table among all upstreams. + shardNumPerDownstreamTable map[filter.Table]int + // sourceID -> tables of this upstream in allow-list + tablesPerSourceID map[string][]filter.Table // sourceID -> databases that contain block-list tables interestedDBPerUpstream []map[string]struct{} - // sourceID -> target table -> source tables + // sourceID -> downstream table -> source tables tableMapPerUpstreamWithSourceID map[string]map[filter.Table][]filter.Table - // target table -> extended columns + // downstream table -> extended columns extendedColumnPerTable map[filter.Table][]string - // source index -> source tables -> size + // source index -> upstream tables -> size tableSizePerSource []map[filter.Table]int64 } @@ -181,14 +180,14 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er eg.Go(func() error { for _, sourceTables := range tableMapPerUpstream[i] { for _, sourceTable := range sourceTables { - size, err := utils.FetchTableEstimatedBytes( + size, err2 := utils.FetchTableEstimatedBytes( ctx, c.instances[i].sourceDB.DB, sourceTable.Schema, sourceTable.Name, ) - if err != nil { - return err + if err2 != nil { + return err2 } tableSizeMu.Lock() tableSize[i][sourceTable] = size @@ -205,9 +204,9 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er info.tableSizePerSource = tableSize info.extendedColumnPerTable = extendedColumnPerTable - info.tablesPerTargetTable = make(map[filter.Table]map[string][]filter.Table) - info.shardNumPerTargetTable = make(map[filter.Table]int) - info.targetTablesPerDB = make(map[string][]filter.Table) + info.tablesPerDownstreamTable = make(map[filter.Table]map[string][]filter.Table) + info.shardNumPerDownstreamTable = make(map[filter.Table]int) + info.downstreamTablesPerDB = make(map[string][]filter.Table) for i, inst := range c.instances { mapping := tableMapPerUpstream[i] @@ -218,18 +217,18 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er sourceID := inst.cfg.SourceID for targetTable, sourceTables := range mapping { - tablesPerSource, ok := info.tablesPerTargetTable[targetTable] + tablesPerSource, ok := info.tablesPerDownstreamTable[targetTable] if !ok { tablesPerSource = make(map[string][]filter.Table) - info.tablesPerTargetTable[targetTable] = tablesPerSource + info.tablesPerDownstreamTable[targetTable] = tablesPerSource } tablesPerSource[sourceID] = append(tablesPerSource[sourceID], sourceTables...) - info.shardNumPerTargetTable[targetTable] += len(sourceTables) - info.targetTablesPerDB[targetTable.Schema] = append(info.targetTablesPerDB[targetTable.Schema], targetTable) + info.shardNumPerDownstreamTable[targetTable] += len(sourceTables) + info.downstreamTablesPerDB[targetTable.Schema] = append(info.downstreamTablesPerDB[targetTable.Schema], targetTable) } } - info.allowTablesPerUpstream = make(map[string][]filter.Table, len(c.instances)) + info.tablesPerSourceID = make(map[string][]filter.Table, len(c.instances)) info.interestedDBPerUpstream = make([]map[string]struct{}, len(c.instances)) info.tableMapPerUpstreamWithSourceID = make(map[string]map[filter.Table][]filter.Table, len(c.instances)) for i, inst := range c.instances { @@ -238,7 +237,7 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er mapping := tableMapPerUpstream[i] info.tableMapPerUpstreamWithSourceID[sourceID] = mapping for _, tables := range mapping { - info.allowTablesPerUpstream[sourceID] = append(info.allowTablesPerUpstream[sourceID], tables...) + info.tablesPerSourceID[sourceID] = append(info.tablesPerSourceID[sourceID], tables...) for _, table := range tables { info.interestedDBPerUpstream[i][table.Schema] = struct{}{} } @@ -320,7 +319,7 @@ func (c *Checker) Init(ctx context.Context) (err error) { c.checkList = append(c.checkList, checker.NewSourceDumpPrivilegeChecker( instance.sourceDB.DB, instance.sourceDBinfo, - info.allowTablesPerUpstream[sourceID], + info.tablesPerSourceID[sourceID], exportCfg.Consistency, c.dumpWholeInstance, )) @@ -374,8 +373,8 @@ func (c *Checker) Init(ctx context.Context) (err error) { return err } if isFresh { - for targetTable, shardingSet := range info.tablesPerTargetTable { - if info.shardNumPerTargetTable[targetTable] <= 1 { + for targetTable, shardingSet := range info.tablesPerDownstreamTable { + if info.shardNumPerDownstreamTable[targetTable] <= 1 { continue } if instance.cfg.ShardMode == config.ShardPessimistic { @@ -434,7 +433,7 @@ func (c *Checker) Init(ctx context.Context) (err error) { var dbMetas []*mydump.MDDatabaseMeta // use downstream table for shard merging - for db, tables := range info.targetTablesPerDB { + for db, tables := range info.downstreamTablesPerDB { mdTables := make([]*mydump.MDTableMeta, 0, len(tables)) for _, table := range tables { mdTables = append(mdTables, &mydump.MDTableMeta{ @@ -823,7 +822,7 @@ func newLightningPrecheckAdaptor( } } } - for db, tables := range info.targetTablesPerDB { + for db, tables := range info.downstreamTablesPerDB { allTables[db] = &checkpoints.TidbDBInfo{ Name: db, Tables: make(map[string]*checkpoints.TidbTableInfo), From 48523f9716b444a8c174d8096962dac4b9765334 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 29 Nov 2022 10:33:56 +0800 Subject: [PATCH 06/16] save work Signed-off-by: lance6716 --- dm/checker/checker.go | 19 +++++-------------- dm/pkg/checker/table_structure.go | 12 +++++++----- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/dm/checker/checker.go b/dm/checker/checker.go index 419687dfcf8..08eba836012 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -136,8 +136,8 @@ type tablePairInfo struct { tableMapPerUpstreamWithSourceID map[string]map[filter.Table][]filter.Table // downstream table -> extended columns extendedColumnPerTable map[filter.Table][]string - // source index -> upstream tables -> size - tableSizePerSource []map[filter.Table]int64 + // byte size of all upstream tables, counting both data and index + totalDataSize *atomic.Int64 } func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, err error) { @@ -170,13 +170,11 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er return nil, egErr } - var tableSizeMu sync.Mutex - tableSize := make([]map[filter.Table]int64, len(c.instances)) + info.totalDataSize = atomic.NewInt64(0) if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok && c.stCfgs[0].Mode != config.ModeIncrement { // TODO: concurrently read it intra-source later for idx := range c.instances { i := idx - tableSize[i] = make(map[filter.Table]int64) eg.Go(func() error { for _, sourceTables := range tableMapPerUpstream[i] { for _, sourceTable := range sourceTables { @@ -189,9 +187,7 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er if err2 != nil { return err2 } - tableSizeMu.Lock() - tableSize[i][sourceTable] = size - tableSizeMu.Unlock() + info.totalDataSize.Add(size) } } return nil @@ -202,7 +198,6 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er return nil, egErr } - info.tableSizePerSource = tableSize info.extendedColumnPerTable = extendedColumnPerTable info.tablesPerDownstreamTable = make(map[filter.Table]map[string][]filter.Table) info.shardNumPerDownstreamTable = make(map[filter.Table]int) @@ -816,11 +811,7 @@ func newLightningPrecheckAdaptor( allTables = make(map[string]*checkpoints.TidbDBInfo) ) if info != nil { - for _, tables := range info.tableSizePerSource { - for _, size := range tables { - sourceDataResult.SizeWithIndex += size - } - } + sourceDataResult.SizeWithIndex = info.totalDataSize.Load() } for db, tables := range info.downstreamTablesPerDB { allTables[db] = &checkpoints.TidbDBInfo{ diff --git a/dm/pkg/checker/table_structure.go b/dm/pkg/checker/table_structure.go index 03d998e787b..6b950e8319f 100644 --- a/dm/pkg/checker/table_structure.go +++ b/dm/pkg/checker/table_structure.go @@ -125,7 +125,7 @@ func (c *TablesChecker) Check(ctx context.Context) *Result { startTime := time.Now() sourceIDs := maps.Keys(c.tableMap) - concurrency, err := getConcurrency(ctx, sourceIDs, c.upstreamDBs, c.dumpThreads) + concurrency, err := GetConcurrency(ctx, sourceIDs, c.upstreamDBs, c.dumpThreads) if err != nil { markCheckError(r, err) return r @@ -574,7 +574,7 @@ func (c *ShardingTablesChecker) Check(ctx context.Context) *Result { } sourceIDs := maps.Keys(c.tableMap) - concurrency, err := getConcurrency(ctx, sourceIDs, c.dbs, c.dumpThreads) + concurrency, err := GetConcurrency(ctx, sourceIDs, c.dbs, c.dumpThreads) if err != nil { markCheckError(r, err) return r @@ -813,7 +813,7 @@ func (c *OptimisticShardingTablesChecker) Check(ctx context.Context) *Result { startTime := time.Now() sourceIDs := maps.Keys(c.tableMap) - concurrency, err := getConcurrency(ctx, sourceIDs, c.dbs, c.dumpThreads) + concurrency, err := GetConcurrency(ctx, sourceIDs, c.dbs, c.dumpThreads) if err != nil { markCheckError(r, err) return r @@ -948,12 +948,14 @@ func dispatchTableItemWithDownstreamTable( close(inCh) } -func getConcurrency(ctx context.Context, sourceIDs []string, dbs map[string]*sql.DB, dumpThreads int) (int, error) { +// GetConcurrency gets the concurrency of workers that we can randomly dispatch +// tasks on any sources to any of them, where each task needs a SQL connection. +func GetConcurrency(ctx context.Context, sourceIDs []string, dbs map[string]*sql.DB, dumpThreads int) (int, error) { concurrency := dumpThreads for _, sourceID := range sourceIDs { db, ok := dbs[sourceID] if !ok { - return 0, errors.NotFoundf("client for sourceID %s", sourceID) + return 0, errors.NotFoundf("SQL connection for sourceID %s", sourceID) } maxConnections, err := utils.GetMaxConnections(ctx, db) if err != nil { From 30ff163a8825f76a9d424b934cfaa7725800c792 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 29 Nov 2022 15:35:02 +0800 Subject: [PATCH 07/16] save work Signed-off-by: lance6716 --- dm/checker/checker.go | 14 ++++--- dm/config/checking_item.go | 3 ++ dm/pkg/checker/lightning.go | 78 +++++++++++++++++++++++++++++++++++-- 3 files changed, 86 insertions(+), 9 deletions(-) diff --git a/dm/checker/checker.go b/dm/checker/checker.go index 08eba836012..6051715fd13 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -450,11 +450,8 @@ func (c *Checker) Init(ctx context.Context) (err error) { ) if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok { - lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterSize) - if err != nil { - return err - } - c.checkList = append(c.checkList, checker.NewLightningFreeSpaceChecker(lChecker)) + c.checkList = append(c.checkList, checker.NewLightningFreeSpaceChecker( + info.totalDataSize.Load(), targetInfoGetter)) } if instance.cfg.LoaderConfig.ImportMode == config.LoadModePhysical { @@ -479,6 +476,13 @@ func (c *Checker) Init(ctx context.Context) (err error) { } c.checkList = append(c.checkList, checker.NewLightningClusterVersionChecker(lChecker)) } + if _, ok := c.checkingItems[config.LightningSortingSpaceChecking]; ok { + lChecker, err := builder.BuildPrecheckItem(restore.CheckLocalTempKVDir) + if err != nil { + return err + } + c.checkList = append(c.checkList, checker.NewLightningSortingSpaceChecker(lChecker)) + } } } diff --git a/dm/config/checking_item.go b/dm/config/checking_item.go index d001a7b2d99..ba5607a39a6 100644 --- a/dm/config/checking_item.go +++ b/dm/config/checking_item.go @@ -42,6 +42,7 @@ const ( LightningRegionDistributionChecking = "region_distribution" LightningDownstreamVersionChecking = "downstream_version" LightningFreeSpaceChecking = "free_space" + LightningSortingSpaceChecking = "enough_sorting_space" ) // LightningPrechecks returns all checking items for lightning. @@ -50,6 +51,7 @@ var LightningPrechecks = []string{ LightningRegionDistributionChecking, LightningDownstreamVersionChecking, LightningFreeSpaceChecking, + LightningSortingSpaceChecking, } // AllCheckingItems contains all checking items. @@ -74,6 +76,7 @@ var AllCheckingItems = map[string]string{ LightningRegionDistributionChecking: "physical import mode region distribution checking item", LightningDownstreamVersionChecking: "physical import mode downstream TiDB/PD/TiKV version checking item", LightningFreeSpaceChecking: "downstream free space checking item", + LightningSortingSpaceChecking: "enough disk space for physical import mode sorting data checking item", } // MaxSourceIDLength is the max length for dm-worker source id. diff --git a/dm/pkg/checker/lightning.go b/dm/pkg/checker/lightning.go index c63542642cc..672a11263d0 100644 --- a/dm/pkg/checker/lightning.go +++ b/dm/pkg/checker/lightning.go @@ -15,7 +15,9 @@ package checker import ( "context" + "fmt" + "github.com/docker/go-units" "github.com/pingcap/tidb/br/pkg/lightning/restore" ) @@ -138,12 +140,16 @@ func (c *LightningClusterVersionChecker) Check(ctx context.Context) *Result { // LightningFreeSpaceChecker checks whether the cluster has enough free space. type LightningFreeSpaceChecker struct { - inner restore.PrecheckItem + sourceDataSize int64 + infoGetter restore.TargetInfoGetter } // NewLightningFreeSpaceChecker creates a new LightningFreeSpaceChecker. -func NewLightningFreeSpaceChecker(lightningChecker restore.PrecheckItem) RealChecker { - return &LightningFreeSpaceChecker{inner: lightningChecker} +func NewLightningFreeSpaceChecker(sourceDataSize int64, getter restore.TargetInfoGetter) RealChecker { + return &LightningFreeSpaceChecker{ + sourceDataSize: sourceDataSize, + infoGetter: getter, + } } // Name implements the RealChecker interface. @@ -158,12 +164,76 @@ func (c *LightningFreeSpaceChecker) Check(ctx context.Context) *Result { Desc: "check whether the downstream has enough free space to store the data to be migrated", State: StateFailure, } + storeInfo, err := c.infoGetter.GetStorageInfo(ctx) + if err != nil { + markCheckError(result, err) + return result + } + clusterAvail := uint64(0) + for _, store := range storeInfo.Stores { + clusterAvail += uint64(store.Status.Available) + } + if clusterAvail < uint64(c.sourceDataSize) { + result.State = StateFailure + result.Errors = append(result.Errors, &Error{ + Severity: StateFailure, + ShortErr: fmt.Sprintf("Cluster doesn't have enough space, available is %s, but we need %s", + units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(c.sourceDataSize))), + }) + result.Instruction = "you can try to scale-out more TiKV to gain more storage space" + return result + } + + replConfig, err := c.infoGetter.GetReplicationConfig(ctx) + if err != nil { + markCheckError(result, err) + return result + } + safeSize := uint64(c.sourceDataSize) * replConfig.MaxReplicas * 2 + if clusterAvail < safeSize { + result.State = StateWarning + result.Errors = append(result.Errors, &Error{ + Severity: StateWarning, + ShortErr: fmt.Sprintf("Cluster may not have enough space, available is %s, but we need %s", + units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(safeSize))), + }) + result.Instruction = "you can try to scale-out more TiKV to gain more storage space" + return result + } + result.State = StateSuccess + return result +} + +// LightningSortingSpaceChecker checks the local disk has enough space for physical +// import mode sorting data. +type LightningSortingSpaceChecker struct { + inner restore.PrecheckItem +} + +// NewLightningSortingSpaceChecker creates a new LightningSortingSpaceChecker. +func NewLightningSortingSpaceChecker(lightningChecker restore.PrecheckItem) RealChecker { + return &LightningSortingSpaceChecker{inner: lightningChecker} +} + +// Name implements the RealChecker interface. +func (c *LightningSortingSpaceChecker) Name() string { + return "lightning_enough_sorting_space" +} + +// Check implements the RealChecker interface. +func (c *LightningSortingSpaceChecker) Check(ctx context.Context) *Result { + result := &Result{ + Name: c.Name(), + Desc: "check whether the free space of sorting-dir-physical is enough for Physical import mode", + State: StateFailure, + } + // TODO: don't expose lightning's config name in message? convertLightningPrecheck( ctx, result, c.inner, StateWarning, - `You can try to scale-out more TiKV to gain more storage space`, + `you can change sorting-dir-physical to another mounting disk or set disk-quota-physical`, ) return result } From 18cc8f74c629d2adb589dbc8c051e91881be5e91 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 29 Nov 2022 15:55:32 +0800 Subject: [PATCH 08/16] sync code Signed-off-by: lance6716 --- dm/config/task.go | 17 ++++++++++++----- dm/pkg/checker/lightning.go | 7 ++++++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index ed5bf8c43e8..0f926f0442f 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -30,6 +30,7 @@ import ( "github.com/dustin/go-humanize" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb-tools/pkg/column-mapping" + "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/util/filter" router "github.com/pingcap/tidb/util/table-router" @@ -274,15 +275,17 @@ const ( // LoaderConfig represents loader process unit's specific config. type LoaderConfig struct { - PoolSize int `yaml:"pool-size" toml:"pool-size" json:"pool-size"` - Dir string `yaml:"dir" toml:"dir" json:"dir"` - SQLMode string `yaml:"-" toml:"-" json:"-"` // wrote by dump unit (DM op) or jobmaster (DM in engine) - ImportMode LoadMode `yaml:"import-mode" toml:"import-mode" json:"import-mode"` + PoolSize int `yaml:"pool-size" toml:"pool-size" json:"pool-size"` + Dir string `yaml:"dir" toml:"dir" json:"dir"` + SortingDirPhysical string `yaml:"sorting-dir-physical" toml:"sorting-dir-physical" json:"sorting-dir-physical"` + SQLMode string `yaml:"-" toml:"-" json:"-"` // wrote by dump unit (DM op) or jobmaster (DM in engine) + ImportMode LoadMode `yaml:"import-mode" toml:"import-mode" json:"import-mode"` // deprecated, use OnDuplicateLogical instead. OnDuplicate LogicalDuplicateResolveType `yaml:"on-duplicate" toml:"on-duplicate" json:"on-duplicate"` OnDuplicateLogical LogicalDuplicateResolveType `yaml:"on-duplicate-logical" toml:"on-duplicate-logical" json:"on-duplicate-logical"` - // TODO: no effects now + // TODO: OnDuplicatePhysical has no effects now OnDuplicatePhysical PhysicalDuplicateResolveType `yaml:"on-duplicate-physical" toml:"on-duplicate-physical" json:"on-duplicate-physical"` + DiskQuotaPhysical config.ByteSize `yaml:"disk-quota-physical" toml:"disk-quota-physical" json:"disk-quota-physical"` } // DefaultLoaderConfig return default loader config for task. @@ -326,6 +329,10 @@ func (m *LoaderConfig) adjust() error { m.PoolSize = defaultPoolSize } + if m.Dir != "" && m.SortingDirPhysical == "" { + m.SortingDirPhysical = m.Dir + } + if m.OnDuplicateLogical == "" && m.OnDuplicate != "" { m.OnDuplicateLogical = m.OnDuplicate } diff --git a/dm/pkg/checker/lightning.go b/dm/pkg/checker/lightning.go index 672a11263d0..3f8df0b6ee8 100644 --- a/dm/pkg/checker/lightning.go +++ b/dm/pkg/checker/lightning.go @@ -16,6 +16,7 @@ package checker import ( "context" "fmt" + "strings" "github.com/docker/go-units" "github.com/pingcap/tidb/br/pkg/lightning/restore" @@ -227,7 +228,6 @@ func (c *LightningSortingSpaceChecker) Check(ctx context.Context) *Result { Desc: "check whether the free space of sorting-dir-physical is enough for Physical import mode", State: StateFailure, } - // TODO: don't expose lightning's config name in message? convertLightningPrecheck( ctx, result, @@ -235,5 +235,10 @@ func (c *LightningSortingSpaceChecker) Check(ctx context.Context) *Result { StateWarning, `you can change sorting-dir-physical to another mounting disk or set disk-quota-physical`, ) + // don't expose lightning's config name in message + if len(result.Errors) > 0 { + result.Errors[0].ShortErr = strings.ReplaceAll(result.Errors[0].ShortErr, "tikv-importer.disk-quota", "disk-quota-physical") + result.Errors[0].ShortErr = strings.ReplaceAll(result.Errors[0].ShortErr, "mydumper.sorted-kv-dir", "sorting-dir-physical") + } return result } From 9b7e3b54d8b23f743913e9bad8b91df3dbb8941d Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 29 Nov 2022 16:30:06 +0800 Subject: [PATCH 09/16] sync code Signed-off-by: lance6716 --- dm/checker/checker.go | 53 +++++++++++++++++----------------- dm/tests/lightning_mode/run.sh | 12 +++++++- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/dm/checker/checker.go b/dm/checker/checker.go index 6051715fd13..8dc0c09a598 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -400,7 +400,9 @@ func (c *Checker) Init(ctx context.Context) (err error) { } } - if instance.cfg.Mode != config.ModeIncrement && hasLightningPrecheck { + if instance.cfg.Mode != config.ModeIncrement && + instance.cfg.LoaderConfig.ImportMode == config.LoadModePhysical && + hasLightningPrecheck { lCfg, err := loader.GetLightningConfig(loader.MakeGlobalConfig(instance.cfg), instance.cfg) if err != nil { return err @@ -453,36 +455,33 @@ func (c *Checker) Init(ctx context.Context) (err error) { c.checkList = append(c.checkList, checker.NewLightningFreeSpaceChecker( info.totalDataSize.Load(), targetInfoGetter)) } - - if instance.cfg.LoaderConfig.ImportMode == config.LoadModePhysical { - if _, ok := c.checkingItems[config.LightningEmptyRegionChecking]; ok { - lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterEmptyRegion) - if err != nil { - return err - } - c.checkList = append(c.checkList, checker.NewLightningEmptyRegionChecker(lChecker)) + if _, ok := c.checkingItems[config.LightningEmptyRegionChecking]; ok { + lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterEmptyRegion) + if err != nil { + return err } - if _, ok := c.checkingItems[config.LightningRegionDistributionChecking]; ok { - lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterRegionDist) - if err != nil { - return err - } - c.checkList = append(c.checkList, checker.NewLightningRegionDistributionChecker(lChecker)) + c.checkList = append(c.checkList, checker.NewLightningEmptyRegionChecker(lChecker)) + } + if _, ok := c.checkingItems[config.LightningRegionDistributionChecking]; ok { + lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterRegionDist) + if err != nil { + return err } - if _, ok := c.checkingItems[config.LightningDownstreamVersionChecking]; ok { - lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterVersion) - if err != nil { - return err - } - c.checkList = append(c.checkList, checker.NewLightningClusterVersionChecker(lChecker)) + c.checkList = append(c.checkList, checker.NewLightningRegionDistributionChecker(lChecker)) + } + if _, ok := c.checkingItems[config.LightningDownstreamVersionChecking]; ok { + lChecker, err := builder.BuildPrecheckItem(restore.CheckTargetClusterVersion) + if err != nil { + return err } - if _, ok := c.checkingItems[config.LightningSortingSpaceChecking]; ok { - lChecker, err := builder.BuildPrecheckItem(restore.CheckLocalTempKVDir) - if err != nil { - return err - } - c.checkList = append(c.checkList, checker.NewLightningSortingSpaceChecker(lChecker)) + c.checkList = append(c.checkList, checker.NewLightningClusterVersionChecker(lChecker)) + } + if _, ok := c.checkingItems[config.LightningSortingSpaceChecking]; ok { + lChecker, err := builder.BuildPrecheckItem(restore.CheckLocalTempKVDir) + if err != nil { + return err } + c.checkList = append(c.checkList, checker.NewLightningSortingSpaceChecker(lChecker)) } } diff --git a/dm/tests/lightning_mode/run.sh b/dm/tests/lightning_mode/run.sh index 5629ab498a7..5d10daf27b2 100755 --- a/dm/tests/lightning_mode/run.sh +++ b/dm/tests/lightning_mode/run.sh @@ -43,7 +43,17 @@ function run() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "check-task $cur/conf/dm-task.yaml" \ "Cluster doesn't have enough space" 1 \ - "but we need 4EiB" 1 + "but we need 4EiB" 1 \ + "local disk space may not enough to finish import, estimate sorted data size is 4EiB" 1 + + export GO_FAILPOINTS='' + kill_dm_worker + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" # use sync_diff_inspector to check full dump loader From 8ba4a7fb93083cd7632592a30ddcc7664853ca10 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 29 Nov 2022 16:49:20 +0800 Subject: [PATCH 10/16] finish Signed-off-by: lance6716 --- dm/tests/dmctl_basic/conf/get_task.yaml | 2 ++ dm/tests/import_v10x/conf/task.yaml | 2 ++ dm/tests/lightning_mode/run.sh | 9 +++------ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/dm/tests/dmctl_basic/conf/get_task.yaml b/dm/tests/dmctl_basic/conf/get_task.yaml index 23c2a06c0d1..afb2da67e59 100644 --- a/dm/tests/dmctl_basic/conf/get_task.yaml +++ b/dm/tests/dmctl_basic/conf/get_task.yaml @@ -146,10 +146,12 @@ loaders: load-01: pool-size: 16 dir: ./dumped_data + sorting-dir-physical: ./dumped_data.test import-mode: logical on-duplicate: "" on-duplicate-logical: replace on-duplicate-physical: none + disk-quota-physical: 0 syncers: sync-01: meta-file: "" diff --git a/dm/tests/import_v10x/conf/task.yaml b/dm/tests/import_v10x/conf/task.yaml index bc5928107bd..96ffc29a115 100644 --- a/dm/tests/import_v10x/conf/task.yaml +++ b/dm/tests/import_v10x/conf/task.yaml @@ -89,10 +89,12 @@ loaders: load-01: pool-size: 16 dir: ./dumped_data + sorting-dir-physical: ./dumped_data.test import-mode: logical on-duplicate: "" on-duplicate-logical: replace on-duplicate-physical: none + disk-quota-physical: 0 syncers: sync-01: meta-file: "" diff --git a/dm/tests/lightning_mode/run.sh b/dm/tests/lightning_mode/run.sh index 5d10daf27b2..aca3dd91fbf 100755 --- a/dm/tests/lightning_mode/run.sh +++ b/dm/tests/lightning_mode/run.sh @@ -47,12 +47,9 @@ function run() { "local disk space may not enough to finish import, estimate sorted data size is 4EiB" 1 export GO_FAILPOINTS='' - kill_dm_worker - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml - - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + kill_dm_master + run_dm_master_info_log $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" From 40942b2a86df8dbad6fd0c52661dd09d09ce8955 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 29 Nov 2022 17:44:10 +0800 Subject: [PATCH 11/16] fix UT Signed-off-by: lance6716 --- dm/config/task_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dm/config/task_test.go b/dm/config/task_test.go index 9d84f7220f4..c5a80e0751a 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -696,7 +696,8 @@ func TestGenAndFromSubTaskConfigs(t *testing.T) { }, LoaderConfig: LoaderConfig{ PoolSize: 32, - Dir: "./dumpped_data", + Dir: "./dumped_data", + SortingDirPhysical: "./dumped_data", ImportMode: LoadModePhysical, OnDuplicateLogical: OnDuplicateReplace, OnDuplicatePhysical: OnDuplicateNone, From 6e063f05fd45d25b3513160b503ee185a8b9a2dc Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 29 Nov 2022 18:28:17 +0800 Subject: [PATCH 12/16] reduce impact Signed-off-by: lance6716 --- dm/checker/checker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dm/checker/checker.go b/dm/checker/checker.go index 8dc0c09a598..d628fb7b2d3 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -171,7 +171,9 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er } info.totalDataSize = atomic.NewInt64(0) - if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok && c.stCfgs[0].Mode != config.ModeIncrement { + if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok && + c.stCfgs[0].LoaderConfig.ImportMode == config.LoadModePhysical && + c.stCfgs[0].Mode != config.ModeIncrement { // TODO: concurrently read it intra-source later for idx := range c.instances { i := idx From 0221915b6354d5584dcd75d48860d063692acf64 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 30 Nov 2022 14:35:47 +0800 Subject: [PATCH 13/16] address some comments Signed-off-by: lance6716 --- dm/checker/checker.go | 3 +-- dm/config/checking_item.go | 18 +++++++-------- dm/config/checking_item_test.go | 36 ++++++++++++++++-------------- dm/config/source_converter_test.go | 10 +++++++++ dm/pkg/checker/lightning.go | 4 ++-- 5 files changed, 41 insertions(+), 30 deletions(-) diff --git a/dm/checker/checker.go b/dm/checker/checker.go index d628fb7b2d3..570db4ea1a8 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -137,7 +137,7 @@ type tablePairInfo struct { // downstream table -> extended columns extendedColumnPerTable map[filter.Table][]string // byte size of all upstream tables, counting both data and index - totalDataSize *atomic.Int64 + totalDataSize atomic.Int64 } func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, err error) { @@ -170,7 +170,6 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er return nil, egErr } - info.totalDataSize = atomic.NewInt64(0) if _, ok := c.checkingItems[config.LightningFreeSpaceChecking]; ok && c.stCfgs[0].LoaderConfig.ImportMode == config.LoadModePhysical && c.stCfgs[0].Mode != config.ModeIncrement { diff --git a/dm/config/checking_item.go b/dm/config/checking_item.go index ba5607a39a6..80b11ed2e58 100644 --- a/dm/config/checking_item.go +++ b/dm/config/checking_item.go @@ -45,15 +45,6 @@ const ( LightningSortingSpaceChecking = "enough_sorting_space" ) -// LightningPrechecks returns all checking items for lightning. -var LightningPrechecks = []string{ - LightningEmptyRegionChecking, - LightningRegionDistributionChecking, - LightningDownstreamVersionChecking, - LightningFreeSpaceChecking, - LightningSortingSpaceChecking, -} - // AllCheckingItems contains all checking items. var AllCheckingItems = map[string]string{ AllChecking: "all checking items", @@ -79,6 +70,15 @@ var AllCheckingItems = map[string]string{ LightningSortingSpaceChecking: "enough disk space for physical import mode sorting data checking item", } +// LightningPrechecks returns all checking items for lightning. +var LightningPrechecks = []string{ + LightningEmptyRegionChecking, + LightningRegionDistributionChecking, + LightningDownstreamVersionChecking, + LightningFreeSpaceChecking, + LightningSortingSpaceChecking, +} + // MaxSourceIDLength is the max length for dm-worker source id. const MaxSourceIDLength = 32 diff --git a/dm/config/checking_item_test.go b/dm/config/checking_item_test.go index 75703639729..96de6e3e0b9 100644 --- a/dm/config/checking_item_test.go +++ b/dm/config/checking_item_test.go @@ -16,28 +16,30 @@ package config import ( "testing" - . "github.com/pingcap/check" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" ) -func TestConfig(t *testing.T) { - TestingT(t) -} - -type testConfig struct{} - -var _ = Suite(&testConfig{}) - -func (t *testConfig) TestCheckingItems(c *C) { +func TestCheckingItems(t *testing.T) { + lightningCheck, normalCheck := 0, 0 for item := range AllCheckingItems { - c.Assert(ValidateCheckingItem(item), IsNil) + require.NoError(t, ValidateCheckingItem(item)) + if slices.Contains(LightningPrechecks, item) { + lightningCheck++ + } else { + normalCheck++ + } } - c.Assert(ValidateCheckingItem("xxx"), NotNil) + // remember to update the number when add new checking items. + require.Equal(t, 5, lightningCheck) + require.Equal(t, 15, normalCheck) + require.Error(t, ValidateCheckingItem("xxx")) // ignore all checking items ignoredCheckingItems := []string{AllChecking} - c.Assert(FilterCheckingItems(ignoredCheckingItems), IsNil) + require.Nil(t, FilterCheckingItems(ignoredCheckingItems)) ignoredCheckingItems = append(ignoredCheckingItems, ShardTableSchemaChecking) - c.Assert(FilterCheckingItems(ignoredCheckingItems), IsNil) + require.Nil(t, FilterCheckingItems(ignoredCheckingItems)) // ignore shard checking items checkingItems := make(map[string]string) @@ -46,12 +48,12 @@ func (t *testConfig) TestCheckingItems(c *C) { } delete(checkingItems, AllChecking) - c.Assert(FilterCheckingItems(ignoredCheckingItems[:0]), DeepEquals, checkingItems) + require.Equal(t, checkingItems, FilterCheckingItems(ignoredCheckingItems[:0])) delete(checkingItems, ShardTableSchemaChecking) - c.Assert(FilterCheckingItems(ignoredCheckingItems[1:]), DeepEquals, checkingItems) + require.Equal(t, checkingItems, FilterCheckingItems(ignoredCheckingItems[1:])) ignoredCheckingItems = append(ignoredCheckingItems, ShardAutoIncrementIDChecking) delete(checkingItems, ShardAutoIncrementIDChecking) - c.Assert(FilterCheckingItems(ignoredCheckingItems[1:]), DeepEquals, checkingItems) + require.Equal(t, checkingItems, FilterCheckingItems(ignoredCheckingItems[1:])) } diff --git a/dm/config/source_converter_test.go b/dm/config/source_converter_test.go index e109b79b229..e420f654ee9 100644 --- a/dm/config/source_converter_test.go +++ b/dm/config/source_converter_test.go @@ -14,10 +14,20 @@ package config import ( + "testing" + "github.com/pingcap/check" "github.com/pingcap/tiflow/dm/openapi/fixtures" ) +func TestConfig(t *testing.T) { + check.TestingT(t) +} + +type testConfig struct{} + +var _ = check.Suite(&testConfig{}) + func (t *testConfig) TestConverterWithSourceAndOpenAPISource(c *check.C) { sourceCfg1, err := ParseYaml(SampleSourceConfig) c.Assert(err, check.IsNil) diff --git a/dm/pkg/checker/lightning.go b/dm/pkg/checker/lightning.go index 3f8df0b6ee8..4df437eefd0 100644 --- a/dm/pkg/checker/lightning.go +++ b/dm/pkg/checker/lightning.go @@ -181,7 +181,7 @@ func (c *LightningFreeSpaceChecker) Check(ctx context.Context) *Result { ShortErr: fmt.Sprintf("Cluster doesn't have enough space, available is %s, but we need %s", units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(c.sourceDataSize))), }) - result.Instruction = "you can try to scale-out more TiKV to gain more storage space" + result.Instruction = "you can try to scale-out TiKV storage or TiKV instance to gain more storage space" return result } @@ -198,7 +198,7 @@ func (c *LightningFreeSpaceChecker) Check(ctx context.Context) *Result { ShortErr: fmt.Sprintf("Cluster may not have enough space, available is %s, but we need %s", units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(safeSize))), }) - result.Instruction = "you can try to scale-out more TiKV to gain more storage space" + result.Instruction = "you can try to scale-out TiKV storage or TiKV instance to gain more storage space" return result } result.State = StateSuccess From c93258dba5f4e04d3825099041b36d147c25ea92 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 30 Nov 2022 15:01:25 +0800 Subject: [PATCH 14/16] address comment Signed-off-by: lance6716 --- dm/checker/checker.go | 80 ++++++++++++++++----------------- dm/config/checking_item_test.go | 2 + 2 files changed, 42 insertions(+), 40 deletions(-) diff --git a/dm/checker/checker.go b/dm/checker/checker.go index 570db4ea1a8..1403a2ce295 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -119,23 +119,23 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, e return c } -// tablePairInfo records information about a upstream-downstream table pair including -// members may have repeated meanings but have different data structure to satisfy different usages. +// tablePairInfo records information about a upstream-downstream(source-target) table pair. +// Members may have repeated meanings but they have different data structure to satisfy different usages. type tablePairInfo struct { - // downstream table -> sourceID -> upstream tables - tablesPerDownstreamTable map[filter.Table]map[string][]filter.Table - // downstream database -> tables under this database - downstreamTablesPerDB map[string][]filter.Table - // number of sharding tables of a downstream table among all upstreams. - shardNumPerDownstreamTable map[filter.Table]int - // sourceID -> tables of this upstream in allow-list - tablesPerSourceID map[string][]filter.Table - // sourceID -> databases that contain block-list tables - interestedDBPerUpstream []map[string]struct{} - // sourceID -> downstream table -> source tables - tableMapPerUpstreamWithSourceID map[string]map[filter.Table][]filter.Table - // downstream table -> extended columns - extendedColumnPerTable map[filter.Table][]string + // target table -> sourceID -> source tables + targetTable2SourceTablesMap map[filter.Table]map[string][]filter.Table + // target database -> target tables under this database + db2TargetTables map[string][]filter.Table + // number of sharding tables (source tables) of a target table among all upstreams. + targetTableShardNum map[filter.Table]int + // sourceID -> tables of this source in allow-list + sourceID2SourceTables map[string][]filter.Table + // sourceID -> databases that contain allow-list tables + sourceID2InterestedDB []map[string]struct{} + // sourceID -> target table -> source tables + sourceID2TableMap map[string]map[filter.Table][]filter.Table + // target table -> extended columns + targetTable2ExtendedColumns map[filter.Table][]string // byte size of all upstream tables, counting both data and index totalDataSize atomic.Int64 } @@ -199,10 +199,10 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er return nil, egErr } - info.extendedColumnPerTable = extendedColumnPerTable - info.tablesPerDownstreamTable = make(map[filter.Table]map[string][]filter.Table) - info.shardNumPerDownstreamTable = make(map[filter.Table]int) - info.downstreamTablesPerDB = make(map[string][]filter.Table) + info.targetTable2ExtendedColumns = extendedColumnPerTable + info.targetTable2SourceTablesMap = make(map[filter.Table]map[string][]filter.Table) + info.targetTableShardNum = make(map[filter.Table]int) + info.db2TargetTables = make(map[string][]filter.Table) for i, inst := range c.instances { mapping := tableMapPerUpstream[i] @@ -213,29 +213,29 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er sourceID := inst.cfg.SourceID for targetTable, sourceTables := range mapping { - tablesPerSource, ok := info.tablesPerDownstreamTable[targetTable] + tablesPerSource, ok := info.targetTable2SourceTablesMap[targetTable] if !ok { tablesPerSource = make(map[string][]filter.Table) - info.tablesPerDownstreamTable[targetTable] = tablesPerSource + info.targetTable2SourceTablesMap[targetTable] = tablesPerSource } tablesPerSource[sourceID] = append(tablesPerSource[sourceID], sourceTables...) - info.shardNumPerDownstreamTable[targetTable] += len(sourceTables) - info.downstreamTablesPerDB[targetTable.Schema] = append(info.downstreamTablesPerDB[targetTable.Schema], targetTable) + info.targetTableShardNum[targetTable] += len(sourceTables) + info.db2TargetTables[targetTable.Schema] = append(info.db2TargetTables[targetTable.Schema], targetTable) } } - info.tablesPerSourceID = make(map[string][]filter.Table, len(c.instances)) - info.interestedDBPerUpstream = make([]map[string]struct{}, len(c.instances)) - info.tableMapPerUpstreamWithSourceID = make(map[string]map[filter.Table][]filter.Table, len(c.instances)) + info.sourceID2SourceTables = make(map[string][]filter.Table, len(c.instances)) + info.sourceID2InterestedDB = make([]map[string]struct{}, len(c.instances)) + info.sourceID2TableMap = make(map[string]map[filter.Table][]filter.Table, len(c.instances)) for i, inst := range c.instances { sourceID := inst.cfg.SourceID - info.interestedDBPerUpstream[i] = make(map[string]struct{}) + info.sourceID2InterestedDB[i] = make(map[string]struct{}) mapping := tableMapPerUpstream[i] - info.tableMapPerUpstreamWithSourceID[sourceID] = mapping + info.sourceID2TableMap[sourceID] = mapping for _, tables := range mapping { - info.tablesPerSourceID[sourceID] = append(info.tablesPerSourceID[sourceID], tables...) + info.sourceID2SourceTables[sourceID] = append(info.sourceID2SourceTables[sourceID], tables...) for _, table := range tables { - info.interestedDBPerUpstream[i][table.Schema] = struct{}{} + info.sourceID2InterestedDB[i][table.Schema] = struct{}{} } } } @@ -315,7 +315,7 @@ func (c *Checker) Init(ctx context.Context) (err error) { c.checkList = append(c.checkList, checker.NewSourceDumpPrivilegeChecker( instance.sourceDB.DB, instance.sourceDBinfo, - info.tablesPerSourceID[sourceID], + info.sourceID2SourceTables[sourceID], exportCfg.Consistency, c.dumpWholeInstance, )) @@ -339,10 +339,10 @@ func (c *Checker) Init(ctx context.Context) (err error) { c.checkList = append(c.checkList, checker.NewSourceReplicationPrivilegeChecker(instance.sourceDB.DB, instance.sourceDBinfo)) } if _, ok := c.checkingItems[config.OnlineDDLChecking]; c.onlineDDL != nil && ok { - c.checkList = append(c.checkList, checker.NewOnlineDDLChecker(instance.sourceDB.DB, info.interestedDBPerUpstream[i], c.onlineDDL, instance.baList)) + c.checkList = append(c.checkList, checker.NewOnlineDDLChecker(instance.sourceDB.DB, info.sourceID2InterestedDB[i], c.onlineDDL, instance.baList)) } if _, ok := c.checkingItems[config.BinlogDBChecking]; ok { - c.checkList = append(c.checkList, checker.NewBinlogDBChecker(instance.sourceDB, instance.sourceDBinfo, info.interestedDBPerUpstream[i], instance.cfg.CaseSensitive)) + c.checkList = append(c.checkList, checker.NewBinlogDBChecker(instance.sourceDB, instance.sourceDBinfo, info.sourceID2InterestedDB[i], instance.cfg.CaseSensitive)) } } } @@ -352,8 +352,8 @@ func (c *Checker) Init(ctx context.Context) (err error) { c.checkList = append(c.checkList, checker.NewTablesChecker( upstreamDBs, c.instances[0].targetDB.DB, - info.tableMapPerUpstreamWithSourceID, - info.extendedColumnPerTable, + info.sourceID2TableMap, + info.targetTable2ExtendedColumns, dumpThreads, )) } @@ -369,8 +369,8 @@ func (c *Checker) Init(ctx context.Context) (err error) { return err } if isFresh { - for targetTable, shardingSet := range info.tablesPerDownstreamTable { - if info.shardNumPerDownstreamTable[targetTable] <= 1 { + for targetTable, shardingSet := range info.targetTable2SourceTablesMap { + if info.targetTableShardNum[targetTable] <= 1 { continue } if instance.cfg.ShardMode == config.ShardPessimistic { @@ -431,7 +431,7 @@ func (c *Checker) Init(ctx context.Context) (err error) { var dbMetas []*mydump.MDDatabaseMeta // use downstream table for shard merging - for db, tables := range info.downstreamTablesPerDB { + for db, tables := range info.db2TargetTables { mdTables := make([]*mydump.MDTableMeta, 0, len(tables)) for _, table := range tables { mdTables = append(mdTables, &mydump.MDTableMeta{ @@ -817,7 +817,7 @@ func newLightningPrecheckAdaptor( if info != nil { sourceDataResult.SizeWithIndex = info.totalDataSize.Load() } - for db, tables := range info.downstreamTablesPerDB { + for db, tables := range info.db2TargetTables { allTables[db] = &checkpoints.TidbDBInfo{ Name: db, Tables: make(map[string]*checkpoints.TidbTableInfo), diff --git a/dm/config/checking_item_test.go b/dm/config/checking_item_test.go index 96de6e3e0b9..650ae4d9ef2 100644 --- a/dm/config/checking_item_test.go +++ b/dm/config/checking_item_test.go @@ -33,6 +33,8 @@ func TestCheckingItems(t *testing.T) { // remember to update the number when add new checking items. require.Equal(t, 5, lightningCheck) require.Equal(t, 15, normalCheck) + // all LightningPrechecks can be found by iterating AllCheckingItems + require.Len(t, LightningPrechecks, lightningCheck) require.Error(t, ValidateCheckingItem("xxx")) // ignore all checking items From 5b157b909dd87e9c056d955bfcb42f13db9c056d Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 1 Dec 2022 11:19:53 +0800 Subject: [PATCH 15/16] Update dm/pkg/checker/lightning.go Co-authored-by: Obliviate <756541536@qq.com> --- dm/pkg/checker/lightning.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/pkg/checker/lightning.go b/dm/pkg/checker/lightning.go index 4df437eefd0..de3a3bc0343 100644 --- a/dm/pkg/checker/lightning.go +++ b/dm/pkg/checker/lightning.go @@ -178,7 +178,7 @@ func (c *LightningFreeSpaceChecker) Check(ctx context.Context) *Result { result.State = StateFailure result.Errors = append(result.Errors, &Error{ Severity: StateFailure, - ShortErr: fmt.Sprintf("Cluster doesn't have enough space, available is %s, but we need %s", + ShortErr: fmt.Sprintf("Downstream doesn't have enough space, available is %s, but we need %s", units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(c.sourceDataSize))), }) result.Instruction = "you can try to scale-out TiKV storage or TiKV instance to gain more storage space" From 0105ac4c5dc98a2b82043752d409a7d403bd043b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 2 Dec 2022 17:37:30 +0800 Subject: [PATCH 16/16] fix CI Signed-off-by: lance6716 --- dm/tests/lightning_mode/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/tests/lightning_mode/run.sh b/dm/tests/lightning_mode/run.sh index aca3dd91fbf..620cb09f5bb 100755 --- a/dm/tests/lightning_mode/run.sh +++ b/dm/tests/lightning_mode/run.sh @@ -42,7 +42,7 @@ function run() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "check-task $cur/conf/dm-task.yaml" \ - "Cluster doesn't have enough space" 1 \ + "Downstream doesn't have enough space" 1 \ "but we need 4EiB" 1 \ "local disk space may not enough to finish import, estimate sorted data size is 4EiB" 1