diff --git a/br/pkg/checksum/executor.go b/br/pkg/checksum/executor.go index c30ae49fccdca..01a09b57d766b 100644 --- a/br/pkg/checksum/executor.go +++ b/br/pkg/checksum/executor.go @@ -26,7 +26,8 @@ type ExecutorBuilder struct { oldTable *metautil.Table - concurrency uint + concurrency uint + backoffWeight int } // NewExecutorBuilder returns a new executor builder. @@ -51,13 +52,19 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder { return builder } +// SetBackoffWeight set the backoffWeight of the checksum executing. +func (builder *ExecutorBuilder) SetBackoffWeight(backoffWeight int) *ExecutorBuilder { + builder.backoffWeight = backoffWeight + return builder +} + // Build builds a checksum executor. func (builder *ExecutorBuilder) Build() (*Executor, error) { reqs, err := buildChecksumRequest(builder.table, builder.oldTable, builder.ts, builder.concurrency) if err != nil { return nil, errors.Trace(err) } - return &Executor{reqs: reqs}, nil + return &Executor{reqs: reqs, backoffWeight: builder.backoffWeight}, nil } func buildChecksumRequest( @@ -262,7 +269,8 @@ func updateChecksumResponse(resp, update *tipb.ChecksumResponse) { // Executor is a checksum executor. type Executor struct { - reqs []*kv.Request + reqs []*kv.Request + backoffWeight int } // Len returns the total number of checksum requests. @@ -308,7 +316,11 @@ func (exec *Executor) Execute( // // It is useful in TiDB, however, it's a place holder in BR. killed := uint32(0) - resp, err := sendChecksumRequest(ctx, client, req, kv.NewVariables(&killed)) + vars := kv.NewVariables(&killed) + if exec.backoffWeight > 0 { + vars.BackOffWeight = exec.backoffWeight + } + resp, err := sendChecksumRequest(ctx, client, req, vars) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 2b36e457cd857..bb6c988e1e11a 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//br/pkg/utils", "//errno", "//parser/model", + "//sessionctx/variable", "//store/driver/error", "//table/tables", "//util", diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 621c59d820e23..e66b679f0b10a 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" tmysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table/tables" "go.uber.org/zap" ) @@ -428,3 +429,45 @@ func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo { } return nil } + +// GetBackoffWeightFromDB gets the backoff weight from database. +func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error) { + val, err := getSessionVariable(ctx, db, variable.TiDBBackOffWeight) + if err != nil { + return 0, err + } + return strconv.Atoi(val) +} + +// copy from dbutil to avoid import cycle +func getSessionVariable(ctx context.Context, db *sql.DB, variable string) (value string, err error) { + query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", variable) + rows, err := db.QueryContext(ctx, query) + + if err != nil { + return "", errors.Trace(err) + } + defer rows.Close() + + // Show an example. + /* + mysql> SHOW VARIABLES LIKE "binlog_format"; + +---------------+-------+ + | Variable_name | Value | + +---------------+-------+ + | binlog_format | ROW | + +---------------+-------+ + */ + + for rows.Next() { + if err = rows.Scan(&variable, &value); err != nil { + return "", errors.Trace(err) + } + } + + if err := rows.Err(); err != nil { + return "", errors.Trace(err) + } + + return value, nil +} diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index e1031c760f749..e372ad3e1bb18 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -440,6 +440,7 @@ type PostRestore struct { Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"` PostProcessAtLast bool `toml:"post-process-at-last" json:"post-process-at-last"` Compact bool `toml:"compact" json:"compact"` + ChecksumViaSQL bool `toml:"checksum-via-sql" json:"checksum-via-sql"` } type CSVConfig struct { @@ -745,6 +746,7 @@ func NewConfig() *Config { Checksum: OpLevelRequired, Analyze: OpLevelOptional, PostProcessAtLast: true, + ChecksumViaSQL: true, }, } } diff --git a/br/pkg/lightning/restore/BUILD.bazel b/br/pkg/lightning/restore/BUILD.bazel index ef5aeb106585b..f2a08f22f7d51 100644 --- a/br/pkg/lightning/restore/BUILD.bazel +++ b/br/pkg/lightning/restore/BUILD.bazel @@ -53,6 +53,7 @@ go_library( "//parser/model", "//parser/mysql", "//planner/core", + "//sessionctx/variable", "//store/driver", "//store/pdtypes", "//table", @@ -76,6 +77,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_tipb//go-tipb", + "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", diff --git a/br/pkg/lightning/restore/checksum.go b/br/pkg/lightning/restore/checksum.go index b30fe14e01fc1..d433eac3196ff 100644 --- a/br/pkg/lightning/restore/checksum.go +++ b/br/pkg/lightning/restore/checksum.go @@ -33,8 +33,10 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tipb/go-tipb" + tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/atomic" @@ -51,6 +53,12 @@ var ( serviceSafePointTTL int64 = 10 * 60 // 10 min in seconds minDistSQLScanConcurrency = 4 + + // DefaultBackoffWeight is the default value of tidb_backoff_weight for checksum. + // when TiKV client encounters an error of "region not leader", it will keep retrying every 500 ms. + // If it still fails after 2 * 20 = 40 seconds, it will return "region unavailable". + // If we increase the BackOffWeight to 6, then the TiKV client will keep retrying for 120 seconds. + DefaultBackoffWeight = 3 * tikvstore.DefBackOffWeight ) // RemoteChecksum represents a checksum result got from tidb. @@ -80,14 +88,26 @@ func newChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) ( // for v4.0.0 or upper, we can use the gc ttl api var manager ChecksumManager - if pdVersion.Major >= 4 { + if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL { tlsOpt := rc.tls.ToPDSecurityOption() pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt) if err != nil { return nil, errors.Trace(err) } - manager = newTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency)) + db, err := rc.tidbGlue.GetDB() + if err != nil { + return nil, errors.Trace(err) + } + backoffWeight, err := common.GetBackoffWeightFromDB(ctx, db) + // only set backoff weight when it's smaller than default value + if err == nil && backoffWeight >= DefaultBackoffWeight { + log.L().Info("get tidb_backoff_weight", zap.Int("backoff_weight", backoffWeight)) + } else { + log.L().Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", DefaultBackoffWeight)) + backoffWeight = DefaultBackoffWeight + } + manager = newTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight) } else { db, err := rc.tidbGlue.GetDB() if err != nil { @@ -125,6 +145,15 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi task := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "remote checksum") + conn, err := e.db.Conn(ctx) + if err != nil { + return nil, errors.Trace(err) + } + defer func() { + if err := conn.Close(); err != nil { + task.Warn("close connection failed", zap.Error(err)) + } + }() // ADMIN CHECKSUM TABLE