diff --git a/br/pkg/checksum/executor.go b/br/pkg/checksum/executor.go index c30ae49fccdca..01a09b57d766b 100644 --- a/br/pkg/checksum/executor.go +++ b/br/pkg/checksum/executor.go @@ -26,7 +26,8 @@ type ExecutorBuilder struct { oldTable *metautil.Table - concurrency uint + concurrency uint + backoffWeight int } // NewExecutorBuilder returns a new executor builder. @@ -51,13 +52,19 @@ func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder { return builder } +// SetBackoffWeight set the backoffWeight of the checksum executing. +func (builder *ExecutorBuilder) SetBackoffWeight(backoffWeight int) *ExecutorBuilder { + builder.backoffWeight = backoffWeight + return builder +} + // Build builds a checksum executor. func (builder *ExecutorBuilder) Build() (*Executor, error) { reqs, err := buildChecksumRequest(builder.table, builder.oldTable, builder.ts, builder.concurrency) if err != nil { return nil, errors.Trace(err) } - return &Executor{reqs: reqs}, nil + return &Executor{reqs: reqs, backoffWeight: builder.backoffWeight}, nil } func buildChecksumRequest( @@ -262,7 +269,8 @@ func updateChecksumResponse(resp, update *tipb.ChecksumResponse) { // Executor is a checksum executor. type Executor struct { - reqs []*kv.Request + reqs []*kv.Request + backoffWeight int } // Len returns the total number of checksum requests. @@ -308,7 +316,11 @@ func (exec *Executor) Execute( // // It is useful in TiDB, however, it's a place holder in BR. killed := uint32(0) - resp, err := sendChecksumRequest(ctx, client, req, kv.NewVariables(&killed)) + vars := kv.NewVariables(&killed) + if exec.backoffWeight > 0 { + vars.BackOffWeight = exec.backoffWeight + } + resp, err := sendChecksumRequest(ctx, client, req, vars) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 2b36e457cd857..bb6c988e1e11a 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//br/pkg/utils", "//errno", "//parser/model", + "//sessionctx/variable", "//store/driver/error", "//table/tables", "//util", diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 621c59d820e23..e66b679f0b10a 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" tmysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table/tables" "go.uber.org/zap" ) @@ -428,3 +429,45 @@ func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo { } return nil } + +// GetBackoffWeightFromDB gets the backoff weight from database. +func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error) { + val, err := getSessionVariable(ctx, db, variable.TiDBBackOffWeight) + if err != nil { + return 0, err + } + return strconv.Atoi(val) +} + +// copy from dbutil to avoid import cycle +func getSessionVariable(ctx context.Context, db *sql.DB, variable string) (value string, err error) { + query := fmt.Sprintf("SHOW VARIABLES LIKE '%s'", variable) + rows, err := db.QueryContext(ctx, query) + + if err != nil { + return "", errors.Trace(err) + } + defer rows.Close() + + // Show an example. + /* + mysql> SHOW VARIABLES LIKE "binlog_format"; + +---------------+-------+ + | Variable_name | Value | + +---------------+-------+ + | binlog_format | ROW | + +---------------+-------+ + */ + + for rows.Next() { + if err = rows.Scan(&variable, &value); err != nil { + return "", errors.Trace(err) + } + } + + if err := rows.Err(); err != nil { + return "", errors.Trace(err) + } + + return value, nil +} diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index e1031c760f749..e372ad3e1bb18 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -440,6 +440,7 @@ type PostRestore struct { Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"` PostProcessAtLast bool `toml:"post-process-at-last" json:"post-process-at-last"` Compact bool `toml:"compact" json:"compact"` + ChecksumViaSQL bool `toml:"checksum-via-sql" json:"checksum-via-sql"` } type CSVConfig struct { @@ -745,6 +746,7 @@ func NewConfig() *Config { Checksum: OpLevelRequired, Analyze: OpLevelOptional, PostProcessAtLast: true, + ChecksumViaSQL: true, }, } } diff --git a/br/pkg/lightning/restore/BUILD.bazel b/br/pkg/lightning/restore/BUILD.bazel index ef5aeb106585b..f2a08f22f7d51 100644 --- a/br/pkg/lightning/restore/BUILD.bazel +++ b/br/pkg/lightning/restore/BUILD.bazel @@ -53,6 +53,7 @@ go_library( "//parser/model", "//parser/mysql", "//planner/core", + "//sessionctx/variable", "//store/driver", "//store/pdtypes", "//table", @@ -76,6 +77,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_tipb//go-tipb", + "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", diff --git a/br/pkg/lightning/restore/checksum.go b/br/pkg/lightning/restore/checksum.go index b30fe14e01fc1..d433eac3196ff 100644 --- a/br/pkg/lightning/restore/checksum.go +++ b/br/pkg/lightning/restore/checksum.go @@ -33,8 +33,10 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tipb/go-tipb" + tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/atomic" @@ -51,6 +53,12 @@ var ( serviceSafePointTTL int64 = 10 * 60 // 10 min in seconds minDistSQLScanConcurrency = 4 + + // DefaultBackoffWeight is the default value of tidb_backoff_weight for checksum. + // when TiKV client encounters an error of "region not leader", it will keep retrying every 500 ms. + // If it still fails after 2 * 20 = 40 seconds, it will return "region unavailable". + // If we increase the BackOffWeight to 6, then the TiKV client will keep retrying for 120 seconds. + DefaultBackoffWeight = 3 * tikvstore.DefBackOffWeight ) // RemoteChecksum represents a checksum result got from tidb. @@ -80,14 +88,26 @@ func newChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) ( // for v4.0.0 or upper, we can use the gc ttl api var manager ChecksumManager - if pdVersion.Major >= 4 { + if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL { tlsOpt := rc.tls.ToPDSecurityOption() pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt) if err != nil { return nil, errors.Trace(err) } - manager = newTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency)) + db, err := rc.tidbGlue.GetDB() + if err != nil { + return nil, errors.Trace(err) + } + backoffWeight, err := common.GetBackoffWeightFromDB(ctx, db) + // only set backoff weight when it's smaller than default value + if err == nil && backoffWeight >= DefaultBackoffWeight { + log.L().Info("get tidb_backoff_weight", zap.Int("backoff_weight", backoffWeight)) + } else { + log.L().Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", DefaultBackoffWeight)) + backoffWeight = DefaultBackoffWeight + } + manager = newTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight) } else { db, err := rc.tidbGlue.GetDB() if err != nil { @@ -125,6 +145,15 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi task := log.FromContext(ctx).With(zap.String("table", tableName)).Begin(zap.InfoLevel, "remote checksum") + conn, err := e.db.Conn(ctx) + if err != nil { + return nil, errors.Trace(err) + } + defer func() { + if err := conn.Close(); err != nil { + task.Warn("close connection failed", zap.Error(err)) + } + }() // ADMIN CHECKSUM TABLE ,
example. // mysql> admin checksum table test.t; // +---------+------------+---------------------+-----------+-------------+ @@ -132,9 +161,23 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi // +---------+------------+---------------------+-----------+-------------+ // | test | t | 8520875019404689597 | 7296873 | 357601387 | // +---------+------------+---------------------+-----------+-------------+ + backoffWeight, err := common.GetBackoffWeightFromDB(ctx, e.db) + if err == nil && backoffWeight < DefaultBackoffWeight { + task.Info("increase tidb_backoff_weight", zap.Int("original", backoffWeight), zap.Int("new", DefaultBackoffWeight)) + // increase backoff weight + if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, DefaultBackoffWeight)); err != nil { + task.Warn("set tidb_backoff_weight failed", zap.Error(err)) + } else { + defer func() { + if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET SESSION %s = '%d';", variable.TiDBBackOffWeight, backoffWeight)); err != nil { + task.Warn("recover tidb_backoff_weight failed", zap.Error(err)) + } + }() + } + } cs := RemoteChecksum{} - err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum", + err = common.SQLWithRetry{DB: conn, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum", "ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes, ) dur := task.End(zap.ErrorLevel, err) @@ -257,20 +300,23 @@ type tikvChecksumManager struct { client kv.Client manager gcTTLManager distSQLScanConcurrency uint + backoffWeight int } // newTiKVChecksumManager return a new tikv checksum manager -func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint) *tikvChecksumManager { +func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint, backoffWeight int) *tikvChecksumManager { return &tikvChecksumManager{ client: client, manager: newGCTTLManager(pdClient), distSQLScanConcurrency: distSQLScanConcurrency, + backoffWeight: backoffWeight, } } func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) { executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts). SetConcurrency(e.distSQLScanConcurrency). + SetBackoffWeight(e.backoffWeight). Build() if err != nil { return nil, errors.Trace(err) diff --git a/br/pkg/lightning/restore/checksum_test.go b/br/pkg/lightning/restore/checksum_test.go index 20acc23fe6be0..ba920ee58ed84 100644 --- a/br/pkg/lightning/restore/checksum_test.go +++ b/br/pkg/lightning/restore/checksum_test.go @@ -56,6 +56,7 @@ func TestDoChecksum(t *testing.T) { WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() + mock.ExpectClose() ctx := MockDoChecksumCtx(db) checksum, err := DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"}) @@ -216,6 +217,7 @@ func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) { WithArgs("300h"). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectClose() + mock.ExpectClose() ctx := MockDoChecksumCtx(db) _, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"}) diff --git a/br/pkg/lightning/restore/table_restore_test.go b/br/pkg/lightning/restore/table_restore_test.go index 17fb97e346e36..ad09add849a51 100644 --- a/br/pkg/lightning/restore/table_restore_test.go +++ b/br/pkg/lightning/restore/table_restore_test.go @@ -753,6 +753,7 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess() { WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() + mock.ExpectClose() ctx := MockDoChecksumCtx(db) remoteChecksum, err := DoChecksum(ctx, s.tr.tableInfo) @@ -783,7 +784,7 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure() { WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() - + mock.ExpectClose() ctx := MockDoChecksumCtx(db) remoteChecksum, err := DoChecksum(ctx, s.tr.tableInfo) require.NoError(s.T(), err) diff --git a/br/pkg/lightning/restore/tidb.go b/br/pkg/lightning/restore/tidb.go index 0e114bc035a56..33e9d5622a598 100644 --- a/br/pkg/lightning/restore/tidb.go +++ b/br/pkg/lightning/restore/tidb.go @@ -50,6 +50,7 @@ var defaultImportantVariables = map[string]string{ "default_week_format": "0", "block_encryption_mode": "aes-128-ecb", "group_concat_max_len": "1024", + "tidb_backoff_weight": "6", } // defaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system diff --git a/br/pkg/lightning/restore/tidb_test.go b/br/pkg/lightning/restore/tidb_test.go index 9b204b2da22b1..b3ece883864f6 100644 --- a/br/pkg/lightning/restore/tidb_test.go +++ b/br/pkg/lightning/restore/tidb_test.go @@ -460,6 +460,7 @@ func TestObtainRowFormatVersionSucceed(t *testing.T) { sysVars := ObtainImportantVariables(ctx, s.tiGlue.GetSQLExecutor(), true) require.Equal(t, map[string]string{ + "tidb_backoff_weight": "6", "tidb_row_format_version": "2", "max_allowed_packet": "1073741824", "div_precision_increment": "10", @@ -487,6 +488,7 @@ func TestObtainRowFormatVersionFailure(t *testing.T) { sysVars := ObtainImportantVariables(ctx, s.tiGlue.GetSQLExecutor(), true) require.Equal(t, map[string]string{ + "tidb_backoff_weight": "6", "tidb_row_format_version": "1", "max_allowed_packet": "67108864", "div_precision_increment": "4",